Real-Time Replication From DynamoDB to MongoDB

Replication From DynamoDB to MongoDBRecently I’ve been faced with an interesting challenge. How can I replicate data, in real-time, from DynamoDB to MongoDB?

Why would I need real-time replication like that? For example:

  • Running on MongoDB different queries relying on different indexes
  • Having on MongoDB more fields or converted fields (you can do it during the replication) so you can use other applications designed for MongoDB
  • Doing a data migration of a large dataset. During the massive copy of the existing documents, the live replication is needed in order to avoid stopping the applications for a long time.

If you need to migrate data from DynamoDB to MongoDB, there are tools and solutions available on the internet, but basically all of them require stopping the application for some time. There are not a lot of solutions for real-time replication, however.

In this article, I’ll show the solution I have implemented to achieve that.

I’ll show you how to deploy a Replicator for a single DynamoDB table. You can scale to more tables by deploying more Replicators the same way.

DynamoDB stores documents using JSON, the same as MongoDB. This simplifies the development of the Replicator’s code, but the documents still need some little adjustments to be fully compatible.

Let’s now create a Proof of Concept to Test Replication From DynamoDB to MongoDB

Note: for the test, we assume you have already an AWS account with proper rights for creating and managing all the features we’ll use. Also, we assume you have the aws CLI (Command Line Interface) installed and you have knowledge of the AWS basics about networking, IAM roles, firewall rules, EC2 creation, and so on. Also, we assume you have already a MongoDB server installed into an EC2 instance.

What We Need

The solution uses the following technologies:

  • A DynamoDB Table: we’ll create a very simple table with just a few fields.
  • DynamoDB Stream: it is an ordered flow of information about changes to items in a DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table. Let’s say it is a sort of oplog for MongoDB or binlog for MySQL. Just remember that only the last 24 hours of events are available on the stream. As a consequence of this limitation, the replication process has to catch-up faster than the write on the table.
  • A Lambda function: it is a compute service that lets you run code without provisioning or managing servers. AWS Lambda executes your code only when needed and scales automatically, from a few requests per day to thousands per second. AWS Lambda runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, code monitoring, and logging. All you need to do is deploying your code in one of the languages that AWS Lambda supports. We’ll use Python for the test, but you can also use NodeJS, Java, .Net, Go, or Ruby.
  • A MongoDB instance: we’ll create a simple MongoDB standalone server on an EC2 machine. The target database could be also a replica set or a sharded cluster with a lot of shards and mongos nodes. For such cases, we’d need only changing the connection string.

The Replication Schema

A DynamoDB stream consists of stream records. Each stream record represents a single data modification in the DynamoDB table to which the stream belongs. Each stream record is assigned a sequence number, reflecting the order in which the record was published to the stream.

Stream records are organized into groups or shards. Each shard acts as a container for multiple stream records. The stream records within a shard are removed automatically after 24 hours.

Any single item of the stream has a type: INSERT, MODIFY or REMOVE, and it can contain the images of the new and old document. An image is the complete JSON document. In the case of INSERT, only the new image is present on the stream. In the case of REMOVE, the old image of the document is available on the stream. In the case of MODIFY, both images can be available on the stream.

The Lambda function is automatically triggered when connected to a source stream like the DynamoDB Stream. As soon as new events are available on the stream, the Lambda function is triggered, AWS creates a container and executes the code associated with the function. The number of events processed by any Lambda container is managed by AWS. We have the capability to set some configuration parameters, but we cannot get complete control of it. AWS manages automatically the scaling of Lambda providing all the resources it needs. In case of a very high write load, we can see hundreds or thousands of Lambda invocations per second.

The Lambda function reads the events from the stream, applies some conversions, and connects to MongoDB to execute the operations:

  • A delete_one in case of REMOVE event
  • An update_one using upsert=true. With upsert=true we can manage at the same time both INSERT and MODIFY operations.

When Lambda finishes computing a batch of events, the container is not immediately destroyed. In fact, a container can be reused by other incoming Lambda invocations. This is an optimization that can help in case of a massive load. As a consequence, you can see thousands of Lambda invocations but fewer running instances.

The following picture summarizes the idea of the solution.

Replication From DynamoDB to MongoDB

 

Note: for the sake of simplicity we are using a MongoDB server on an EC2 machine on AWS. But you can eventually replicate to a MongoDB deployed elsewhere, on another cloud platform like Google Cloud for example, or in your on-premise data center. Just remember in this case to open firewall rules, setting up ssh tunnels to encrypt connections or do whatever is needed to enable proper connectivity to the remote MongoDB ports. This is not covered on this article.

Create a DynamoDB Table

Be sure you have aws CLI installed and your credentials stored in the file ~/.aws/credentials.

 

Create a DynamoDB table named mytable.

We’ll store very simple documents with 3 fields:

  • id: a unique integer number we use as the primary key
  • name: a string containing a person’s name
  • created_at: a numeric field containing the item’s creation Unix epoch

 

Create the Execution Role for Lambda

A Lambda function requires a role with specific policies in order to be executed and accessing other AWS resources.

In the IAM dashboard you can create the role lambda-dynamodb-role and assign to it the policies you can see in the following picture:

Execution Role for Lambda

 

The Python Code of the Function

Let’s create the following file with the name replicator.py. This will be the code executed by Lambda at each invocation.

Analysis of the code:

  • Line 14: at the time you create a Lambda function, you specify a handler, which is a function in your code, that AWS Lambda can invoke when the service executes your code. AWS Lambda uses the event parameter to pass in event data to the handler. AWS Lambda uses the context parameter to provide runtime information to your handler
  • Lines 16-24: we have decided to set env variables on the Lambda creation. This can give you more flexibility if you need to deploy multiple Lambda functions for many DynamoDB Streams.
  • Line 28: we create explicitly a session to execute all the writes of the Lambda invocation. This way we can minimize the impact on MongoDB resources in particular in the case of very high load. Since sessions have been introduced on MongoDB 3.6, if you are using an older version drop this line together with line 69 and drop session=session in update_one and delete_one
  • Line 30: for loop to scan one by one all the items of the batch
  • Line 32: the sub-document dynamodb contains the data about the primary key and the full new image of the document (in case of INSERT), the full old image of the document (in case of REMOVE) or both new and old images (in case of MODIFY)
  • Line 34: check the operation type
  • Lines 36-37: extraction of the new image of the document. We need to use the json.loads() function in order to convert the JSON document into a valid one for MongoDB
  • Line 40: the DynamoDB documents don’t have the _id field as the primary key. This is needed by MongoDB instead. In this case, we would like to use the existing primary key id and create the new field _id.
  • Lines 42-48: if you need to do any kind of conversion on the data during the replication you can put your custom code here. In the example, we add a new age field if it is missing on source document and we convert the created_at field from epoch to ISODate. You can do more transformations if you want. Or you can comment these lines if you don’t need any conversion.
  • Lines 50-55: execution of update_one to insert or update the document on MongoDB. In case of an error, the error string is written on the logs
  • Lines 57-67: execution of delete_one if the operation is REMOVE
  • Lines 73-85: here is the end of the function. We need to return a status code to Lambda. We have decided to return the status code 500 in case not all the events have been processed correctly. The status code 200 (success) is returned only if all the records have been processed correctly. Looking at error messages on the logs you may decide what to do with missing replication events.

Further notes:

  • In case your target MongoDB is a sharded cluster instead of a standalone or a replica set, you need to include also a condition on the shard key into update_one and delete_one
  • The print() results are written into the logs you can access from CloudWatch

As a reference, see below how an event of the DynamoDB Stream looks like: