As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
I remember the first time I tried to handle live data. A stream of clicks from our website was piling up in a log file, and by the time I processed it, the day was over. Real-time streaming changed everything. Instead of waiting, you act the moment an event happens. In Python, Faust and Apache Kafka give you the power to build these pipelines. I’ll walk you through eight techniques that turned my messy data into a fast, reliable stream processor. Each technique is a tool you can pick up and use right now.
1. Defining Stream Processors with Faust
The heart of your pipeline is an agent – a function that runs forever, listening to a Kafka topic. Every message triggers it. Faust calls these agents, and they run as async functions inside a worker app. You tell the app which Kafka broker to talk to, how to serialize your data, and where to keep local state.
import faust
app = faust.App(
'streaming-pipeline',
broker='kafka://localhost:9092',
value_serializer='json',
store='rocksdb://',
)
class ClickEvent(faust.Record):
user_id: str
page: str
timestamp: float
click_topic = app.topic('clicks', value_type=ClickEvent)
@app.agent(click_topic)
async def process_clicks(clicks):
async for click in clicks:
# Process each click event
print(f"User {click.user_id} visited {click.page}")
# Enrich and forward to another topic
enriched = {'user': click.user_id, 'url': click.page}
await enrich_topic.send(value=enriched)
# Run: faust -A main worker -l info
I used to think I needed a separate framework for every minor change. With Faust, the agent loops over each incoming message. You write normal Python code inside the loop. The worker automatically commits offsets after each successful iteration, so if the process crashes, it restarts from where it left off. The async nature means one worker can handle thousands of messages per second without blocking.
2. Stateful Stream Processing
Sometimes you need to remember things. How many times has this user clicked? What was the last page they saw? Faust gives you a Table – a key-value store backed by RocksDB. The table lives on disk, so even if your worker restarts, the counts survive.
# Count page views per user
page_views = app.Table('page_views', default=int)
@app.agent(click_topic)
async def count_views(clicks):
async for click in clicks:
page_views[click.user_id] += 1
current = page_views[click.user_id]
if current % 100 == 0:
print(f"User {click.user_id} has {current} views")
I added a table to my click pipeline to track users hitting 100, 200 views – it felt like magic. You can share tables across agents. A second agent can join clicks with user profiles stored in another table:
user_profiles = app.Table('user_profiles', default=dict, partitions=8)
@app.agent(profile_topic)
async def update_profiles(profiles):
async for profile in profiles:
user_profiles[profile['user_id']] = profile
@app.agent(click_topic)
async def join_clicks(clicks):
async for click in clicks:
profile = user_profiles.get(click.user_id, {})
# Combine click with profile data
print(f"{profile.get('name')} clicked {click.page}")
The table partitions data so it stays close to the agent processing that partition. This keeps joins fast.
3. Windowed Aggregations
Counting total clicks is fine, but often you need time windows – how many clicks per minute? Per hour? Faust supports tumbling windows (non-overlapping) and hopping windows (overlapping).
from datetime import timedelta
# Tumbling window (non-overlapping)
windowed_views = app.Table(
'windowed_views',
default=int,
window=app.tumbling(timedelta(minutes=1)),
)
@app.agent(click_topic)
async def window_count(clicks):
async for click in clicks:
windowed_views[click.page] += 1
I used a hopping window for rolling averages on sensor data. Each window covers five minutes but slides every minute – you get smooth, overlapping snapshots.
rolling_avg = app.Table(
'rolling_avg',
default=float,
window=app.hopping(timedelta(minutes=5), timedelta(minutes=1)),
)
@app.agent(metric_topic)
async def compute_rolling(metrics):
async for m in metrics:
current = rolling_avg[m['sensor']]
print(f"Sensor {m['sensor']} window value: {current}")
Internally, Faust cleans up old windows automatically. You set the retention, and it handles the rest.
4. Stream-Stream Joins
Sometimes you need to match two event streams. For example, when an order arrives, you want to check if a confirmation comes within an hour. Faust lets you join two topics using a table or a time-based join.
orders_topic = app.topic('orders', value_type=Order)
confirmations_topic = app.topic('confirmations', value_type=Confirmation)
# Using a table to store confirmations
confirmations_table = app.Table('confirmations', default=dict, window=app.tumbling(timedelta(hours=1)))
@app.agent(confirmations_topic)
async def store_confirmations(confs):
async for conf in confs:
confirmations_table[conf.order_id] = conf
@app.agent(orders_topic)
async def match_orders(orders):
async for order in orders:
conf = confirmations_table.get(order.order_id)
if conf:
print(f"Order {order.order_id} confirmed at {conf.timestamp}")
The window on the confirmation table means old confirmations expire. You can also use the built-in join operator that matches within a time range:
@app.agent(orders_topic)
async def join_within_window(orders):
await orders.join(
confirmations_topic,
within=timedelta(hours=1),
on_match=lambda order, conf: print(f"Matched {order.id}"),
on_mismatch=lambda order: print(f"Unmatched {order.id}"),
)
I use this pattern to detect abandoned carts – orders without a matching payment within ten minutes.
5. Error Handling and Dead Letter Queues
Bad data happens. A missing field, a malformed JSON, a temporary database hiccup. If your agent crashes, the entire pipeline can stall. Faust gives you two strategies: continue or stop.
@app.agent(click_topic, on_error='continue')
async def resilient_processing(clicks):
async for click in clicks:
try:
if click.page is None:
raise ValueError("Missing page")
print(f"OK: {click.user_id}")
except Exception as e:
# Send to dead letter topic
await dead_letter_topic.send(
value={'error': str(e), 'data': click}
)
I set on_error to 'continue' for most agents. The worker skips the bad message and logs it. To avoid losing data, I route those messages to a separate Kafka topic (dead letter queue). Later, I can reprocess them after fixing the issue.
For agents where data integrity matters more than throughput, you can set on_error='stop'. The worker pauses until you fix the problem manually. Combine it with retries:
@app.agent(click_topic, on_error='stop')
async def strict_processing(clicks):
async for click in clicks:
await process_with_retries(click)
async def process_with_retries(click, max_retries=3):
for attempt in range(max_retries):
try:
return await risky_operation(click)
except:
await asyncio.sleep(0.5 * attempt)
raise
The dead letter queue and retry patterns saved me countless hours of debugging.
6. Watermarks and Event Time Processing
Messages often arrive late. A user’s click might be timestamped five minutes ago but reaches the broker only now. If you process based on processing time, you count that click in the current window, not the window it belongs to. Faust supports event time: you tell it which field holds the original timestamp, and it uses watermarks to handle lateness.
@app.agent(click_topic, processing_time=0.1)
async def event_time_aggregation(clicks):
async for click in clicks:
watermarked = app.Table(
'watermarked',
default=int,
window=app.tumbling(timedelta(minutes=5)),
expire=timedelta(minutes=10), # allow 10 min lateness
)
watermarked[click.page] += 1
You set the expire parameter to how much late data you want to accept. The watermark advances as you see timestamps, and windows close only after the watermark passes their end. I use this for IoT sensors that sometimes buffer data offline. Without event time, those delayed readings would distort the hourly average.
7. Partition Rebalancing and Scaling
As your traffic grows, you add more worker instances. Kafka partitions get redistributed among workers. Faust handles this automatically, but you might need custom logic for hot keys. You can write your own assignor.
class CustomAssignor(faust.assignors.Assignor):
def assign(self, cluster):
workers = list(cluster.workers)
partitions = list(cluster.partitions)
assignments = {}
for i, p in enumerate(partitions):
worker = workers[i % len(workers)]
assignments.setdefault(worker, []).append(p)
return assignments
app.conf.assignor = CustomAssignor()
I once had a single user generating more events than the rest of the users combined. The default round‑robin assigned that user’s partition to one worker, which overloaded it. With a custom assignor, I spread partitions based on estimated load.
Scaling is simple: start more workers. Each worker registers with the same consumer group. Faust rebalances seamlessly.
faust -A main worker -l info --web-port=6066
8. Testing Streaming Pipelines
Testing a real‑time pipeline without a Kafka cluster used to be a pain. Faust provides a test harness that simulates everything.
import pytest
from faust.testing import FaustAppTester
@pytest.fixture
def app_test():
tester = FaustAppTester(app)
yield tester
tester.stop()
@pytest.mark.asyncio
async def test_click_counting(app_test):
await click_topic.send(value=ClickEvent('user1', '/home', 100.0))
await click_topic.send(value=ClickEvent('user1', '/about', 200.0))
await app_test.worker_loop()
assert page_views['user1'] == 2
@pytest.mark.asyncio
async def test_windowed_aggregation(app_test):
now = datetime.now().timestamp()
for i in range(10):
await click_topic.send(value=ClickEvent('u2', '/blog', now + i))
await app_test.worker_loop()
window = windowed_views['/blog']
assert window > 0
I write these tests before changing any agent logic. They catch regressions fast. The tester injects events directly into the agent and lets you inspect tables after processing.
Deploying to Production
Once everything works locally, you need to run it reliably. Configure replication, monitoring, and graceful shutdown.
app.conf.update(
broker='kafka://prod-kafka:9092',
store='rocksdb://',
topic_replication_factor=3,
topic_min_insync_replicas=2,
consumer_auto_offset_reset='earliest',
table_cleanup_interval=60.0,
web_port=6066,
web_host='0.0.0.0',
)
app.metrics.collector = faust.metrics.collectors.PrometheusCollector()
@app.web_route('/health')
async def health(websocket):
return {'status': 'healthy', 'partitions': app.conf.num_partitions}
@app.on_shutdown
async def cleanup():
print("Flushing pending operations...")
await app.producer.flush()
I use systemd to start the worker with ExecStart=/usr/local/bin/faust -A main worker -l info. The web interface on port 6066 shows internal metrics. Prometheus scrapes it. Grafana dashboards give me real-time visibility.
Wrapping Up
These eight techniques turned me from a batch processor into someone who handles millions of events per second. Start with a simple agent, add a table for state, introduce windows for time analytics, join streams to enrich data, handle errors gracefully, respect event time, plan for scaling, and test early. Python and Faust make it all feel natural. You can apply these patterns to clickstream analytics, fraud detection, IoT pipelines, or any live data system. Just pick a topic and start streaming.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)