loading...

Tutorial: How to use MongoDB Change Streams [Part 2]

abhirockzz profile image Abhishek Gupta ・5 min read

This is the second part of a blog series that covers MongoDB Change Streams and how it can be used with Azure Cosmos DB which has wire protocol support for MongoDB server version 3.6 (including the Change Streams feature). Part 1 covered the introduction, overview of the Change streams processor service and walked you through how to run the application so that you can witness Changes streams in action.

In this instalment, we will go over the code and see how things work behind the scenes.

As always, the code is available on GitHub

Overview: Change processor app

Before moving on, here is a quick refresher of the Change Processor service (copied over from part 1):

The application is a change processor service that uses the Change stream feature. It's a Go application that uses the official MongoDB Go driver but the concepts should be applicable to any other language whose native driver supports Change Streams.

It uses the Watch API to subscribe to the change events feed in a specific Collection so that it is notified of documents being created, updated and deleted. It extracts the relevant information from the change event payload i.e. the document which was affected and saves it locally to a file. It also demonstrates how to use Resume Tokens to save processing progress.

Code walkthrough

Here is the project layout: the main.go file consists of most of the logic around subscribing and processing change streams while token/resume_token.go handles saving/retrieving resume tokens.

.
├── go.mod
├── go.sum
├── main.go
└── token
    └── resume_token.go

The application starts by connecting to the Azure Cosmos DB MongoDB API and bails out if that fails for some reason. If the connectivity succeeds, we get a handle to the MongoDB Collection we want to watch

    client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI))
    if err != nil {
        log.Fatal("failed to create client: ", err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    err = client.Connect(ctx)
    if err != nil {
        log.Fatal("failed to connect", err)
    }

  coll := client.Database(mongoDBName).Collection(mongoCollectionName)
    defer func() {
        err = client.Disconnect(context.Background())
        if err != nil {
            fmt.Println("failed to close connection")
        }
    }()

It's time to start "tracking" the collection for change events. Notice the pipeline and opts arguments

cs, err := coll.Watch(ctx, pipeline, opts)

pipeline is a *mongo.Pipeline. We specify a couple of stages as a part of the Pipeline - $match and $project along with the fullDocument option

At the time of writing, these options are mandatory due to constraints specific to Azure Cosmos DB.

This is how these are manifested in the code:

    matchStage := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace"}}}}}}}
    //matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
    projectStage := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
    pipeline := mongo.Pipeline{matchStage, projectStage}
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)

You have a choice of using the resume token (specified by an environment variable - more on this in the next section). The resume token is set as an *options.ChangeStreamOptions using the ResumeAfter field. The resume token (if present) is stored locally in a file named token - we check for the token before starting our change stream and use it if it's present

    if resumeSupported {
        t, err := token.RetrieveToken()
        if err != nil {
            log.Fatal("failed to fetch resume token: ", err)
        }
        if t != nil {
            opts.SetResumeAfter(t)
        }
    }

As mentioned earlier, token operations (save and retrieve) are part of token/resume_token.go. Here is how an existing token is fetched:

func RetrieveToken() (bson.Raw, error) {
    tf, err := os.Open(tokenFileName)
    if err != nil {
        if os.IsNotExist(err) {
            return nil, nil
        }
        return nil, err
    }
    token, err := bson.NewFromIOReader(tf)
    if err != nil {
        return nil, err
    }
    return token, nil
}

We create the output file to store the change events and start watching the collection

  op, err := os.OpenFile(outputfileName,
        os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  ...

    cs, err := coll.Watch(ctx, pipeline, opts)
    if err != nil {
        log.Fatal("failed to start change stream watch: ", err)
    }

A different goroutine is used to process the change events

    go func() {
        fmt.Println("started change stream...")
        for cs.Next(ctx) {
            re := cs.Current.Index(1)
            _, err := op.WriteString(re.Value().String() + "\n")
            if err != nil {
                fmt.Println("failed to save change event", err)
            }
        }
    }()

The goroutine is the heart of this (rather simple) processing service and runs as a tight for loop which uses Next to fetch the change event. Since Next is a blocking call and we need to think about clean/graceful exit mechanism. The ctx argument which is passed into Next plays an important role - it is a cancellable context (more in how this is used in just a bit)

ctx, cancel := context.WithCancel(context.Background())

An exit channel is used to detect program termination (e.g. ctrl+c)

exit := make(chan os.Signal)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)

While the change stream detection and processing is running in a separate goroutine, the main goroutine is blocked using the exit channel

    <-exit
    fmt.Println("exit signalled. cancelling context")
    cancel()
    if resumeSupported {
        token.SaveToken(cs.ResumeToken())
    }

It does a couple of things:

Invokes cancel() which is nothing but the Cancel function returned as a by-product of creating the cancellable Context (which is passed into Next). Calling the CancelFunc propagates program termination to the change stream for loop, allowing it to exit.

Please note that I have used user-initiated (manual) program termination (e.g. pressing ctrl+c) as an example. The same concept applies if the processor service was executing as a long-running server component

Another key bit is to save the Resume token to the local file:

if resumeSupported {
  token.SaveToken(cs.ResumeToken())
}

Here is how the resume token is saved to a local file:

func SaveToken(token []byte) {

    if len(token) == 0 {
        return
    }
    tf, err := os.Create(tokenFileName)

    if err != nil {
        fmt.Println("token file creation failed", err)
        return
    }
    _, err = tf.Write(token)
    if err != nil {
        fmt.Println("failed to save token", err)
        return
    }
    fmt.Println("saved token to file")
}

That concludes the two-part series on MongoDB change streams. I hope this was useful and helped understand the feature with the help of a practical example!

Posted on by:

abhirockzz profile

Abhishek Gupta

@abhirockzz

Currently working with Kafka, Databases, Azure, Kubernetes and related open source projects | Confluent Community Catalyst (for Kafka)

Discussion

markdown guide