DEV Community

Cover image for Robust MongoDB Change Stream implementation
Sibelius Seraphini for Woovi

Posted on • Edited on

10 1

Robust MongoDB Change Stream implementation

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog.

Change streams enable you to update other data sources like a search index in elastic search, or publish events to subscribers.

A naive implementation would listen to some collections, but won't handle the case your service that is listening crashes or has some downtime when doing a rollout deployment.

This article provides a more robust solution that will avoid making you lose any change stream event, even when your service crashes or get some downtime.

Robust Change Streams using a Resume Token

const stream = model.watch([], {
  fullDocument: 'updateLookup',
});

stream.on('change', (data) => {})
Enter fullscreen mode Exit fullscreen mode

A stream will emit a ChangeStreamData for each data change in your collection.

For each data change it will also provide a resumeToken, that will let you resume the processing from this point in time.

A resumeToken is like a cursor in cursor-based pagination.

The implementation to use a resume token is easy as that:

const changeStreamListen = async <T extends any>(
  model: Model<T>,
  fn: (data: ChangeStreamData<T>) => void,
) => {
  const name = model.collection.name;
  const key = `resumetoken:${name}`;
  const resumeToken = await RedisCache.get(key);

  const getResumeOptions = () => {
    if (resumeToken) {
      return {
        resumeAfter: resumeToken,
      };
    }

    return {};
  };

  const stream = model.watch([], {
    fullDocument: 'updateLookup',
    ...getResumeOptions(),
  });

  stream.on('change', changeStreamMiddleware(name, fn)).on('error', (err) => {
    // eslint-disable-next-line
    console.log('change error:', err);
    Sentry.setExtra('error', err);
    Sentry.captureException(err);
  });
};
Enter fullscreen mode Exit fullscreen mode

This implementation saves the resume token for a given collection in Redis, and tries to resume from it if available.

In Summary

Change Streams can simplify a lot your software architecture.
Providing a robust implementation of it is important to make sure you won't lose any data change.

References


Woovi
Woovi is a Startup that enables shoppers to pay as they like. Woovi provides instant payment solutions for merchants to accept orders to make this possible.

If you want to work with us, we are hiring!

Image of AssemblyAI tool

Challenge Submission: SpeechCraft - AI-Powered Speech Analysis for Better Communication

SpeechCraft is an advanced real-time speech analytics platform that transforms spoken words into actionable insights. Using cutting-edge AI technology from AssemblyAI, it provides instant transcription while analyzing multiple dimensions of speech performance.

Read full post

Top comments (0)

Billboard image

Imagine monitoring that's actually built for developers

Join Vercel, CrowdStrike, and thousands of other teams that trust Checkly to streamline monitor creation and configuration with Monitoring as Code.

Start Monitoring

👋 Kindness is contagious

Engage with a sea of insights in this enlightening article, highly esteemed within the encouraging DEV Community. Programmers of every skill level are invited to participate and enrich our shared knowledge.

A simple "thank you" can uplift someone's spirits. Express your appreciation in the comments section!

On DEV, sharing knowledge smooths our journey and strengthens our community bonds. Found this useful? A brief thank you to the author can mean a lot.

Okay