Change Streams in MongoDB 3.6

No Comments

MongoDB 3.6 introduces an interesting API enhancement called change streams. With change streams you can watch for changes to certain collections by means of the driver API. This feature replaces all the custom oplog watcher implementations out there, including the one I used in the article on Near-Realtime Analytics with MongoDB.

For a start, we need to install MongoDB 3.6.0 or a higher version. After setting up a minimal replica set we connect to the primary and set the feature compatibility to 3.6 be able to use change streams (this will hopefully be the default in future version):

use admin
db.adminCommand( { setFeatureCompatibilityVersion: "3.6" } )

I will use the Java driver for the following examples. In order the be able to watch for documents changes, we write a simple program the inserts a bunch of document to a collection test.events:

MongoCollection<Document> eventCollection =
    new MongoClient(
        new MongoClientURI("mongodb://localhost:27001,localhost:27002,localhost:27003/test?replicatSet=demo-dev")
    ).getDatabase("test").getCollection("events");
 
long i = 0;
while (true) {
  Document doc = new Document();
  doc.put("i", i++);
  doc.put("even", i%2);
  eventCollection.insertOne(doc);
  System.out.println("inserted: " + doc);
  Thread.sleep(2000L + (long)(1000*Math.random()));
}

The output of this Java process looks like this:

inserted: Document{{i=1, even=0, _id=5a31187a21d65707e8282fa7}}
inserted: Document{{i=2, even=1, _id=5a31187d21d65707e8282fa8}}
inserted: Document{{i=3, even=0, _id=5a31187f21d65707e8282fa9}}
inserted: Document{{i=4, even=1, _id=5a31188121d65707e8282faa}}
...

In another Java process, we use the same code for retrieving the collection. On that collection we call a method watch with takes a list of aggregation stages, just like the aggregate operation:

ChangeStreamIterable<Document> changes = eventCollection.watch(asList(
    Aggregates.match( and( asList(
      in("operationType", asList("insert")),
      eq("fullDocument.even", 1L)))
  )));

We register only for insert operations on the collection and additionally filter for documents with the field even being equal to 1.

Change Streams in MongoDB 3.6

When we iterate over the cursor we just print out the matching documents:

changes.forEach(new Block<ChangeStreamDocument<Document>>() {
  @Override
  public void apply(ChangeStreamDocument<Document> t) {
    System.out.println("received: " + t.getFullDocument());
    }
});

The result looks like this:

received: Document{{_id=5a311e2021d657082268f38a, i=2, even=1}}
received: Document{{_id=5a311e2521d657082268f38c, i=4, even=1}}
received: Document{{_id=5a311e2a21d657082268f38e, i=6, even=1}}
...

With change streams, the MongoDB API grows even wider. Now you can quite easily build things that resemble triggers you know from traditional databases. There is no need for external event processing or your own oplog watcher implementation anymore.

The full source code can be found at GitHub.

Comment

Your email address will not be published. Required fields are marked *