DEV Community

Abhishek Gupta
Abhishek Gupta

Posted on

Tutorial: How to use MongoDB Change Streams [Part 2]

This is the second part of a blog series that covers MongoDB Change Streams and how it can be used with Azure Cosmos DB which has wire protocol support for MongoDB server version 3.6 (including the Change Streams feature). Part 1 covered the introduction, overview of the Change streams processor service and walked you through how to run the application so that you can witness Changes streams in action.

In this instalment, we will go over the code and see how things work behind the scenes.

As always, the code is available on GitHub

Overview: Change processor app

Before moving on, here is a quick refresher of the Change Processor service (copied over from part 1):

The application is a change processor service that uses the Change stream feature. It's a Go application that uses the official MongoDB Go driver but the concepts should be applicable to any other language whose native driver supports Change Streams.

It uses the Watch API to subscribe to the change events feed in a specific Collection so that it is notified of documents being created, updated and deleted. It extracts the relevant information from the change event payload i.e. the document which was affected and saves it locally to a file. It also demonstrates how to use Resume Tokens to save processing progress.

Code walkthrough

Here is the project layout: the main.go file consists of most of the logic around subscribing and processing change streams while token/resume_token.go handles saving/retrieving resume tokens.

.
├── go.mod
├── go.sum
├── main.go
└── token
    └── resume_token.go
Enter fullscreen mode Exit fullscreen mode

The application starts by connecting to the Azure Cosmos DB MongoDB API and bails out if that fails for some reason. If the connectivity succeeds, we get a handle to the MongoDB Collection we want to watch

    client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI))
    if err != nil {
        log.Fatal("failed to create client: ", err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    err = client.Connect(ctx)
    if err != nil {
        log.Fatal("failed to connect", err)
    }

  coll := client.Database(mongoDBName).Collection(mongoCollectionName)
    defer func() {
        err = client.Disconnect(context.Background())
        if err != nil {
            fmt.Println("failed to close connection")
        }
    }()
Enter fullscreen mode Exit fullscreen mode

It's time to start "tracking" the collection for change events. Notice the pipeline and opts arguments

cs, err := coll.Watch(ctx, pipeline, opts)
Enter fullscreen mode Exit fullscreen mode

pipeline is a *mongo.Pipeline. We specify a couple of stages as a part of the Pipeline - $match and $project along with the fullDocument option

At the time of writing, these options are mandatory due to constraints specific to Azure Cosmos DB.

This is how these are manifested in the code:

    matchStage := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace"}}}}}}}
    //matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
    projectStage := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
    pipeline := mongo.Pipeline{matchStage, projectStage}
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
Enter fullscreen mode Exit fullscreen mode

You have a choice of using the resume token (specified by an environment variable - more on this in the next section). The resume token is set as an *options.ChangeStreamOptions using the ResumeAfter field. The resume token (if present) is stored locally in a file named token - we check for the token before starting our change stream and use it if it's present

    if resumeSupported {
        t, err := token.RetrieveToken()
        if err != nil {
            log.Fatal("failed to fetch resume token: ", err)
        }
        if t != nil {
            opts.SetResumeAfter(t)
        }
    }
Enter fullscreen mode Exit fullscreen mode

As mentioned earlier, token operations (save and retrieve) are part of token/resume_token.go. Here is how an existing token is fetched:

func RetrieveToken() (bson.Raw, error) {
    tf, err := os.Open(tokenFileName)
    if err != nil {
        if os.IsNotExist(err) {
            return nil, nil
        }
        return nil, err
    }
    token, err := bson.NewFromIOReader(tf)
    if err != nil {
        return nil, err
    }
    return token, nil
}
Enter fullscreen mode Exit fullscreen mode

We create the output file to store the change events and start watching the collection

  op, err := os.OpenFile(outputfileName,
        os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  ...

    cs, err := coll.Watch(ctx, pipeline, opts)
    if err != nil {
        log.Fatal("failed to start change stream watch: ", err)
    }

Enter fullscreen mode Exit fullscreen mode

A different goroutine is used to process the change events

    go func() {
        fmt.Println("started change stream...")
        for cs.Next(ctx) {
            re := cs.Current.Index(1)
            _, err := op.WriteString(re.Value().String() + "\n")
            if err != nil {
                fmt.Println("failed to save change event", err)
            }
        }
    }()
Enter fullscreen mode Exit fullscreen mode

The goroutine is the heart of this (rather simple) processing service and runs as a tight for loop which uses Next to fetch the change event. Since Next is a blocking call and we need to think about clean/graceful exit mechanism. The ctx argument which is passed into Next plays an important role - it is a cancellable context (more in how this is used in just a bit)

ctx, cancel := context.WithCancel(context.Background())
Enter fullscreen mode Exit fullscreen mode

An exit channel is used to detect program termination (e.g. ctrl+c)

exit := make(chan os.Signal)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

While the change stream detection and processing is running in a separate goroutine, the main goroutine is blocked using the exit channel

    <-exit
    fmt.Println("exit signalled. cancelling context")
    cancel()
    if resumeSupported {
        token.SaveToken(cs.ResumeToken())
    }
Enter fullscreen mode Exit fullscreen mode

It does a couple of things:

Invokes cancel() which is nothing but the Cancel function returned as a by-product of creating the cancellable Context (which is passed into Next). Calling the CancelFunc propagates program termination to the change stream for loop, allowing it to exit.

Please note that I have used user-initiated (manual) program termination (e.g. pressing ctrl+c) as an example. The same concept applies if the processor service was executing as a long-running server component

Another key bit is to save the Resume token to the local file:

if resumeSupported {
  token.SaveToken(cs.ResumeToken())
}
Enter fullscreen mode Exit fullscreen mode

Here is how the resume token is saved to a local file:

func SaveToken(token []byte) {

    if len(token) == 0 {
        return
    }
    tf, err := os.Create(tokenFileName)

    if err != nil {
        fmt.Println("token file creation failed", err)
        return
    }
    _, err = tf.Write(token)
    if err != nil {
        fmt.Println("failed to save token", err)
        return
    }
    fmt.Println("saved token to file")
}
Enter fullscreen mode Exit fullscreen mode

That concludes the two-part series on MongoDB change streams. I hope this was useful and helped understand the feature with the help of a practical example!

Top comments (1)

Collapse
 
dingzhanjun profile image
John Ding

I got this error when running the program: Stage $changeStream is not supported yet in native pipeline