<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Memphis.dev</title>
    <description>The latest articles on DEV Community by Memphis.dev (@memphis_dev).</description>
    <link>https://dev.to/memphis_dev</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Forganization%2Fprofile_image%2F5970%2F150e2662-f962-46b8-9a8c-b42f4e536694.jpg</url>
      <title>DEV Community: Memphis.dev</title>
      <link>https://dev.to/memphis_dev</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/memphis_dev"/>
    <language>en</language>
    <item>
      <title>Ingesting Webhooks From Stripe – The Better Way</title>
      <dc:creator>Memphis.dev team</dc:creator>
      <pubDate>Thu, 18 Jan 2024 12:22:37 +0000</pubDate>
      <link>https://dev.to/memphis_dev/ingesting-webhooks-from-stripe-the-better-way-h58</link>
      <guid>https://dev.to/memphis_dev/ingesting-webhooks-from-stripe-the-better-way-h58</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Learn what webhooks are, and how you use them with Stripe to react to events quickly and in real-time with greater reliability using a streaming platform. Below, we’re answering these questions and more. This post will show you everything you need to know about webhooks, including what they are, how they work, examples, and how they can be improved using Memphis.&lt;/p&gt;

&lt;h2&gt;
  
  
  What are Webhooks?
&lt;/h2&gt;

&lt;p&gt;Imagine a world where information flows seamlessly between systems. In this world, there’s no need for constant browser refreshing or sending numerous requests for updates. Welcome to the domain of webhooks, where real-time communication glides with the rhythm of efficiency and automation.&lt;/p&gt;

&lt;p&gt;Webhooks stand out for their effectiveness for both the provider and the user. The main challenge with webhooks, however, is the complexity involved in their initial setup.&lt;/p&gt;

&lt;p&gt;Often likened to reverse APIs, webhooks offer something akin to an API specification, requiring you to craft an API for the webhook to interact with. When the webhook initiates an HTTP request to your application, usually through a POST method, your task is to interpret and handle this incoming data effectively.&lt;/p&gt;

&lt;h2&gt;
  
  
  The downsides of webhooks
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Push-based&lt;/strong&gt;:
Webhooks deliver or push events to your clients’ services, requiring them to handle the resulting back pressure. While understandable, this approach can impede your customers’ progress.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Implementing a server&lt;/strong&gt;:
For your client’s services to receive webhooks, they need a server that listens to incoming events. This involves managing CORS, middleware, opening ports, and securing network access, which adds extra load to their service by increasing overall memory consumption.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Retry&lt;/strong&gt;:
Services experience frequent crashes or unavailability due to various reasons. While some triggered webhooks might lead to insignificant events, others can result in critical issues, such as incomplete datasets where orders fail to be documented in CRM or new shipping instructions are not being processed. Hence, having a robust retry mechanism becomes crucial.
4 &lt;strong&gt;Persistent&lt;/strong&gt;:
Standard webhook systems generally lack event persistence for future audits and replays.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Replay&lt;/strong&gt;:
Similarly, it boils down to the user or developer experience you aim to provide. While setting up an endpoint for users to retrieve past events is feasible, it demands meticulous handling, intricate business logic, an extra database, and increased complexity for the client.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Throttling&lt;/strong&gt;:
Throttling is a technique used in computing and networking to control data flow, requests, or operations to prevent overwhelming a system or service. It limits the rate or quantity of incoming or outgoing data, recommendations, or actions. The primary challenge lies not in implementing throttling but in managing distinct access levels for various customers. Consider having an enterprise client with notably higher throughput needs compared to others. To accommodate this, you’d require a multi-tenant webhook system tailored to support diverse demands.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Why to use webhooks with Stripe
&lt;/h2&gt;

&lt;p&gt;When you’re piecing together Stripe integrations, it’s crucial to have your applications tuned in to live events from your Stripe accounts. This way, your backend systems are always ready to spring into action based on these events.&lt;/p&gt;

&lt;p&gt;To get this real-time event flow, you’ll need to set up webhook endpoints in your application. Once you’ve registered these endpoints, Stripe becomes your real-time informant, pushing event data directly to your application’s webhook endpoint as things happen in your Stripe account. Stripe uses HTTPS to deliver these events, packaging them as JSON payloads that feature an Event object.&lt;/p&gt;

&lt;p&gt;Webhook events are your go-to for monitoring asynchronous activities. They’re perfect for keeping tabs on events like a customer’s bank giving the green light on a payment, charge disputes from customers, successful recurring payments, or managing subscription billing. With webhooks, you’re not just informed; you’re always a step ahead.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why use Memphis as your Stripe’s webhook destination
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Convert the push to pull: Memphis.dev operates as a pull-based message broker where clients actively pull and consume data from the broker.&lt;/li&gt;
&lt;li&gt;Retry: Memphis provides a built-in retry system that maintains client states and offsets, even during disconnections. This configurable mechanism resends unacknowledged events until they’re acknowledged or until the maximum number of retries is reached.&lt;/li&gt;
&lt;li&gt;Persistent: Memphis ensures message persistence by assigning a retention policy to each topic and message.&lt;/li&gt;
&lt;li&gt;Replay: The client has the flexibility to rotate the active offset, enabling easy access to read and replay any past event that complies with the retention policy and is still stored.&lt;/li&gt;
&lt;li&gt;Back pressure: Let Memphis handle the back pressure and scale from your team and clients.&lt;/li&gt;
&lt;li&gt;Backup: You can easily enable automatic backup that will back up each and every message to an external S3-compatible storage.&lt;/li&gt;
&lt;li&gt;Dead-letter: They enable you to preserve unconsumed messages, rather than discarding them, to diagnose why their processing was unsuccessful.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  How to get started
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Head to Stripe’s webhook &lt;a href="https://dashboard.stripe.com/login?redirect=%2Fwebhooks%2Fcreate" rel="noopener noreferrer"&gt;dashboard&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr0wl9szswfs86l42ouei.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr0wl9szswfs86l42ouei.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a &lt;a href="https://cloud.memphis.dev/" rel="noopener noreferrer"&gt;Memphis account&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Create a new Memphis station&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fow3gzkv7xjxfz2flc0s3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fow3gzkv7xjxfz2flc0s3.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a new client-type user and generate a URL for producing data&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxnv8mnt8svy6lw6n69uy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxnv8mnt8svy6lw6n69uy.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F763yrw00ivvuhaskkdrq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F763yrw00ivvuhaskkdrq.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7t073580jfuvux9wcx37.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7t073580jfuvux9wcx37.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Copy the produce URL to the Stripe dashboard and click “Add endpoint”&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyl9b7cn2di5rzlxwbkku.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyl9b7cn2di5rzlxwbkku.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Once a selected event will occur, it will trigger an event that will be sent to your Memphis Station&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmdceam0z2zezg9kns2bx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmdceam0z2zezg9kns2bx.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej" rel="noopener noreferrer"&gt;Join 4500+ others and sign up for our data engineering newsletter&lt;/a&gt;.&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By &lt;a href="https://www.linkedin.com/in/shoham-roditi-elimelech-0b933314a/" rel="noopener noreferrer"&gt;Shoham Roditi Elimelech&lt;/a&gt;, software engineer at @&lt;a href="https://memphis.dev/blog/ingesting-webhooks-from-stripe-the-better-way/" rel="noopener noreferrer"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis" rel="noopener noreferrer"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme" rel="noopener noreferrer"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.gg/sGgmCQP7jc" rel="noopener noreferrer"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>webhooks</category>
      <category>stripe</category>
      <category>messagebroker</category>
    </item>
    <item>
      <title>Event Sourcing with Memphis.dev: A Beginner’s Guide</title>
      <dc:creator>Memphis.dev team</dc:creator>
      <pubDate>Thu, 04 Jan 2024 12:14:54 +0000</pubDate>
      <link>https://dev.to/memphis_dev/event-sourcing-with-memphisdev-a-beginners-guide-71a</link>
      <guid>https://dev.to/memphis_dev/event-sourcing-with-memphisdev-a-beginners-guide-71a</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In the realm of modern software development, managing and maintaining data integrity is paramount. Traditional approaches often involve updating the state of an application directly within a database. However, as systems grow in complexity, ensuring data consistency and traceability becomes more challenging. This is where Event Sourcing, coupled with a powerful distributed streaming platform like Memphis.dev, emerges as a robust solution and a great data structure to work with.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is Event Sourcing?
&lt;/h2&gt;

&lt;p&gt;At its core, Event Sourcing is a design pattern that captures every change or event that occurs in a system as an immutable and sequentially ordered log of events. Instead of persisting the current state of an entity, Event Sourcing stores a sequence of state-changing events. These events serve as a single source of truth for the system’s state at any given point in time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Event Sourcing in Action
&lt;/h2&gt;

&lt;p&gt;Imagine a banking application that tracks an account’s balance. Instead of only storing the current balance in a database, Event Sourcing captures all the events that alter the balance. Deposits, withdrawals, or any adjustments are recorded as individual events in chronological order.&lt;/p&gt;

&lt;p&gt;Let’s break down how this might work:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Event Creation&lt;/strong&gt;: When a deposit of $100 occurs in the account, an event, such as FundsDeposited, with relevant metadata (timestamp, amount, account number) is created.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Event Storage&lt;/strong&gt;: These events are then appended to an immutable log, forming a sequential history of transactions specific to that account.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;State Reconstruction&lt;/strong&gt;: To obtain the current state of an account, the application replays these events sequentially to compute the current balance. Each event is applied in order to derive the current balance, enabling the system to rebuild state at any given point in time.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Leveraging Memphis in Event Sourcing
&lt;/h2&gt;

&lt;p&gt;Memphis.dev, an open-source distributed event streaming platform, is perfect for implementing Event Sourcing due to its features:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scalability and Fault Tolerance&lt;/strong&gt;: Memphis’ distributed nature allows for horizontal scalability and ensures fault tolerance by replicating data across multiple brokers (nodes).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Ordered and Immutable Event Logs&lt;/strong&gt;: Memphis’ log-based architecture aligns seamlessly with the principles of Event Sourcing. It maintains ordered, immutable logs, preserving the sequence of events.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Real-time Event Processing&lt;/strong&gt;: Memphis Functions offers a serverless framework, built within the Memphis platform to handle high-throughput, real-time event streams. Applications can process events as they occur, enabling near real-time reactions to changes in state.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Managing Schemas&lt;/strong&gt;: One of the major challenges in event sourcing is maintaining schemas across the different events to avoid upstream breaks and client crashes.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Benefits of Event Sourcing with Kafka
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Temporal Queries and Auditing&lt;/strong&gt;: By retaining a complete history of events, it becomes possible to perform temporal queries and reconstruct past states, aiding in auditing and compliance.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Flexibility and Scalability&lt;/strong&gt;: As the system grows, Event Sourcing with Memphis allows for easy scalability, as new consumers can independently process the event log.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Fault Tolerance and Recovery&lt;/strong&gt;: In the event of failures, the ability to rebuild state from events ensures resiliency and quick recovery.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Let’s see what it looks like via code
&lt;/h2&gt;

&lt;p&gt;Events occur and are pushed by their order of creation into some Memphis Station (=topic)&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Log:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class EventLog:
    def __init__(self):
        self.events = []

    def append_event(self, event):
        self.events.append(event)

    def get_events(self):
        return self.events
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Memphis Producer:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from __future__ import annotations
import asyncio
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError
import json

class MemphisEventProducer:
    def __init__(self,host="my.memphis.dev"):
        try:
        self.memphis = Memphis()
        await self.memphis.connect(host=host, username="&amp;lt;application type username&amp;gt;", password="&amp;lt;password&amp;gt;")

    def send_event(self, topic, event):
        await self.memphis.produce(station_name=topic, producer_name='prod_py',
  message=event,nonblocking=False)
        except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
          print(e)
        finally:
          await self.memphis.close()

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Usage:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Initialize Event Log
event_log = EventLog()

# Initialize Memphis Producer
producer = MemphisEventProducer()

# Append events to the event log and produce them to Memphis
events_to_publish = [
    {"type": "Deposit", "amount": 100},
    {"type": "Withdrawal", "amount": 50},
    # Add more events as needed
]

for event in events_to_publish:
    event_log.append_event(event)
    producer.send_event('account-events', event)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Criteria to choose the right event streaming platform for the job
&lt;/h2&gt;

&lt;p&gt;When implementing Event Sourcing with a message broker, several key features are crucial for a streamlined and efficient system:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Persistent Message Storage:&lt;/strong&gt;&lt;br&gt;
Durability: Messages should be reliably stored even in the event of failures. This ensures that no events are lost and the event log remains intact.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Ordered and Immutable Event Logs:&lt;/strong&gt;&lt;br&gt;
Sequential Order: Preserving the order of events is critical for accurate state reconstruction. Events must be processed in the same sequence they were produced.&lt;br&gt;
Immutability: Once an event is stored, it should not be altered. This guarantees the integrity and consistency of the event log.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scalability and Performance:&lt;/strong&gt;&lt;br&gt;
Horizontal Scalability: The message broker should support horizontal scaling to accommodate increased event volume without sacrificing performance.&lt;br&gt;
Low Latency: Minimizing message delivery time ensures near real-time processing of events, enabling quick reactions to state changes.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Fault Tolerance and High Availability:&lt;/strong&gt;&lt;br&gt;
Redundancy: Ensuring data redundancy across multiple nodes or partitions prevents data loss in the event of node failures.&lt;br&gt;
High Availability: Continuous availability of the message broker is essential to maintain system functionality.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Consumer Flexibility and State Rebuilding:&lt;/strong&gt;&lt;br&gt;
Consumer Groups: Support for consumer groups allows multiple consumers to independently process the same set of events, aiding in parallel processing and scalability.&lt;br&gt;
State Rebuilding: The broker should facilitate easy rebuilding of the application state by replaying events, enabling historical data retrieval.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Retention Policies and Archiving:&lt;/strong&gt;&lt;br&gt;
Retention Policies: Configurable retention policies allow managing the duration or size of stored messages. This ensures efficient storage management.&lt;br&gt;
Archiving: Ability to archive or offload older events to long-term storage for compliance or historical analysis purposes.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Monitoring and Management:&lt;/strong&gt;&lt;br&gt;
Metrics and Monitoring: Providing insights into message throughput, latency, and system health helps in monitoring and optimizing system performance.&lt;br&gt;
Admin Tools: Easy-to-use administrative tools for managing topics, partitions, and configurations streamline system management.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Security and Compliance:&lt;/strong&gt;&lt;br&gt;
Encryption and Authentication: Support for encryption and authentication mechanisms ensures the confidentiality and integrity of transmitted events.&lt;br&gt;
Compliance Standards: Adherence to compliance standards (such as GDPR, SOC2) ensures that sensitive data is handled appropriately.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Seamless Integration and Ecosystem Support:&lt;br&gt;
Compatibility and Integrations: Seamless integration with various programming languages and frameworks, along with support for diverse ecosystems, enhances usability.&lt;br&gt;
Ecosystem Tools: Availability of connectors, libraries, and frameworks that facilitate Event Sourcing simplifies implementation and reduces development efforts.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Choosing a message broker that aligns with these critical features is essential for implementing robust Event Sourcing, ensuring data integrity, scalability, and resilience within your application architecture.&lt;/p&gt;




&lt;h2&gt;
  
  
  Event Sourcing using a Database vs a Message Broker (Streaming Platform)
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Use Case Complexity&lt;/strong&gt;: For simpler applications or where scalability isn’t a primary concern, databases might suffice. For higher reliability, distributed systems needing high scalability, and real-time processing, a message broker can be more suitable.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Replay&lt;/strong&gt;: In event streaming platforms or message brokers, events are stored in a FIFO manner, one after the other as they first appear. That nature also makes it easier for the consumer on the other side to understand the natural flow of events and replay the entire “scene,” whereas in databases, it is not the case, and additional fields must be added, like timestamps, to organize the data based on time. It also requires additional logic to understand the latest state of an entity.&lt;/p&gt;




&lt;p&gt;Continue your learning: &lt;a href="https://memphis.dev/blog/event-sourcing-outgrows-databases/"&gt;read&lt;/a&gt; how and why event sourcing outgrows the database.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By &lt;a href="https://www.linkedin.com/in/idan-asulin/"&gt;Idan Asulin&lt;/a&gt;, Co-Founder &amp;amp; CTO at @Memphis.dev.&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://twitter.com/Memphis_Dev"&gt;Twitter&lt;/a&gt; • &lt;a href="https://discord.com/invite/WZpysvAeTf"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataintegrity</category>
      <category>dataconsistency</category>
      <category>streamingdata</category>
    </item>
    <item>
      <title>Comparing Webhooks and Event Consumption: A Comparative Analysis</title>
      <dc:creator>Memphis.dev team</dc:creator>
      <pubDate>Thu, 28 Dec 2023 14:59:11 +0000</pubDate>
      <link>https://dev.to/memphis_dev/comparing-webhooks-and-event-consumption-a-comparative-analysis-37a3</link>
      <guid>https://dev.to/memphis_dev/comparing-webhooks-and-event-consumption-a-comparative-analysis-37a3</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In event-driven architecture and API integration, two vital concepts stand out: webhooks and event consumption. Both are mechanisms used to facilitate communication between different applications or services. Yet, they differ significantly in their approaches and functionalities, and by the end of this article, you will learn why consuming events can be a much more robust option than serving them using a webhook.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The foundational premise of the article assumes you function as a platform that wants or already delivers internal events to your clients through webhooks.&lt;/strong&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Webhooks
&lt;/h2&gt;

&lt;p&gt;Webhooks are user-defined HTTP callbacks triggered by specific events on a service. They enable real-time communication between systems by notifying other applications when a particular event occurs. Essentially, webhooks eliminate the need to do manual polling or checking for updates, allowing for a more efficient, event-driven, and responsive system.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Key Features of Webhooks:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Event-driven:&lt;/strong&gt; Webhooks are event-driven and are only triggered when a specified event occurs. For example, a webhook can notify an application when a new user signs up or when an order is placed.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Outbound Requests:&lt;/strong&gt; They use HTTP POST requests to send data payloads to a predefined URL the receiving application provides.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Asynchronous Nature:&lt;/strong&gt; Webhooks operate asynchronously, allowing the sending and receiving systems to continue their processes independently.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Event Consumption
&lt;/h2&gt;

&lt;p&gt;Event consumption involves receiving, processing, and acting upon events emitted by various systems or services. This mechanism facilitates the seamless integration and synchronization of data across different applications.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Key Features of Event Consumption*:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Message Queues or Brokers:&lt;/strong&gt; Event consumption often involves utilizing message brokers like Memphis.dev, Kafka, RabbitMQ, or AWS SQS to manage and distribute events.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Subscriber-Driven:&lt;/strong&gt; Unlike webhooks, event consumption relies on subscribers who listen to event streams and process incoming events.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scalability:&lt;/strong&gt; Event consumption systems are highly scalable, efficiently handling large volumes of events.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Architectural questions for a better decision
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Push-based VS. Pull based:&lt;/strong&gt;&lt;br&gt;
Webhooks deliver or push events to your clients’ services, requiring them to handle the resulting back pressure. While understandable, this approach can impede your customers’ progress. Using a scalable message broker to support consumption can alleviate this burden for your clients. How? By allowing clients to pull events based on their availability.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;I*&lt;em&gt;mplementing a server vs. Implementing a broker SDK:&lt;/em&gt;*&lt;br&gt;
For your client’s services to receive webhooks, they need a server that listens to incoming events. This involves managing CORS, middleware, opening ports, and securing network access, which adds extra load to their service by increasing overall memory consumption.&lt;br&gt;
Opting for pull-based consumption eliminates most of these requirements. With pull-based consumption, as the traffic is egress (outgoing) rather than ingress (incoming), there’s no need to set up a server, open ports, or handle CORS. Instead, the client’s service initiates the communication, significantly reducing complexity.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Retry:&lt;/strong&gt;&lt;br&gt;
Services experience frequent crashes or unavailability due to various reasons. While some triggered webhooks might lead to insignificant events, others can result in critical issues, such as incomplete datasets where orders fail to be documented in CRM or new shipping instructions are not being processed. Hence, having a robust retry mechanism becomes crucial. This can be achieved by incorporating a retry mechanism within the webhook system or introducing an endpoint within the service.&lt;br&gt;
In contrast, when utilizing a message broker, events are acknowledged only after processing. Although implementing a retry mechanism is necessary in most cases, it’s typically more straightforward and native than handling retries with webhooks.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Persistent:&lt;/strong&gt;&lt;br&gt;
Standard webhook systems generally lack event persistence for future audits and replays, a capability inherently provided by persisted message brokers.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Replay:&lt;/strong&gt;&lt;br&gt;
Similarly, it boils down to the user or developer experience you aim to provide. While setting up an endpoint for users to retrieve past events is feasible, it demands meticulous handling, intricate business logic, an extra database, and increased complexity for the client. In contrast, using a message broker supporting this feature condenses the process to just a line or two of code, significantly reducing complexity.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Throttling:&lt;/strong&gt;&lt;br&gt;
Throttling is a technique used in computing and networking to control data flow, requests, or operations to prevent overwhelming a system or service. It limits the rate or quantity of incoming or outgoing data, recommendations, or actions. The primary challenge lies not in implementing throttling but in managing distinct access levels for various customers. Consider having an enterprise client with notably higher throughput needs compared to others. To accommodate this, you’d require a multi-tenant webhook system tailored to support diverse demands or opt for a message broker or streaming platform designed to handle such differential requirements.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Memphis as a tailor-made solution for the task
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--GyNsQpM5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/b9yom07p44e0fmw3yrfk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--GyNsQpM5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/b9yom07p44e0fmw3yrfk.png" alt="Image description" width="800" height="834"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We are still iterating on the subject, so if you have any thoughts or ideas, I would love to learn from them: &lt;a href="mailto:idan@memphis.dev"&gt;idan@memphis.dev&lt;/a&gt;&lt;/p&gt;

</description>
      <category>webhooks</category>
      <category>eventconsumption</category>
      <category>eventdriven</category>
      <category>datastructures</category>
    </item>
    <item>
      <title>How to handle API rate limitations with a queue</title>
      <dc:creator>Memphis.dev team</dc:creator>
      <pubDate>Wed, 20 Dec 2023 14:57:10 +0000</pubDate>
      <link>https://dev.to/memphis_dev/how-to-handle-api-rate-limitations-with-a-queue-8a8</link>
      <guid>https://dev.to/memphis_dev/how-to-handle-api-rate-limitations-with-a-queue-8a8</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Rate limitation refers to restricting the number of times a specific action can be performed within a certain time frame. For example, an API might have rate limitations restricting user or app requests within a given period. This helps prevent server overload, ensures fair usage, and maintains system stability and security.&lt;/p&gt;

&lt;p&gt;Rate limitation is also a challenge for the apps that encounter it, as it requires to “slow down” or pause. Here’s a typical scenario:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Initial Request:&lt;/strong&gt; When the app initiates communication with the API, it requests specific data or functionality.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;API Response:&lt;/strong&gt; The API processes the request and responds with the requested information or performs the desired action.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Rate-Limitation:&lt;/strong&gt; If the app has reached the limit, it will usually need to wait until the next designated time frame (like a minute to an hour) before making additional requests. If it is a “soft” rate limitation and timeframes are known and linear, it’s easier to handle. Often, the waiting time climbs and increases in every block, requiring a whole different and custom handling per each API.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Handling Rate Limit Exceedances:&lt;/strong&gt; If the app exceeds the rate limit, it might receive an error response from the API (such as a “429 Too Many Requests” status code). The app needs to handle this gracefully, possibly by queuing requests, implementing backoff strategies (waiting for progressively more extended periods before retrying), or informing the user about the rate limit being reached.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;To effectively operate within rate limitations, apps often incorporate strategies like:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Throttling:&lt;/strong&gt; Regulating the rate of outgoing requests to align with the API’s rate limit.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Caching:&lt;/strong&gt; Storing frequently requested data locally to reduce the need for repeated API calls.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Exponential Backoff:&lt;/strong&gt; Implementing a strategy where the app waits increasingly longer between subsequent retries after hitting a rate limit to reduce server load and prevent immediate retries.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Queue&lt;/strong&gt;? More in the next section&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Using a queue
&lt;/h2&gt;

&lt;p&gt;A queue serves as an excellent “sidekick” or tool for helping services manage rate limitations due to its ability to handle tasks systematically. However, while it offers significant benefits, it’s not a standalone solution for this purpose.&lt;/p&gt;

&lt;p&gt;In constructing a robust architecture, the service or app used to interact with an external API subject to rate limitations often handles tasks asynchronously. This service is typically initiated by tasks derived from a queue. When the service encounters a rate limit, it can easily return the job to the main queue, or assign it to a separate queue designated for delayed tasks, and revisit it after a specific waiting period, say X seconds.&lt;/p&gt;

&lt;p&gt;This reliance on a queue system is highly advantageous, primarily because of its temporary nature and ordering. However, the queue alone cannot fully address rate limitations; it requires additional features or help from the service itself to effectively handle these constraints.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenges may arise when utilizing a queue:
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Tasks re-entering the queue might return earlier than necessary, as their timing isn’t directly controlled by your service.&lt;/li&gt;
&lt;li&gt;Exceeding rate limitations due to frequent calls within restricted timeframes. This may necessitate implementing sleep or wait mechanisms, commonly considered poor practice due to their potential impact on performance and responsiveness.&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Here is what it will look like with RabbitMQ:
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const amqp = require('amqplib');
const axios = require('axios');

// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
  try {
    const response = await axios.get(url);
    console.log('API Response:', response.data);
  } catch (error) {
    console.error('API Error:', error.message);
  }
}

// Connect to RabbitMQ server
async function connect() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'rateLimitedQueue';
    channel.assertQueue(queue, { durable: true });

    // Consume messages from the queue
    channel.consume(queue, async msg =&amp;gt; {
      const { url, delayInSeconds } = JSON.parse(msg.content.toString());

      // Simulating rate limitation
      await new Promise(resolve =&amp;gt; setTimeout(resolve, delayInSeconds * 1000));

      await makeAPICall(url); // Make the API call

      channel.ack(msg); // Acknowledge message processing completion
    });
  } catch (error) {
    console.error('RabbitMQ Connection Error:', error.message);
  }
}

// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'rateLimitedQueue';
    channel.assertQueue(queue, { durable: true });

    const message = JSON.stringify({ url, delayInSeconds });
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });

    console.log('Task added to the queue');
  } catch (error) {
    console.error('RabbitMQ Error:', error.message);
  }
}

// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds

// Start the consumer
connect();

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Or with Kafka
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { Kafka } = require('kafkajs');
const axios = require('axios');

// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
  try {
    const response = await axios.get(url);
    console.log('API Response:', response.data);
  } catch (error) {
    console.error('API Error:', error.message);
  }
}

// Kafka configuration
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'], // Replace with your Kafka broker address
});

// Create a Kafka producer
const producer = kafka.producer();

// Connect to Kafka and send messages
async function produceToKafka(topic, message) {
  await producer.connect();
  await producer.send({
    topic,
    messages: [{ value: message }],
  });
  await producer.disconnect();
}

// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });

// Consume messages from Kafka topic
async function consumeFromKafka(topic) {
  await consumer.connect();
  await consumer.subscribe({ topic });
  await consumer.run({
    eachMessage: async ({ message }) =&amp;gt; {
      const { url, delayInSeconds } = JSON.parse(message.value.toString());

      // Simulating rate limitation
      await new Promise(resolve =&amp;gt; setTimeout(resolve, delayInSeconds * 1000));

      await makeAPICall(url); // Make the API call
    },
  });
}

// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {
  const message = JSON.stringify({ url, delayInSeconds });
  await produceToKafka(topic, message);
  console.log('Message added to Kafka topic');
}

// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);

// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;Both approaches are legitimate, yet they necessitate your service to incorporate a ‘sleep’ mechanism.&lt;/p&gt;

&lt;p&gt;With Memphis, you can offload the delay from the client to the queue using a simple feature made&lt;br&gt;
just for that purpose and called “Delayed Messages”. Delayed messages allow you to send a received message back to the broker when your consumer application requires extra processing time.&lt;/p&gt;

&lt;p&gt;What sets apart Memphis’ implementation is the consumer’s capability to control this delay independently and atomically.&lt;br&gt;
Within the station, the count of unconsumed messages doesn’t impact the consumption of delayed messages. For instance, if a 60-second delay is necessary, it precisely configures the invisibility time for that specific message.&lt;/p&gt;

&lt;h2&gt;
  
  
  Memphis.dev Delayed Messages
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt; message is received by the consumer group.&lt;/li&gt;
&lt;li&gt;An event occurs, prompting the consumer group to pause processing the message.&lt;/li&gt;
&lt;li&gt;Assuming the &lt;code&gt;maxMsgDeliveries&lt;/code&gt; hasn’t hit its limit, the consumer will activate &lt;code&gt;message.delay(delayInMilliseconds)&lt;/code&gt;, bypassing the message. Instead of immediately reprocessing the same message, the broker will retain it for the specified duration.&lt;/li&gt;
&lt;li&gt;The subsequent message will be consumed.&lt;/li&gt;
&lt;li&gt;Once the requested delayInMilliseconds has passed, the broker will halt the primary message flow and reintroduce the delayed message into circulation.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { memphis } = require('memphis-dev');

// Function to make API requests, simulating rate limitations 
async function makeAPICall(message) 
{ 
  try { 
    const response = await axios.get(message.getDataAsJson()['url']); 
    console.log('API Response:', response.data); 
    message.ack();
  } catch (error) { 
    console.error('API Error:', error.message); 
    console.log("Delaying message for 1 minute"); 
    message.delay(60000);
  } 
}

(async function () {
    let memphisConnection;

    try {
        memphisConnection = await memphis.connect({
            host: '&amp;lt;broker-hostname&amp;gt;',
            username: '&amp;lt;application-type username&amp;gt;',
            password: '&amp;lt;password&amp;gt;'
        });

        const consumer = await memphisConnection.consumer({
            stationName: '&amp;lt;station-name&amp;gt;',
            consumerName: '&amp;lt;consumer-name&amp;gt;',
            consumerGroup: ''
        });

        consumer.setContext({ key: "value" });
        consumer.on('message', (message, context) =&amp;gt; {
            await makeAPICall(url, message);
        });

        consumer.on('error', (error) =&amp;gt; { });
    } catch (ex) {
        console.log(ex);
        if (memphisConnection) memphisConnection.close();
    }
})();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Wrapping up
&lt;/h2&gt;

&lt;p&gt;Understanding and adhering to rate limitations is crucial for app developers working with APIs. It involves managing request frequency, handling errors when limits are reached, implementing backoff strategies to prevent overloading the API servers, and utilizing rate limit information provided by the API to optimize app performance, and now you know how to do it with a queue as well!&lt;/p&gt;

&lt;p&gt;Head to our &lt;a href="https://memphis.dev/blog/"&gt;blog&lt;/a&gt; or [docs(&lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;https://docs.memphis.dev/memphis/getting-started/readme&lt;/a&gt;) for more examples like that!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By &lt;a href="https://www.linkedin.com/in/idan-asulin/"&gt;Idan Asulin&lt;/a&gt;, Co-Founder &amp;amp; CTO at @Memphis.dev.&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://twitter.com/Memphis_Dev"&gt;Twitter&lt;/a&gt; • &lt;a href="https://discord.com/invite/WZpysvAeTf"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>api</category>
      <category>ratelimitations</category>
      <category>messagequeue</category>
    </item>
    <item>
      <title>Real-Time Data Scrubbing Before Storing In A Data Warehouse</title>
      <dc:creator>Memphis.dev team</dc:creator>
      <pubDate>Wed, 20 Dec 2023 08:22:03 +0000</pubDate>
      <link>https://dev.to/memphis_dev/real-time-data-scrubbing-before-storing-in-a-data-warehouse-1a1</link>
      <guid>https://dev.to/memphis_dev/real-time-data-scrubbing-before-storing-in-a-data-warehouse-1a1</guid>
      <description>&lt;p&gt;Between January 2023 and May 2023, companies violating general data processing principles incurred fines totaling 1.86 billion USD (!!!).&lt;/p&gt;

&lt;p&gt;In today’s data-driven landscape, the importance of data accuracy and compliance cannot be overstated. As businesses amass vast amounts of information, the need to ensure data integrity, especially PII storing, becomes paramount. Data scrubbing emerges as a crucial process, particularly in real-time scenarios, before storing information in a data warehouse.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Data Scrubbing in the context of compliance&lt;/strong&gt;&lt;br&gt;
Data scrubbing, often referred to as data cleansing or data cleaning, involves the identification and rectification of errors or inconsistencies in a dataset. In the context of compliance, it means removing certain values that qualify as PII that cannot be stored or should be handled differently.&lt;/p&gt;

&lt;p&gt;Real-time data scrubbing takes the cleansing process a step further by ensuring that incoming data is cleaned and validated instantly, before being stored in a data warehouse.&lt;/p&gt;

&lt;p&gt;Compliance standards, such as GDPR, HIPAA, or industry-specific regulations, mandate stringent requirements for data accuracy, privacy, and security. Failure to adhere to these standards can result in severe repercussions, including financial penalties and reputational damage. Real-time data scrubbing acts as a robust preemptive measure, ensuring that only compliant data is integrated into the warehouse.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event-driven Scrubbing&lt;/strong&gt;&lt;br&gt;
Event-driven applications stand as stateful systems that intake events from one or multiple streams and respond to these incoming events by initiating computations, updating their state, or triggering external actions.&lt;/p&gt;

&lt;p&gt;They represent a progressive shift from the conventional application structure that segregates computation and data storage into distinct tiers. In this novel architecture, these applications retrieve data from and save data to a remote transactional database.&lt;/p&gt;

&lt;p&gt;In stark contrast, event-driven applications revolve around stateful stream processing frameworks. This approach intertwines data and computation, facilitating localized data access either in-memory or through disk storage. To ensure resilience, these applications implement fault-tolerance measures by periodically storing checkpoints in remote persistent storage.&lt;/p&gt;

&lt;p&gt;In the context of Scrubbing, it means that the actual action of scrubbing will take place for each ingested event, in real-time, powering up only when new events arrive, and immediately after, not based on constant times, usually performed on top of the database, after being stored, meaning the potential violation already took place.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;How does Memphis Functions support such a use case?&lt;/strong&gt;&lt;br&gt;
At times, a more comprehensive policy-driven cleansing may be necessary. However, if a quick, large-scale ‘eraser’ is what you require, Memphis Functions can offer an excellent solution. The diagram illustrates two options: data sourced from either a Kafka topic or a Memphis station, potentially both concurrently. This data passes through a Memphis Function named ‘&lt;a href="https://github.com/memphisdev/memphis-dev-functions/tree/master/remove-fields"&gt;remove-fields&lt;/a&gt;‘ before progressing to the data warehouse for further storage.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--N2Q-RJ5H--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fxy7vof8xeb9cv1fzygn.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--N2Q-RJ5H--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fxy7vof8xeb9cv1fzygn.jpeg" alt="Image description" width="800" height="191"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Behind the curtain, events or streaming data are grouped into batches, a configuration determined by the user’s specifications. These batches then undergo processing via a serverless function, specifically the ‘remove-fields’ function, meticulously designed to cleanse the ingested data according to pre-established rules. Following this scrubbing process, the refined data is either consumed internally or routed to a different Kafka topic, alternatively being swiftly directed straight to the Data Warehouse (DWH) for immediate utilization.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Usage example&lt;/strong&gt;&lt;br&gt;
Before&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "id": 123456789,
  "full_name": "Peter Parker",
  "gender": "male"
}
After (Removing ‘gender’)

{
  "id": 123456789,
  "full_name": "Peter Parker",
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Next steps&lt;/strong&gt;&lt;br&gt;
An ideal follow-up action would involve implementing schema enforcement. Data warehouses are renowned for their rigorous schema enforcement practices. By integrating both a transformation layer and schema validation, it’s possible to significantly elevate data quality while reducing the risk of potential disruptions or breaks in the system. This can simply take place by attaching a Schemaverse schema to the station.&lt;/p&gt;

&lt;p&gt;Start by &lt;a href="https://cloud.memphis.dev/"&gt;signing up&lt;/a&gt; to Memphis Cloud. We have a great free plan that can get you up and running in no time, and try to build a pipeline yourself.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By &lt;a href="https://www.linkedin.com/in/idan-asulin/"&gt;Idan Asulin&lt;/a&gt;, Co-Founder &amp;amp; CTO at @Memphis.dev.&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://twitter.com/Memphis_Dev"&gt;Twitter&lt;/a&gt; • &lt;a href="https://discord.com/invite/WZpysvAeTf"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>datascrubbing</category>
      <category>dataprocessing</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Introducing Memphis Functions</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Thu, 09 Nov 2023 14:14:39 +0000</pubDate>
      <link>https://dev.to/memphis_dev/introducing-memphis-functions-4b1f</link>
      <guid>https://dev.to/memphis_dev/introducing-memphis-functions-4b1f</guid>
      <description>&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fn2grkgm6p8sg9xt47t57.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fn2grkgm6p8sg9xt47t57.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  The story
&lt;/h2&gt;

&lt;p&gt;Organizations are increasingly embracing real-time event processing, intercepting data streams before they enter the data warehouse, and embracing event-driven architectural paradigms. However, they must contend with the ever-evolving landscape of data and technology. Development teams face the challenge of maintaining alignment with these changes while also striving for greater development efficiency and agility.&lt;/p&gt;

&lt;h3&gt;
  
  
  Further challenges lie ahead:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Developing new stream processing flows is a formidable task. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Code exhibits high coupling to particular flows or event types.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;There is no opportunity for code reuse or sharing.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Debugging, troubleshooting, and rectifying issues pose ongoing challenges.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Managing code evolution remains a persistent concern.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  The shortcomings of current solutions are as follows:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;They impose the use of SQL or other vendor-specific, lock-in languages on developers.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;They lack support for custom logic.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;They add complexity to the infrastructure, particularly as operations scale.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;They do not facilitate code reusability or sharing.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Ultimately, they demand a significant amount of time to construct a real-time application or pipeline.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Introducing Memphis Functions
&lt;/h2&gt;

&lt;p&gt;The Memphis platform is composed of four independent components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Memphis Broker, serving as the storage layer.&lt;/li&gt;
&lt;li&gt;Schemaverse, responsible for schema management.&lt;/li&gt;
&lt;li&gt;Memphis Functions, designed for serverless stream processing.&lt;/li&gt;
&lt;li&gt;Memphis Connectors, facilitating data retrieval and delivery through pre-built connectors.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Memphis Functions empower developers and data engineers with the ability to seamlessly process, transform, and enrich incoming events in real-time through a serverless paradigm, all within the familiar AWS Lambda syntax.&lt;/p&gt;

&lt;p&gt;This means they can achieve these operations without being burdened by boilerplate code, intricate orchestration, error-handling complexities, or the need to manage underlying infrastructure.&lt;/p&gt;

&lt;p&gt;Memphis Functions provides this versatility in an array of programming languages, including but not limited to Go, Python, JavaScript, .NET, Java, and SQL. This flexibility ensures that development teams have the freedom to select the language best suited to their specific needs, making the event processing experience more accessible and efficient.&lt;/p&gt;

&lt;h3&gt;
  
  
  What’s more?
&lt;/h3&gt;

&lt;p&gt;In addition to orchestrating various functions, Memphis Functions offer a comprehensive suite for the end-to-end management and observability of these functions. This suite encompasses features such as a robust retry mechanism, dynamic auto-scaling utilizing both Kubernetes-based and established public cloud serverless technologies, extensive monitoring capabilities, dead-letter handling, efficient buffering, distributed security measures, and customizable notifications.&lt;/p&gt;

&lt;p&gt;It’s important to note that Memphis Functions are designed to seamlessly complement existing streaming platforms, such as Kafka, without imposing the necessity of adopting the Memphis broker. This flexibility allows organizations to leverage Memphis Functions while maintaining compatibility with their current infrastructures and preferences.&lt;/p&gt;

&lt;h2&gt;
  
  
  Getting started
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Step 1: Write your processing function&lt;/strong&gt;&lt;br&gt;
Utilize the same syntax as you would when crafting a function for AWS Lambda, taking advantage of the familiar and powerful AWS Lambda framework. This approach ensures that you can tap into AWS Lambda’s extensive ecosystem and development resources, making your serverless function creation a seamless and efficient process and without learning yet another framework syntax.&lt;/p&gt;

&lt;p&gt;Functions can be a simple string-to-JSON conversion all the way to pushing a webhook based on some event’s payload.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 2: Connect Memphis to your git repository&lt;/strong&gt;&lt;br&gt;
Integrating Memphis with your git repository is the next crucial step. By doing so, Memphis establishes an automated link to your codebase, effortlessly fetching the functions you’ve developed. These functions are then conveniently showcased within the Memphis Dashboard, streamlining the entire process of managing and monitoring your serverless workflows. This seamless connection simplifies collaboration, version control, and overall visibility into your stream processing application development.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 3: Attach functions to streams&lt;/strong&gt;&lt;br&gt;
Now it’s time to integrate your functions with the streams. By attaching your developed functions to the streams, you establish a dynamic pathway for ingested events. These events will seamlessly traverse through the connected functions, undergoing processing as specified in your serverless workflow. This crucial step ensures that the events are handled efficiently, allowing you to unleash the full potential of your processing application with agility and scalability.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Gain early access and sign up to our Private Beta Functions waiting list &lt;a href="https://www.functions.memphis.dev/" rel="noopener noreferrer"&gt;here&lt;/a&gt;!&lt;/strong&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej" rel="noopener noreferrer"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis-broker" rel="noopener noreferrer"&gt;Github&lt;/a&gt;•&lt;a href="https://docs.memphis.dev/memphis/getting-started/readme" rel="noopener noreferrer"&gt;Docs&lt;/a&gt;•&lt;a href="https://discord.com/invite/DfWFT7fzUu" rel="noopener noreferrer"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>dataprocessing</category>
    </item>
    <item>
      <title>Event-Driven Architecture with Serverless Functions – Part 1</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Mon, 09 Oct 2023 08:38:24 +0000</pubDate>
      <link>https://dev.to/memphis_dev/event-driven-architecture-with-serverless-functions-part-1-1ei3</link>
      <guid>https://dev.to/memphis_dev/event-driven-architecture-with-serverless-functions-part-1-1ei3</guid>
      <description>&lt;p&gt;&lt;em&gt;This is the 1st part of the series “A new type of stream processing״.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In this series of articles, we are going to explain what is the missing piece in stream processing, and in this part, we’ll start from the source. We’ll break down the different components and walk through how they can be used in tandem to drive modern software.&lt;/p&gt;

&lt;p&gt;First things first, Event-driven architecture. EDA and serverless functions are two powerful software patterns and concepts that have become popular in recent years with the rise of cloud-native computing. While one is more of an architecture pattern and the other a deployment or implementation detail, when combined, they provide a scalable and efficient solution for modern applications.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is Event-Driven Architecture
&lt;/h2&gt;

&lt;p&gt;EDA is a software architecture pattern that utilizes events to decouple various components of an application. In this context, an event is defined as a change in state. For example, for an e-commerce application, an event could be a customer clicking on a listing, adding that item to their shopping cart, or submitting their credit card information to buy. Events also encompass non-user-initiated state changes, such as scheduled jobs or notifications from a monitoring system.&lt;/p&gt;

&lt;p&gt;The primary goal of EDA is to create loosely coupled components or microservices that can communicate by producing and consuming events between one another in an asynchronous way. This way, different components of the system can scale up or down independently for availability and resilience. Also, this decoupling allows development teams to add or release new features more quickly and safely as long as its interface remains compatible. &lt;/p&gt;

&lt;h2&gt;
  
  
  The Usual Components
&lt;/h2&gt;

&lt;p&gt;A scalable event-driven architecture will comprise three key components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Producer&lt;/strong&gt;: components that publish or produce events. These can be frontend services that take in user input, edge devices like IoT systems, or other types of applications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Broker&lt;/strong&gt;: components that take in events from producers and deliver them to consumers. Examples include Kafka, Memphis.dev, or AWS SQS.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Consumer&lt;/strong&gt;: components that listen to events and act on them. &lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;It’s important also to note that some components may be a producer for one workflow while being a consumer for another. For example, if we look at a credit card processing service, it could be a consumer for events that involve credit cards, such as new purchases or updating credit card information. At the same time, this service may be a producer for downstream services that record purchase history or detect fraudulent activity. &lt;/p&gt;

&lt;h2&gt;
  
  
  Common Patterns
&lt;/h2&gt;

&lt;p&gt;Since EDA is a broad architectural pattern, it can be applied in many ways. Some common patterns include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Point-to-point messaging&lt;/strong&gt;: For applications that need a simple one-to-one communication channel, a point-to-point messaging pattern may be used with a simple queue. Events are sent to a queue (messaging channels) and buffered for consumers.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Pub/sub&lt;/strong&gt;: If multiple consumers need to listen to the same events, pub/sub style messaging may be used. In this scenario, the producer generates events on a topic that consumers can subscribe to. This is useful for scenarios where events need to be broadcast (e.g. replication) or different business logic must be applied to the same event.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Communication models&lt;/strong&gt;: Different use cases dictate how communication should be coordinated. In some cases, it must be orchestrated via a centralized service if logic involves some distinct steps with dependencies. In other cases, it can be choreographed where producers can generate events without worrying about downstream dependencies as long as the events adhere to a predetermined schema or format. &lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  EDA Use Cases
&lt;/h2&gt;

&lt;p&gt;Event-driven architecture became much more popular with the rise of cloud-native applications and microservices. We are not always aware of it, but if we take Uber, Doordash, Netflix, Lyft, Instacart, and many more, each one of them is completely based on an event-driven, async architecture.&lt;/p&gt;

&lt;p&gt;Another key use case is data processing of events that require massive parallelization and elasticity to changes.&lt;/p&gt;




&lt;h2&gt;
  
  
  Let’s talk about Serverless Functions
&lt;/h2&gt;

&lt;p&gt;Serverless functions are a subset of the serverless computing model, where a third party (typically a cloud or service provider) or some orchestration engine manages the infrastructure on behalf of the users and only charges on a per-use basis. In particular, serverless functions or Function-as-a-Service (FaaS) allow users to write small functions as opposed to full-fledged services with server logic and abstract away the typical “server” functionality such as HTTP listeners, scaling, and monitoring. To developers, serverless functions can simplify their workflow significantly as they can focus on the business logic and allow the service provider to bootstrap the infrastructure and server functionalities.&lt;/p&gt;

&lt;p&gt;Serverless functions are usually triggered by external events. This can be a HTTP call or events on a queue or a pub/sub like messaging system. Serverless functions are generally stateless and designed to handle individual events. When there are multiple calls, the service provider will automatically scale up functions as needed unless parallelism is limited to one by the user. While different implementations of serverless functions have varying execution limits, in general, serverless functions are meant to be short-lived and should not be used for long-running jobs.&lt;/p&gt;




&lt;h2&gt;
  
  
  How about combining EDA and Serverless Functions?
&lt;/h2&gt;

&lt;p&gt;As you probably noticed, since serverless functions are triggered by events, it makes for a great pairing with event-driven architectures. This is especially true for stateless services that can be short-lived. A lot of microservices probably fall under this bucket unless it is responsible for batch processing or some heavy analytics that push the execution limit. &lt;/p&gt;

&lt;p&gt;The benefit of utilizing serverless functions with event-driven architecture is reduced overhead of managing the underlying infrastructure and freeing up developer’s time to focus on business logic that drives business value. Also, since service providers only charge per use, it can be a cost-efficient alternative to running a self-hosted infrastructure either on servers, VMs, or Containers.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sounds like the perfect match, right? Why isn’t it everywhere?&lt;/strong&gt;&lt;br&gt;
While AWS is doing its best to push us to use lambda, Cloudflare invests hundreds of millions of dollars to convince developers to use its serverless framework, gcp, and others, it still feels “harder” than building a traditional service in the context of EDA or data processing.&lt;/p&gt;

&lt;p&gt;Among the reasons are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Lack of observability. What went in and what went out?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Debug is still hard. When dealing with future given data, great debugging experience is a must, as the slightest change in its structure will break the function.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Retry mechanism.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Convert a batch of messages into a processing batch.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The idea of combining both technologies is very interesting. Ultimately can save a great number of dev hours, efforts, and add abilities that are extremely hard to develop with traditional methodologies, but the mentioned reasons are definitely a major blocker.&lt;/p&gt;

&lt;p&gt;I will end part 1 with a simple, open question/teaser –&lt;br&gt;
Have you ever used Zapier?&lt;/p&gt;

&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Stay tuned&lt;/a&gt; for part 2 and get one step closer to learning more about the new way to do stream processing.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at Memphis.dev By Yaniv Ben Hemo, Co-Founder &amp;amp; CEO at &lt;a href="https://memphis.dev/blog/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>eventdriven</category>
      <category>dataengineering</category>
      <category>datastructures</category>
      <category>dataprocessing</category>
    </item>
    <item>
      <title>Task scheduling with a message broker</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Thu, 17 Aug 2023 08:38:45 +0000</pubDate>
      <link>https://dev.to/memphis_dev/task-scheduling-with-a-message-broker-2lj5</link>
      <guid>https://dev.to/memphis_dev/task-scheduling-with-a-message-broker-2lj5</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Task scheduling is essential in modern applications to maximize resource utilization and user experience (Non-blocking task fulfillment).&lt;br&gt;
A queue is a powerful tool that allows your application to manage and prioritize tasks in a structured, persistent, and scalable way.&lt;br&gt;
While there are multiple possible solutions, working with a queue (which is also the perfect data structure for that type of work), can ensure that tasks are completed in their creation order without the risk of forgetting, overlooking, or double-processing critical tasks.&lt;/p&gt;

&lt;p&gt;A very interesting story on the need and evolvement as the scale grows can be found in one of DigitalOcean’s co-founder’s blog post&lt;br&gt;
&lt;a href="https://www.digitalocean.com/blog/from-15-000-database-connections-to-under-100-digitaloceans-tale-of-tech-debt"&gt;From 15,000 database connections to under 100.&lt;br&gt;
&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Any other solutions besides a queue?
&lt;/h2&gt;

&lt;p&gt;Multiple. Each with its own advantages and disadvantages.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cron&lt;/strong&gt;&lt;br&gt;
You can use cron job schedulers to automate such tasks. The issue with cron is that the job and its execution time have to be written explicitly and before the actual execution, making your architecture highly static and not event-driven. Mainly suitable for a well-defined and known set of tasks that either way have to take place, not by a user action.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Database&lt;/strong&gt;&lt;br&gt;
A database can be a good and simple choice for a task storing place, and actually used for that in the early days of a product MVP,&lt;br&gt;
but there are multiple issues with that approach, for example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Ordering of insertion is not guaranteed, and therefore the tasks handling might not take place in the order they actually got created.&lt;/li&gt;
&lt;li&gt;Double processing can happen as the nature of a database is not to delete a record once read, so there is a potential of double reading and processing a specific task, and the results of that can be catastrophic to a system’s behavior.&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Traditional queues
&lt;/h2&gt;

&lt;p&gt;Often, for task scheduling, the chosen queue would probably be a pub/sub system like RabbitMQ.&lt;/p&gt;

&lt;p&gt;Choosing RabbitMQ over a classic broker such as Kafka, for example, in the context of task scheduling does make sense as a more suitable tool for that type of task given the natural behavior of Kafka to retain records (or tasks) till a specific point in time, no matter if acknowledged or not.&lt;/p&gt;

&lt;p&gt;The downside in choosing RabbitMQ would be the lack of scale, robustness, and performance, which in time become increasingly needed.&lt;/p&gt;

&lt;p&gt;With that idea in mind, Memphis is a broker that presents scale, robustness, and high throughput alongside a type of retention that fully enables task scheduling over a message broker.&lt;/p&gt;




&lt;h2&gt;
  
  
  Memphis Broker is a perfect queue for task scheduling
&lt;/h2&gt;

&lt;p&gt;On v1.2, Memphis released its support for ACK-based retention through Memphis Cloud. Read more &lt;a href="https://docs.memphis.dev/memphis/memphis-broker/concepts/station#retention"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Messages will be removed from a station only when &lt;strong&gt;acknowledged by all&lt;/strong&gt; the connected consumer groups. For example:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;If we have only one connected consumer group when a message/record is acknowledged, it will be automatically removed from the station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If we have two connected consumer groups, the message will be removed from the station (=queue) once all CGs acknowledge the message.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We mentioned earlier the advantages and disadvantages of using traditional queues such as RabbitMQ in comparison to common brokers such as Kafka in the context of task scheduling. When comparing both tools to Memphis, it’s all about getting the best from both worlds.&lt;/p&gt;

&lt;p&gt;A few of Memphis.dev advantages –&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Ordering&lt;/li&gt;
&lt;li&gt;Exactly-once delivery guarantee&lt;/li&gt;
&lt;li&gt;Highly scalable, serving data in high throughput with low 4. latency&lt;/li&gt;
&lt;li&gt;Ack-based retention&lt;/li&gt;
&lt;li&gt;Many-to-Many pattern&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Getting started with Memphis Broker as a tasks queue
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;a href="https://cloud.memphis.dev/"&gt;Sign up&lt;/a&gt; to Memphis Cloud.&lt;/li&gt;
&lt;li&gt;Connect your task producer –&lt;/li&gt;
&lt;li&gt;Producers are the entities that insert new records or tasks.&lt;/li&gt;
&lt;li&gt;Consumers are the entities who read and process them.&lt;/li&gt;
&lt;li&gt;A single client with a single connection object can act as both at the same time, meaning be both a producer and a consumer. Not to the same station because it will lead to an infinite loop. It’s doable, but not making much sense.
That pattern is more to reduce footprint and needed “workers” so a single worker can produce tasks to a specific station, but can also act as a consumer or a processor to another station of a different use case.
The below code example will create an Ack-based station and initiate a producer in node.js –
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { memphis } = require("memphis-dev");

(async function () {
  let memphisConnection;

  try {
    memphisConnection = await memphis.connect({
      host: "MEMPHIS_BROKER_HOSTNAME",
      username: "CLIENT_TYPE_USERNAME",
      password: "PASSWORD",
      accountId: ACCOUNT_ID
    });

    const station = await memphis.station({
      name: 'tasks',
      retentionType: memphis.retentionTypes.ACK_BASED,
    })

    const producer = await memphisConnection.producer({
      stationName: "tasks",
      producerName: "producer-1",
    });

    const headers = memphis.headers();
    headers.add("Some_KEY", "Some_VALUE");
    await producer.produce({
      message: {taskID: 123, task: "deploy a new instance"}, // you can also send JS object - {}
      headers: headers,
    });

    memphisConnection.close();
  } catch (ex) {
    console.log(ex);
    if (memphisConnection) memphisConnection.close();
  }
})();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Connect your task consumer –&lt;/li&gt;
&lt;li&gt;The below consumer group will consume tasks, process them, and, once finished – acknowledge them.
By acknowledging the tasks, the broker will make sure to remove those records to ensure exactly-once processing. We are using the station entity here as well in case the consumer starts before the producer.
No need to worry. It is applied if the station does not exist yet.Another thing to remember is that a consumer group can contain multiple consumers to increase parallelism and read-throughput. Within each consumer group, only a single consumer will read and ack the specific message, not all the contained consumers. In case that pattern is needed, then multiple consumer groups are needed.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { memphis } = require("memphis-dev");

(async function () {
  let memphisConnection;

  try {
    memphisConnection = await memphis.connect({
      host: "MEMPHIS_BROKER_HOSTNAME",
      username: "APPLICATION_TYPE_USERNAME",
      password: "PASSWORD",
      accountId: ACCOUNT_ID
    });

    const station = await memphis.station({
      name: 'tasks',
      retentionType: memphis.retentionTypes.ACK_BASED,
    })

    const consumer = await memphisConnection.consumer({
      stationName: "tasks",
      consumerName: "worker1",
      consumerGroup: "cg_workers",
    });

    consumer.setContext({ key: "value" });
    consumer.on("message", (message, context) =&amp;gt; {
      console.log(message.getData().toString());
      message.ack();
      const headers = message.getHeaders();
    });

    consumer.on("error", (error) =&amp;gt; {});
  } catch (ex) {
    console.log(ex);
    if (memphisConnection) memphisConnection.close();
  }
})();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;If you liked the tutorial and want to learn what else you can do with Memphis Head &lt;a href="https://docs.memphis.dev/memphis/getting-started/tutorials"&gt;here&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis-broker"&gt;Github&lt;/a&gt;•&lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt;•[Discord (&lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;https://discord.com/invite/DfWFT7fzUu&lt;/a&gt;)&lt;/p&gt;




&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/blog/streaming-first-infrastructure-for-real-time-machine-learning/"&gt;Memphis.dev&lt;/a&gt; By &lt;a href="https://www.linkedin.com/in/shay-bratslavsky/"&gt;Shay Bratslavsky&lt;/a&gt;, Software Engineer at @Memphis.dev&lt;/p&gt;

</description>
      <category>messagebroker</category>
      <category>messagequeue</category>
      <category>tasmanagement</category>
      <category>memphisdev</category>
    </item>
    <item>
      <title>You have a chance to save the world! 🔥</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Mon, 17 Jul 2023 12:11:02 +0000</pubDate>
      <link>https://dev.to/memphis_dev/you-have-a-chance-to-save-the-world-g5e</link>
      <guid>https://dev.to/memphis_dev/you-have-a-chance-to-save-the-world-g5e</guid>
      <description>&lt;p&gt;&lt;a href="https://www.hackathon.memphis.dev/"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--UqRjg2VD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/j3vl99ybzt3gtzvcr1be.png" alt="Savezakar hackathon" width="800" height="240"&gt;&lt;/a&gt;&lt;br&gt;
We are happy to announce Memphis #1 hackathon #SaveZakar!📣📣📣&lt;/p&gt;

&lt;p&gt;Sponsored by &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt;, &lt;a href="https://streamlit.io/"&gt;Streamlit&lt;/a&gt; and &lt;a href="https://supabase.com/"&gt;Supabase&lt;/a&gt; - Join us to save the world from wildfires using real-time data and AI!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://www.hackathon.memphis.dev/"&gt;Sign up&lt;/a&gt; 🔥&lt;/p&gt;




&lt;h2&gt;
  
  
  What the hackathon is all about? 🌎
&lt;/h2&gt;

&lt;p&gt;Wildfires wreck havoc every year. They take human and animal lives. The fires destroy homes and other property. They destroy agricultural and industrial crops and cause famines. They damage the environment, contribute to global warming, and generate smoke that pollutes the air. Their overall impact runs into billions of dollars and includes incalculable harm to people and animals.&lt;/p&gt;

&lt;p&gt;Where and when wildfires will occur is difficult to predict ahead of time. Instead, researchers, federal, state, and municipal governments and international non-profits have invested heavily in early warning systems. If fires can be detected early, firefighters can be deployed to prevent their spread. If people can be notified, they can be evacuated to avoid loss of life.&lt;/p&gt;




&lt;h2&gt;
  
  
  Your mission 🔥
&lt;/h2&gt;

&lt;p&gt;In this hackathon, you are going to create a wildfire early warning system for the fictional island nation of Zakar. Zakar has been struggling with wildfires in the last few years. As a small island nation, fires are particularly problematic. Importing materials is expensive and takes time, so they must do everything they can to protect their homes, farms, and natural resources. Similarly, with a relatively small geographic footprint, smoke can quickly pollute the air, causing health problems for their people.&lt;/p&gt;




&lt;h2&gt;
  
  
  Prizes 🏆
&lt;/h2&gt;

&lt;p&gt;Each project will be judged by the following categories:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Creativity&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Most informative visualization&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Most accurate solution (For the early warning system)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Most interesting architecture&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Most interesting algorithm&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Besides internal glory 😉, the best project will get the perfect gaming package which includes the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;SteelSeries Arctis Nova Pro Wireless Multi-System Gaming Headset.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Logitech G Pro Wireless Gaming Mouse - League of Legends Edition.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;RARE Framed Nintendo Game Boy Color GBC Disassembled.&lt;br&gt;
Tons of swag from Memphis.dev and Streamlit!&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The 2nd best project will receive -&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Logitech G Pro Wireless Gaming Mouse - League of Legends Edition.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Tons of swag from Memphis.dev and Streamlit!&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Useful information💡
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The hackathon week will occur from July 31 - August 7th.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;There are two types of potential submitted projects: early warning system and data visualization.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The submission deadline is Monday, August 7, 2023&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Join our &lt;a href="https://discord.gg/q37A5ZF4yH"&gt;Discord&lt;/a&gt; channel to get full support.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The winners will be announced on August 21, 2023&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  &lt;a href="https://www.hackathon.memphis.dev/"&gt;Sign up&lt;/a&gt; 🔥
&lt;/h2&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

</description>
      <category>datahackathon</category>
      <category>data</category>
      <category>streamlit</category>
      <category>supabase</category>
    </item>
    <item>
      <title>Introducing Memphis.dev Cloud: Empowering Developers with the Next Generation of Streaming</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Mon, 03 Jul 2023 13:30:37 +0000</pubDate>
      <link>https://dev.to/memphis_dev/introducing-memphisdev-cloud-empowering-developers-with-the-next-generation-of-streaming-50kp</link>
      <guid>https://dev.to/memphis_dev/introducing-memphisdev-cloud-empowering-developers-with-the-next-generation-of-streaming-50kp</guid>
      <description>&lt;p&gt;Event processing innovator Memphis.dev today introduced Memphis Cloud to enable a full serverless experience for massive scale event streaming and processing, and announced it had secured $5.58 million in seed funding co-led by Angular Ventures and boldstart ventures, with participation from JFrog co-founder and CTO Fred Simon, Snyk co-founder Guy Podjarny, CircleCI CEO Jim Rose, Console.dev co-founder David Mytton, and Priceline CTO Martin Brodbeck.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introducing Memphis.dev Cloud
&lt;/h2&gt;

&lt;p&gt;Memphis.dev, the next-generation event streaming platform, is ready to make waves in the world and disrupt data streaming with its highly anticipated cloud service launch.&lt;br&gt;
With a firm commitment to providing developers and data engineers with a powerful and unified streaming engine, Memphis.dev aims to revolutionize the way software is utilizing a message broker. In this blog post, we delve into the key features and benefits of Memphis.dev’s cloud service, highlighting how it empowers organizations and developers to unleash the full potential of their data.&lt;/p&gt;




&lt;h2&gt;
  
  
  What to expect?
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;1. The Serverless Experience&lt;/strong&gt;&lt;br&gt;
Memphis’ platform was intentionally designed to be deployed in minutes, on any Kubernetes, on any cloud, both on-prem, public cloud, or even in air-gapped environments.&lt;br&gt;
In advance of the rising multi-cloud architecture, Memphis enables streamlining development between the local dev station all the way to the production, both on-prem and through various clouds, and to reduce TCO and overall complexity – the serverless cloud will enable an even faster onboarding and time-to-value.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Enable “Day 2” operations&lt;/strong&gt;&lt;br&gt;
Message brokers need to evolve to handle the vast amount and complexity of events that occur, and they need to incorporate three critical elements: reliable; ease to manage and scale; and, offer what we call the “Day 2” operations on top to help build queue-based, stream-driven applications in minutes.&lt;/p&gt;

&lt;p&gt;To support both the small and the massive scale and workloads Memphis is built for, some key features were only enabled to be delivered via the cloud.&lt;/p&gt;

&lt;p&gt;Key features in Memphis Cloud include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Augmenting Kafka clusters – providing the missing piece in modern stream processing with the ability to augment Kafka clusters;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Schemaverse – Enabling built-in schema management, enforcement, and transformation to ensure data quality as our data gets complicated and branched;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Multi-tenancy – Offering the perfect solution for users of SaaS platforms who want to isolate traffic between their customers;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;True multi-cloud – creating primary instances on GCP, and a replica on AWS.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;3. A Developer-Centric Approach (and obsession&lt;/strong&gt;&lt;br&gt;
Memphis.dev’s cloud service launch is driven by a developer-centric philosophy, recognizing that developers are the driving force behind technological innovation. With a deep understanding of developers’ and data engineers’ needs, especially in the current era of much more complicated pipelines with much fewer hands, Memphis.dev has created a comprehensive suite of out-of-the-box tools and features tailored specifically to enhance productivity, streamline workflows, and facilitate collaboration. By prioritizing the developer experience, Memphis.dev aims to empower developers to focus on what they do best: writing exceptional code, and extracting value out of their data!&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;4. No code changes. Open-source to Cloud.&lt;/strong&gt;&lt;br&gt;
Fully aligned development experience between the open-source and the cloud. No code changes are needed, nor an application config modification.&lt;br&gt;
The cloud does reveal an additional parameter to add and is not mandatory, which is an account id.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;5. Enhanced Security and Compliance&lt;/strong&gt;:&lt;br&gt;
Memphis.dev prioritizes the security and compliance of its cloud service, recognizing the critical importance of protecting sensitive data. With robust security measures, including data encryption, role-based identity and access management, integration with 3rd party identity managers, and regular security audits, Memphis.dev ensures that developers’ applications and data are safeguarded. By adhering to industry-standard compliance frameworks, Memphis.dev helps developers meet regulatory requirements and build applications with confidence.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;6. Support and Success&lt;/strong&gt;&lt;br&gt;
The core support and customer service ethos of Memphis.dev is customer success and enablement. A successful customer is a happy customer, and we are working hard to support our customers not just with Memphis, but with their bigger picture and data journey. Three global customer support teams, spread across three different timezones alongside highly experienced data engineers and data architects that are positioned as Customer Success Engineers and willing to dive into the internals of our customers and help them achieve their goals.&lt;/p&gt;




&lt;p&gt;“Cluster setup, fault tolerance, high availability, data replication, performance tuning, multitenancy, security, monitoring, and troubleshooting all are headaches everyone who has deployed traditional message broker platforms is familiar with,” said Torsten Volk, senior analyst, EMA. “Memphis however is incredibly simple so that I had my first Python app sending and receiving messages in less than 15 minutes.”    &lt;/p&gt;

&lt;p&gt;“The world is asynchronous and built out of events. Message brokers are the engine behind their flow in the modern software architecture, and when we looked at the bigger picture and the role message brokers play, we immediately understood that the modern message broker should be much more intelligent and by far with much less friction,” said Yaniv Ben Hemo, co-founder and CEO, Memphis. “With that idea, we built Memphis.dev which takes five minutes on average for a user to get to production and start building queue-based applications and distributed streaming pipelines.”&lt;/p&gt;




&lt;p&gt;&lt;a href="https://share.hsforms.com/1lBALaPyfSRS3FLLLy_Hsfgcqtej"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at Memphis.dev By Yaniv Ben Hemo, Co-Founder &amp;amp; CEO at &lt;a href="https://memphis.dev/blog/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Part 4: Validating CDC Messages with Schemaverse</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Thu, 22 Jun 2023 10:58:53 +0000</pubDate>
      <link>https://dev.to/memphis_dev/part-4-validating-cdc-messages-with-schemaverse-2cfk</link>
      <guid>https://dev.to/memphis_dev/part-4-validating-cdc-messages-with-schemaverse-2cfk</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part four of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In the previous two blog posts (&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;part 2&lt;/a&gt; and &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;), we described how to implement a change data capture (CDC) pipeline for &lt;a href="https://www.mongodb.com/"&gt;MongoDB&lt;/a&gt; using &lt;a href="https://debezium.io/documentation/reference/stable/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; and &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt;.&lt;/p&gt;




&lt;h2&gt;
  
  
  Schema on Write, Schema on Read
&lt;/h2&gt;

&lt;p&gt;With relational databases, schemas are defined before any data are ingested.  Only data that conforms to the schema can be inserted into the database.  This is known as “schema on write.”  This pattern ensures data integrity but can limit flexibility and the ability to evolve a system.  &lt;/p&gt;

&lt;p&gt;Predefined schemas are optional in NoSQL databases like MongoDB.  MongoDB models collections of objects.  In the most extreme case, collections can contain completely different types of objects such as cats, tanks, and books.  More commonly, fields may only be present on a subset of objects or the value types may vary from one object to another.  This flexibility makes it easier to evolve schemas over time and efficiently support objects with many optional fields.&lt;/p&gt;

&lt;p&gt;Schema flexibility puts more onus on applications that read the data.  Clients need to check for any desired field and confirm their data types.  This pattern is called "schema on read."&lt;/p&gt;




&lt;h2&gt;
  
  
  Malformed Records Cause Crashes
&lt;/h2&gt;

&lt;p&gt;In one of my positions earlier in my career, I worked on a team that developed and maintained data pipelines for an online ad recommendation system.  One of the most common sources of downtime were malformed records.  Pipeline code can fail if a field is missing, an unexpected value is encountered, or when trying to parse badly-formatted data.  If the pipeline isn't developed with errors in mind (e.g., using &lt;a href="https://en.wikipedia.org/wiki/Defensive_programming"&gt;defensive programming techniques&lt;/a&gt;, explicitly-defined data models, and validating data), the entire pipeline may crash and require manual intervention by an operator.&lt;/p&gt;

&lt;p&gt;Unfortunately, malformed data, especially when handling large volumes of data, is a frequent occurrence.  Simply hoping for the best won't lead to resilient pipelines.  As the saying goes, "Hope for the best. Plan for the worst."&lt;/p&gt;




&lt;h2&gt;
  
  
  The Best of Both Worlds: Data Validation with Schemaverse
&lt;/h2&gt;

&lt;p&gt;Fortunately, Memphis.dev has an awesome feature called Schemaverse.  Schemaverse provides a mechanism to check messages for compliance with a specified schema and handle non-confirming messages.&lt;/p&gt;

&lt;p&gt;To use Schemaverse, the operator needs to first define a schema.  Messaged schemas can be defined using JSON Schema, Google Protocol Buffers, or GraphQL.  The operator will choose the schema definition language appropriate to the format of the message payloads.&lt;/p&gt;

&lt;p&gt;Once a schema is defined, the operator can "attach" the schema to a station.  The schema will be downloaded by clients using the Memphis.dev client SDKs.  The client SDK will validate each message before sending it to the Memphis broker.  If a message doesn't validate, the client will redirect the message to the dead-letter queue, trigger a notification, and raise an exception to notify the user of the  client. &lt;/p&gt;

&lt;p&gt;In this example, we'll look at using Schemaverse to validate change data capture (CDC) events from MongoDB.&lt;/p&gt;




&lt;h2&gt;
  
  
  Review of the Solution
&lt;/h2&gt;

&lt;p&gt;In our &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;previous post&lt;/a&gt;, we described a change data capture (CDC) pipeline for a collection of todo items stored in MongoDB.  Our solution consists of eight components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds.  Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;MongoDB&lt;/strong&gt;: Configured with a single database containing a single collection (todo_items).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debezium Server&lt;/strong&gt;: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Printing Consumer&lt;/strong&gt;: A script that uses the &lt;a href="https://github.com/memphisdev/memphis.py"&gt;Memphis.dev Python SDK&lt;/a&gt; to consume messages and print them to the console.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Transformer Service&lt;/strong&gt;: A &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cleaned Printing Consumer&lt;/strong&gt;: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--i4llCJng--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5xr8j1rklgc20bi14mb0.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--i4llCJng--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5xr8j1rklgc20bi14mb0.jpg" alt="dataflow diagram" width="800" height="333"&gt;&lt;/a&gt;&lt;br&gt;
In this iteration, we aren't adding or removing any of the components.  Rather, we're just going to change Memphis.dev's configuration to perform schema validation on messages sent to the "cleaned-todo-cdc-events" station.&lt;/p&gt;


&lt;h2&gt;
  
  
  Schema for Todo Change Data Capture (CDC) Events
&lt;/h2&gt;

&lt;p&gt;In &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;, we transformed the messages to hydrate a serialized JSON subdocument to produce fully deserialized JSON messages.  The resulting message looked like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each JSON-encoded message has two top-level fields, "schema" and "payload."  We are concerned with the "payload" field.  The payload object has two required fields, "before" and "after", that we are concerned with.  The before field contains a copy of the record before being modified (or null if it didn't exist), while the after field contains a copy of the record after being modified (or null if the record is being deleted).&lt;/p&gt;

&lt;p&gt;From this example, we can define criteria that messages must satisfy to be considered valid.  Let's write the criteria out as a set of rules:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The payload/before field may contain a todo object or null.&lt;/li&gt;
&lt;li&gt;The payload/after field may contain a todo object or null.&lt;/li&gt;
&lt;li&gt;A todo object must have five fields ("_id", "creation_timestamp", "due_date", "description", and "completed").&lt;/li&gt;
&lt;li&gt;The creation_timestamp must be an object with a single field ("$date").  The "$date" field must have a positive integer value (Unix timestamp).&lt;/li&gt;
&lt;li&gt;The due_date must be an object with a single field ("$date").  The "$date" field must have a positive integer value (Unix timestamp).&lt;/li&gt;
&lt;li&gt;The description field should have a string value.  Nulls are not allowed.&lt;/li&gt;
&lt;li&gt;The completed field should have a boolean value.  Nulls are not allowed.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For this project, we'll define the schema using &lt;a href="https://json-schema.org/"&gt;JSON Schema&lt;/a&gt;. JSON Schema is a very powerful data modeling language.  It supports defining required fields, field types (e.g., integers, strings, etc.), whether  fields are nullable, field formats (e.g., date / times, email addresses), and field constraints (e.g., minimum or maximum values).  Objects can be defined and referenced by name, allowing recursive schema and for definitions to be reused.  Schema can be further combined using and, or, any, and not operators.  As one might expect, this expressiveness comes with a cost: the JSON Schema definition language is complex, and unfortunately, covering it is beyond the scope of this tutorial.&lt;/p&gt;




&lt;h2&gt;
  
  
  Creating a Schema and Attaching it to a Station
&lt;/h2&gt;

&lt;p&gt;Let's walk through the process of creating a schema and attaching it to a station.  You'll first need to complete the first 10 steps from &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;part 2&lt;/a&gt; and &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 11: Navigate to the Schemaverse Tab&lt;/strong&gt;&lt;br&gt;
Navigate to the Memphis UI in your browser.  For example, you might be able to find it at &lt;a href="https://localhost:9000/"&gt;https://localhost:9000/&lt;/a&gt; .  Once you are signed in, navigate to the Schemaverse tab:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--2nruqPkO--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3mrrv389yavvu6salpbv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--2nruqPkO--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3mrrv389yavvu6salpbv.png" alt="Image description" width="512" height="241"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 12: Create the Schema&lt;/strong&gt;&lt;br&gt;
Click the "Create from blank" button to create a new schema.  Set the schema name to "todo-cdc-schema" and the schema type to "JSON schema."  Paste the following JSON Schema document into the textbox on the right.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://example.com/product.schema.json",
    "type" : "object",
    "properties" : {
        "payload" : {
            "type" : "object",
            "properties" : {
                "before" : {
                    "oneOf" : [{ "type" : "null" }, { "$ref" : "#/$defs/todoItem" }]
                },
                "after" : {
                    "oneOf" : [{ "type" : "null" }, { "$ref" : "#/$defs/todoItem" }]
                }
            },
            "required" : ["before", "after"]
        }
    },
    "required" : ["payload"],
   "$defs" : {
      "todoItem" : {
          "title": "TodoItem",
          "description": "An item in a todo checklist",
          "type" : "object",
          "properties" : {
              "_id" : {
                  "type" : "object",
                  "properties" : {
                      "$oid" : {
                          "type" : "string"
                      }
                  }
              },
              "description" : {
                  "type" : "string"
              },
              "creation_timestamp" : {
                  "type" : "object",
                  "properties" : {
                      "$date" : {
                          "type" : "integer"
                      }
                  }
              },
              "due_date" : {
                    "anyOf" : [
                        {
                            "type" : "object",
                            "properties" : {
                                "$date" : {
                                    "type" : "integer"
                                }
                            }
                        },
                        {
                            "type" : "null"
                        }
                    ]
              },
              "completed" : {
                  "type" : "boolean"
              }
          },
          "required" : ["_id", "description", "creation_timestamp", "completed"]
      }
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When done, your window should look like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--VU3PPqk2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0b81bwyzcme5wviv2rkb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--VU3PPqk2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0b81bwyzcme5wviv2rkb.png" alt="schemaverse" width="800" height="520"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When done, click the "Create schema" button. Once the schema has been created, you'll be returned to the Schemaverse tab.  You should see an entry for the newly created schema like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4ATDAiU3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/8cdem09g467p976jr19r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4ATDAiU3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/8cdem09g467p976jr19r.png" alt="Image description" width="800" height="227"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 13: Attach the Schema to the Station&lt;/strong&gt;&lt;br&gt;
Once the schema is created, we want to attach the schema to the "cleaned-todo-cdc-events" station. Double-click on the "todo-cdc-schema" window to bring up its details window like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--7y_Pg5Ku--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vhihxzs4hbolg874szvn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--7y_Pg5Ku--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vhihxzs4hbolg874szvn.png" alt="todo cdc schema" width="800" height="729"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Next, click on the "+ Attach to Station" button.  This will bring up the following window:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--awNMOoLn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fitl5kqwgrql1jv3tehl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--awNMOoLn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fitl5kqwgrql1jv3tehl.png" alt="enforce schema" width="800" height="1268"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Select the "cleaned-todo-cdc-events" station, and click "Attach Selected."  The producers attached to the station will automatically download the schema and begin validating outgoing messages within a few minutes.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 14: Confirm that Messages are Being Filtered&lt;/strong&gt;&lt;br&gt;
Navigate to the station overview page for the "cleaned-todo-cdc-events" station.  After a couple of minutes, you should see a red warning notification icon next to the "Dead-letter" tab name.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KNbaOmRf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3yttzmt80x63flcxdu0y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KNbaOmRf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3yttzmt80x63flcxdu0y.png" alt="Image description" width="800" height="494"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If you click on the "Dead-letter" tab and then the "Schema violation" subtab, you'll see the messages that failed the schema validation.  These messages have been re-routed to the dead letter queue so that they don't cause bugs in the downstream pipelines.  The window will look like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Eo98aEtk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/cs57i1x337o7u4h53owv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Eo98aEtk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/cs57i1x337o7u4h53owv.png" alt="Image description" width="800" height="836"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations!  You're now using Schemaverse to validate messages.  This is one small but incredibly impactful step towards making your pipeline more reliable.&lt;/p&gt;




&lt;p&gt;In case you missed parts 1,2 and 3:&lt;br&gt;
&lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Part 3: Transforming MongoDB CDC Event Messages&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By RJ Nowling, Developer advocate at &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>cdc</category>
      <category>schemaverse</category>
      <category>data</category>
    </item>
    <item>
      <title>Part 3: Transforming MongoDB CDC Event Messages</title>
      <dc:creator>avital trifsik</dc:creator>
      <pubDate>Tue, 06 Jun 2023 10:26:31 +0000</pubDate>
      <link>https://dev.to/memphis_dev/part-3-transforming-mongodb-cdc-event-messages-a8p</link>
      <guid>https://dev.to/memphis_dev/part-3-transforming-mongodb-cdc-event-messages-a8p</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part three of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In our &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;last blog post&lt;/a&gt;, we introduced a reference implementation for capturing change data capture (CDC) events from a &lt;a href="https://www.mongodb.com/"&gt;MongoDB&lt;/a&gt; database using &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; and &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt;.  At the end of the post we noted that MongoDB records are serialized as strings in Debezium CDC messages like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "schema" : ...,

"payload" : {
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",

...
}
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We want to use the &lt;a href="https://docs.memphis.dev/memphis/memphis/schemaverse-schema-management"&gt;Schemaverse&lt;/a&gt; functionality of Memphis.dev to check messages against an expected schema.  Messages that don’t match the schema are routed to a dead letter station so that they don’t impact downstream consumers.  If this all sounds like ancient Greek, don’t worry!  We’ll explain the details in our next blog post.&lt;/p&gt;

&lt;p&gt;To use functionality like Schemaverse, we need to deserialize the MongoDB records as JSON documents.  In this blog post, we describe a modification to our MongoDB CDC pipeline that adds a transformer service to deserialize the MongoDB records to JSON documents.&lt;/p&gt;




&lt;h2&gt;
  
  
  Overview of the Solution
&lt;/h2&gt;

&lt;p&gt;The &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;previous solution&lt;/a&gt; consisted of six components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds.  Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;MongoDB&lt;/strong&gt;: Configured with a single database containing a single collection (todo_items).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debezium Server&lt;/strong&gt;: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Printing Consumer&lt;/strong&gt;: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--AghDMpS0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gzsz7y5tf54tqpckoqiq.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--AghDMpS0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gzsz7y5tf54tqpckoqiq.jpg" alt="mongocdcd example" width="800" height="255"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this iteration, we are adding two additional components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Transformer Service&lt;/strong&gt;: A &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cleaned Printing Consumer&lt;/strong&gt;: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our updated architecture looks like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xLcRiKeb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kvrythdmblgiti7ob40m.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xLcRiKeb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kvrythdmblgiti7ob40m.jpg" alt="data flow diagram" width="800" height="333"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  A Deep Dive Into the Transformer Service
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Skeleton of the Message Transformer Service&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Our &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service uses the &lt;a href="https://github.com/memphisdev/memphis.py"&gt;Memphis.dev Python SDK&lt;/a&gt;.  Let’s walk through the transformer implementation.  The main() method of our transformer first connects to the &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev broker&lt;/a&gt;. The connection details are grabbed from environmental variables.  The host, username, password, input station name, and output station name are passed using environmental variables in accordance with suggestions from the &lt;a href="https://12factor.net/config"&gt;Twelve-Factor App manifesto&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once a connection is established, we create consumer and producer objects.  In Memphis.dev, consumers and producers have names.  These names appear in the Memphis.dev UI, offering transparency into the system operations.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The consumer API uses the &lt;a href="https://en.wikipedia.org/wiki/Callback_(computer_programming)"&gt;callback function&lt;/a&gt; design pattern. When messages are pulled from the broker, the provided function is called with a list of messages as its argument.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After setting up the callback, we kick off the asyncio event loop.  At this point, the transformer service pauses and waits until messages are available to pull from the broker.&lt;/p&gt;

&lt;h1&gt;
  
  
  Keep your main thread alive so the consumer will keep receiving data
&lt;/h1&gt;

&lt;p&gt;&lt;code&gt;await asyncio.Event().wait()&lt;/code&gt;  &lt;/p&gt;




&lt;h2&gt;
  
  
  Creating the Message Handler Function
&lt;/h2&gt;

&lt;p&gt;The create function for the message handler takes a producer object and returns a callback function. Since the callback function only takes a single argument, we use the &lt;a href="https://en.wikipedia.org/wiki/Closure_(computer_programming)"&gt;closure pattern&lt;/a&gt; to implicitly pass the producer to the msg_handler function when we create it.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;msg_handler&lt;/code&gt; function is passed three arguments when called: a list of messages, an error (if one occurred), and a context consisting of a dictionary.  Our handler loops over the messages, calls the transform function on each, sends the messages to the second station using the producer, and acknowledges that the message has been processed.  In Memphis.dev, messages are not marked off as delivered until the consumer acknowledges them.  This prevents messages from being dropped if an error occurs during processing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  The Message Transformer Function
&lt;/h2&gt;

&lt;p&gt;Now, we get to the meat of the service: the message transformer function.  Message payloads (returned by the get_data() method) are stored as &lt;a href="https://docs.python.org/3/library/stdtypes.html#bytearray"&gt;bytearray&lt;/a&gt; objects.  We use the Python json library to deserialize the messages into a hierarchy of Python collections (list and dict) and primitive types (int, float, str, and None).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We expect the object to have a payload property with an object as the value.  That object then has two properties (“before” and “after”) which are either None or strings containing serialized JSON objects.  We use the JSON library again to deserialize and replace the strings with the objects.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, we reserialize the entire JSON record and convert it back into a bytearray for transmission to the broker.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Hooray! Our objects now look like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Running the Transformer Service
&lt;/h2&gt;

&lt;p&gt;If you followed the 7 steps in the &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;previous blog post&lt;/a&gt;, you only need to run three additional steps.  to start the transformer service and verify that its working:&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 8: Start the Transformer Service
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 9: Start the Second Printing Consumer
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 10: Check the Memphis UI
&lt;/h2&gt;

&lt;p&gt;When the transformer starts producing messages to Memphis.dev, a second station named "cleaned-todo-cdc-events" will be created.  You should see this new station on the Station Overview page in the Memphis.dev UI like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jrjoaM1S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4037i8dcy49e2y3nsljo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jrjoaM1S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4037i8dcy49e2y3nsljo.png" alt="Check memphis ui" width="800" height="227"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The details page for the "cleaned-todo-cdc-events" page should show the transformer attached as a producer, the printing consumer, and the transformed messages:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--2zZSSU0---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/r3xdims35q2t5gdeojo4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--2zZSSU0---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/r3xdims35q2t5gdeojo4.png" alt="Image description" width="800" height="494"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations!  We’re now ready to tackle validating messages using Schemaverse in our next blog post. Subscribe to our newsletter to stay tuned! &lt;/p&gt;

&lt;p&gt;Head over to &lt;a href="https://memphis.dev/blog/part-4-validating-cdc-messages-with-schemaverse/"&gt;Part 4: Validating CDC Messages with Schemaverse&lt;/a&gt; to learn further&lt;/p&gt;




&lt;p&gt;In case you missed parts 1 &amp;amp; 2:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By RJ Nowling, Developer advocate at &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mongodb</category>
      <category>cdc</category>
      <category>memphisdev</category>
      <category>dataprocessing</category>
    </item>
  </channel>
</rss>
