DEV Community

Ayat Saadat
Ayat Saadat

Posted on

ayat saadati — Complete Guide

Ayat Saadati: Pioneering EventStream Processing

It's a genuine pleasure to dive into the work of someone who's genuinely moving the needle in the developer community. Ayat Saadati, whose insights you can often find on their excellent dev.to profile, is one of those individuals. While Ayat contributes across a spectrum of technical topics, they're particularly renowned for their work in crafting elegant, efficient solutions for real-time data processing.

One of their most impactful contributions, and the focus of this documentation, is Ayat's EventStream (AES). This isn't just another library; it's a meticulously designed Python framework that simplifies the often-daunting task of building reactive, event-driven data pipelines. I've personally found it to be an absolute game-changer for projects needing robust, scalable, and understandable stream processing.

What is Ayat's EventStream (AES)?

Ayat's EventStream is a lightweight, opinionated Python library designed to help developers ingest, transform, and route continuous streams of data with remarkable ease. Forget the boilerplate and the steep learning curves often associated with distributed stream processing systems. AES distills the essence of event-driven architecture into a Pythonic, user-friendly package.

My initial reaction when I first played with AES was, "Finally, someone built this the right way!" It strikes a beautiful balance between raw power and developer ergonomics. If you've ever wrestled with Kafka consumers, custom message queues, or just trying to keep your data synchronized across microservices, AES offers a refreshing perspective and a tangible solution.

Key Features

  • Pythonic API: Integrates seamlessly into existing Python projects. If you know Python, you're halfway there.
  • Source Agnostic: Built-in connectors for various data sources (Kafka, RabbitMQ, HTTP streams, local files).
  • Powerful Transformations: A flexible StreamProcessor API that allows for complex data manipulations, filtering, and aggregations.
  • Multiple Sinks: Output processed data to databases, other message queues, APIs, or custom handlers.
  • Pluggable Architecture: Easily extendable with custom sources, processors, and sinks.
  • Observability Hooks: Designed with monitoring in mind, providing hooks for metrics and logging.
  • Focus on Developer Experience: Clear error messages, sensible defaults, and comprehensive documentation (which we're expanding on right here!).

Installation

Getting started with Ayat's EventStream is, thankfully, a breeze. It’s a standard Python package, so pip is your best friend here.

First, I always recommend working within a virtual environment. It keeps your project dependencies clean and avoids conflicts.

# Create a virtual environment (if you haven't already)
python3 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate

# Now, install Ayat's EventStream
pip install ayats-eventstream
Enter fullscreen mode Exit fullscreen mode

Optional Dependencies

Depending on your data sources and sinks, you might need additional dependencies. AES is smart about this; it doesn't force you to install everything. Here are some common ones:

  • Kafka Connector:

    pip install ayats-eventstream[kafka]
    
  • RabbitMQ Connector:

    pip install ayats-eventstream[rabbitmq]
    
  • HTTP Stream Source (for webhooks, SSE):

    pip install ayats-eventstream[http]
    
  • All (for local development, if you want everything):

    pip install ayats-eventstream[all]
    

Honestly, I usually just install [all] during initial development to avoid juggling dependencies, then prune it down for production. It saves a bit of head-scratching.

Quick Start: Your First EventStream

Let's get a basic stream up and running. This example will simulate a simple event source, process each event, and then print it to the console.

import time
import json
from ayats_eventstream import (
    EventSource,
    EventStream,
    StreamProcessor,
    EventSink,
    Event
)

# 1. Define a custom EventSource (simulated for simplicity)
class MySimulatedSource(EventSource):
    def __init__(self, num_events: int = 5):
        self.count = 0
        self.num_events = num_events

    def get_events(self):
        while self.count < self.num_events:
            event_data = {"id": self.count, "timestamp": time.time(), "value": f"data_{self.count}"}
            # AES expects Event objects
            yield Event(data=json.dumps(event_data), metadata={"source": "simulated"})
            self.count += 1
            time.sleep(0.5) # Simulate some delay
        print("Simulated source exhausted.")

# 2. Define a custom StreamProcessor
class MySimpleProcessor(StreamProcessor):
    def process(self, event: Event) -> Event | None:
        try:
            payload = json.loads(event.data)
            payload["processed_at"] = time.time()
            payload["status"] = "processed"
            # Return a new Event with transformed data
            return Event(data=json.dumps(payload), metadata=event.metadata)
        except json.JSONDecodeError:
            print(f"Skipping malformed event: {event.data}")
            return None # Indicate to drop this event

# 3. Define a custom EventSink
class MyConsoleSink(EventSink):
    def consume(self, event: Event):
        print(f"[{event.metadata.get('source', 'unknown')}] Consumed event: {event.data}")

# 4. Assemble the EventStream
if __name__ == "__main__':
    print("Starting Ayat's EventStream example...")
    source = MySimulatedSource()
    processor = MySimpleProcessor()
    sink = MyConsoleSink()

    # The magic happens here: chaining the components
    stream = EventStream(source=source, processor=processor, sink=sink)

    try:
        stream.run()
    except KeyboardInterrupt:
        print("\nEventStream stopped by user.")
    print("Example finished.")
Enter fullscreen mode Exit fullscreen mode

Run this file, and you'll see your simulated events flow through the processor and get printed to the console. It's a foundational example, but it demonstrates the core concepts beautifully.

Usage: Building More Complex Pipelines

The real power of AES comes when you start swapping out the simple components for more sophisticated ones or chaining multiple processors.

Connecting to External Sources (e.g., Kafka)

Let's imagine you have a Kafka topic named my_raw_events.

from ayats_eventstream.sources import KafkaSource
from ayats_eventstream import EventStream
# ... (your processor and sink definitions from above)

if __name__ == '__main__':
    print("Starting Kafka-backed EventStream...")
    kafka_source = KafkaSource(
        bootstrap_servers='localhost:9092', # Or your Kafka cluster addresses
        topic='my_raw_events',
        group_id='my_aes_consumer_group',
        auto_offset_reset='earliest'
    )
    processor = MySimpleProcessor()
    sink = MyConsoleSink()

    stream = EventStream(source=kafka_source, processor=processor, sink=sink)

    try:
        stream.run()
    except KeyboardInterrupt:
        print("\nKafka EventStream stopped by user.")
    print("Kafka example finished.")
Enter fullscreen mode Exit fullscreen mode

Developer's Note: When working with Kafka, ensure your bootstrap_servers are correct and your consumer group group_id is unique if you're deploying multiple instances. It's a common oversight!

Chaining Multiple Processors

Sometimes, a single process method isn't enough. You might want to apply transformations in stages. AES supports this implicitly by allowing you to define a StreamProcessor that internally orchestrates multiple steps, or you can even build a custom CompositeProcessor.

# ... (imports from above)
import re

class EventValidator(StreamProcessor):
    def process(self, event: Event) -> Event | None:
        payload = json.loads(event.data)
        if "value" not in payload or not isinstance(payload["value"], str):
            print(f"Validation failed: 'value' missing or not string in {event.data}")
            return None # Drop invalid events
        return event # Pass valid events through

class DataCleaner(StreamProcessor):
    def process(self, event: Event) -> Event:
        payload = json.loads(event.data)
        # Remove non-alphanumeric characters from 'value'
        payload["value"] = re.sub(r'[^a-zA-Z0-9\s]', '', payload["value"])
        return Event(data=json.dumps(payload), metadata=event.metadata)

class DataEnricher(StreamProcessor):
    def process(self, event: Event) -> Event:
        payload = json.loads(event.data)
        payload["enriched_data"] = f"Processed by {self.__class__.__name__}"
        return Event(data=json.dumps(payload), metadata=event.metadata)

if __name__ == '__main__':
    print("Starting Multi-Processor EventStream example...")
    source = MySimulatedSource(num_events=3) # Use simulated source for easy testing

    # Chain processors directly in the EventStream constructor (implicitly handled)
    # The order matters! Events flow from left to right.
    stream = EventStream(
        source=source,
        processor=[
            EventValidator(),
            DataCleaner(),
            DataEnricher(),
            MySimpleProcessor() # Our initial processor
        ],
        sink=MyConsoleSink()
    )

    try:
        stream.run()
    except KeyboardInterrupt:
        print("\nMulti-Processor EventStream stopped by user.")
    print("Multi-Processor example finished.")
Enter fullscreen mode Exit fullscreen mode

When you pass a list of processors to the EventStream, AES automatically chains them. An event will pass through EventValidator, then DataCleaner, then DataEnricher, and finally MySimpleProcessor. If any processor returns None, the event is dropped from the pipeline at that stage. This modularity is pure gold for maintainability.

Custom Event Handling with EventStream.on_error

Robust stream processing isn't just about happy paths. What happens when things go wrong? AES provides an on_error hook.

# ... (imports, source, processor, sink definitions)

class MyErrorLoggerSink(EventSink):
    def consume(self, event: Event):
        print(f"[ERROR] Failed event: {event.data} - Reason: {event.metadata.get('error', 'Unknown')}")

def handle_stream_error(original_event: Event, exception: Exception):
    print(f"!!! CRITICAL ERROR IN PIPELINE !!!")
    print(f"Original event: {original_event.data}")
    print(f"Exception: {exception}")
    # You might want to send this to a dead-letter queue, log to Sentry, etc.
    error_sink = MyErrorLoggerSink()
    error_event = Event(
        data=original_event.data,
        metadata={"error": str(exception), "original_source": original_event.metadata.get('source', 'unknown')}
    )
    error_sink.consume(error_event)

if __name__ == '__main__':
    print("Starting Error Handling EventStream...")

    # Let's create a processor that intentionally fails sometimes
    class FailingProcessor(StreamProcessor):
        def process(self, event: Event) -> Event | None:
            payload = json.loads(event.data)
            if payload.get("id") == 2:
                raise ValueError("Simulated processing error for event ID 2!")
            payload["status"] = "processed_ok"
            return Event(data=json.dumps(payload), metadata=event.metadata)

    source = MySimulatedSource(num_events=5)
    processor = FailingProcessor()
    sink = MyConsoleSink()

    stream = EventStream(source=source, processor=processor, sink=sink)
    stream.on_error = handle_stream_error # Assign our custom error handler

    try:
        stream.run()
    except KeyboardInterrupt:
        print("\nError Handling EventStream stopped by user.")
    print("Error Handling example finished.")
Enter fullscreen mode Exit fullscreen mode

When event ID 2 passes through FailingProcessor, handle_stream_error will be invoked, demonstrating how you can gracefully manage exceptions within your stream. This is critical for production readiness.

Core Concepts

  • Event: The atomic unit of data in AES. It's a simple dataclass holding data (usually a string or bytes, often JSON) and metadata (a dictionary for context like source, timestamp, tracing IDs).
  • EventSource: An abstract base class that defines how events are ingested. Your custom sources must implement the get_events() method, which should be a generator yielding Event objects.
  • StreamProcessor: An abstract base class for transforming events. It implements a process(event: Event) method that takes an Event and returns either a transformed Event or None to drop the event.
  • EventSink: An abstract base class for sending processed events to their final destination. It implements a consume(event: Event) method.
  • EventStream: The orchestrator. It ties a source, processor(s), and sink together, providing the run() method to start the pipeline.

API Reference (Simplified)

Here's a quick look at some essential classes and methods. For a comprehensive reference, I always refer to Ayat's official documentation (or the source code, if I'm feeling adventurous!).

| Class / Method | Description

Top comments (0)