MongoDB ChangeStreams - Filtering and Reshaping
- Rishaab
- Jun 7, 2021
- 2 min read
Updated: Jul 20, 2023
In Part 1 of the series, we saw how to keep a watch on a collection and receive events. This means, that the subscriber receives all the changes that occur on a particular collection or a database. This could be overwhelming for the subscriber if the producer's workload is write-intensive. The subscriber in this case will also receive events that it not does care for. This would incur a lot of unnecessary traffic flow from MongoDB to the subscriber, which could negatively affect the overall performance.
Consider the case, where the client application is only interested in receiving the update operationType on a collection, and does not care about inserts or deletion. How can we achieve this? Well, with change streams we could specify a custom aggregation pipeline the would perform filtering.
The syntax for this looks like this
var cursor = db.test.watch([<user pipeline>])
Filtering
For illustration purposes, we will keep a watch on the test collection for only the update operationType. We will achieve this by creating a pipeline with a $match stage.
Let's see how to achieve this.
The below syntax would watch on the collection by filtering out all events except for the ones with the operationType as "update".
var cursor = db.test.watch([{$match: {operationType: "update"}}])
Next, insert a document and then update its field.
db.test.insert({_id: 0, sample:"test"})
db.test.update({_id: 0}, {$set: {sample: "test 2"}})
Poll for events
cursor.next()
Voila! we can see that we only get only the update event.
{
"_id" : {
"_data" : "8260BD54C8000000012B022C0100296E5A100478C19FC0B29344D88E8C9784C6494D3E461E5F696400290004",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1623020744, 1),
"ns" : {
"db" : "test",
"coll" : "test"
},
"documentKey" : {
"_id" : 0
},
"updateDescription" : {
"updatedFields" : {
"sample" : "test 2"
},
"removedFields" : [ ]
}
}
Invoking cursor.getNext() must return false, meaning there are no further events to be dispatched.
Reshaping
What if we want to avoid all fields but the documentKey in events, how would we achieve this? We could reshape the events by using a $project stage in our custom pipeline.
Our new change stream pipeline will look like this
var cursor = db.test.watch([{$match: {operationType: "update"}}, {$project: {documentKey: 1}}])
Now, let's update the previous document to generate an event.
db.test.update({_id: 0}, {$set: {sample: "test 3"}})
cursor.next() now displays only the documentKey and _id field.
{
"_id" : {
"_data" : "8260BD58F8000000012B022C0100296E5A100478C19FC0B29344D88E8C9784C6494D3E461E5F696400290004",
"_typeBits" : BinData(0,"QA==")
},
"documentKey" : {
"_id" : 0
}
}
Note that _id field acts as a resumeToken (later in the series) which cannot be suppressed in change events.
In the next chapter, we will see how to resume a change streams from a certain event.
Comments