 In this post, we’ll look at MySQL CDC, streaming binary logs and asynchronous triggers.
In this post, we’ll look at MySQL CDC, streaming binary logs and asynchronous triggers.
What is Change Data Capture and why do we need it?
Change Data Capture (CDC) tracks data changes (usually close to realtime). In MySQL, the easiest and probably most efficient way to track data changes is to use binary logs. However, other approaches exist. For example:
- General log or Audit Log Plugin (which logs all queries, not just the changes)
- MySQL triggers (not recommended, as it can slow down the application — more below)
One of the first implementations of CDC for MySQL was the FlexCDC project by Justin Swanhart. Nowadays, there are a lot of CDC implementations (see mysql-cdc-projects wiki for a long list).
CDC can be implemented for various tasks such as auditing, copying data to another system or processing (and reacting to) events. In this blog post, I will demonstrate how to use a CDC approach to stream MySQL binary logs, process events and save it (stream to) another MySQL instance (or MongoDB). In addition, I will show how to implement asynchronous triggers by streaming binary logs.
Streaming binary logs
You can read binary logs using the mysqlbinlog utility, by adding “-vvv” (verbose option). mysqlbinlog can also show human readable version for the ROW based replication. For example:
| 1 2 3 4 5 6 7 8 9 10 11 12 | # mysqlbinlog -vvv /var/lib/mysql/master.000001  BINLOG ' JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY= JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg== '/*!*/; ### INSERT INTO `test`.`a` ### SET ###   @1=100 /* INT meta=0 nullable=1 is_null=0 */ # at 8047542 #160809 17:51:35 server id 1  end_log_pos 8047573 CRC32 0x56b36ca5      Xid = 24453 COMMIT/*!*/; | 
Starting with MySQL 5.6, mysqlbinlog can also read the binary log events from a remote master (“fake” replication slave).
Reading binary logs is a great basis for CDC. However, there are still some challenges:
- ROW-based replication is probably the easiest way to get the RAW changes, otherwise we will have to parse SQL. At the same time, ROW-based replication binary logs don’t contain the table metadata, i.e. it does not record the field names, only field number (as in the example above “@1” is the first field in table “a”).
- We will need to somehow record and store the binary log positions so that the tool can be restarted at any time and proceed from the last position (like a MySQL replication slave).
Maxwell’s daemon (Maxwell = Mysql + Kafka), an application recently released by Zendesk, reads MySQL binlogs and writes row updates as JSON (it can write to Kafka, which is its primary goal, but can also write to stdout and can be extended for other purposes). Maxwell stores the metadata about MySQL tables and binary log events (and other metadata) inside MySQL, so it solves the potential issues from the above list.
Here is a quick demo of Maxwell:
Session 1 (Insert into MySQL):
| 1 2 3 4 5 6 | mysql> insert into a (i) values (151); Query OK, 1 row affected (0.00 sec) mysql> update a set i = 300 limit 5; Query OK, 5 rows affected (0.01 sec) Rows matched: 5  Changed: 5  Warnings: 0 | 
Session 2 (starting Maxwell):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | $ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout                                                                            16:00:15,303 INFO  Maxwell - Maxwell is booting (StdoutProducer), starting at BinlogPosition[master.000001:15494460] 16:00:15,327 INFO  TransportImpl - connecting to host: 127.0.0.1, port: 3306 16:00:15,350 INFO  TransportImpl - connected to host: 127.0.0.1, port: 3306, context: AbstractTransport.Context[threadId=9,... 16:00:15,350 INFO  AuthenticatorImpl - start to login, user: maxwell, host: 127.0.0.1, port: 3306 16:00:15,354 INFO  AuthenticatorImpl - login successfully, user: maxwell, detail: OKPacket[packetMarker=0,affectedRows=0,insertId=0,serverStatus=2,warningCount=0,message=<null>] 16:00:15,533 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[master.000001:3921]) {"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}} {"database":"test","table":"a","type":"insert","ts":1472937475,"xid":211209,"commit":true,"data":{"i":151}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"data":{"i":300},"old":{"i":150}} {"database":"test","table":"a","type":"update","ts":1472937535,"xid":211333,"commit":true,"data":{"i":300},"old":{"i":150}} | 
As we can see in this example, Maxwell get the events from MySQL replication stream and outputs it into stdout (if we change the producer, it can save it to Apache Kafka).
Saving binlog events to MySQL document store or MongoDB
If we want to save the events to some other place we can use MongoDB or MySQL JSON fields and document store (as Maxwell will provide use with JSON documents). For a simple proof of concept, I’ve created nodeJS scripts to implement a CDC “pipleline”:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | var mysqlx = require('mysqlx'); var mySession = mysqlx.getSession({     host: '10.0.0.2',     port: 33060,     dbUser: 'root',     dbPassword: 'xxx' }); process.on('SIGINT', function() {     console.log("Caught interrupt signal. Exiting...");     process.exit() }); process.stdin.setEncoding('utf8'); process.stdin.on('readable', () => {   var chunk = process.stdin.read();   if(chunk != null) {       process.stdout.write(`data: ${chunk}`);     mySession.then(session => {                     session.getSchema("mysqlcdc").getCollection("mysqlcdc")                     .add(  JSON.parse(chunk)  ) .execute(function (row) {                             // can log something here                     }).catch(err => {                             console.log(err);                     })                     .then( function (notices) {                             console.log("Wrote to MySQL: " + JSON.stringify(notices))                     });     }).catch(function (err) {                   console.log(err);                   process.exit();     });   } }); process.stdin.on('end', () => {   process.stdout.write('end');   process.stdin.resume(); }); | 
And to run it we can use the pipeline:
| 1 | ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR  | node ./maxwell_to_mysql.js | 
The same approach can be used to save the CDC events to MongoDB with mongoimport:
| 1 | $ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR |mongoimport -d mysqlcdc -c mysqlcdc --host localhost:27017 | 
Reacting to binary log events: asynchronous triggers
In the above example, we only recorded the binary log events. Now we can add “reactions”.
One of the practical applications is re-implementing MySQL triggers to something more performant. MySQL triggers are executed for each row, and are synchronous (the query will not return until the trigger event finishes). This was known to cause poor performance, and can significantly slow down bulk operations (i.e., “load data infile” or “insert into … values (…), (…)”). With triggers, MySQL will have to process the “bulk” operations row by row, killing the performance. In addition, when using statement-based replication, triggers on the slave can slow down the replication thread (it is much less relevant nowadays with ROW-based replication and potentially multithreaded slaves).
With the ability to read binary logs from MySQL (using Maxwell), we can process the events and re-implement triggers — now in asynchronous mode — without delaying MySQL operations. As Maxwell gives us a JSON document with the “new” and “old” values (with the default option binlog_row_image=FULL, MySQL records the previous values for updates and deletes) we can use it to create triggers.
Not all triggers can be easily re-implemented based on the binary logs. However, in my experience most of the triggers in MySQL are used for:
- auditing (if you deleted a row, what was the previous value and/or who did and when)
- enriching the existing table (i.e., update the field in the same table)
Here is a quick algorithm for how to re-implement the triggers with Maxwell:
- Find the trigger table and trigger event text (SQL)
- Create an app or a script to parse JSON for the trigger table
- Create a new version of the SQL changing the NEW.<field> to “data.field” (from JSON) and OLD.<field> to “old.field” (from JSON)
For example, if I want to audit all deletes in the “transactions” table, I can do it with Maxwell and a simple Python script (do not use this in production, it is a very basic sample):
| 1 2 3 4 5 6 7 8 9 | import json,sys line = sys.stdin.readline() while line:     print line,     obj=json.loads(line);     if obj["type"] == "delete":         print "INSERT INTO transactions_delete_log VALUES ('" + str(obj["data"]) + "', Now() )"     line = sys.stdin.readline() | 
MySQL:
| 1 2 | mysql> delete from transactions where user_id = 2; Query OK, 1 row affected (0.00 sec) | 
Maxwell pipeline:
| 1 2 3 4 | $ ./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout --log_level=ERROR  | python trigger.py {"database":"test","table":"transactions","type":"delete","ts":1472942384,"xid":214395,"commit":true,"data":{"id":2,"user_id":2,"value":2,"last_updated":"2016-09-03 22:39:31"}} INSERT INTO transactions_delete_log VALUES ('{u'last_updated': u'2016-09-03 22:39:31', u'user_id': 2, u'id': 2, u'value': 2}', Now() ) | 
Maxwell limitations
Maxwell was designed for MySQL 5.6 with ROW-based replication. Although it can work with MySQL 5.7, it does not support new MySQL 5.7 data types (i.e., JSON fields). Maxwell does not support GTID, and can’t failover based on GTID (it can parse events with GTID thou).
Conclusion
Streaming MySQL binary logs (for example with Maxwell application) can help to implement CDC for auditing and other purposes, and also implement asynchronous triggers (removing the MySQL level triggers can increase MySQL performance).
 
 
 
 
						 
						 
						 
						 
						 
						
Just a reminder that FlexCDC is pluggable. So you can just write a short little bit of PHP code to do whatever trigger events you want, or even to replicate the changes to another RDBMS.
Here is an example (work-in-progress) replication plugin to replicate from MySQL into a MongoDB database that has been set up from a MySQL dump:
https://github.com/greenlion/swanhart-tools/blob/mongo/flexviews/consumer/include/mongo.php
I am getting error as mysqlx module is missing. Please help me with that.
Hi
I am seeing issue with stream import to mongodb. maxwell seems to be working fine. but with pipe it seems to not tranfering the data.
./bin/maxwell –user=’root’ –password=’****’ –host=’*****’ –producer=stdout –log_level=ERROR| mongoimport -d mysqlcdc -c mysqlcdc
2017-07-19T14:45:51.004-0700 connected to: localhost
Wed Jul 19 14:45:52 PDT 2017 WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Wed Jul 19 14:45:53 PDT 2017 WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Wed Jul 19 14:45:53 PDT 2017 WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Wed Jul 19 14:45:53 PDT 2017 WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2017-07-19T14:45:54.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:45:57.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:00.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:03.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:06.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:09.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:12.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:15.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:18.002-0700 mysqlcdc.mysqlcdc 0B
2017-07-19T14:46:21.002-0700 mysqlcdc.mysqlcdc 109B
2017-07-19T14:46:24.002-0700 mysqlcdc.mysqlcdc 109B
2017-07-19T14:46:27.002-0700 mysqlcdc.mysqlcdc 109B
Can you please suggest.
@Alexander Rubin, Can you please suggest on my last message ?
Hi Alexander Rubin, I am getting below error while running maxwell with your node.js script. Can you help me on this.
[root@hostname04]# ./bin/maxwell –user=’maxwell’ –password=’m@xW#ll’ –host=’127.0.0.1′ –producer=stdout –log_level=ERROR | node ./maxwell_to_mysql.js
/maxwell_to_mysql.js:17
process.stdin.on(‘readable’, () => {
^
SyntaxError: Unexpected token )
at Module._compile (module.js:439:25)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Function.Module.runMain (module.js:497:10)
at startup (node.js:119:16)
at node.js:945:3