DEV Community

Cover image for **Event-Driven Programming in Python: Pub/Sub, Kafka, Event Sourcing, and Reactive Streams**
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

**Event-Driven Programming in Python: Pub/Sub, Kafka, Event Sourcing, and Reactive Streams**

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

You know how most computer programs work? They do step one, then step two, then step three, in a straight line. You call a function, it runs, and you get a result. But what if your program needs to wait for something to happen—like a user clicking a button, a sensor sending data, or a payment being confirmed? Waiting around ruins the straight line.

That's where event-driven thinking comes in. Instead of a straight line, we build a system that sits and listens. When something happens—an "event"—the system reacts. It's like setting up dominoes. You don't push each one individually; you set them up so that when the first one falls, it triggers a whole chain reaction.

In my work, I've found this approach is perfect for modern software: websites that update in real time, apps that process data from many sources, or services that need to stay responsive no matter what. Python, with its clear syntax and powerful libraries, is a great tool for building these reactive systems. Let me show you some of the most effective ways I've learned to do it.

First, let's talk about the simplest idea: the Publisher and the Subscriber. Imagine a newsletter. You (the publisher) write an issue and send it out. You don't know who exactly will read it. People who signed up (the subscribers) get it in their inbox. They don't know when you'll write it; they just wait for it.

In code, this pattern helps separate parts of your application. One piece of code (the publisher) announces that something happened, like "user_logged_in." Other pieces of code (subscribers) that care about that event can do their own work, like "update_last_seen_timestamp" or "send_welcome_email." The publisher doesn't wait for the subscribers to finish. It just shouts the news and moves on.

Here’s a basic way to build this in pure Python. We'll create a central message broker that keeps track of who is listening to what.

class EventBroker:
    def __init__(self):
        # A dictionary to store lists of subscriber functions for each event type
        self._subscribers = {}

    def subscribe(self, event_type, handler_function):
        """Tell the broker: 'When this event happens, call this function.'"""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler_function)
        print(f"Handler for '{event_type}' subscribed.")

    def unsubscribe(self, event_type, handler_function):
        """Tell the broker: 'Stop calling this function for that event.'"""
        if event_type in self._subscribers:
            try:
                self._subscribers[event_type].remove(handler_function)
                print(f"Handler for '{event_type}' removed.")
            except ValueError:
                pass

    def publish(self, event_type, data=None):
        """Announce an event. The broker calls every subscribed function."""
        print(f"\nPublishing event: '{event_type}' with data: {data}")
        if event_type in self._subscribers:
            # Call each handler one after the other
            for handler in self._subscribers[event_type]:
                try:
                    handler(data)  # Execute the subscriber's code
                except Exception as e:
                    print(f"Handler for {event_type} failed: {e}")

# Example Subscriber Functions
def send_invoice(data):
    print(f"  [Invoice System] Creating invoice for order {data['order_id']}.")

def update_inventory(data):
    print(f"  [Inventory System] Reducing stock for item {data['product_id']}.")

def notify_shipping(data):
    print(f"  [Shipping System] Preparing shipment to {data['zip_code']}.")

# Let's use it
broker = EventBroker()

# Services subscribe to the events they care about
broker.subscribe("order.placed", send_invoice)
broker.subscribe("order.placed", update_inventory)
broker.subscribe("order.shipped", notify_shipping)

# A user action triggers an event
broker.publish("order.placed", {"order_id": "12345", "product_id": "ABC99", "zip_code": "90210"})

# Later, another event happens
broker.publish("order.shipped", {"order_id": "12345", "tracking": "ZYX987", "zip_code": "90210"})
Enter fullscreen mode Exit fullscreen mode

When you run this, you'll see the broker receive the "order.placed" event and call both the invoice and inventory functions. They run in order. The shipping function doesn't run yet because it's only listening for "order.shipped." The part of your code that published the event doesn't need to know about invoices or inventory. It just says, "Hey, an order was placed," and continues. This separation is powerful.

However, you might notice a problem. What if update_inventory takes a long time? The send_invoice function has to wait for it to finish because we're calling handlers one by one. In a real system, we often want these tasks to run independently, or "asynchronously." This is where message queues come in.

Think of a message queue like a conveyor belt in a factory. One machine (the publisher) puts items (messages) on the belt. Other machines (the consumers) further down the line pick items off the belt and process them at their own pace. If one machine is slow or breaks, items just wait on the conveyor belt; they don't block the first machine.

For this, we often use a dedicated "message broker" server. Apache Kafka is a popular one, and Python can talk to it easily. A Kafka broker holds "topics" (like named conveyor belts). Producers write messages to topics, and Consumers read from them. The beauty is that many consumers can read from the same topic, and Kafka keeps track of what each one has read.

The code you provided for Kafka is excellent for a robust system. It shows how to reliably send messages, how to process them in batches, and how to handle errors. Let me break down why that structure is useful.

The EventProducer class is careful. It waits for an acknowledgment (acks='all') that the message was safely stored by Kafka. It retries if sending fails. This ensures we don't lose an important event like "payment_received." The EventConsumer works with a group_id. This means if I run two instances of my inventory service with the same group_id, Kafka will automatically split the messages between them. This is how you scale out: add more consumer instances to handle more work.

The consumer also uses enable_auto_commit=False and manually commits after successful processing. This is crucial. It means if my program crashes while handling a message, it will get that same message again when it restarts. This "at-least-once" delivery prevents data loss. I always design my event handlers to be idempotent—meaning processing the same event twice has the same effect as processing it once—just in case.

Sometimes, you need more than just processing a message. You need to keep a complete, immutable history of every change that ever happened to a piece of data. This is called Event Sourcing. It's like the history feature in a Google Doc. You don't just see the final text; you can see every single edit that led to it.

In a traditional system, you might have a users table with a status column. To change a user from "pending" to "active," you UPDATE that column. The old "pending" state is gone. With Event Sourcing, you never update in place. Instead, you store an event: "UserActivated." The current state is rebuilt by playing back all the events for that user: "UserRegistered," "EmailVerified," "UserActivated."

Your event sourcing code example is a great start. The EventStore is an append-only log. The OrderAggregate is a critical concept. An "aggregate" is a cluster of data that changes together. Here, an "Order" is an aggregate. It ensures that business rules (like "you can't ship a cancelled order") are checked before a new event is created.

The real power comes when you need to ask new questions of your data. Say your business wants to know "how often do customers add an item within 5 minutes of creating an order?" In a traditional system, you'd have to scour application logs. With Event Sourcing, you just re-process the stream of "order.created" and "item.added" events. You can build new "projections" of your data anytime without changing the original event log.

Your example shows rebuilding the order state from scratch. For large aggregates with thousands of events, this would be slow. That's why we take "snapshots." Periodically, you save the complete current state (version 142 of the order), and then to rebuild, you load the snapshot (version 142) and only apply events that came after it (143, 144...). Your snapshot hash is a smart way to ensure the snapshot and event history are consistent.

Another fascinating way to handle events is Reactive Programming. It treats data as streams that flow over time. You can filter these streams, combine them, or transform them. It's like Excel formulas for asynchronous events. If cell A1 changes, everything that depends on it updates automatically. The RxPy library brings this concept to Python.

Your trading example is a classic use case. Market data is a continuous stream of prices and volumes. A reactive pipeline lets you declare what you want: "Alert me if the price moves more than 1% and volume is twice the 5-minute average." You build this by connecting operators.

The pipe method is where the magic happens. ops.filter lets only certain data through. ops.group_by splits the main stream into smaller streams per stock symbol. ops.buffer_with_count collects items into small windows (like the last 10 prices). ops.map transforms each buffer into a new value, like a moving average.

The combine_latest operator is especially powerful. It watches multiple streams (moving average, volume spike, price change) and emits a new value anytime any of its input streams emit. This new value is a tuple of the latest from each stream, perfect for making a decision based on several factors at once.

What I like about this approach is its declarative nature. The code describes the relationships and transformations. The library (RxPy) handles the complexity of listening, timing, and memory management. It makes complex asynchronous logic much easier to reason about.

These techniques—pub/sub, message queues, event sourcing, and reactive streams—are tools in a toolbox. You don't usually use all of them in one project. The choice depends on what you need.

Use a simple Event Broker when your system is small and runs on one machine. It's the quickest way to decouple components.

Use a Message Queue (like Kafka) when you need durability (no lost messages), need to scale across multiple servers, or have different services written in different languages that need to communicate.

Use Event Sourcing when the complete audit trail of changes is a business requirement, or when you need the flexibility to derive new views of your data later.

Use Reactive Programming when you are dealing with continuous, real-time data streams and need to perform complex, live transformations and analyses.

The common thread is thinking in events. Instead of asking "what should I do next?" you design your system to ask "what should I do when this happens?" It's a shift in mindset, but it leads to systems that are more flexible, scalable, and resilient. They can handle unexpected load, they can be understood in pieces, and they can evolve over time as you add new reactions to old events. In my experience, that's a foundation you can build on for a long time.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)