<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Davor Runje</title>
    <description>The latest articles on DEV Community by Davor Runje (@davorrunje).</description>
    <link>https://dev.to/davorrunje</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1045526%2F93af0d3b-533e-44a6-a2dc-7f83f45fea4e.jpeg</url>
      <title>DEV Community: Davor Runje</title>
      <link>https://dev.to/davorrunje</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/davorrunje"/>
    <language>en</language>
    <item>
      <title>How we deprecated two successful projects and joined forces to create an even more successful one</title>
      <dc:creator>Davor Runje</dc:creator>
      <pubDate>Mon, 09 Oct 2023 17:23:10 +0000</pubDate>
      <link>https://dev.to/airtai/how-we-deprecated-two-successful-projects-and-joined-forces-to-create-an-even-more-successful-one-3lon</link>
      <guid>https://dev.to/airtai/how-we-deprecated-two-successful-projects-and-joined-forces-to-create-an-even-more-successful-one-3lon</guid>
      <description>&lt;p&gt;We had two large clients requesting to integrate our ML pipelines with their backend using Apache Kafka. Our previous HTTP API was based on FastAPI and we thought no problem, let's search for something similar but for Kafka. It turns out the one available solution was Faust, a framework with the last release at the time more than two years old. There is a community-maintained fork of it, but it was not very active and we just couldn't use it in an enterprise-level deployment.&lt;/p&gt;

&lt;p&gt;After a short discussion, we concluded we were just too spoiled to use low-level libraries that were nothing more than just tiny wrappers around C++ libs and that we could just build our own. So, we shamelessly made one by reusing beloved paradigms from FastAPI and we shamelessly named it &lt;a href="https://github.com/airtai/fastkafka" rel="noopener noreferrer"&gt;FastKafka&lt;/a&gt;. The point was to set the expectations right - you get pretty much what you would expect: function decorators for consumers and producers with type hints specifying Pydantic classes for JSON encoding/decoding, automatic message routing to Kafka brokers and documentation generation.&lt;/p&gt;

&lt;p&gt;After deploying it with our clients, we decided to open-source it just to see if there was any interest in something like this. We made a post about it on the Hacker news and the thing just took off overnight! I remember posting it in the evening and looking into an overnight (literally) success: there were the first 50 stars in a few hours. Five days later, we crossed the 200 stars and then the HN post started to wear off and growth slowed down.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3ohe2o1n81gcfkkxh4e6.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3ohe2o1n81gcfkkxh4e6.jpeg" alt="star-history.com"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The next step was to figure out what to do next. We posted questions on a few relevant subreddits and got quite a few feature requests, mostly around supporting other protocols, encoding schemas etc. But, we also got a message from a developer of a similar framework &lt;a href="https://github.com/Lancetnik/Propan" rel="noopener noreferrer"&gt;Propan&lt;/a&gt; that was released at about the same time and was gaining quite a traction in the RabbitMQ community. That developer was &lt;a href="https://github.com/Lancetnik" rel="noopener noreferrer"&gt;Nikita Pastukhov&lt;/a&gt; and he made an intriguing proposal: &lt;strong&gt;let's join our efforts and create one framework with the best features of both&lt;/strong&gt;. Both projects were growing at roughly the same speed but targeted different communities. So the potential for double growth was there. After a quick consideration, we realized there was not much to lose and there was a lot to gain. Of course, we would lose absolute control over the project but losing control to the community is the only way for an open-source project to succeed. On the positive side, we would gain a very skilled maintainer who single-handedly created a similar framework all by himself. The frameworks were conceptually very similar so we concluded there would not be much friction of ideas and we should be able to reach consensus on the most important design issues.&lt;/p&gt;

&lt;p&gt;However, as developers, we test our hypothesis early to avoid failing later. So we started with creating a design document where we specified the main features and started discussing high-level API. There was no discussion on the implementation at all, just on what should be the look and feel of the framework to the end user. The process took roughly one month and after reaching a consensus on the design, we started the work on the implementation. The self-imposed deadline was mid-September because we already had two conference talks coming up and we wanted to use those opportunities to launch a new version.&lt;/p&gt;

&lt;p&gt;After two months of hard work, we presented the newly released &lt;a href="https://github.com/airtai/faststream" rel="noopener noreferrer"&gt;FastStream&lt;/a&gt; framework at &lt;a href="https://shift.infobip.com/" rel="noopener noreferrer"&gt;Infobip Shift&lt;/a&gt; conference and got featured at &lt;a href="https://shiftmag.dev/fast-stream-python-framework-1646/" rel="noopener noreferrer"&gt;ShiftMag&lt;/a&gt;. The framework now supports both Apache Kafka and RabbitMQ, but also NATS protocol with the plan to add more protocols in the near future. The overall code is much cleaner and the implementation is streamlined with abstractions covering the common functionality across the protocols. We deprecated both FastKafka and Propan, but promised to fix bugs as long as needed. However, it seems like the community already decided to switch over to gain new functionalities.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;A few posts on Reddit later and the community feedback is fantastic. We have never before experienced such growth and we are just 50 stars short from our goal of 1000, less than four week from the launch. I have no doubt that this was the right move and that collaboration and not competition is the right way to build long-term success in the open-source world.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbg4hqkoteqwaupxt3r77.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbg4hqkoteqwaupxt3r77.png" alt="star-history.com"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>opensource</category>
      <category>python</category>
      <category>programming</category>
      <category>webdev</category>
    </item>
    <item>
      <title>Generating production-level streaming microservices using AI</title>
      <dc:creator>Davor Runje</dc:creator>
      <pubDate>Thu, 05 Oct 2023 13:16:16 +0000</pubDate>
      <link>https://dev.to/airtai/generating-production-level-streaming-microservices-using-ai-41ji</link>
      <guid>https://dev.to/airtai/generating-production-level-streaming-microservices-using-ai-41ji</guid>
      <description>&lt;p&gt;&lt;a href="https://github.com/airtai/faststream-gen/" rel="noopener noreferrer"&gt;faststream-gen&lt;/a&gt; is a Python library that uses generative AI to automatically generate &lt;a href="https://github.com/airtai/faststream" rel="noopener noreferrer"&gt;FastStream&lt;/a&gt; applications. Simply describe your microservice in plain English, and &lt;a href="https://github.com/airtai/faststream-gen/" rel="noopener noreferrer"&gt;faststream-gen&lt;/a&gt; will generate a production-level FastStream application ready to deploy in a few minutes and under $1 cost.&lt;/p&gt;




&lt;p&gt;Documentation: &lt;a href="https://faststream-gen.airt.ai" rel="noopener noreferrer"&gt;https://faststream-gen.airt.ai&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Source Code: &lt;a href="https://github.com/airtai/faststream-gen" rel="noopener noreferrer"&gt;https://github.com/airtai/faststream-gen&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;The code generator for FastStream is a Python library that automates the process of creating streaming microservices for Apache Kafka, RabbitMQ and other supported protocols. It works by taking your application descriptions and swiftly turning them into a ready-to-deploy FastStream applications.&lt;/p&gt;

&lt;p&gt;The key features are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Automatic code generation&lt;/strong&gt;: faststream-gen enables you to easily generate complete FastStream application with minimal effort. This library allows you to outline your application requirements, and it will quickly transform them into a fully-fledged FastStream project.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Automatic test generation&lt;/strong&gt;: faststream-gen provides dependable code through rigorous testing, including pre-implemented integration tests, ensuring stability and functionality, saving development time, and preventing common bugs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Script Templates&lt;/strong&gt;: Streamline the deployment of your FastStream application using faststream-gen’s built-in scripts, tailored for initiating, subscribing to Kafka topic and shutting down the local Kafka broker.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Automatic CI/CD integration&lt;/strong&gt;: faststream-gen integrates seamlessly with your version control and continuous integration pipeline through its GitHub workflow files. These predefined configuration files are optimized for FastStream projects, enabling smooth integration with GitHub Actions. You can automate tasks such as code validation, testing, and deployment, ensuring that your FastStream application remains in top shape throughout its development lifecycle.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In this tutorial, we will walk through the process of using the faststream-gen Python library to generate two FastStream applications.&lt;/p&gt;

&lt;p&gt;The first application will demonstrate how to retrieve current cryptocurrency prices from various web sources. Subsequently, we will utilize the functionalities of FastStream to publish retrieved data as messages to a Kafka topic.&lt;/p&gt;

&lt;p&gt;The second application will showcase how to consume messages from the Kafka topic using FastStream consumer capabilities. It will process these messages to extract information about cryptocurrencies and calculate the price mean for each cryptocurrency within a certain time window and publish the price mean to another topic.&lt;/p&gt;

&lt;p&gt;A short version of the tutorial is available as a video at the following link (roughly half of it):&lt;/p&gt;

&lt;p&gt;&lt;a href="https://youtu.be/zklmEzifKdk" rel="noopener noreferrer"&gt;https://youtu.be/zklmEzifKdk&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;iframe width="710" height="399" src="https://www.youtube.com/embed/zklmEzifKdk"&gt;
&lt;/iframe&gt;
&lt;/p&gt;

&lt;p&gt;Let’s get started!&lt;/p&gt;

&lt;h2&gt;
  
  
  Cryptocurrency analysis with FastStream
&lt;/h2&gt;

&lt;p&gt;In this tutorial, we will walk you through the process of using the &lt;code&gt;faststream-gen&lt;/code&gt; Python library to retrieve cryptocurrency prices in real time and calculate their moving average. To accomplish that, we will generate the following two &lt;a href="https://faststream.airt.ai" rel="noopener noreferrer"&gt;FastStream&lt;/a&gt; applications:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;A microservice retrieving current cryptocurrency prices from an external &lt;a href="https://docs.cloud.coinbase.com/sign-in-with-coinbase/docs/api-prices#get-spot-price" rel="noopener noreferrer"&gt;web service&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A microservice consuming such messages, calculating the moving average price for each cryptocurrency and publishing it to another Kafka topic.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Let’s get started!&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Installation
&lt;/h2&gt;

&lt;p&gt;To complete this tutorial, you will need the following software and&lt;br&gt;
Python library:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://www.python.org/" rel="noopener noreferrer"&gt;Python&lt;/a&gt; (version 3.8 and upward)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;a valid &lt;a href="https://platform.openai.com/account/api-keys" rel="noopener noreferrer"&gt;OPENAI API key&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;[optional] &lt;a href="https://github.com/" rel="noopener noreferrer"&gt;github account&lt;/a&gt; and installed &lt;a href="https://git-scm.com/" rel="noopener noreferrer"&gt;git command&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;It is recommended to use a virtual environment for your Python projects. In this tutorial, we will be using Python’s &lt;code&gt;venv&lt;/code&gt; module to create a virtual environment.&lt;/p&gt;

&lt;p&gt;First, create a root directory for this tutorial. Navigate to the&lt;br&gt;
desired location and create a new directory called&lt;br&gt;
&lt;code&gt;faststream_gen_tutorial&lt;/code&gt; and enter it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;mkdir &lt;/span&gt;faststream_gen_tutorial
&lt;span class="nb"&gt;cd &lt;/span&gt;faststream_gen_tutorial
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Creating and activating a new Python virtual environment
&lt;/h3&gt;

&lt;p&gt;Create a new virtual environment using &lt;a href="https://docs.python.org/3/library/venv.html" rel="noopener noreferrer"&gt;venv&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;python3 &lt;span class="nt"&gt;-m&lt;/span&gt; venv venv
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, activate your new virtual environment:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Installing the packages
&lt;/h3&gt;

&lt;p&gt;Upgrade pip if needed and install &lt;code&gt;faststream-gen&lt;/code&gt; package:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--upgrade&lt;/span&gt; pip &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; pip &lt;span class="nb"&gt;install &lt;/span&gt;faststream-gen
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Check that the installation was successful by running the following&lt;br&gt;
command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream_gen &lt;span class="nt"&gt;--help&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see the full list of options of the command in the output.&lt;/p&gt;

&lt;p&gt;Now you have successfully set up the environment and installed the&lt;br&gt;
&lt;code&gt;faststream-gen&lt;/code&gt; package.&lt;/p&gt;
&lt;h3&gt;
  
  
  Setting up OpenAI API key
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;faststream-gen&lt;/code&gt; uses OpenAI API and you need to export your API key in&lt;br&gt;
environment variable &lt;code&gt;OPENAI_API_KEY&lt;/code&gt;. If you use bash or compatible&lt;br&gt;
shell, you can do that with the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;OPENAI_API_KEY&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"sk-your-openai-api-key"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you don’t already have &lt;code&gt;OPENAI_API_KEY&lt;/code&gt;, you can create one &lt;a href="https://platform.openai.com/account/api-keys" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generating FastStream applications
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Retrieving and publishing crypto prices
&lt;/h3&gt;

&lt;p&gt;Now, we will create an application which retrieves information about&lt;br&gt;
current cryptocurrency prices from an external &lt;a href="https://docs.cloud.coinbase.com/sign-in-with-coinbase/docs/api-prices#get-spot-price" rel="noopener noreferrer"&gt;web&lt;br&gt;
service&lt;/a&gt; and publishes messages to a Kafka topic. In order to achieve&lt;br&gt;
this, we need to provide a high-level description of the application in&lt;br&gt;
&lt;strong&gt;plain English&lt;/strong&gt; containing only the &lt;strong&gt;necessary information&lt;/strong&gt; needed&lt;br&gt;
by a &lt;strong&gt;knowledgeable Python developer&lt;/strong&gt; familiar with FastStream&lt;br&gt;
framework to implement it. This should include details such as the&lt;br&gt;
message schema, instructions on external API-s and web service, and&lt;br&gt;
guidance on selecting the appropriate topic and partition keys.&lt;/p&gt;

&lt;p&gt;Below is an example of such a description used in this particular case.&lt;br&gt;
Notice that we did not specify steps needed to actually implement, we&lt;br&gt;
specified &lt;strong&gt;what&lt;/strong&gt; the service should do, but not &lt;strong&gt;how&lt;/strong&gt; to do it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Create a FastStream application which will retrieve the current cryptocurrency
price and publish it to the new_crypto_price topic. 

The application should retrieve the data every 2 seconds.

A message which will be produced is JSON with the two attributes:
- price: non-negative float (current price of cryptocurrency in USD)
- crypto_currency: string (the cryptocurrency e.g. BTC, ETH...)

The current price of Bitcoin can be retrieved by a simple GET request to:
    - https://api.coinbase.com/v2/prices/BTC-USD/spot

The current price of Ethereum can be retrieved by a simple GET request to:
    - https://api.coinbase.com/v2/prices/ETH-USD/spot

The response of this GET request is a JSON and you can get information about
the crypto_currency in:
    response['data']['base']

and the information about the price in:
    response['data']['amount']

Use utf-8 encoded crypto_currency attribute as a partition key when publishing
the message to new_crypto_price topic.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let’s generate a new &lt;code&gt;FastStream&lt;/code&gt; project inside the&lt;br&gt;
&lt;code&gt;retrieve-publish-crypto&lt;/code&gt; directory. First, copy the previous&lt;br&gt;
description and paste it into a file called&lt;br&gt;
&lt;code&gt;description_retrieve_publish.txt&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Next, run the following command (parameter &lt;code&gt;-i&lt;/code&gt; specifies the file path&lt;br&gt;
for the app description file, while the parameter &lt;code&gt;-o&lt;/code&gt; specifies the&lt;br&gt;
directory where the generated project files will be saved):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream_gen &lt;span class="nt"&gt;-i&lt;/span&gt; description_retrieve_publish.txt &lt;span class="nt"&gt;-o&lt;/span&gt; retrieve-publish-crypto
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;✨  Generating a new FastStream application!
 ✔ Application description validated. 
 ✔ FastStream app skeleton code generated. 
 ✔ The app and the tests are generated. 
 ✔ New FastStream project created. 
 ✔ Integration tests were successfully completed. 
 Tokens used: 36938
&lt;/span&gt;&lt;span class="gp"&gt; Total Cost (USD): $&lt;/span&gt;0.11436
&lt;span class="go"&gt;✨  All files were successfully generated!
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Failed generation
&lt;/h4&gt;

&lt;p&gt;The generation process is not bullet-proof and it could fail with a message like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;✨  Generating a new FastStream application!
✔ Application description validated. 
✔ New FastStream project created. 
✔ FastStream app skeleton code generated. 
✘ Error: Failed to generate a valid application and test code. 
✘ Error: Integration tests failed.
Tokens used: 79384
&lt;/span&gt;&lt;span class="gp"&gt;Total Cost (USD): $&lt;/span&gt;0.24567
&lt;span class="go"&gt;
Apologies, we couldn't generate a working application and test code from your application description.

Please run the following command to start manual debugging:

cd retrieve_without_logs &amp;amp;&amp;amp; pytest

For in-depth debugging, check the retrieve-publish-crypto/_faststream_gen_logs directory for complete logs, including individual step information.
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There can be a number of reasons for that, the most common being:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The specification you provided was not sufficiently detailed to generate the application. In such cases, you could try to add more detailed instructions and try again.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The task is too difficult for the default GPT-3.5 model to handle and you could try to use GPT-4 instead:&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;faststream_gen --model gpt-4 -i description_retrieve_publish.txt -o retrieve-publish-crypto
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;You were unlucky and you just need to execute the command again. Large language models are stochastic in their nature and they always give different answers to the same questions. There are retry mechanisms in the engine, but sometimes they are not enough and just rerunning the command can mediate the problem.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If none of the above strategies work, check the already generated files and see what the potential problem could be. You can also finish the implementation or tests yourself.&lt;/p&gt;

&lt;h4&gt;
  
  
  Successful generation
&lt;/h4&gt;

&lt;p&gt;If successful, the command will generate a &lt;code&gt;FastStream&lt;/code&gt; project with the following structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;retrieve-publish-crypto
├── .github
│   └── workflows
│       ├── build_docker.yml
│       ├── deploy_docs.yml
│       └── test.yml
├── .gitignore
├── Dockerfile
├── LICENSE
├── README.md
├── app
│   └── application.py
├── pyproject.toml
├── scripts
│   ├── build_docker.sh
│   ├── lint.sh
│   ├── services.yml
│   ├── start_kafka_broker_locally.sh
│   ├── static-analysis.sh
│   ├── stop_kafka_broker_locally.sh
│   └── subscribe_to_kafka_broker_locally.sh
└── tests
    ├── __init__.py
    └── test_application.py
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The generated application is located in the &lt;code&gt;app/&lt;/code&gt; directory, while the tests are located in the &lt;code&gt;tests/&lt;/code&gt; directory. It is important to keep in mind that these files are generated by LLM and may vary with each generation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;app/application.py&lt;/code&gt;&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;aiohttp&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pydantic&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;NonNegativeFloat&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Logger&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream.kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaBroker&lt;/span&gt;

&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;NonNegativeFloat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;...,&lt;/span&gt; &lt;span class="n"&gt;examples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mf"&gt;50000.0&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Current price of cryptocurrency in USD&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;...,&lt;/span&gt; &lt;span class="n"&gt;examples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;The cryptocurrency&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;publisher&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;fetch_crypto_price&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time_interval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="c1"&gt;# Always use context: ContextRepo for storing app_is_running variable
&lt;/span&gt;    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;app_is_running&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;aiohttp&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ClientSession&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                    &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;amount&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                    &lt;span class="n"&gt;new_crypto_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                        &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;crypto_currency&lt;/span&gt;
                    &lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                        &lt;span class="n"&gt;new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                    &lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;warning&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                        &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Failed API request &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; at time &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                    &lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time_interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@app.on_startup&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;app_setup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_global&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;app_is_running&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@app.on_shutdown&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;shutdown&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_global&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;app_is_running&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Get all the running tasks and wait them to finish
&lt;/span&gt;    &lt;span class="n"&gt;fetch_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;fetch_tasks&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;gather&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;fetch_tasks&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@app.after_startup&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;publish_crypto_price&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Starting publishing:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;cryptocurrencies&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Bitcoin&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Ethereum&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ETH&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
    &lt;span class="n"&gt;fetch_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;create_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="nf"&gt;fetch_crypto_price&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://api.coinbase.com/v2/prices/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-USD/spot&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;cryptocurrencies&lt;/span&gt;
    &lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="c1"&gt;# you need to save asyncio tasks so you can wait them to finish at app shutdown (the function with @app.on_shutdown function)
&lt;/span&gt;    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_global&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;fetch_tasks&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fetch_tasks&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;code&gt;tests/test_application.py&lt;/code&gt;&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pytest&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TestApp&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream.kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;TestKafkaBroker&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;app.application&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;


&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;bytes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message.raw_message.key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;pass&lt;/span&gt;


&lt;span class="nd"&gt;@pytest.mark.asyncio&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_fetch_crypto_price&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;TestKafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;TestApp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;wait_call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Creating a new Python virtual environment
&lt;/h4&gt;

&lt;p&gt;All the required dependencies to run the newly generated FastStream&lt;br&gt;
project are located within the &lt;code&gt;pyproject.toml&lt;/code&gt; file.&lt;/p&gt;

&lt;p&gt;Create and activate a new virtual environment inside the&lt;br&gt;
&lt;code&gt;retrieve-publish-crypto&lt;/code&gt; directory by using &lt;a href="https://docs.python.org/3/library/venv.html" rel="noopener noreferrer"&gt;venv&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;retrieve-publish-crypto
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; venv venv
&lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Upgrade pip if needed and install the development dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--upgrade&lt;/span&gt; pip &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; .[dev]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Testing the application
&lt;/h4&gt;

&lt;p&gt;In order to verify the functional correctness of the application, it is recommended to execute the generated unit and integration test by running the &lt;code&gt;pytest&lt;/code&gt; command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pytest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;============================= test session starts ==============================
platform linux -- Python 3.11.4, pytest-7.4.2, pluggy-1.3.0
rootdir: /work/fastkafka-gen/docs_src/tutorial/retrieve-publish-crypto
configfile: pyproject.toml
plugins: anyio-3.7.1, asyncio-0.21.1
asyncio: mode=Mode.STRICT
collected 1 item                                                               

tests/test_application.py .                                              [100%]

============================== 1 passed in 2.65s ===============================
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
  
  
  Previewing AsyncAPI Docs
&lt;/h4&gt;

&lt;p&gt;To preview AsyncAPI Docs for the application, execute the following&lt;br&gt;
command:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream docs serve app.application:app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;INFO:     Started server process [3575270]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can now access the AsyncAPI Docs by opening &lt;a href="http://localhost:8000" rel="noopener noreferrer"&gt;localhost:8000&lt;/a&gt; in your browser.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgu069n1a3yltk5k5khzr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgu069n1a3yltk5k5khzr.png" alt="AsyncAPI Docs" width="800" height="651"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Starting localhost Kafka broker
&lt;/h4&gt;

&lt;p&gt;To run the &lt;code&gt;FastStream&lt;/code&gt; application locally, ensure that you have a&lt;br&gt;
running Kafka broker. You can start a Kafka Docker container by&lt;br&gt;
executing the &lt;code&gt;start_kafka_broker_locally.sh&lt;/code&gt; shell script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;./scripts/start_kafka_broker_locally.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;[+] Running 2/2
 ⠿ Network scripts_default  Created                                                                                                             0.1s
 ⠿ Container bitnami_kafka  Started 
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Starting the application
&lt;/h4&gt;

&lt;p&gt;To start the application, execute the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream run app.application:app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;2023-09-15 13:41:21,948 INFO     - FastStream app starting...
2023-09-15 13:41:22,144 INFO     -      |            - Starting publishing:
2023-09-15 13:41:22,144 INFO     - FastStream app started successfully! To exit press CTRL+C
Topic new_crypto_price not found in cluster metadata
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Ensure that the app remains running, it is needed for the subsequent&lt;br&gt;
steps.&lt;/p&gt;
&lt;h3&gt;
  
  
  Calculating the moving average
&lt;/h3&gt;

&lt;p&gt;Let’s develop an application that calculates the average price of the&lt;br&gt;
three most recent messages received from the &lt;code&gt;new_crypto_price&lt;/code&gt; topic&lt;br&gt;
for each cryptocurrency. Subsequently, we will publish the computed&lt;br&gt;
average price to the &lt;code&gt;price_mean&lt;/code&gt; topic.&lt;/p&gt;

&lt;p&gt;Here is the full description of the desired application:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Create a FastStream application for consuming messages
from the new_crypto_price topic. 

This topic needs to use a partition key.

new_crypto_price messages use JSON with two attributes 
(create class CryptoPrice with these attributes):
- price: non-negative float (it represents the current price of the crypto)
- crypto_currency: string (it represents the cryptocurrency e.g. BTC, ETH...)

The application should save each message to a dictionary (global variable) 
- partition key should be used as a dictionary key 
  and value should be a List of prices.

Keep only the last 100 messages in the dictionary.

If there are fewer than 3 messages for a given partition key,
do not publish any messages.

Otherwise, Calculate the price mean of the last 3 messages
for the given partition key.

Publish the price mean to the price_mean topic and use 
the same partition key that the new_crypto_price topic is using.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Please open a new terminal and navigate to the root directory of this&lt;br&gt;
tutorial, which is called &lt;code&gt;faststream_gen_tutorial&lt;/code&gt;. Once you are inside&lt;br&gt;
the &lt;code&gt;faststream_gen_tutorial&lt;/code&gt; folder, please activate the virtual&lt;br&gt;
environment.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;path_to/faststream_gen_tutorial
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To create a &lt;code&gt;faststream&lt;/code&gt; application inside the &lt;code&gt;calculate-mean-app&lt;/code&gt;&lt;br&gt;
directory, first copy the previous description and paste it into the&lt;br&gt;
&lt;code&gt;description_calculate_mean.txt&lt;/code&gt; file.&lt;/p&gt;

&lt;p&gt;Next, run the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream_gen &lt;span class="nt"&gt;-i&lt;/span&gt; description_calculate_mean.txt &lt;span class="nt"&gt;-o&lt;/span&gt; calculate-mean-app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;✨  Generating a new FastStream application!
 ✔ Application description validated. 
 ✔ FastStream app skeleton code generated. 
 ✔ The app and the tests are generated. 
 ✔ New FastStream project created. 
 ✔ Integration tests were successfully completed. 
 Tokens used: 13367
&lt;/span&gt;&lt;span class="gp"&gt; Total Cost (USD): $&lt;/span&gt;0.04147
&lt;span class="go"&gt;✨  All files were successfully generated!
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If successful, the command will generate &lt;code&gt;calculate-mean-app&lt;/code&gt; directory&lt;br&gt;
with &lt;code&gt;app/application.py&lt;/code&gt; and &lt;code&gt;tests/test_application.py&lt;/code&gt; inside. If&lt;br&gt;
not, just rerun the command again until it succeds.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;app/application.py&lt;/code&gt;&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pydantic&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;NonNegativeFloat&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Logger&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream.kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaBroker&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;NonNegativeFloat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;...,&lt;/span&gt; &lt;span class="n"&gt;examples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;50000&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Current price of the cryptocurrency&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Field&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;...,&lt;/span&gt; &lt;span class="n"&gt;examples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Cryptocurrency symbol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;publisher&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;price_mean&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@app.on_startup&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;app_setup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ContextRepo&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_global&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_history&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;bytes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message.raw_message.key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;New crypto price &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;crypto_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;crypto_key&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;

    &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;:]&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;price_mean&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_history&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;crypto_key&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;:])&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price_mean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;code&gt;tests/test_application.py&lt;/code&gt;&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pytest&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TestApp&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faststream.kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;TestKafkaBroker&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;app.application&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;


&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;price_mean&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_price_mean&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;bytes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message.raw_message.key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)):&lt;/span&gt;
    &lt;span class="k"&gt;pass&lt;/span&gt;


&lt;span class="nd"&gt;@pytest.mark.asyncio&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_app&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;TestKafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;TestApp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;50000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;50000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_price_mean&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_not_called&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;60000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;60000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_price_mean&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_not_called&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;70000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;new_crypto_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_new_crypto_price&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CryptoPrice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;70000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;crypto_currency&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BTC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;on_price_mean&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;60000.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Creating a new Python virtual environment
&lt;/h4&gt;

&lt;p&gt;All the required dependencies to run the newly generated FastStream&lt;br&gt;
project are located within the &lt;code&gt;pyproject.toml&lt;/code&gt; file. Create a new&lt;br&gt;
virtual environment and install the development dependencies for the&lt;br&gt;
project.&lt;/p&gt;

&lt;p&gt;Create and activate a new virtual environment inside the&lt;br&gt;
&lt;code&gt;calculate-mean-app&lt;/code&gt; directory by using &lt;a href="https://docs.python.org/3/library/venv.html" rel="noopener noreferrer"&gt;env&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;calculate-mean-app
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; venv venv
&lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Upgrade pip if needed and install the development dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--upgrade&lt;/span&gt; pip &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; .[dev]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Testing the application
&lt;/h4&gt;

&lt;p&gt;In order to verify functional correctness of the application, it is&lt;br&gt;
recommended to execute the generated unit and integration test by&lt;br&gt;
running the &lt;code&gt;pytest&lt;/code&gt; command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pytest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;============================= test session starts ==============================
platform linux -- Python 3.11.4, pytest-7.4.2, pluggy-1.3.0
rootdir: /work/fastkafka-gen/docs_src/tutorial/calculate-mean-app
configfile: pyproject.toml
plugins: anyio-3.7.1, asyncio-0.21.1
asyncio: mode=Mode.STRICT
collected 1 item                                                               

tests/test_application.py .                                              [100%]

============================== 1 passed in 0.44s ===============================
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
  
  
  Previewing AsyncAPI Docs
&lt;/h4&gt;

&lt;p&gt;To preview AsyncAPI Docs for the application, execute the following&lt;br&gt;
command:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream docs serve app.application:app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;INFO:     Started server process [3596205]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can now access the AsyncAPI Docs by opening &lt;a href="http://localhost:8000" rel="noopener noreferrer"&gt;http://localhost:8000&lt;/a&gt; in your browser.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffferz5qxdvvv4gq17l4k.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffferz5qxdvvv4gq17l4k.png" alt="AsyncAPI Docs" width="800" height="589"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Starting the application
&lt;/h4&gt;

&lt;p&gt;To start the application, execute the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;faststream run app.application:app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;2023-10-04 09:31:18,926 INFO     - FastStream app starting...
2023-10-04 09:31:18,948 INFO     - new_crypto_price |            - `OnNewCryptoPrice` waiting for messages
Topic new_crypto_price not found in cluster metadata
2023-10-04 09:31:19,069 INFO     - FastStream app started successfully! To exit, press CTRL+C
2023-10-04 09:31:40,876 INFO     - new_crypto_price | 0-16964047 - Received
2023-10-04 09:31:40,878 INFO     - new_crypto_price | 0-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC')
2023-10-04 09:31:40,878 INFO     - new_crypto_price | 0-16964047 - Processed
2023-10-04 09:31:40,878 INFO     - new_crypto_price | 1-16964047 - Received
2023-10-04 09:31:40,879 INFO     - new_crypto_price | 1-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH')
2023-10-04 09:31:40,879 INFO     - new_crypto_price | 1-16964047 - Processed
2023-10-04 09:31:43,053 INFO     - new_crypto_price | 2-16964047 - Received
2023-10-04 09:31:43,054 INFO     - new_crypto_price | 2-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC')
2023-10-04 09:31:43,054 INFO     - new_crypto_price | 2-16964047 - Processed
2023-10-04 09:31:43,054 INFO     - new_crypto_price | 3-16964047 - Received
2023-10-04 09:31:43,055 INFO     - new_crypto_price | 3-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH')
&lt;/span&gt;&lt;span class="c"&gt;...
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can see in the terminal that the application is reading the messages&lt;br&gt;
from the &lt;code&gt;new_crypto_price&lt;/code&gt; topic. Ensure that the app remains running&lt;br&gt;
as it is needed for the next step.&lt;/p&gt;
&lt;h4&gt;
  
  
  Subscribing directly to local Kafka broker topic
&lt;/h4&gt;

&lt;p&gt;Open the &lt;strong&gt;new terminal&lt;/strong&gt;, navigate to the &lt;code&gt;calculate-mean-app&lt;/code&gt;&lt;br&gt;
directory.&lt;/p&gt;

&lt;p&gt;To check if the &lt;code&gt;calculate-mean-app&lt;/code&gt; is publishing messages to the&lt;br&gt;
&lt;code&gt;price_mean&lt;/code&gt; topic, run the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;./scripts/subscribe_to_kafka_broker_locally.sh price_mean
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;BTC     26405.745
ETH     1621.3733333333332
BTC     26404.865
ETH     1621.375
BTC     26404.865
&lt;/span&gt;&lt;span class="c"&gt;...
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Stopping Kafka broker
&lt;/h4&gt;

&lt;p&gt;To stop the Kafka broker after analyzing the mean price of&lt;br&gt;
cryptocurrencies, you can execute the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;./scripts/stop_kafka_broker_locally.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight console"&gt;&lt;code&gt;&lt;span class="go"&gt;[+] Running 2/2
 ⠿ Container bitnami_kafka  Removed                                                                            1.2s
 ⠿ Network scripts_default  Removed 
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  [Optional] GitHub integration
&lt;/h2&gt;

&lt;p&gt;To successfully complete this optional tutorial chapter, make sure you&lt;br&gt;
have a &lt;a href="https://github.com/" rel="noopener noreferrer"&gt;GitHub account&lt;/a&gt; and the &lt;a href="https://git-scm.com/" rel="noopener noreferrer"&gt;Git command&lt;/a&gt; installed. Additionally, ensure that your &lt;a href="https://docs.github.com/authentication" rel="noopener noreferrer"&gt;authentication&lt;/a&gt; for GitHub is properly set.&lt;/p&gt;

&lt;p&gt;We need to create two GitHub repositories, one for the &lt;code&gt;FastStream&lt;/code&gt; project in the &lt;code&gt;retrieve-publish-crypto&lt;/code&gt; directory and another for the &lt;code&gt;FastStream&lt;/code&gt; project in the &lt;code&gt;calculate-mean-app&lt;/code&gt; directory. In this chapter, we will upload the &lt;code&gt;FastStream&lt;/code&gt; project to GitHub, which includes the &lt;code&gt;retrieve-publish-crypto&lt;/code&gt; project. We will also provide an&lt;br&gt;
explanation of the generated CI workflows.&lt;/p&gt;
&lt;h3&gt;
  
  
  Adding locally hosted code to GitHub
&lt;/h3&gt;

&lt;p&gt;To create a GitHub repository, click on the following &lt;a href="https://github.com/new" rel="noopener noreferrer"&gt;link&lt;/a&gt;. For the&lt;br&gt;
&lt;code&gt;Repository name&lt;/code&gt;, use &lt;code&gt;retrieve-publish-crypto&lt;/code&gt; and click on&lt;br&gt;
&lt;code&gt;Create repository&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4o2nqb96vdbja04v0m9s.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4o2nqb96vdbja04v0m9s.png" alt="Create GitHub Repository" width="800" height="731"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Please open your development environment and go to the&lt;br&gt;
&lt;code&gt;retrieve-publish-crypto&lt;/code&gt; directory that was generated in the previous&lt;br&gt;
steps.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;path_to/faststream_gen_tutorial/retrieve-publish-crypto
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, execute the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git init
git add &lt;span class="nb"&gt;.&lt;/span&gt;
git commit &lt;span class="nt"&gt;-m&lt;/span&gt; &lt;span class="s2"&gt;"Retrieve and publish crypto prices application implemented"&lt;/span&gt;
git branch &lt;span class="nt"&gt;-M&lt;/span&gt; main
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you are using &lt;strong&gt;HTTPS&lt;/strong&gt; authentication execute (&lt;strong&gt;replace the&lt;br&gt;
&lt;code&gt;git_username&lt;/code&gt;&lt;/strong&gt; in the URL with your own GitHub username):&lt;/p&gt;

&lt;p&gt;&lt;code&gt;git remote add origin https://github.com/git_username/retrieve-publish-crypto.git&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;If you are using &lt;strong&gt;SSH&lt;/strong&gt; authentication execute (&lt;strong&gt;replace the&lt;br&gt;
&lt;code&gt;git_username&lt;/code&gt;&lt;/strong&gt; in the URL with your own GitHub username):&lt;/p&gt;

&lt;p&gt;&lt;code&gt;git remote add origin git@github.com:git_username/retrieve-publish-crypto.git&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;To update the remote branch with local commits, execute:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git push &lt;span class="nt"&gt;-u&lt;/span&gt; origin main
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Continuous integration with GitHub Actions
&lt;/h3&gt;

&lt;p&gt;Once the changes are pushed, CI pipeline will run the tests again,&lt;br&gt;
create and publish the documentation and build Docker container with the&lt;br&gt;
application.&lt;/p&gt;

&lt;p&gt;To verify the passing status of the CI, open your web browser, go to the&lt;br&gt;
newly created GitHub repository, and click on the Actions tab.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzix9xb0hz770gmrp2jky.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzix9xb0hz770gmrp2jky.png" alt="GitHub Actions" width="800" height="396"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The tests executed with &lt;code&gt;pytest&lt;/code&gt; are passing successfully.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fppfwfs2tfbwzb03v1cfn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fppfwfs2tfbwzb03v1cfn.png" alt="Tests" width="800" height="383"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Docker image has been successfully built and pushed to the GitHub&lt;br&gt;
Container registry under the repository&lt;br&gt;
&lt;code&gt;ghcr.io/your_username/retrieve-publish-crypto&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwgc3iedkb6zaa3v0gshj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwgc3iedkb6zaa3v0gshj.png" alt="Build Docker" width="800" height="454"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The AsyncAPI docs have been successfully generated and deployed to&lt;br&gt;
GitHub Pages.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk2vzs3b5tsn293tc61wf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk2vzs3b5tsn293tc61wf.png" alt="Deploy FastStream AsyncAPI Docs" width="800" height="462"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After the successful execution of the &lt;code&gt;Deploy FastStream AsyncAPI Docs&lt;/code&gt;&lt;br&gt;
workflow, a new branch named &lt;code&gt;gh-pages&lt;/code&gt; will be created. To access the&lt;br&gt;
GitHub Pages settings, navigate to the &lt;code&gt;Settings -&amp;gt; Pages&lt;/code&gt;, select the&lt;br&gt;
&lt;code&gt;gh-pages&lt;/code&gt; branch as the designated branch for hosting the GitHub Pages&lt;br&gt;
and click on &lt;code&gt;Save&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fck0822fhjh9q1zwe2xl2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fck0822fhjh9q1zwe2xl2.png" alt="Deploy FastStream AsyncAPI Docs" width="800" height="455"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;By setting up a branch on GitHub Pages, a new workflow will&lt;br&gt;
automatically be triggered within the GitHub Actions. To access this&lt;br&gt;
workflow, simply click on the &lt;code&gt;Actions&lt;/code&gt; tab and open the&lt;br&gt;
&lt;code&gt;pages build and deployment&lt;/code&gt; workflow.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6hwzlouetx35cqkcevk5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6hwzlouetx35cqkcevk5.png" alt="Pages build and deploy" width="800" height="339"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once all the jobs within the workflow are completed, you can click on&lt;br&gt;
the provided URL to conveniently view and explore the AsyncAPI Docs that&lt;br&gt;
have been generated for your application.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnraletqbhh1m1d27u6k2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnraletqbhh1m1d27u6k2.png" alt="AsyncAPI Docs" width="800" height="589"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Repeat
&lt;/h3&gt;

&lt;p&gt;Repeat the same process with &lt;code&gt;calculate-mean-app&lt;/code&gt; project.&lt;/p&gt;

&lt;h2&gt;
  
  
  Next steps
&lt;/h2&gt;

&lt;p&gt;Congratulations! You have successfully completed this tutorial and&lt;br&gt;
gained a new set of skills. Now that you have learned how to use&lt;br&gt;
&lt;code&gt;faststream-gen&lt;/code&gt;, try it out with your own example!&lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>ai</category>
      <category>kafka</category>
      <category>faststream</category>
    </item>
    <item>
      <title>FastKafka - free open source python lib for building Kafka-based services</title>
      <dc:creator>Davor Runje</dc:creator>
      <pubDate>Wed, 15 Mar 2023 05:18:11 +0000</pubDate>
      <link>https://dev.to/airtai/fastkafka-free-open-source-python-lib-for-building-kafka-based-services-4b15</link>
      <guid>https://dev.to/airtai/fastkafka-free-open-source-python-lib-for-building-kafka-based-services-4b15</guid>
      <description>&lt;p&gt;We were searching for something like FastAPI for Kafka-based service we were developing, but couldn’t find anything similar. So we shamelessly made one by reusing beloved paradigms from FastAPI and we shamelessly named it FastKafka. The point was to set the expectations right - you get pretty much what you would expect: function decorators for consumers and producers with type hints specifying Pydantic classes for JSON encoding/decoding, automatic message routing to Kafka brokers and documentation generation.&lt;/p&gt;

&lt;p&gt;Please take a look and tell us how to make it better. Our goal is to make using it as easy as possible for someone with experience with FastAPI.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/airtai/fastkafka"&gt;https://github.com/airtai/fastkafka&lt;/a&gt;&lt;/p&gt;

</description>
      <category>webdev</category>
      <category>python</category>
      <category>fastapi</category>
      <category>apachekafka</category>
    </item>
  </channel>
</rss>
