Striim 3.9.4 / 3.9.5 documentation

Replicating MongoDB data to Azure CosmosDB

To replicate one or many MongoDB collections to Cosmos DB, specify multiple collections in the Collections properties of MongoDBReader and CosmosDBWriter. You may use wildcards ($ for MongoDB, % for Cosmos DB) to replicate all collections in a database, as in the example below, or specify multiple collections manually, as described in the notes for CosmosDBWriter's Collections property.

You must create the target collections in Cosmos DB manually. The partition key names must match one of the fields in the MongoDB documents.

Data will be read only from collections that exist when the source starts. Additional collections added later will be ignored until the source is restarted. When the target collection is in a fixed container (see Partition and scale in Azure Cosmos DB), inserts, updates, and deletes are handled automatically. When the target collection is in an unlimited container, updates require special handling and deletes must be done manually, as discussed below.

If you wish to run the examples, adjust the MongoDBReader properties and CosmosDBWriter properties to reflect your own environment.

When the target collection is in a fixed container

Note

Writing to a target collection in a fixed container will not be possible until Microsoft fixes the bug discussed in this Azure forum discussion.

  1. In Cosmos DB, create database mydb containing the collection employee with partition key /name  (note that the collection and partition names are case-sensitive).

  2. In MongoDB, create the collection employee and populate it as follows:

    use mydb;
    db.employee.insertMany([
    {_id:1,"name":"employee1","company":"Striim","city":"Madras"},
    {_id:2,"name":"employee2","company":"Striim","city":"Seattle"},
    {_id:3,"name":"employee3","company":"Striim","city":"California"}
    ]);
  3. In Striim, run the following application to perform the initial load of the existing data:

    CREATE APPLICATION Mongo2CosmosInitialLoad; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
     
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream;
     
    END APPLICATION Mongo2CosmosInitialLoad;

    After the application is finished, the Cosmos DB employee collection should contain the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": "madras",
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": "\"0800b33d-0000-0000-0000-5bb5aafa0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": "seattle",
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": "\"2b00f87b-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": "\"2700ad2a-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    
  4. In Striim, run the following application to continuously replicate new data from MongoDB to Cosmos DB:

    CREATE APPLICATION Mongo2CosmosIncrementalFixedContainer; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      authType: 'NoAuth',
      Mode:'Incremental',
      startTimestamp: '<timestamp>'
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
    
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint:'<Cosmos DB connection string>',
      AccessKey:'<Cosmos DB account read-write key>',
      Collections:'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream ;
    
    CREATE CQ SelectDeleteOperations
    INSERT INTO DeleteOpsStream
    SELECT META(MongoDBStream,"DatabaseName"),
      META(MongoDBStream,"CollectionName"),
      META(MongoDBStream,"DocumentKey")
    FROM MongoDBStream
    WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";
    
    CREATE TARGET WriteIgnoredDeleteOps USING FileWriter (
      filename:'DeleteOperations.json'
    )
    FORMAT USING JSONFormatter()
    INPUT FROM DeleteOpsStream;
     
    END APPLICATION Mongo2CosmosIncrementalFixedContainer;
  5. In MongoDB, modify the employees collection as follows to add employee4:

    use mydb;
    db.employee.save({_id:4,"name":"employee4","company":"Striim","city":"Palo Alto"});
    db.employee.save({_id:1,"name":"employee1","company":"Striim","city":"Seattle"});
    db.employee.update({_id:2},{$set : {"city":"Palo Alto"}});
    db.employee.remove({_id:3});

    Within 30 seconds, those changes should be replicated to the corresponding Cosmos DB collection with results similar to the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": “Seattle”,
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": ""0800b33d-0000-0000-0000-5bb5aafa0000"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": “Palo Alto”,
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": ""2b00f87b-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 4,
        "name": "employee4”,
        "company": "striim",
        "city": “Palo Alto”,
        "id": “4.0”,
        "_rid": "HnpSALVXpu4BAAAAAAAAAE==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAE==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
When the target collection is in an unlimited container

When a Cosmos DB collection is in an unlimited container, it must have a partition key, which must be specified when you create the collection.

  • When MongoDB save operations create new documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can write to the correct partition.

  • When MongoDB save operations update existing documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can use the partition key and document ID to update the correct target document.

  • MongoDB update operations do not include all fields, so the partition key may be missing from MongoDBReader's output. In those cases, the PartialRecordPolicy open processor retrieves the missing fields from MongoDB and adds them before passing the data to CosmosDBWriter.

  • MongoDB remove operations include only the document ID, so the partition key is missing from MongoDBReader's output. Since CosmosDBWriter would be unable to determine the correct partition, the application writes the database name, collection name, and document key to a DeleteOps collection in CosmosDB.

incrementalMongo2Cosmos.png
CREATE APPLICATION Mongo2CosmosIncrementalUnlimitedContainer; 
 
CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'<MongoDB connection string>',
  authType: 'NoAuth',
  Mode:'Incremental',
  startTimestamp: '<timestamp>',
  Collections:'mydb.$'
 )
OUTPUT TO MongoDBStream;

CREATE STREAM FilteredMongoDBStream OF Global.JsonNodeEvent;

CREATE CQ ExcludeDeleteOperations
INSERT INTO FilteredMongoDBStream
SELECT META(MongoDBStream,"DatabaseName"),
  META(MongoDBStream,"CollectionName"),
  META(MongoDBStream,"DocumentKey")
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() != "DELETE";

CREATE STREAM FullDocstream OF Global.JsonNodeEvent;

CREATE OPEN PROCESSOR CompletePartialDocs USING PartialRecordPolicy ( 
  ConnectionURL:'<MongoDB connection string>', 
  authType:'NoAuth',
  OnMissingDocument: 'Process'
)
INSERT INTO FullDocstream
FROM FilteredMongoDBStream;

CREATE TARGET WriteToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.$,mydb.%',
  IgnorableExceptionCode:'PARTITION_KEY_NOT_FOUND'
)
INPUT FROM FullDocstream;

CREATE CQ SelectDeleteOperations
INSERT INTO DeleteOpsStream
SELECT TO_STRING(META(MongoDBStream,"DatabaseName")) AS DatabaseName,
  TO_STRING(META(MongoDBStream,"CollectionName")) AS CollectionName,
  TO_STRING(META(MongoDBStream,"DocumentKey")) AS DocumentKey
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";

CREATE TARGET WriteDeleteOpsToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.DeleteOps'
INPUT FROM DeleteOpsStream;
 
END APPLICATION Mongo2CosmosIncrementalUnlimitedContainer;