top of page

MongoDB ChangeStreams - Filtering and Reshaping

  • Writer: Rishaab
    Rishaab
  • Jun 7, 2021
  • 2 min read

Updated: Jul 20, 2023

In Part 1 of the series, we saw how to keep a watch on a collection and receive events. This means, that the subscriber receives all the changes that occur on a particular collection or a database. This could be overwhelming for the subscriber if the producer's workload is write-intensive. The subscriber in this case will also receive events that it not does care for. This would incur a lot of unnecessary traffic flow from MongoDB to the subscriber, which could negatively affect the overall performance.


Consider the case, where the client application is only interested in receiving the update operationType on a collection, and does not care about inserts or deletion. How can we achieve this? Well, with change streams we could specify a custom aggregation pipeline the would perform filtering.


The syntax for this looks like this

var cursor = db.test.watch([<user pipeline>])


Filtering

For illustration purposes, we will keep a watch on the test collection for only the update operationType. We will achieve this by creating a pipeline with a $match stage.


Let's see how to achieve this.


The below syntax would watch on the collection by filtering out all events except for the ones with the operationType as "update".

var cursor = db.test.watch([{$match: {operationType: "update"}}])

Next, insert a document and then update its field.

db.test.insert({_id: 0, sample:"test"})
db.test.update({_id: 0}, {$set: {sample: "test 2"}})

Poll for events

cursor.next()

Voila! we can see that we only get only the update event.

{
	"_id" : {
		"_data" : "8260BD54C8000000012B022C0100296E5A100478C19FC0B29344D88E8C9784C6494D3E461E5F696400290004",
		"_typeBits" : BinData(0,"QA==")
	},
	"operationType" : "update",
	"clusterTime" : Timestamp(1623020744, 1),
	"ns" : {
		"db" : "test",
		"coll" : "test"
	},
	"documentKey" : {
		"_id" : 0
	},
	"updateDescription" : {
		"updatedFields" : {
			"sample" : "test 2"
		},
		"removedFields" : [ ]
	}
}

Invoking cursor.getNext() must return false, meaning there are no further events to be dispatched.



Reshaping

What if we want to avoid all fields but the documentKey in events, how would we achieve this? We could reshape the events by using a $project stage in our custom pipeline.


Our new change stream pipeline will look like this

var cursor = db.test.watch([{$match: {operationType: "update"}}, {$project: {documentKey: 1}}])

Now, let's update the previous document to generate an event.

db.test.update({_id: 0}, {$set: {sample: "test 3"}})

cursor.next() now displays only the documentKey and _id field.

{
	"_id" : {
		"_data" : "8260BD58F8000000012B022C0100296E5A100478C19FC0B29344D88E8C9784C6494D3E461E5F696400290004",
		"_typeBits" : BinData(0,"QA==")
	},
	"documentKey" : {
		"_id" : 0
	}
}

Note that _id field acts as a resumeToken (later in the series) which cannot be suppressed in change events.



In the next chapter, we will see how to resume a change streams from a certain event.


Recent Posts

See All
BSON - Diving Deep

Background BSON stands for Binary JSON which is a serialization format for binary-encoding JSON-like documents. BSON was developed at...

 
 
 
MongoDB ChangeStreams - Resuming

Today we are going to explore a very powerful tool in the change streams called the resume token. For motivation purposes, consider, your...

 
 
 

Comments


Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page