In this post, we’ll look at MySQL CDC, streaming binary logs and asynchronous triggers.
What is Change Data Capture and why do we need it?
Change Data Capture (CDC) tracks data changes (usually close to realtime). In MySQL, the easiest and probably most efficient way to track data changes is to use binary logs. However, other approaches exist. For example:
- General log or Audit Log Plugin (which logs all queries, not just the changes)
- MySQL triggers (not recommended, as it can slow down the application — more below)
One of the first implementations of CDC for MySQL was the FlexCDC project by Justin Swanhart. Nowadays, there are a lot of CDC implementations (see mysql-cdc-projects wiki for a long list).
CDC can be implemented for various tasks such as auditing, copying data to another system or processing (and reacting to) events. In this blog post, I will demonstrate how to use a CDC approach to stream MySQL binary logs, process events and save it (stream to) another MySQL instance (or MongoDB). In addition, I will show how to implement asynchronous triggers by streaming binary logs.
Streaming binary logs
You can read binary logs using the mysqlbinlog utility, by adding “-vvv” (verbose option). mysqlbinlog can also show human readable version for the ROW based replication. For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
# mysqlbinlog -vvv /var/lib/mysql/master.000001 BINLOG ' JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY= JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg== '/*!*/; ### INSERT INTO `test`.`a` ### SET ### @1=100 /* INT meta=0 nullable=1 is_null=0 */ # at 8047542 #160809 17:51:35 server id 1 end_log_pos 8047573 CRC32 0x56b36ca5 Xid = 24453 COMMIT/*!*/; |
Starting with MySQL 5.6, mysqlbinlog can also read the binary log events from a remote master (“fake” replication slave).
Reading binary logs is a great basis for CDC. However, there are still some challenges:
- ROW-based replication is probably the easiest way to get the RAW changes, otherwise we will have to parse SQL. At the same time, ROW-based replication binary logs don’t contain the table metadata, i.e. it does not record the field names, only field number (as in the example above “@1” is the first field in table “a”).
- We will need to somehow record and store the binary log positions so that the tool can be restarted at any time and proceed from the last position (like a MySQL replication slave).
Maxwell’s daemon (Maxwell = Mysql + Kafka), an application recently released by Zendesk, reads MySQL binlogs and writes row updates as JSON (it can write to Kafka, which is its primary goal, but can also write to stdout and can be extended for other purposes). Maxwell stores the metadata about MySQL tables and binary log events (and other metadata) inside MySQL, so it solves the potential issues from the above list.
Here is a quick demo of Maxwell:
Session 1 (Insert into MySQL):
1 2 3 4 5 6 |
mysql> insert into a (i) values (151); Query OK, 1 row affected (0.00 sec) mysql> update a set i = 300 limit 5; Query OK, 5 rows affected (0.01 sec) Rows matched: 5 Changed: 5 Warnings: 0 |
Session 2 (starting Maxwell):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
$ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout 16:00:15,303 INFO Maxwell - Maxwell is booting (StdoutProducer), starting at BinlogPosition[master.000001:15494460] 16:00:15,327 INFO TransportImpl - connecting to host: 127.0.0.1, port: 3306 16:00:15,350 INFO TransportImpl - connected to host: 127.0.0.1, port: 3306, context: AbstractTransport.Context[threadId=9,... 16:00:15,350 INFO AuthenticatorImpl - start to login, user: maxwell, host: 127.0.0.1, port: 3306 16:00:15,354 INFO AuthenticatorImpl - login successfully, user: maxwell, detail: OKPacket[packetMarker=0,affectedRows=0,insertId=0,serverStatus=2,warningCount=0,message=<null>] 16:00:15,533 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[master.000001:3921]) {"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}} {"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"commit":true,"data":{"i":300},"old":{"i":150}} |
As we can see in this example, Maxwell get the events from MySQL replication stream and outputs it into stdout (if we change the producer, it can save it to Apache Kafka).
Saving binlog events to MySQL document store or MongoDB
If we want to save the events to some other place we can use MongoDB or MySQL JSON fields and document store (as Maxwell will provide use with JSON documents). For a simple proof of concept, I’ve created nodeJS scripts to implement a CDC “pipleline”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
var mysqlx = require('mysqlx'); var mySession = mysqlx.getSession({ host: '10.0.0.2', port: 33060, dbUser: 'root', dbPassword: 'xxx' }); process.on('SIGINT', function() { console.log("Caught interrupt signal. Exiting..."); process.exit() }); process.stdin.setEncoding('utf8'); process.stdin.on('readable', () => { var chunk = process.stdin.read(); if(chunk != null) { process.stdout.write(`data: ${chunk}`); mySession.then(session => { session.getSchema("mysqlcdc").getCollection("mysqlcdc") .add( JSON.parse(chunk) ) .execute(function (row) { // can log something here }).catch(err => { console.log(err); }) .then( function (notices) { console.log("Wrote to MySQL: " + JSON.stringify(notices)) }); }).catch(function (err) { console.log(err); process.exit(); }); } }); process.stdin.on('end', () => { process.stdout.write('end'); process.stdin.resume(); }); |
And to run it we can use the pipeline:
1 |
./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR | node ./maxwell_to_mysql.js |
The same approach can be used to save the CDC events to MongoDB with mongoimport:
1 |
$ ./bin/ |