A couple of weeks ago, I talked about how Woovi uses MongoDB Change Streams to emit events in our event-driven architecture. Now, I'm going to talk about the repercussions of this kind of architectural decision and how much value lies solely in decision-making and good software development principles.
Principle
Principles help narrow down options when deciding how to implement a new feature or adopt a new technology for a feature..
At Woovi, we adopt an event-driven architecture to scale our platform. This means that in every design decision, we consider whether it aligns with this perspective.
Is there an organic event for that implementation, meaning the user produces some behavior that can be captured as an event? Or do we need to produce an event based on what we want to happen?
It has to process fast or is costly?
It needs to happen as fast as possible or can be delayed?
It needs to be atomic?
It needs to be idempotent?
We can take a different approach for each of those questions, but it is common for the process to be very similar for every implementation due to the event-driven principle.
The problem and decision
Our platform is very data heavy and we always wanted to offer the best search feature for our users, being able to find anything from a single search bar is our goal with this feature, for that we decided on using elastic search to power our search solution.
The first problem we encountered was, how to index all our data?
- First we need to be event driven, so no cronJob pooling all of our data from db and indexing.
- Second we need to be fast, as fast as data is created in our platform it needs to be searchable for the user. We deal with financial transactions, speed is essencial. This two points complement each other, if i need to be event driven i have to find a way to index data as data is created on our platform.
- Third, we can't create a point of stress on our system, so it needs to be lean. We can't tax our database with heavy queries in order to create events.
- And last we need to be resilient, we need to deliver all the data the user wants access to.
The solution was to create events from the database itself.
The publisher
A couple of weeks ago, we talked about this package, and it has matured a lot since then. Now, it consists of a changeStreamListen method that takes a mongoose Model for the change stream and a function that will receive the data from the stream. This approach allows us to scale by adding multiple methods inside this function for the same model and different types of streams.
import type {
ChangeStreamDeleteDocument,
ChangeStreamUpdateDocument,
ChangeStreamInsertDocument,
} from 'mongodb';
import type { Model } from 'mongoose';
import { changeStreamMiddleware } from './changeStreamMiddleware';
export type AllowedChangeStreams =
| ChangeStreamInsertDocument
| ChangeStreamUpdateDocument
| ChangeStreamDeleteDocument;
export type OperationType = AllowedChangeStreams['operationType'];
export type ChangeStreamData<T> = {
_id: Record<string, unknown>;
operationType: OperationType;
wallTime: Date;
fullDocument: T;
ns: Record<string, unknown>;
};
export const changeStreamListen = <T>(model: Model<T>, fn: (data: ChangeStreamData<T>) => void) => {
const stream = model.watch([], { fullDocument: 'updateLookup' });
stream.on('change', changeStreamMiddleware(model.collection.name, fn));
}
Notice the changeStreamMiddleware method encapsulating the model and the function? This is a Higher-Order Function (HoF) designed to attach APM (Application Performance Monitoring) and index transactions in our observability tools, reducing the risk of intrusion or failure for the processes being watched. More on that in a future article.
After that, we have setupSubscribers, which instantiates one subscriber for each model we want to watch and passes a function to handle the data from the events.
import { Charge } from '@woovi/charge';
import { Company } from '@woovi/company';
import { Customer } from '@woovi/customer';
import { PixTransaction } from '@woovi/transaction';
import { User } from '@woovi/user';
import { changeStreamListen } from './changeStreamListen';
import { chargeChangeStreamHandler } from '../charge/chargeChangeStreamHandler';
import { handleCompanySubscriberEvent } from '../company/handleCompanySubscriberEvent';
import { handleCustomerSubscriberEvent } from '../customer/handleCustomerSubscriberEvent';
import { handleTransactionSubscriberEvent } from '../transaction/handleTransactionSubscriberEvent';
import { handleUserSubscriberEvent } from '../user/handleUserSubscriberEvent';
export const setupSubscribers = () => {
changeStreamListen(Charge, chargeChangeStreamHandler);
changeStreamListen(PixTransaction, handleTransactionSubscriberEvent);
changeStreamListen(Customer, handleCustomerSubscriberEvent);
changeStreamListen(User, handleUserSubscriberEvent);
changeStreamListen(Company, handleCompanySubscriberEvent);
};
This implementation performs really well and delivers events very fast as soon as it hits consensus between replica members.
This allows us to achieve one of the goals for the search implementation: fast indexing times. It takes less than one second for a transaction or charge to be available in Elasticsearch from a stream event.
It is very resilient when compared with other event-driven implementations.
We use BullJs as our event bus and worker implementation. One concept we always consider while working with jobs is that a job can always fail. For that reason, each queue has its own retry and rate policies. The publisher is intended to be used for processes that require both speed and resilience in execution.
While using jobs, you always have to implement two sides of the processes: the creation of the event and the consuming part. By using the publisher, we can simplify the complexity of our processes and become even more resilient against unintended effects.
Benefits and Concerns of using Change Data Capture
Almost every database has some kind of event capture implementation. There are many ways in which you can achieve CDC (Change Data Capture), but log-based approaches are generally considered the better choice. Examples of log-based approaches include write-ahead logs in PostgreSQL, MySQL binary logs, or MongoDB oplog. These methods offer more benefits than simply using database triggers or even worse a query based approach. They are usually more reliable since they utilize the database driver to perform the necessary security checks before emitting an event. Additionally, log-based approaches have a low impact on database performance, especially in the case of MongoDB where it is close to zero. Moreover, these methods typically have a low cost of implementation and add minimal complexity.
On the other hand, the downsides of using this approach include the lack of information on how the change takes place and its context. Much of the logic is obscured by the database layer, making it inaccessible from the publisher's side and limiting the implementation from that perspective. Other important point is change streams are infinite; as long as you are watching for changes in the model, they will be continuously streamed. Hence, it becomes crucial to adhere to software development principles and good practices more seriously.
As every single event on your database is being streamed, every query you do as a result of that is even more important and has to be optimized with that in mind. It's quite trivial to create a loop on an update event, but those are easy to avoid. On the other hand, heavy and unnecessary queries are easy to overlook. So keep that in mind: simple and lean code is best for this kind of implementation.
Enabling more Products
While jobs are a great way to scale asynchronous processes, it is possible to have too many, very fast. This is a common problem in event-driven systems. Woovi has more than 100 jobs, and we manage well with a very mature codebase. However, the problem is that it does not scale forever. To move forward, we need more ways to scale events so we can implement more products for our users.
This implementation has enabled not just our search project (which is soon moving out of beta) but much more to come.
Now, we have a strong base to implement:
Mqtt integration
We are implementing Mqtt to integrate our Maquininha and solve an old problem with Windows machines. This will allow our customers to print QR codes with ease using any commercial thermal printer.PubSub
We are implementing real-time updates to charges and transactions in our platform. This way, customers can be notified live and track information in real time.Analytics
We are creating a complete analytics pipeline to offer data-driven insights to ourselves and our customers. This implementation uses the publisher and jobs to push data to multiple pipelines.
And much more to come.
We are shipping a lot every day. If you are interested in joining a fast-growing startup, we are hiring!
Photo by Possessed Photography on Unsplash
Top comments (0)