In this post, we’ll look at MySQL CDC, streaming binary logs and asynchronous triggers.
Change Data Capture (CDC) tracks data changes (usually close to realtime). In MySQL, the easiest and probably most efficient way to track data changes is to use binary logs. However, other approaches exist. For example:
One of the first implementations of CDC for MySQL was the FlexCDC project by Justin Swanhart. Nowadays, there are a lot of CDC implementations (see mysql-cdc-projects wiki for a long list).
CDC can be implemented for various tasks such as auditing, copying data to another system or processing (and reacting to) events. In this blog post, I will demonstrate how to use a CDC approach to stream MySQL binary logs, process events and save it (stream to) another MySQL instance (or MongoDB). In addition, I will show how to implement asynchronous triggers by streaming binary logs.
You can read binary logs using the mysqlbinlog utility, by adding “-vvv” (verbose option). mysqlbinlog can also show human readable version for the ROW based replication. For example:
|
1 |
# mysqlbinlog -vvv /var/lib/mysql/master.000001 <br><br>BINLOG '<br>JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY=<br>JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg==<br>'/*!*/;<br>### INSERT INTO `test`.`a`<br>### SET<br>### @1=100 /* INT meta=0 nullable=1 is_null=0 */<br># at 8047542<br>#160809 17:51:35 server id 1 end_log_pos 8047573 CRC32 0x56b36ca5 Xid = 24453<br>COMMIT/*!*/;<br> |
Starting with MySQL 5.6, mysqlbinlog can also read the binary log events from a remote master (“fake” replication slave).
Reading binary logs is a great basis for CDC. However, there are still some challenges:
Maxwell’s daemon (Maxwell = Mysql + Kafka), an application recently released by Zendesk, reads MySQL binlogs and writes row updates as JSON (it can write to Kafka, which is its primary goal, but can also write to stdout and can be extended for other purposes). Maxwell stores the metadata about MySQL tables and binary log events (and other metadata) inside MySQL, so it solves the potential issues from the above list.
Here is a quick demo of Maxwell:
Session 1 (Insert into MySQL):
|
1 |
mysql> insert into a (i) values (151);<br>Query OK, 1 row affected (0.00 sec)<br><br>mysql> update a set i = 300 limit 5;<br>Query OK, 5 rows affected (0.01 sec)<br>Rows matched: 5 Changed: 5 Warnings: 0 |
Session 2 (starting Maxwell):
|
1 |
$ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout <br>16:00:15,303 INFO Maxwell - Maxwell is booting (StdoutProducer), starting at BinlogPosition[master.000001:15494460]<br>16:00:15,327 INFO TransportImpl - connecting to host: 127.0.0.1, port: 3306<br>16:00:15,350 INFO TransportImpl - connected to host: 127.0.0.1, port: 3306, context: AbstractTransport.Context[threadId=9,...<br>16:00:15,350 INFO AuthenticatorImpl - start to login, user: maxwell, host: 127.0.0.1, port: 3306<br>16:00:15,354 INFO AuthenticatorImpl - login successfully, user: maxwell, detail: OKPacket[packetMarker=0,affectedRows=0,insertId=0,serverStatus=2,warningCount=0,message=<null>]<br>16:00:15,533 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[master.000001:3921])<br><br>{"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}}<br><br>{"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}}<br>{"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}}<br>{"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}}<br>{"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}}<br>{"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}}<br>{"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"commit":true,"data":{"i":300},"old":{"i":150}} |
As we can see in this example, Maxwell get the events from MySQL replication stream and outputs it into stdout (if we change the producer, it can save it to Apache Kafka).
If we want to save the events to some other place we can use MongoDB or MySQL JSON fields and document store (as Maxwell will provide use with JSON documents). For a simple proof of concept, I’ve created nodeJS scripts to implement a CDC “pipleline”:
|
1 |
var mysqlx = require('mysqlx');<br>var mySession =<br>mysqlx.getSession({<br> host: '10.0.0.2',<br> port: 33060,<br> dbUser: 'root',<br> dbPassword: 'xxx'<br>});<br>process.on('SIGINT', function() {<br> console.log("Caught interrupt signal. Exiting...");<br> process.exit()<br>});<br><br>process.stdin.setEncoding('utf8');<br><br>process.stdin.on('readable', () => {<br> var chunk = process.stdin.read();<br> if(chunk != null) { <br> process.stdout.write(`data: ${chunk}`);<br> mySession.then(session => {<br> session.getSchema("mysqlcdc").getCollection("mysqlcdc")<br> .add( JSON.parse(chunk) ) .execute(function (row) {<br> // can log something here<br> }).catch(err => {<br> console.log(err);<br> })<br> .then( function (notices) {<br> console.log("Wrote to MySQL: " + JSON.stringify(notices))<br> });<br> }).catch(function (err) {<br> console.log(err);<br> process.exit();<br> });<br> }<br>});<br><br><br>process.stdin.on('end', () => {<br> process.stdout.write('end');<br> process.stdin.resume();<br>});<br> |
And to run it we can use the pipeline:
|
1 |
./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR | node ./maxwell_to_mysql.js |
The same approach can be used to save the CDC events to MongoDB with mongoimport:
|
1 |
$ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR |mongoimport -d mysqlcdc -c mysqlcdc --host localhost:27017 |
In the above example, we only recorded the binary log events. Now we can add “reactions”.
One of the practical applications is re-implementing MySQL triggers to something more performant. MySQL triggers are executed for each row, and are synchronous (the query will not return until the trigger event finishes). This was known to cause poor performance, and can significantly slow down bulk operations (i.e., “load data infile” or “insert into … values (…), (…)”). With triggers, MySQL will have to process the “bulk” operations row by row, killing the performance. In addition, when using statement-based replication, triggers on the slave can slow down the replication thread (it is much less relevant nowadays with ROW-based replication and potentially multithreaded slaves).
With the ability to read binary logs from MySQL (using Maxwell), we can process the events and re-implement triggers — now in asynchronous mode — without delaying MySQL operations. As Maxwell gives us a JSON document with the “new” and “old” values (with the default option binlog_row_image=FULL, MySQL records the previous values for updates and deletes) we can use it to create triggers.
Not all triggers can be easily re-implemented based on the binary logs. However, in my experience most of the triggers in MySQL are used for:
Here is a quick algorithm for how to re-implement the triggers with Maxwell:
For example, if I want to audit all deletes in the “transactions” table, I can do it with Maxwell and a simple Python script (do not use this in production, it is a very basic sample):
|
1 |
import json,sys<br><br>line = sys.stdin.readline()<br>while line:<br> print line,<br> obj=json.loads(line);<br> if obj["type"] == "delete":<br> print "INSERT INTO transactions_delete_log VALUES ('" + str(obj["data"]) + "', Now() )"<br> line = sys.stdin.readline() |
MySQL:
|
1 |
mysql> delete from transactions where user_id = 2;<br>Query OK, 1 row affected (0.00 sec)<br> |
Maxwell pipeline:
|
1 |
$ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR | python trigger.py<br><br>{"database":"test","table":"transactions","type":"delete","ts":1472942384,"xid":214395,"commit":true,"data":{"id":2,"user_id":2,"value":2,"last_updated":"2016-09-03 22:39:31"}}<br>INSERT INTO transactions_delete_log VALUES ('{u'last_updated': u'2016-09-03 22:39:31', u'user_id': 2, u'id': 2, u'value': 2}', Now() )<br> |
Maxwell was designed for MySQL 5.6 with ROW-based replication. Although it can work with MySQL 5.7, it does not support new MySQL 5.7 data types (i.e., JSON fields). Maxwell does not support GTID, and can’t failover based on GTID (it can parse events with GTID thou).
Streaming MySQL binary logs (for example with Maxwell application) can help to implement CDC for auditing and other purposes, and also implement asynchronous triggers (removing the MySQL level triggers can increase MySQL performance).