This is the second part of a blog series that covers MongoDB Change Streams and how it can be used with Azure Cosmos DB
which has wire protocol support for MongoDB server version 3.6 (including the Change Streams feature). Part 1 covered the introduction, overview of the Change streams processor service and walked you through how to run the application so that you can witness Changes streams in action.
In this instalment, we will go over the code and see how things work behind the scenes.
As always, the code is available on GitHub
Overview: Change processor app
Before moving on, here is a quick refresher of the Change Processor service (copied over from part 1):
The application is a change processor service that uses the Change stream feature. It's a Go
application that uses the official MongoDB Go driver but the concepts should be applicable to any other language whose native driver supports Change Streams.
It uses the Watch
API to subscribe to the change events feed in a specific Collection
so that it is notified of documents being created, updated and deleted. It extracts the relevant information from the change event payload i.e. the document which was affected and saves it locally to a file. It also demonstrates how to use Resume Tokens
to save processing progress.
Code walkthrough
Here is the project layout: the main.go
file consists of most of the logic around subscribing and processing change streams while token/resume_token.go
handles saving/retrieving resume tokens.
.
├── go.mod
├── go.sum
├── main.go
└── token
└── resume_token.go
The application starts by connecting to the Azure Cosmos DB MongoDB API and bails out if that fails for some reason. If the connectivity succeeds, we get a handle to the MongoDB Collection we want to watch
client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI))
if err != nil {
log.Fatal("failed to create client: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
err = client.Connect(ctx)
if err != nil {
log.Fatal("failed to connect", err)
}
coll := client.Database(mongoDBName).Collection(mongoCollectionName)
defer func() {
err = client.Disconnect(context.Background())
if err != nil {
fmt.Println("failed to close connection")
}
}()
It's time to start "tracking" the collection for change events. Notice the pipeline
and opts
arguments
cs, err := coll.Watch(ctx, pipeline, opts)
pipeline
is a *mongo.Pipeline
. We specify a couple of stages as a part of the Pipeline
- $match
and $project
along with the fullDocument
option
At the time of writing, these options are mandatory due to constraints specific to Azure Cosmos DB.
This is how these are manifested in the code:
matchStage := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace"}}}}}}}
//matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
projectStage := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
pipeline := mongo.Pipeline{matchStage, projectStage}
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
You have a choice of using the resume token (specified by an environment variable - more on this in the next section). The resume token is set as an *options.ChangeStreamOptions
using the ResumeAfter
field. The resume token (if present) is stored locally in a file named token
- we check for the token before starting our change stream and use it if it's present
if resumeSupported {
t, err := token.RetrieveToken()
if err != nil {
log.Fatal("failed to fetch resume token: ", err)
}
if t != nil {
opts.SetResumeAfter(t)
}
}
As mentioned earlier, token operations (save and retrieve) are part of token/resume_token.go
. Here is how an existing token is fetched:
func RetrieveToken() (bson.Raw, error) {
tf, err := os.Open(tokenFileName)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
token, err := bson.NewFromIOReader(tf)
if err != nil {
return nil, err
}
return token, nil
}
We create the output file to store the change events and start watching the collection
op, err := os.OpenFile(outputfileName,
os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
...
cs, err := coll.Watch(ctx, pipeline, opts)
if err != nil {
log.Fatal("failed to start change stream watch: ", err)
}
A different goroutine is used to process the change events
go func() {
fmt.Println("started change stream...")
for cs.Next(ctx) {
re := cs.Current.Index(1)
_, err := op.WriteString(re.Value().String() + "\n")
if err != nil {
fmt.Println("failed to save change event", err)
}
}
}()
The goroutine is the heart of this (rather simple) processing service and runs as a tight for
loop which uses Next
to fetch the change event. Since Next
is a blocking call and we need to think about clean/graceful exit mechanism. The ctx
argument which is passed into Next
plays an important role - it is a cancellable context
(more in how this is used in just a bit)
ctx, cancel := context.WithCancel(context.Background())
An exit
channel is used to detect program termination (e.g. ctrl+c
)
exit := make(chan os.Signal)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
While the change stream detection and processing is running in a separate goroutine, the main
goroutine is blocked using the exit
channel
<-exit
fmt.Println("exit signalled. cancelling context")
cancel()
if resumeSupported {
token.SaveToken(cs.ResumeToken())
}
It does a couple of things:
Invokes cancel()
which is nothing but the Cancel function
returned as a by-product of creating the cancellable
Context (which is passed into Next
). Calling the CancelFunc
propagates program termination to the change stream for loop, allowing it to exit.
Please note that I have used user-initiated (manual) program termination (e.g. pressing ctrl+c) as an example. The same concept applies if the processor service was executing as a long-running server component
Another key bit is to save the Resume token to the local file:
if resumeSupported {
token.SaveToken(cs.ResumeToken())
}
Here is how the resume token is saved to a local file:
func SaveToken(token []byte) {
if len(token) == 0 {
return
}
tf, err := os.Create(tokenFileName)
if err != nil {
fmt.Println("token file creation failed", err)
return
}
_, err = tf.Write(token)
if err != nil {
fmt.Println("failed to save token", err)
return
}
fmt.Println("saved token to file")
}
That concludes the two-part series on MongoDB change streams. I hope this was useful and helped understand the feature with the help of a practical example!
Top comments (1)
I got this error when running the program: Stage $changeStream is not supported yet in native pipeline