DEV Community

Cover image for # Build Real-Time Data Pipelines With Python, Faust, and Apache Kafka: 8 Proven Techniques
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

# Build Real-Time Data Pipelines With Python, Faust, and Apache Kafka: 8 Proven Techniques

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

I remember the first time I tried to handle live data. A stream of clicks from our website was piling up in a log file, and by the time I processed it, the day was over. Real-time streaming changed everything. Instead of waiting, you act the moment an event happens. In Python, Faust and Apache Kafka give you the power to build these pipelines. I’ll walk you through eight techniques that turned my messy data into a fast, reliable stream processor. Each technique is a tool you can pick up and use right now.

1. Defining Stream Processors with Faust

The heart of your pipeline is an agent – a function that runs forever, listening to a Kafka topic. Every message triggers it. Faust calls these agents, and they run as async functions inside a worker app. You tell the app which Kafka broker to talk to, how to serialize your data, and where to keep local state.

import faust

app = faust.App(
    'streaming-pipeline',
    broker='kafka://localhost:9092',
    value_serializer='json',
    store='rocksdb://',
)

class ClickEvent(faust.Record):
    user_id: str
    page: str
    timestamp: float

click_topic = app.topic('clicks', value_type=ClickEvent)

@app.agent(click_topic)
async def process_clicks(clicks):
    async for click in clicks:
        # Process each click event
        print(f"User {click.user_id} visited {click.page}")
        # Enrich and forward to another topic
        enriched = {'user': click.user_id, 'url': click.page}
        await enrich_topic.send(value=enriched)

# Run: faust -A main worker -l info
Enter fullscreen mode Exit fullscreen mode

I used to think I needed a separate framework for every minor change. With Faust, the agent loops over each incoming message. You write normal Python code inside the loop. The worker automatically commits offsets after each successful iteration, so if the process crashes, it restarts from where it left off. The async nature means one worker can handle thousands of messages per second without blocking.

2. Stateful Stream Processing

Sometimes you need to remember things. How many times has this user clicked? What was the last page they saw? Faust gives you a Table – a key-value store backed by RocksDB. The table lives on disk, so even if your worker restarts, the counts survive.

# Count page views per user
page_views = app.Table('page_views', default=int)

@app.agent(click_topic)
async def count_views(clicks):
    async for click in clicks:
        page_views[click.user_id] += 1
        current = page_views[click.user_id]
        if current % 100 == 0:
            print(f"User {click.user_id} has {current} views")
Enter fullscreen mode Exit fullscreen mode

I added a table to my click pipeline to track users hitting 100, 200 views – it felt like magic. You can share tables across agents. A second agent can join clicks with user profiles stored in another table:

user_profiles = app.Table('user_profiles', default=dict, partitions=8)

@app.agent(profile_topic)
async def update_profiles(profiles):
    async for profile in profiles:
        user_profiles[profile['user_id']] = profile

@app.agent(click_topic)
async def join_clicks(clicks):
    async for click in clicks:
        profile = user_profiles.get(click.user_id, {})
        # Combine click with profile data
        print(f"{profile.get('name')} clicked {click.page}")
Enter fullscreen mode Exit fullscreen mode

The table partitions data so it stays close to the agent processing that partition. This keeps joins fast.

3. Windowed Aggregations

Counting total clicks is fine, but often you need time windows – how many clicks per minute? Per hour? Faust supports tumbling windows (non-overlapping) and hopping windows (overlapping).

from datetime import timedelta

# Tumbling window (non-overlapping)
windowed_views = app.Table(
    'windowed_views',
    default=int,
    window=app.tumbling(timedelta(minutes=1)),
)

@app.agent(click_topic)
async def window_count(clicks):
    async for click in clicks:
        windowed_views[click.page] += 1
Enter fullscreen mode Exit fullscreen mode

I used a hopping window for rolling averages on sensor data. Each window covers five minutes but slides every minute – you get smooth, overlapping snapshots.

rolling_avg = app.Table(
    'rolling_avg',
    default=float,
    window=app.hopping(timedelta(minutes=5), timedelta(minutes=1)),
)

@app.agent(metric_topic)
async def compute_rolling(metrics):
    async for m in metrics:
        current = rolling_avg[m['sensor']]
        print(f"Sensor {m['sensor']} window value: {current}")
Enter fullscreen mode Exit fullscreen mode

Internally, Faust cleans up old windows automatically. You set the retention, and it handles the rest.

4. Stream-Stream Joins

Sometimes you need to match two event streams. For example, when an order arrives, you want to check if a confirmation comes within an hour. Faust lets you join two topics using a table or a time-based join.

orders_topic = app.topic('orders', value_type=Order)
confirmations_topic = app.topic('confirmations', value_type=Confirmation)

# Using a table to store confirmations
confirmations_table = app.Table('confirmations', default=dict, window=app.tumbling(timedelta(hours=1)))

@app.agent(confirmations_topic)
async def store_confirmations(confs):
    async for conf in confs:
        confirmations_table[conf.order_id] = conf

@app.agent(orders_topic)
async def match_orders(orders):
    async for order in orders:
        conf = confirmations_table.get(order.order_id)
        if conf:
            print(f"Order {order.order_id} confirmed at {conf.timestamp}")
Enter fullscreen mode Exit fullscreen mode

The window on the confirmation table means old confirmations expire. You can also use the built-in join operator that matches within a time range:

@app.agent(orders_topic)
async def join_within_window(orders):
    await orders.join(
        confirmations_topic,
        within=timedelta(hours=1),
        on_match=lambda order, conf: print(f"Matched {order.id}"),
        on_mismatch=lambda order: print(f"Unmatched {order.id}"),
    )
Enter fullscreen mode Exit fullscreen mode

I use this pattern to detect abandoned carts – orders without a matching payment within ten minutes.

5. Error Handling and Dead Letter Queues

Bad data happens. A missing field, a malformed JSON, a temporary database hiccup. If your agent crashes, the entire pipeline can stall. Faust gives you two strategies: continue or stop.

@app.agent(click_topic, on_error='continue')
async def resilient_processing(clicks):
    async for click in clicks:
        try:
            if click.page is None:
                raise ValueError("Missing page")
            print(f"OK: {click.user_id}")
        except Exception as e:
            # Send to dead letter topic
            await dead_letter_topic.send(
                value={'error': str(e), 'data': click}
            )
Enter fullscreen mode Exit fullscreen mode

I set on_error to 'continue' for most agents. The worker skips the bad message and logs it. To avoid losing data, I route those messages to a separate Kafka topic (dead letter queue). Later, I can reprocess them after fixing the issue.

For agents where data integrity matters more than throughput, you can set on_error='stop'. The worker pauses until you fix the problem manually. Combine it with retries:

@app.agent(click_topic, on_error='stop')
async def strict_processing(clicks):
    async for click in clicks:
        await process_with_retries(click)

async def process_with_retries(click, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await risky_operation(click)
        except:
            await asyncio.sleep(0.5 * attempt)
    raise
Enter fullscreen mode Exit fullscreen mode

The dead letter queue and retry patterns saved me countless hours of debugging.

6. Watermarks and Event Time Processing

Messages often arrive late. A user’s click might be timestamped five minutes ago but reaches the broker only now. If you process based on processing time, you count that click in the current window, not the window it belongs to. Faust supports event time: you tell it which field holds the original timestamp, and it uses watermarks to handle lateness.

@app.agent(click_topic, processing_time=0.1)
async def event_time_aggregation(clicks):
    async for click in clicks:
        watermarked = app.Table(
            'watermarked',
            default=int,
            window=app.tumbling(timedelta(minutes=5)),
            expire=timedelta(minutes=10),  # allow 10 min lateness
        )
        watermarked[click.page] += 1
Enter fullscreen mode Exit fullscreen mode

You set the expire parameter to how much late data you want to accept. The watermark advances as you see timestamps, and windows close only after the watermark passes their end. I use this for IoT sensors that sometimes buffer data offline. Without event time, those delayed readings would distort the hourly average.

7. Partition Rebalancing and Scaling

As your traffic grows, you add more worker instances. Kafka partitions get redistributed among workers. Faust handles this automatically, but you might need custom logic for hot keys. You can write your own assignor.

class CustomAssignor(faust.assignors.Assignor):
    def assign(self, cluster):
        workers = list(cluster.workers)
        partitions = list(cluster.partitions)
        assignments = {}
        for i, p in enumerate(partitions):
            worker = workers[i % len(workers)]
            assignments.setdefault(worker, []).append(p)
        return assignments

app.conf.assignor = CustomAssignor()
Enter fullscreen mode Exit fullscreen mode

I once had a single user generating more events than the rest of the users combined. The default round‑robin assigned that user’s partition to one worker, which overloaded it. With a custom assignor, I spread partitions based on estimated load.

Scaling is simple: start more workers. Each worker registers with the same consumer group. Faust rebalances seamlessly.

faust -A main worker -l info --web-port=6066
Enter fullscreen mode Exit fullscreen mode

8. Testing Streaming Pipelines

Testing a real‑time pipeline without a Kafka cluster used to be a pain. Faust provides a test harness that simulates everything.

import pytest
from faust.testing import FaustAppTester

@pytest.fixture
def app_test():
    tester = FaustAppTester(app)
    yield tester
    tester.stop()

@pytest.mark.asyncio
async def test_click_counting(app_test):
    await click_topic.send(value=ClickEvent('user1', '/home', 100.0))
    await click_topic.send(value=ClickEvent('user1', '/about', 200.0))
    await app_test.worker_loop()
    assert page_views['user1'] == 2

@pytest.mark.asyncio
async def test_windowed_aggregation(app_test):
    now = datetime.now().timestamp()
    for i in range(10):
        await click_topic.send(value=ClickEvent('u2', '/blog', now + i))
    await app_test.worker_loop()
    window = windowed_views['/blog']
    assert window > 0
Enter fullscreen mode Exit fullscreen mode

I write these tests before changing any agent logic. They catch regressions fast. The tester injects events directly into the agent and lets you inspect tables after processing.

Deploying to Production

Once everything works locally, you need to run it reliably. Configure replication, monitoring, and graceful shutdown.

app.conf.update(
    broker='kafka://prod-kafka:9092',
    store='rocksdb://',
    topic_replication_factor=3,
    topic_min_insync_replicas=2,
    consumer_auto_offset_reset='earliest',
    table_cleanup_interval=60.0,
    web_port=6066,
    web_host='0.0.0.0',
)

app.metrics.collector = faust.metrics.collectors.PrometheusCollector()

@app.web_route('/health')
async def health(websocket):
    return {'status': 'healthy', 'partitions': app.conf.num_partitions}

@app.on_shutdown
async def cleanup():
    print("Flushing pending operations...")
    await app.producer.flush()
Enter fullscreen mode Exit fullscreen mode

I use systemd to start the worker with ExecStart=/usr/local/bin/faust -A main worker -l info. The web interface on port 6066 shows internal metrics. Prometheus scrapes it. Grafana dashboards give me real-time visibility.

Wrapping Up

These eight techniques turned me from a batch processor into someone who handles millions of events per second. Start with a simple agent, add a table for state, introduce windows for time analytics, join streams to enrich data, handle errors gracefully, respect event time, plan for scaling, and test early. Python and Faust make it all feel natural. You can apply these patterns to clickstream analytics, fraud detection, IoT pipelines, or any live data system. Just pick a topic and start streaming.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)