In this blog post, we will explore MongoDB 3.6 change streams – a powerful new feature that added in the latest version of MongoDB.

Streaming data workflows are becoming increasingly popular in open-source due to their ability to allow efficient, asynchronous, near-real-time updates that benefit users becoming more and more accustomed to real-time experiences in technology.
Before the rise of streaming workflows and open-source streaming frameworks such as Akka, Spark and Storm, it was much more common to process data in what is called a “batch-processing” workflow. Here, potentially massive amounts of data are queried and processed in a large batch, often once or a few times daily. This processing style has the drawbacks of operating on data AFTER it was written/changed, the inefficiencies caused by querying large amounts of data in a single instance, not to mention the latency in receiving results when doing so.
Stream workflows benefit from the high-efficiency of processing at change-time (usually asynchronously) while also providing more up-to-date results as a free side effect. These benefits make this approach popular in “real-time” user-facing systems like social media, gaming and trading and even backend ETL systems.
Before MongoDB 3.6, the most common way to implement a “change stream”-like data system in MongoDB was by using tailable cursors to “watch” the MongoDB replication oplog.
The use of the oplog requires that you enable replication, whether or not you have a single node. The tailable cursor method is possible as the replication oplog is a queriable capped-collection in MongoDB, available to any MongoDB shell or driver.
The drawbacks to using the oplog as a change stream source are:
MongoDB 3.6 added “Change Streams”, handled via the new collection-level method named “db.collection.watch()”. This function opens a Change Stream Cursor on a given collection, returning changes to the collection in a predictable format called Change Events.
An example of a change returned by this new feature (due to an insert to “test.test” in another session):
|
1 |
rs0:PRIMARY> db.test.watch().pretty()<br>{<br> "_id" : {<br> "_data" : BinData(0,"glqW/CsAAAABRmRfaWQAZFqW/CukzAygJGLkVwBaEASV+CeIpHBBKKVaH0KcDV5OBA==")<br> },<br> "operationType" : "insert",<br> "fullDocument" : {<br> "_id" : ObjectId("5a96fc2ba4cc0ca02462e457"),<br> "x" : 1<br> },<br> "ns" : {<br> "db" : "test",<br> "coll" : "test"<br> },<br> "documentKey" : {<br> "_id" : ObjectId("5a96fc2ba4cc0ca02462e457")<br> }<br>}<br>... |
The “db.collection.watch()” function takes in an aggregation pipeline as it’s first optional field and a document of “options” as the second optional field. Passing no fields to the function causes it to perform no aggregation and use default options.
“db.collection.watch()” supports the following aggregation functions to be passed as an optional pipeline:
Similar to the pre-3.6 method described earlier, the change stream feature requires you enable replication, and the operation errors if it is not. If you run a standalone server, you can still enable replication with a single member only.
The benefits of the new feature are numerous:
By default, Change Streams will stop on error or if no changes occurred in the default timeout of 1000 milliseconds, this timeout could be overridden using the ‘maxAwaitTimeMS’ option to your db.collection.watch() operation.
This behavior means Change Streams sometimes need to be resumed from the last successful change. Resuming change streams from the last successful change can be done by passing the ‘_id’ of the last event read as the ‘resumeAfter’ option to your db.collection.watch() operation.
As change streams use MongoDB’s replication technologies behind the scenes, there are some things to consider in production:
In a sharded cluster, there are additional items to consider:
Let’s pretend we have an email system that uses MongoDB as the source of truth. A single email is a single MongoDB document in the collection: “email.emails”.
In our example, we must write an application that sends our emails (over SMTP) when the “sendNow” boolean field is set to “true” in the emails collection document.
When an email is ready to be sent, the application issues this update on a single ‘_id’ field:
|
1 |
> db.emails.update(<br> { "_id": ObjectId("5a97fdd4a4cc0ca02462e45c") },<br> { $set: { sendNow: true } }<br> )<br>WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) |
Using change streams, we can “watch” the emails collection for update operations that match our criteria!
Below I have created a change stream that uses an aggregation. The aggregation matches change events containing the “update” operationType and the field “sendNow” is set to “true” in the update. We store updated fields under the “updateDescription.updatedFields” sub-document in Change Events, so the full name for the “sendNow” field becomes: “updateDescription.updatedFields.sendNow”.
As our sender application only needs the document key to query an email for sending, I added a $project aggregation step to strip-down the result to only the Change Event “documentKey” plus the “_id” field that is returned by default.
The result is this operation:
|
1 |
> db.emails.watch([<br> { $match: {<br> "operationType": "update",<br> "updateDescription.updatedFields.sendNow": true <br> } },<br> { $project: {<br> documentKey: 1<br> } }<br> ]).pretty()<br>...<br>{<br> "_id" : {<br> "_data" : BinData(0,"glqYAA4AAAABRmRfaWQAZFqX/dSkzAygJGLkXABaEARxbXGgdm5K9ZnzwSfCfmNbBA==")<br> },<br> "documentKey" : {<br> "_id" : ObjectId("5a97fdd4a4cc0ca02462e45c")<br> }<br>}<br>... |
Now, when emails get marked as “sendNow” we have a stream of document keys that are ready to be sent immediately!
This makes a very intuitive and responsive workflow! In this case, our email sender now knows it can send the email with ‘_id’ of ObjectId(“5a97fdd4a4cc0ca02462e45c”)!
Often large infrastructures have several data-related components that require synchronization. Some examples are caching tiers (Redis, Memcache, etc.), search engines (Apache Solr, Elasticsearch, etc.) and backend analytics systems.
Change streams make it easy for systems other than MongoDB to “hook into” a real-time stream of events, making synchronization of several backends easy. Used correctly, using this feature can also remove or reduce dependencies/reliance on message queues.
Some ideas this brings to mind:
I hope this article gives you some ideas on how to use MongoDB 3.6 change streams, a powerful new feature!
MongoDB has undergone several important updates since we last left off, including adding support for multi-document ACID transactions. Our webinar, MongoDB 4.0 Features – Transactions & More, covers several new MongoDB 4.0 features, from transactions to non-blocking secondary reads.
MongoDB is document-based, scalable and was designed so you could easily alter your database schema. If you’re interested in learning if MongoDB is a good fit for your situation, our whitepaper Why Choose MongoDB? covers the important details you need to know to make an informed choice.
While you’re here, the introduction of JSON Schema Validator offers new techniques for enforcing the integrity of your data at the database level. For a rundown on how to use the validator and to better understand the importance of validation, see our post MongoDB: how to use the JSON Schema Validator.
Resources
RELATED POSTS