//

Change Streams in MongoDB 3.6

15.1.2018 | 2 minutes of reading time

MongoDB 3.6 introduces an interesting API enhancement called change streams. With change streams you can watch for changes to certain collections by means of the driver API. This feature replaces all the custom oplog watcher implementations out there, including the one I used in the article on Near-Realtime Analytics with MongoDB .

For a start, we need to install MongoDB 3.6.0 or a higher version. After setting up a minimal replica set we connect to the primary and set the feature compatibility to 3.6 be able to use change streams (this will hopefully be the default in future version):

1use admin
2db.adminCommand( { setFeatureCompatibilityVersion: "3.6" } )
3

I will use the Java driver for the following examples. In order the be able to watch for documents changes, we write a simple program the inserts a bunch of document to a collection test.events:

1MongoCollection<Document> eventCollection =
2    new MongoClient(
3        new MongoClientURI("mongodb://localhost:27001,localhost:27002,localhost:27003/test?replicatSet=demo-dev")
4    ).getDatabase("test").getCollection("events");
5 
6long i = 0;
7while (true) {
8  Document doc = new Document();
9  doc.put("i", i++);
10  doc.put("even", i%2);
11  eventCollection.insertOne(doc);
12  System.out.println("inserted: " + doc);
13  Thread.sleep(2000L + (long)(1000*Math.random()));
14}
15

The output of this Java process looks like this:

1inserted: Document{{i=1, even=0, _id=5a31187a21d65707e8282fa7}}
2inserted: Document{{i=2, even=1, _id=5a31187d21d65707e8282fa8}}
3inserted: Document{{i=3, even=0, _id=5a31187f21d65707e8282fa9}}
4inserted: Document{{i=4, even=1, _id=5a31188121d65707e8282faa}}
5...
6

In another Java process, we use the same code for retrieving the collection. On that collection we call a method watch with takes a list of aggregation stages, just like the aggregate operation:

1ChangeStreamIterable<Document> changes = eventCollection.watch(asList(
2    Aggregates.match( and( asList(
3      in("operationType", asList("insert")),
4      eq("fullDocument.even", 1L)))
5  )));
6

We register only for insert operations on the collection and additionally filter for documents with the field even being equal to 1.

When we iterate over the cursor we just print out the matching documents:

1changes.forEach(new Block<ChangeStreamDocument<Document>>() {
2  @Override
3  public void apply(ChangeStreamDocument<Document> t) {
4    System.out.println("received: " + t.getFullDocument());
5    }
6});
7

The result looks like this:

1received: Document{{_id=5a311e2021d657082268f38a, i=2, even=1}}
2received: Document{{_id=5a311e2521d657082268f38c, i=4, even=1}}
3received: Document{{_id=5a311e2a21d657082268f38e, i=6, even=1}}
4...
5

With change streams, the MongoDB API grows even wider. Now you can quite easily build things that resemble triggers you know from traditional databases. There is no need for external event processing or your own oplog watcher implementation anymore.

The full source code can be found at GitHub .

share post

Likes

0

//

More articles in this subject area\n

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen

Wir helfen Deinem Unternehmen

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.