DEV Community

Cover image for FastStream: Python's framework for Efficient Message Queue Handling
Tvrtko Sternak for airt

Posted on

FastStream: Python's framework for Efficient Message Queue Handling

Ever felt lost in the complexity of microservices and message queues like Kafka and RabbitMQ? FastStream is here to simplify it all. That's precisely why we created FastStream. Initially, it was our solution to the challenges we faced with messaging queues in our own projects. But as it simplified our lives, we realized it could do the same for others. So, we decided to share it with the world.

FastStream streamlines the entire process of working with message queues in microservices. Parsing messages, managing networking, and keeping documentation updatedβ€”all handled effortlessly.

In this blog post, we'll explore how FastStream simplifies microservices development. Let's dive in and discover how FastStream can revolutionize your workflow.

Hint: If you want to dive in the code right away, checkout the hands-on tutorial at FastStream documentation

Our motivation

Our journey with FastStream started when we needed to integrate our machine learning models into a customer's Apache Kafka environment. To streamline this process, we created FastKafka using AIOKafka, AsyncAPI, and asyncio. It was our first step in making message queue management easier.

Later, we discovered Propan, a library created by Nikita Pastukhov, which solved similar problems but for RabbitMQ. Recognizing the potential for collaboration, we joined forces with Nikita to build a unified library that could work seamlessly with both Kafka and RabbitMQ. And that's how FastStream came to beβ€”a solution born out of the need for simplicity and efficiency in microservices development.

Key features that set FastStream apart πŸš€

FastStream is more than just another library; it's a powerful toolkit designed to simplify and supercharge your microservices development. Let's dive into the key features that make FastStream stand out:

Multiple Broker Support: FastStream provides a unified API that works seamlessly across multiple message brokers. Whether you're dealing with Kafka, RabbitMQ, or others, FastStream has you covered, making it effortless to switch between them.

broker = KafkaBroker("localhost:9092")

@broker.publisher(β€œprediction”)
@broker.subscriber(β€œinput_data”)
async def on_input_data(msg: InputData) -> Prediction:
  # your processing processing
  return prediction
Enter fullscreen mode Exit fullscreen mode
# Just change the broker class, 
#  rest of the code stays the same
broker = RabbitBroker("localhost:5672")

@broker.publisher(β€œprediction”)
@broker.subscriber(β€œinput_data”)
async def on_input_data(msg: InputData) -> Prediction:
  # your processing processing
  return prediction
Enter fullscreen mode Exit fullscreen mode

Pydantic Validation: Leverage the robust validation capabilities of Pydantic to serialize and validate incoming messages. With Pydantic, you can ensure that your data is always in the right format.

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber(β€œinput_data”)
async def on_input_data(msg: InputData): # <- decodes consumed message using InputData(**json.loads(data))
  # your processing logic
Enter fullscreen mode Exit fullscreen mode
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.publisher(β€œprediction”)
@broker.subscriber(β€œinput_data”)
async def on_input_data(msg: InputData) -> Prediction: # <- encodes produced message using Prediction.json()
  # some processing
  return prediction
Enter fullscreen mode Exit fullscreen mode

Automatic Documentation: FastStream keeps you ahead of the game with automatic AsyncAPI documentation generation. Say goodbye to outdated documentation – FastStream ensures it's always up-to-date.

Basic FastStream documentation example

Intuitive Development: FastStream offers full-typed editor support, catching errors before they reach runtime. This means you can code with confidence, knowing that issues are caught early in the development process.

Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in Dependency Injection (DI) system. Say goodbye to spaghetti code and embrace clean, modular architecture.

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber(β€œinput_data”)
# Load a global logger instance from faststream.Context()
async def on_input_data(msg: InputData, logger=Context()):
  logger.info(msg)
Enter fullscreen mode Exit fullscreen mode

Testability: FastStream supports in-memory tests, making your Continuous Integration and Continuous Deployment (CI/CD) pipeline faster and more reliable. Test your microservices with ease, ensuring they perform as expected.

async def test_base_app():
  # Subscribe to prediction topic so that we can assert incoming msgs
  @broker.subscriber("prediction")
  async def on_prediction(msg: Prediction):
    pass

  async with TestKafkaBroker(broker):
    # Publish a test message to the input_data topic
    await broker.publish(InputData(data=0.2), "input_data")

    # Check that the handle function for "input_data" topic was called with the correct msg
    on_input_data.mock.assert_called_once_with(dict(InputData(data=0.2)))

    # Check that the service responded with the correct prediction in the "prediction" topic
    on_prediction.mock.assert_called_once_with(dict(Prediction(score=1.2)))
Enter fullscreen mode Exit fullscreen mode

Seamless Integrations: FastStream plays well with others. It's fully compatible with any HTTP framework you prefer, with a special emphasis on compatibility with FastAPI.

# Create a FastStream router
router = KafkaRouter("localhost:9092")

...

# Connect a FastStream router to a FastAPI application lifespan
app = FastAPI(lifespan=router.lifespan_context)
Enter fullscreen mode Exit fullscreen mode

Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT. This means you can leverage the power of code generation to boost your productivity. Checkout the amazing tool we built for the microservice code generation: faststream-gen.

FsLovesGPT

Image description

FastStream, in a nutshell, offers ease, efficiency, and power in your microservices development journey. Whether you're just starting or looking to scale your microservices, FastStream is your trusted companion. With these core features at your disposal, you'll be well-equipped to tackle the challenges of modern, data-centric microservices.

Let's build something!

Now, let's get our hands a bit dirty πŸ‘·.
Let's implement an example python app using FastStream that consumes names from "persons" topic and outputs greetings to the "greetings" topic.

Cookiecutter project

To start our project, we will use the prepared cookiecutter FastStream project. To find out more about it, check our detailed guide.

Install the cookiecutter package using the following command:

pip install cookiecutter
Enter fullscreen mode Exit fullscreen mode

Now, run the provided cookiecutter command and fill out the relevant details to generate a new FastStream project, we will name this project "greetings_app":

cookiecutter https://github.com/airtai/cookiecutter-faststream.git
Enter fullscreen mode Exit fullscreen mode

The creation process should look like this:

You`ve downloaded /Users/tvrtko/.cookiecutters/cookiecutter-faststream before. Is it okay to delete and re-download it? [y/n] (y): y
  [1/4] username (github-username): sternakt
  [2/4] project_name (My FastStream App): Greetings App
  [3/4] project_slug (greetings_app): greetings_app
  [4/4] Select streaming_service
    1 - kafka
    2 - nats
    3 - rabbit
    Choose from [1/2/3] (1): 1
Enter fullscreen mode Exit fullscreen mode

Change the working directory to the newly created directory and install all development requirements using pip:

cd greetings_app
pip install -e ".[dev]"
Enter fullscreen mode Exit fullscreen mode

Now we are ready to edit our greetings_app/application.py and tests/test_application.py files to implement our application logic.

Writing app code

FastStream brokers provide convenient function decorators @broker.subscriber and @broker.publisher to allow you to delegate the actual process of:

  • consuming and producing data to Event queues, and

  • decoding and encoding JSON encoded messages

These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.

Here is an example python app we talked about:

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
from pydantic import BaseModel, Field

version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"


class Name(BaseModel):
    name: str = Field(..., description="Name of the person")


class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")


broker = KafkaBroker()
app = FastStream(broker, title=title, version=version, description=description)

to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a meesage on names",
)


@broker.subscriber("names", description="Consumes messages from names topic and produces messages to greetings topic")
async def on_names(msg: Name, logger: Logger) -> None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)
Enter fullscreen mode Exit fullscreen mode

The example application will subscribe to persons Kafka topic and consume Name JSON messages from it. When the application consumes a message it will publish a Greetings JSON message greetingsΒ topic.

We can save the application into the application.py file and let's take a closer look at the code.

Creating a broker
To create an application, first we need to create a broker. This is the main piece of FastStream and takes care of the defining subscribers and producers.

version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"

...

broker = KafkaBroker()
app = FastStream(broker, title=title, version=version, description=description)
Enter fullscreen mode Exit fullscreen mode

Defining data structures
Next, we need to define the structure of incoming and outgoing data. FastStream is integrated with Pydantic and offers automatic encoding and decoding of JSON formatted messages into Pydantic classes.

class Name(BaseModel):
    name: str = Field(..., description="Name of the person")


class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")
Enter fullscreen mode Exit fullscreen mode

Defining a publisher
Now, we define the publishing logic of our application.

to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a message on names",
)
Enter fullscreen mode Exit fullscreen mode

Defining a subscriber
Finally, we can define the subscribing logic of our application. The app will consume data from the "names" topic and use the defined publisher to produce to the "greetings" topic whenever a message is consumed.

@broker.subscriber("names", description="Consumes messages from names topic and produces messages to greetings topic")
async def on_names(msg: Name, logger: Logger) -> None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)
Enter fullscreen mode Exit fullscreen mode

Testing the service

The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".

The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

import pytest
from faststream.kafka import TestKafkaBroker

from greetings_app.application import Greeting, Name, broker, on_names


# Subscribe to the "greetings" topic so we can monitor 
# messages our application is producing
@broker.subscriber("greetings")
async def on_greetings(msg: Greeting) -> None:
    pass


@pytest.mark.asyncio
async def test_on_names():
    async with TestKafkaBroker(broker):
        # Send John to "names" topic
        await broker.publish(Name(name="John"), "names")

        # Assert that our application has consumed "John"
        on_names.mock.assert_called_with(dict(Name(name="John")))

        # Assert that our application has greeted John in the "greetings" topic
        on_greetings.mock.assert_called_with(dict(Greeting(greeting="hello John")))
Enter fullscreen mode Exit fullscreen mode

In the test, we send a test User JSON to the in topic, and then we assert that the broker has responded to the out topic with the appropriate message.

We can save the test to the test_application.py file and run the test by executing the following command in your application root file.

pytest
Enter fullscreen mode Exit fullscreen mode

Here is how the tests execution should look like in your terminal:

===================================== test session starts =====================================
platform darwin -- Python 3.11.5, pytest-7.4.2, pluggy-1.3.0
rootdir: /Users/tvrtko/Documents/Airt Projects/FastStream/faststream-cookiecutter/greetings_app
configfile: pyproject.toml
plugins: asyncio-0.21.1, anyio-3.7.1
asyncio: mode=Mode.STRICT
collected 1 item                                                                              

tests/test_application.py .                                                             [100%]

====================================== 1 passed in 0.34s ======================================
Enter fullscreen mode Exit fullscreen mode

Running the application

The application can be started using built-in FastStream CLI command.

To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.

faststream run greetings_app.application:app
Enter fullscreen mode Exit fullscreen mode

After running the command, you should see the following output:

2023-10-13 08:36:32,162 INFO     - FastStream app starting...
2023-10-13 08:36:32,170 INFO     - names |            - `OnNames` waiting for messages
2023-10-13 08:36:32,177 INFO     - FastStream app started successfully! To exit, press CTRL+C
Enter fullscreen mode Exit fullscreen mode

Also, FastStream provides you a great hot reload feature to improve your Development Experience

faststream run greetings_app.application:app --reload
Enter fullscreen mode Exit fullscreen mode

And multiprocessing horizontal scaling feature as well:

faststream run greetings_app.application:app --workers 3
Enter fullscreen mode Exit fullscreen mode

Documentation

FastStream provides a command to serve the AsyncAPI documentation, let's use it to document our application.
To generate and serve the documentation, run the following command:

faststream docs serve greetings_app.application:app
Enter fullscreen mode Exit fullscreen mode

Now, you should see the following output:

INFO:     Started server process [47151]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
Enter fullscreen mode Exit fullscreen mode

Now open your browser at http://localhost:8000 and enjoy in your automatically generated documentation! πŸŽ‰

Generated docs

Aaaand, that's it! πŸŽ‰ πŸŽ‰ Feel free to experiment further with your application and checkout the documentation for more complex examples.

Support us on GitHub and join our community ⭐

Ready to join the FastStream revolution? Head over to our GitHub repository and show your support by starring it. By doing so, you'll stay in the loop with the latest developments, updates, and enhancements as we continue to refine and expand FastStream.

Don't forget, we also have an active Discord channel where you can connect with fellow FastStream enthusiasts, ask questions, and share your experiences. Your active participation in our growing community, whether on GitHub or Discord, is invaluable, and we're grateful for your interest and potential contributions. Together, we can make microservices development simpler and more efficient with FastStream.

Conclusion

FastStream is your go-to tool for efficient microservices development. It simplifies message queues, supports various brokers, and offers Pydantic validation and auto-doc generation.

We're immensely grateful for your interest, and we look forward to your potential contributions. With FastStream in your toolkit, you're prepared to conquer the challenges of data-centric microservices like never before. Happy coding!

Top comments (2)

Collapse
 
nikvst profile image
Nikita Vstovsky

Looks very promising, you did a great job!

Collapse
 
miro5lav profile image
Mirek

Thank you for sharing. It works with kafka, unlike faust library with error.