MongoDB ChangeStreams - Introduction
- Rishaab
- Jun 5, 2021
- 2 min read
Updated: Jul 20, 2023
MongoDB 3.6 introduced a new feature called change streams. The change streams is a very powerful feature which allows applications to receive near real-time events. An event is simply a change made to a collection or a database in MongoDB. For example, an insert, update or delete to a collection will generate a corresponding event in the change streams. The client application subscribes to these events by keeping a watch on a collection or the whole database to receive such events.
The eventful nature of change streams provides scope for a number of use cases. A fraud detection system might use change streams to get notifications on any suspicious activities. For IoT, devices measurements could be observed for any unusual reading. The possibilities are endless but you get a sense of it.
Setting up
The change streams only run either in the replica set or the sharded cluster.
For illustration purposes, I will run change streams in a replica set. Assuming you have the newer version of MongoDB installed.
For this demonstration, I am using the following configuration for the replica set with the file name shard.conf. You can change the configuration to your needs.
storage:
dbPath: /data/shard
replication:
replSetName: shard
sharding:
clusterRole: shardsvr
net:
bindIp: localhost
port: 20001
For detailed instructions on setting up the server, you below links
Provided you have the mongod and the mongo binary path correctly set up, follow the below steps.
Start the server,
mongod -f shard.conf
Connect to the Mongo shell,
mongo --port 20001
Initiate the replica set in with the shell using the below command. The shell prompt should show shard:PRIMARY
rs.initiate()
Now watch for the test collection. This will enable change streams on this collection.
let cursor = db.test.watch()
Insert a document into the test collection
db.test.insert({sample: "test"})
Poll on the cursor to receive the change event
cursor.next()
You should see a similar kind of event.
{
"_id" : {
"_data" : "8260BB5717000000022B022C0100296E5A1004CBD9F06FDB214B63AF909E9CF03F34BE46645F6964006460BB571785B57FD58C40A1680004"
},
"operationType" : "insert",
"clusterTime" : Timestamp(1622890263, 2),
"fullDocument" : {
"_id" : ObjectId("60bb571785b57fd58c40a168"),
"sample" : "test"
},
"ns" : {
"db" : "test",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("60bb571785b57fd58c40a168")
}
}
Following is a quick interpretation of the output. We will go into more detail in the coming topics.
_id: Id of the event. This also acts as a token to resume the change streams (more about this later in the course).
operationType: The type of operation done on the collection or the database. We inserted a document, hence the insert type in our case.
clusterTime: The timestamp when an operation happened. In our case, this is the timestamp when the insert happened.
fullDocument: Contains all the fields in a document. For insert operation, this contains the inserted document. For update, this field will not be present by default (more about this later in the course).
ns: Namespace of the collection or database being watched. In our case, both the db and the collection name were test.
documentKey: The document for which this event was generated.
It's always recommended to call hasNext() before calling next() on the cursor to check if there's any new event available.
if (cursor.hasNext()) {
newEvent = cursor.next();
}
In the next chapter, we will see how to filter and transform documents with the change streams.
コメント