<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: bytewax</title>
    <description>The latest articles on DEV Community by bytewax (@bytewax).</description>
    <link>https://dev.to/bytewax</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Forganization%2Fprofile_image%2F6754%2F3bdc2c57-9051-412a-a445-6b558a9c64e8.png</url>
      <title>DEV Community: bytewax</title>
      <link>https://dev.to/bytewax</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/bytewax"/>
    <language>en</language>
    <item>
      <title>M12 invests in the Future of Stream Processing with Bytewax</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Wed, 09 Aug 2023 16:25:50 +0000</pubDate>
      <link>https://dev.to/bytewax/m12-invests-in-the-future-of-stream-processing-with-bytewax-3n43</link>
      <guid>https://dev.to/bytewax/m12-invests-in-the-future-of-stream-processing-with-bytewax-3n43</guid>
      <description>&lt;p&gt;At Bytewax, we're passionate about the power of real-time data. With AI and automation on the rise, accessing data instantly isn't just a cool perk—it's becoming a necessity. Our mission is to build software that will strip away the complexities of streaming and make it accessible for &lt;strong&gt;every developer&lt;/strong&gt; to build real-time data applications.&lt;/p&gt;

&lt;p&gt;We started with the Rust powered, open source Python stream processor, &lt;a href="https://github.com/bytewax/bytewax"&gt;Bytewax&lt;/a&gt;, which is now a year and a half old, debuting in February 2022. Since starting the project we have grown and matured the Bytewax open source offering to include persistent state, different windowing configurations, and new operators for increased performance and scalability. We have also focused on bettering the developer experience from integration to deployment with our deployment tool, &lt;a href="https://dev.to/docs/deployment/waxctl"&gt;waxctl&lt;/a&gt;, the ability to rescale without losing data stored in state, and the ability to connect to various input and output sources as well as build your own.&lt;/p&gt;

&lt;p&gt;We are excited to announce a new partner along our journey in M12/GitHub with their investment in Bytewax to support further development on the open source as well as the development of &lt;a href="https://dev.to/platform"&gt;&lt;strong&gt;the Bytewax Platform&lt;/strong&gt;&lt;/a&gt;, which will help businesses scale out their Bytewax usage starting with features like disaster recovery, collaboration and observability tools and a management layer.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;How Bytewax Supports AI and Real-Time Applications&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The world has moved into a new wave of computing where businesses power their operations and consumer interactions with AI. Sophisticated AI models require a real-time understanding of the world to make accurate decisions. What is often referred to as real-time ML is when a system reacts in real-time with a decision powered by an ML model to the inputs it receives. Stream processing and more importantly &lt;strong&gt;stream processing with a Python interface&lt;/strong&gt; is pivotal for Real-time ML in order to transform data into features for models.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;You can read more about real-time ML with Bytewax in &lt;a href="https://dev.to/blog/real-time-ml"&gt;our blog post here&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;There are many other use cases currently being powered by Bytewax from monitoring and reacting to &lt;a href="https://dev.to/blog/online-machine-learning-in-practice-interactive-dashboards-to-detect-data-anomalies-in-real-time"&gt;IoT sensors&lt;/a&gt; for vehicle fleets or across the energy grid, to monitoring &lt;a href="https://dev.to/blog/real-time-stock-prices-with-numpy"&gt;market data&lt;/a&gt; or analyzing &lt;a href="https://dev.to/blog/aws-anomaly-detection"&gt;infrastructure&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;New Investment: A Vote of Confidence&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Microsoft is known for its investments in &lt;a href="https://devblogs.microsoft.com/python/supporting-the-python-community/"&gt;Python&lt;/a&gt; and &lt;a href="https://blogs.microsoft.com/blog/2023/01/23/microsoftandopenaiextendpartnership/"&gt;AI&lt;/a&gt;. Creating partnerships with pivotal developers and teams that are moving the industry forward. Their investment in Bytewax is a vote of confidence towards the Bytewax vision and mission and the importance of stream processing in the next wave of computing.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;“We believe that Zander and the Bytewax team are building a cutting edge tool that simplifies event and stream processing, and appreciate their thoughtful technical approach leveraging a Python framework to build highly scalable streaming dataflows” said Priyanka Mitra, Partner at M12 and co-founder of the M12 GitHub Fund. “We are impressed with their engagement of the open-source community and are committed to supporting Bytewax in accomplishing their mission, especially as they explore cutting edge AI and ML use cases” she added.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;strong&gt;Future Bytewax&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The Microsoft investment will help Bytewax establish a thriving community around the open source project and build out features for the paid platform to support adoption of the technology. We have been working to solve exceptionally hard problems like rescaling dataflows and cloud backup for disaster recovery as well as improving performance. We are excited to continue to bring features like these to Bytewax with a simple user interface and low complexity to support users across all stages of their journey.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Connect with us&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;We would love to hear from our users and any Python and streaming enthusiasts on how we can increase our support for workloads and Python development patterns. Please feel free to reach out via our &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-vkos2f6r-_SeT9pF2~n9ArOaeI3ND2w"&gt;slack community&lt;/a&gt; or the &lt;a href="https://github.com/bytewax/bytewax"&gt;GitHub repo&lt;/a&gt;. We would also like to take this opportunity to thank our users, investors, and community for their continued support! If you like what we are building, please &lt;a href="https://github.com/bytewax/bytewax"&gt;⭐ the repo&lt;/a&gt; 😀.&lt;/p&gt;

</description>
      <category>investment</category>
      <category>streaming</category>
    </item>
    <item>
      <title>Data Parallel, Task Parallel, and Agent Actor Architectures</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 13 Jul 2023 19:26:35 +0000</pubDate>
      <link>https://dev.to/bytewax/data-parallel-task-parallel-and-agent-actor-architectures-dm6</link>
      <guid>https://dev.to/bytewax/data-parallel-task-parallel-and-agent-actor-architectures-dm6</guid>
      <description>&lt;h2&gt;
  
  
  Introduction:
&lt;/h2&gt;

&lt;p&gt;In the rapidly evolving world of data processing, understanding the various architectural approaches is pivotal to choosing the right tools for your specific needs. The three dominant architectures that have emerged—data parallel, task parallel, and agent actor—each offer unique strengths that cater to different types of data workloads.&lt;/p&gt;

&lt;p&gt;Data parallel architectures shine when large datasets need to be processed in parallel. This model divides data into smaller chunks, each processed independently but in the same manner on different workers or nodes. Apache Spark, a well-known data processing framework, uses this architecture. Spark's resilience, capacity for handling vast amounts of data, and ability to perform complex transformations make it a favorite in big data landscapes. Bytewax also follows this model with the same transformations happening on each worker, but on different data.&lt;/p&gt;

&lt;p&gt;On the other hand, task parallel architectures, as exemplified by Apache Flink and Dask, focus on executing different tasks concurrently across distributed systems. This approach is particularly effective for workflows with a wide variety of tasks that can be performed independently or have complex dependencies. Flink's stream-first philosophy provides robustness for real-time processing tasks, while Dask's flexibility makes it a great choice for parallel computing tasks in Python environments.&lt;/p&gt;

&lt;p&gt;Finally, the agent actor architecture, the foundation for Ray, presents a flexible and robust model for handling complex, stateful, and concurrent computations. In this model, "actors" encapsulate state and behavior, communicating through message passing. Ray's ability to scale from a single node to a large cluster makes it a popular choice for machine learning tasks.&lt;/p&gt;

&lt;p&gt;As we delve deeper into these architectures in the following sections, we will explore their pros and cons, use cases, and the unique features offered by Spark, Flink, Dask, Ray, and Bytewax. By understanding these architectures, you'll be better equipped to select the right framework for your next data processing venture. Stay tuned!&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Parallel Architectures
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ttZGQmfL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Data_parallelism_d1e340a7c5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ttZGQmfL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Data_parallelism_d1e340a7c5.png" alt="Data parallelism.png" width="776" height="886"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Data parallelism is a form of parallelization that distributes the data across different nodes, which operate independently of each other. Each node applies the same operation on its allocated subset of data. This approach is particularly effective when dealing with large datasets where the task can be divided and executed simultaneously, reducing computational time significantly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In data parallel architectures, the dataset is split into smaller, more manageable chunks, or partitions. Each partition is processed independently by separate tasks running the same operation. This distribution is done in a way that each task operates on a different core or processor, enabling high-level parallel computation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Scalability:&lt;/strong&gt; Data parallel architectures are designed to handle large volumes of data. As data grows, you can simply add more nodes to the system to maintain performance.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Performance:&lt;/strong&gt; The ability to perform computations in parallel leads to significant speedups, particularly for large datasets and computationally intensive operations. Due to the fact that data does not move around as often to different workers, there can also be a performance gain.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Simplicity:&lt;/strong&gt; Since the same operation is applied to each partition, this model is relatively simple to understand and implement.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Communication Overhead:&lt;/strong&gt; The nodes need to communicate with each other to synchronize and aggregate results, which can add overhead, particularly for large numbers of nodes.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Limited Use Cases:&lt;/strong&gt; Data parallelism works best when the same operation can be applied to all data partitions. It's less suitable for tasks that require complex interdependencies or shared state across tasks. As we have seen with spark though, this is not entirely true.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Data parallel architectures excel in situations where large volumes of data need to be processed quickly and in a similar manner. Some of the best use cases include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Batch Processing:&lt;/strong&gt; In scenarios where large amounts of data need to be processed all at once, data parallel architectures shine. This is a common use case in big data analytics, where massive datasets are processed in batch jobs.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Machine Learning:&lt;/strong&gt; Many machine learning algorithms, especially those that involve matrix operations, can be easily parallelized. For instance, in the training phase of a neural network, the weights of the neurons are updated based on the error. This operation can be done in parallel for each layer, making data parallelism a great fit.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;High Partitioned Input and Output:&lt;/strong&gt; Data parallel frameworks excel when the input and output are partitioned in such a way that the workers can evenly match the partitions and redistribution of the data is limited.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stream Processing:&lt;/strong&gt; The data parallelism approach is well suited to stream processing where the same operation is happening to data in real-time.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Apache Spark, a notable data parallel framework, is widely used in big data analytics for tasks like ETL (Extract, Transform, Load), predictive analytics, and data mining. It's particularly known for its ability to perform complex data transformations and aggregations across large datasets.&lt;/p&gt;

&lt;p&gt;Bytewax is known for its ability to handle large continuos streams of data and do complex transformations on them in real-time.&lt;/p&gt;

&lt;p&gt;As we continue our exploration into the different data processing architectures, we'll see how other approaches handle tasks that might not be as suitable for data parallel processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Task Parallel Architectures: Unlocking Concurrent Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--vkVYkOoH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Task_parallelism_4cc6d9f034.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--vkVYkOoH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Task_parallelism_4cc6d9f034.png" alt="Task parallelism.png" width="776" height="886"&gt;&lt;/a&gt;Task parallelism, also known as function parallelism, is an architectural approach that focuses on distributing tasks—rather than data—across different processing units. Each of these tasks can be a separate function or a method operating on different data or performing different computations. This type of parallelism is a great fit for problems where different operations can be performed concurrently on the same or different data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In a task parallel model, the focus is on concurrent execution of many different tasks that are part of a larger computation. These tasks can be independent, or they can have defined dependencies and need to be executed in a certain order. The tasks are scheduled and dispatched to different processors in the system, enabling parallel execution.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Diverse Workloads:&lt;/strong&gt; Task parallel architectures excel in scenarios where the problem can be broken down into a variety of tasks that can be executed in parallel.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Flexibility:&lt;/strong&gt; Since tasks don't necessarily need to operate on the same data or perform the same operation, this model offers a high level of flexibility.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Efficiency:&lt;/strong&gt; Task parallelism can lead to improved resource utilization, as tasks can be scheduled to keep all processors busy.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complexity:&lt;/strong&gt; Managing and scheduling tasks, especially when there are dependencies, can add complexity to the system.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Inter-task Communication:&lt;/strong&gt; Tasks often need to communicate with each other to synchronize or to pass data, which can lead to overhead and can be a challenge for performance.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Task parallel architectures are best suited to problems that can be broken down into discrete tasks that can run concurrently. This includes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complex Computations:&lt;/strong&gt; Scenarios where a complex problem can be broken down into a number of separate tasks, such as simulations or optimization problems, are a good fit for task parallel architectures.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Real-Time Processing On Diverse Datasets:&lt;/strong&gt; Task parallel architectures are often used in systems that require real-time processing and low latency, such as stream processing systems.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Apache Flink is an excellent example of a system that uses a task parallel architecture. Flink is designed for stream processing, where real-time results are of utmost importance. It breaks down stream processing into a number of tasks that can be executed in parallel, providing low-latency and high-throughput processing of data streams.&lt;/p&gt;

&lt;p&gt;Similarly, Dask is a flexible library for parallel computing in Python that uses task scheduling for complex computations. Dask allows you to parallelize and distribute computation by breaking it down into smaller tasks, making it a popular choice for tasks that go beyond the capabilities of typical data parallel tools.&lt;/p&gt;

&lt;p&gt;In the next section, we'll explore the agent actor model, a different approach to managing concurrency and state that opens up new possibilities for parallel computation.&lt;/p&gt;

&lt;h2&gt;
  
  
  Agent Actor Architectures: Pioneering Concurrent Computations
&lt;/h2&gt;

&lt;p&gt;Agent actor architectures introduce a fundamentally different approach to handle parallel computations, particularly for problems that involve complex, stateful computations. This approach build on task parallelism with the addition of an actor. An actor is a computational entity that, in response to a message it receives, can concurrently: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. The agents are then similar to task distributed or functional distributed systems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In the agent actor model, actors are the universal primitives of concurrent computation. Upon receiving a message, an actor can change its local state, send messages to other actors, or create new actors. Actors encapsulate their state, avoiding common pitfalls of multithreaded programming such as race conditions. Actor systems are inherently message-driven and can be distributed across many nodes, making them highly scalable.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Concurrent State Management:&lt;/strong&gt; Actors provide a safe way to handle mutable state in a concurrent system. Since each actor processes messages sequentially and has isolated state, there is no need for locks or other synchronization mechanisms.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Scalability:&lt;/strong&gt; Actor systems are inherently distributed and can easily scale out across many nodes.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Fault Tolerance:&lt;/strong&gt; Actor systems can be designed to be resilient with self-healing capabilities. If an actor fails, it can be restarted, and messages it was processing can be redirected to other actors.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complexity:&lt;/strong&gt; Building systems with the actor model can be more complex than traditional paradigms due to the asynchronous and distributed nature of actors.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Message Overhead:&lt;/strong&gt; Communication between actors is done with messages, which can lead to overhead, especially in systems with a large number of actors.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Agent actor architectures are best suited for problems that involve complex, stateful computations and require high levels of concurrency. This includes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Real-time Systems:&lt;/strong&gt; The actor model is well suited for real-time systems where you need to process high volumes of data concurrently, such as trading systems or real-time analytics.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Distributed Systems:&lt;/strong&gt; The actor model can be a good fit for building distributed systems where you need to manage state across multiple nodes, like IoT systems or multiplayer online games.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Ray is an example of a system that employs the actor model. It was designed to scale Python applications from a single node to a large cluster, and it's commonly used for machine learning tasks, which often require complex, stateful computations.&lt;/p&gt;

&lt;p&gt;As we've seen, the landscape of data processing architectures is rich and diverse, with each model offering unique strengths and potential challenges. Whether it's data parallel, task parallel, or agent actor, the choice of architecture will depend largely on the nature of the data workload and the specific requirements of the system you're building.&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>data</category>
      <category>actors</category>
    </item>
    <item>
      <title>Reasoning about Streaming vs Batch with a Case Study from GitHub</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 15 Jun 2023 20:26:32 +0000</pubDate>
      <link>https://dev.to/bytewax/reasoning-about-streaming-vs-batch-with-a-case-study-from-github-5g3l</link>
      <guid>https://dev.to/bytewax/reasoning-about-streaming-vs-batch-with-a-case-study-from-github-5g3l</guid>
      <description>&lt;p&gt;&lt;em&gt;If you prefer videos check out Zander's talk at Data Council 2023 &lt;a href="//youtu.be/qJ3PWyx7w2Q"&gt;"When to Move from Batch to Streaming and how to do it without hiring an entirely new team"&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The world of data processing is undergoing a significant shift, moving towards real-time processing. Despite an increase in understanding that shifting workloads to real-time can increase ROI and lower costs, there isn't consensus in the industry around how to best transition workloads to real-time and what the best tools are for different types of real-time workloads. While traditional analytical tools such as data warehouses, business intelligence layers, and metrics are widely accepted and understood, the concept of real-time data processing and the technologies that enable it are not as widely recognized or agreed upon.&lt;/p&gt;

&lt;p&gt;In this post, we aim to demystify real-time data processing, discussing its relevance within an organization, the different types of real-time workloads, and some real-world examples from my time at GitHub. But first, let's clarify some definitions.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Real-Time and Stream Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--MAmdy1fh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/real_time_53cfa27cf7.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--MAmdy1fh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/real_time_53cfa27cf7.jpg" alt="real-time.jpg" width="800" height="420"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;What is Real-Time? "Real-Time" refers to anything that is perceived to happen in real time by a human - an admittedly fuzzy definition. Quantitatively, this usually refers to processes that happen in the sub-second realm. Interestingly, based on this definition, real-time data processing can actually occur with both batch and stream processing technologies depending on the end-to-end latency.&lt;/p&gt;

&lt;p&gt;Stream processing refers to processing a single datum at a time, flowing in a continuous stream, while batch processing is when you gather a batch of data and process it all at once. By reducing the size of the batch progressively, we can edge closer to real-time processing. This is precisely what technologies like Spark's structured streaming do with micro-batches.&lt;/p&gt;

&lt;p&gt;Now that we have some definitions out of the way, let's dive into real-time processing and the different types of real-time workloads.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Relevance of Real-Time Data Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--DOxOtcp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/tenor_45419acb5c.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--DOxOtcp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/tenor_45419acb5c.gif" alt="tenor.gif" width="576" height="576"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In our day-to-day lives, we're constantly receiving and processing information in real-time. Consider driving a car - an activity that requires processing multiple inputs and making decisions in real-time. If we were to approach driving in a batch processing manner, waiting to gather information for a duration and then trying to forecast the next 15 seconds, it would likely end in disaster. As another example you can imagine a sport like basketball. For each moment in time, the players are receiving tens or hundreds of inputs and they are reacting to them in real-time. If we also imagined a non-real-time version of this, it might not be so exciting to watch or play as each player waited for a certain number of seconds while receiving input and then tried to react to those inputs.&lt;/p&gt;

&lt;p&gt;These examples help to highlight why we might choose to process things in real-time. In the context of driving, we're making decisions that could potentially be a matter of life or death. And in our basketball example, the real-time processing elevates the user experience. However, while these examples provide some understanding, they don't necessarily help us generalize the concept.&lt;/p&gt;

&lt;h2&gt;
  
  
  Types of Real-Time Workloads
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KablIMzo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/analytical_vs_operational_49781ffe70.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KablIMzo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/analytical_vs_operational_49781ffe70.png" alt="analytical vs operational" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We can broadly categorize real-time processing into two types of workloads: analytical and operational.&lt;/p&gt;

&lt;h3&gt;
  
  
  Analytical workloads
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--UTlf0zho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/linked_in_views_71137c4795.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--UTlf0zho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/linked_in_views_71137c4795.png" alt="linked-in-views.png" width="800" height="375"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Analytical workloads require low latency, freshness, and the capability of retrieval at scale. Real-time analytical workloads must be queryable. A good example of this is LinkedIn's profile view notification. When you click into the profile view notification, you're taken to a page that shows your profile views history, all the way up to the most recent data. This demonstrates the freshness of data and the ability to query it as you can filter and interact with the data, querying the freshest data.&lt;/p&gt;

&lt;p&gt;Another example of a real-time analytical workload is an Instacart order. When you place an order on Instacart, you can go into your order and see the updated Estimated Time of Arrival (ETA). This is another instance of an analytical real-time workload where the user is interacting with analytical data in real-time.&lt;/p&gt;

&lt;h3&gt;
  
  
  Operational workloads
&lt;/h3&gt;

&lt;p&gt;On the other hand, operational workloads, require low latency and freshness, but they also need to be reactive. This means that some of the decision-making or business logic is embedded inline in the system. For example, in a streaming use case, this would be inside the stream processor. The data is received, transformed, and then a decision is made in an online fashion. Bytewax is a great example of a framework that can be used to make real-time decisions for operational workloads.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--eITPBuJE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/fraud_4cc560b3e9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--eITPBuJE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/fraud_4cc560b3e9.png" alt="fraud.png" width="236" height="512"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A good example of operational processing is fraud detection. The fraud detection system takes all the inputs in real time and makes a decision about them without a human in the loop. It then makes a decision on what to do and communicates with the user to confirm if its suspicion of fraud is correct.&lt;/p&gt;

&lt;p&gt;Another example in financial markets is high-frequency trading. The software system consumes inputs from a variety of different data sources, processes them in real time, and then makes a decision whether to buy or sell. The speed of making that decision is a key factor in this context.&lt;/p&gt;

&lt;h3&gt;
  
  
  Analytical vs Operational
&lt;/h3&gt;

&lt;p&gt;One more aspect I wanted to touch on here is the difference between having a human in the loop versus having a machine in the loop. If we look at different examples across analytical and operational workloads, there's a concept of the human being more involved in the analytical and less or not involved in the operational.&lt;/p&gt;

&lt;p&gt;To summarize, if there is a situation where you believe there's value to be derived and there's a human in the loop, there's probably a subset of tools within the real-time space that fall under the analytical workload. If you're building something like an algorithmic trading system, where you believe that there's no requirement for a human in the loop, you're more likely to fall under the operational category, and you should look at tools, like Bytewax, that support operational processing.&lt;/p&gt;

&lt;h1&gt;
  
  
  Case Studies. GitHub's Real-Time Data Processing Decisions
&lt;/h1&gt;

&lt;p&gt;Let's make things more concrete by discussing a couple of case studies involving decisions we made at GitHub concerning real-time data processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Trending Repositories and Developers: A Batch Processing Approach
&lt;/h2&gt;

&lt;p&gt;The team I was a part of at GitHub was responsible for several data products that were featured on github.com, including Trending Repositories and Trending Developers. These features were located on the GitHub Explore page and aimed to identify trending repositories and developers based on a variety of metrics, such as stars, forks, and views. We had access to this data in real time through a streaming platform (Kafka) managed by another team.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--003vyP93--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/trending_1bd345b99f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--003vyP93--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/trending_1bd345b99f.png" alt="trending.png" width="512" height="357"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Although we had the capacity to implement these as real-time features, we decided against it. Our team primarily consisted of data scientists and machine learning engineers who hadn't worked with streaming platforms or stream processors before. Moreover, these features were new products, and we didn't know how impactful they would be or whether users would find them valuable and engage with them repeatedly.&lt;/p&gt;

&lt;p&gt;Instead of implementing these features to use real-time data, we decided to process this data in a batch format. We would run nightly queries against Presto, where the data landed from Kafka, then store the processed data in a MySQL database for retrieval from github.com. These features were not real-time workloads, but they could have been. If it was determined they would be useful as real-time data products, they would serve as excellent examples of analytical use cases.&lt;/p&gt;

&lt;h2&gt;
  
  
  Star Spam Detection: A Real-Time Processing Solution
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--rgcu5vPH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/star_history_c2a1668eae.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--rgcu5vPH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/star_history_c2a1668eae.png" alt="star-history.png" width="512" height="348"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Another task we undertook was star spam detection. The concept of "stars" on GitHub repositories is used as a proxy to gauge the health and utility of the project. If we were unable to detect star spammers, it would degrade the platform's value for users, potentially leading to a downward spiral in the platform's overall value.&lt;/p&gt;

&lt;p&gt;We decided to tackle this problem in a real-time manner to limit the exposure of the users and potential degradation of the platform from star spam. The data was available in Kafka and could therefore be consumed as it was available. Based on certain criteria, users could be flagged as spammers and then action taken. Once a user was flagged, they were submitted for human review to decide on what the next steps should be. This is an excellent example of operational processing.&lt;/p&gt;

&lt;h1&gt;
  
  
  The Impact of Real-Time Processing Decisions
&lt;/h1&gt;

&lt;p&gt;The point is that the decision to implement real-time processing can have significant impacts on the return on investment for a project, and this correlation should be carefully considered. If we had decided to make the trending feature real-time, it would have been even more necessary to maintain the platform's value by detecting star spam as close to real-time as possible.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--8KNcxPFc--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Copy_of_Zander_Data_Council_2023_1_c28bfe1372.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--8KNcxPFc--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Copy_of_Zander_Data_Council_2023_1_c28bfe1372.png" alt="roi and latency.png" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The value of data often degrades over time, and while it's usually depicted as a sharp decline (see graph on the left), most projects or data tend to follow more of an S-curve (on the right). After a certain point, the return or value of the data caps out with respect to latency. In these case studies, neither project saw an exponential increase in return on investment as latency was reduced, and we were able to tackle star spammers on an hours timeframe instead of milliseconds. This demonstrates that not all data projects need to move towards zero latency to provide significant value.&lt;/p&gt;

&lt;p&gt;If you are interested in moving some of your workloads to real-time and not sure where to start. Please reach out to us in &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;our slack channel&lt;/a&gt; and we would be happy to help you figure out if the value is there and where to start.&lt;/p&gt;

</description>
      <category>python</category>
      <category>streaming</category>
      <category>opensource</category>
    </item>
    <item>
      <title>Bytewax v0.16.2 is out!</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Thu, 08 Jun 2023 20:35:31 +0000</pubDate>
      <link>https://dev.to/bytewax/bytewax-v0162-is-out-1dbb</link>
      <guid>https://dev.to/bytewax/bytewax-v0162-is-out-1dbb</guid>
      <description>&lt;p&gt;🎉 Exciting News from Bytewax! 🎉&lt;/p&gt;

&lt;p&gt;We're thrilled to announce the release of Bytewax v0.16.2!&lt;/p&gt;

&lt;p&gt;Firstly, support for Windows builds is here! 🖥️&lt;/p&gt;

&lt;p&gt;This is a significant step forward not only because it makes Bytewax more accessible to developers across different platforms but also because we're particularly excited to welcome the first contribution from a member of our community Jim Zhang &lt;a href="https://github.com/bytewax/bytewax/pull/249"&gt;@zzl221000&lt;/a&gt;!&lt;/p&gt;

&lt;p&gt;A big shout-out to Jim!!!&lt;/p&gt;

&lt;p&gt;In addition to Windows support, v0.16.2 also introduces a CSVInput subclass of FileInput, further expanding the versatility of Bytewax.&lt;/p&gt;

&lt;p&gt;Here's a quick rundown of what's changed in this release:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/bytewax/bytewax/pull/244"&gt;PyO3 has been updated by @whoahbot&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/bytewax/bytewax/pull/245"&gt;Added a _CSVSource and CSVInput subclass of FileInput by @awmatheson&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/bytewax/bytewax/pull/247"&gt;Fixed an encoder issue by @Psykopear&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/bytewax/bytewax/pull/249"&gt;Windows build support by @zzl221000&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We're OSS and incredibly grateful for the community's contributions&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--GJfYK3dE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--GJfYK3dE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw" alt="Share" width="" height=""&gt;&lt;/a&gt; what you're building with Bytewax, and happy coding! 🚀 Check out the changes on &lt;a href="https://github.com/bytewax/bytewax/"&gt;our GitHub&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Bytewax at Data Science Summit. Interactive Dashboards To Detect Data Anomalies In Real Time</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Wed, 24 May 2023 21:41:03 +0000</pubDate>
      <link>https://dev.to/bytewax/bytewax-at-data-science-summit-interactive-dashboards-to-detect-data-anomalies-in-real-time-5e3c</link>
      <guid>https://dev.to/bytewax/bytewax-at-data-science-summit-interactive-dashboards-to-detect-data-anomalies-in-real-time-5e3c</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--NOnrBjg1--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jdl0zu6hnx1png37c1gt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--NOnrBjg1--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jdl0zu6hnx1png37c1gt.png" alt="talk invite" width="800" height="800"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Science Summit
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://dssconf.pl/en/"&gt;Data Science Summit&lt;/a&gt; is the largest and oldest independent data science conference in the CEE region. This year, we are joining them online and our CEO, Zander Matheson, is presenting! For the sixth time Data Science Summit shares knowledge in topics ranging from analysis and processing (including big data), implementation issues to visualisation (BI) and management topics. This year's edition of the most important Data Science event in Poland dedicated to Machine Learning!&lt;/p&gt;

&lt;p&gt;10 tracks, 100+ talks, the agenda is packed with cutting-edge insights 💡&lt;/p&gt;

&lt;p&gt;🎟️ Use code DSSML23RP20 until 09.06.2023 to grab a Standard or PRO ticket at a 20% discount&lt;/p&gt;

&lt;p&gt;Here are details of the talk Zander is presenting:&lt;/p&gt;

&lt;h2&gt;
  
  
  Interactive dashboards to detect data anomalies in real time
&lt;/h2&gt;

&lt;p&gt;Join Zander for a technical exploration of crafting interactive dashboards that employ online machine learning algorithms for real-time anomaly detection across hundreds of sensors. He will guide you through how to set up a development environment with a streaming system (Kafka or similar), load sensor data to the streaming system with Bytewax, and write a dataflow using River that will transform the data and use different anomaly detection algorithms to determine if there are anomalies in the sensor data. The icing on the cake? Visualize all these complex processes on a dynamic, real-time dashboard using Rerun! Equip yourself with the tools and knowledge to monitor and react to data anomalies as they happen. Come, experience the power of Python in data anomaly detection and interactive visualization in real time!&lt;/p&gt;

&lt;p&gt;If this abstract sounds interesting, you might want to check out these blogs: &lt;a href="https://bytewax.io/blog/data-visualization-with-rerun"&gt;Real-Time Anomaly Detection Visualization with Bytewax and Rerun&lt;/a&gt; and &lt;a href="https://bytewax.io/blog/online-machine-learning-iot#online-machine-learning-in-python"&gt;Online Machine Learning for IoT&lt;/a&gt;. The talk is going to go beyond these but it covers same domains.&lt;/p&gt;

&lt;p&gt;We are looking forward to exchange knowledge, share our ideas and learn from the experiences of other attendees and speakers. Stay tuned for updates from &lt;a href="https://ml.dssconf.pl/en/"&gt;the conference!&lt;/a&gt;&lt;/p&gt;

</description>
      <category>conference</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Easy yet flexible way to display child routes in tabs with Vue 3</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Tue, 09 May 2023 18:22:17 +0000</pubDate>
      <link>https://dev.to/bytewax/easy-yet-flexible-way-to-display-child-routes-in-tabs-with-vue-3-2ng8</link>
      <guid>https://dev.to/bytewax/easy-yet-flexible-way-to-display-child-routes-in-tabs-with-vue-3-2ng8</guid>
      <description>&lt;p&gt;Hello, I'm Konrad Sieńkowski and I am a front-end developer &amp;amp; UI designer here at Bytewax. I want to share with you something that I worked on recently. In this article, I'll walk through the steps to set up a new Vue application, configure the router for nested routes, create the AppTabs.vue component, and customize your tabs using route meta fields for labels and icons. By the end, you'll know how to make an easy yet flexible solution for displaying child routes in tabs. So, let's dive in!&lt;/p&gt;

&lt;p&gt;&lt;em&gt;For those eager to dive in, check out the &lt;a href="https://github.com/konradsienkowski/vue-3-child-route-tabs/"&gt;project repository&lt;/a&gt; on Github.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;First of all, we're going to create a fresh, new application using &lt;code&gt;&amp;amp;gt; npm init vue@latest&lt;/code&gt;. The &lt;code&gt;vue-create&lt;/code&gt; tool is going to ask you about including optional features in the project. The only one required for that tutorial is &lt;strong&gt;Vue Router&lt;/strong&gt;. I chose Typescript &amp;amp; Prettier as well, but it's up to your personal preferences.&lt;/p&gt;

&lt;h2&gt;
  
  
  Preparing routes &amp;amp; structure
&lt;/h2&gt;

&lt;p&gt;Once you follow the instructions on installing dependencies and running the app, you can start customizing the application. My first step was to simplify &lt;code&gt;app.vue&lt;/code&gt; a bit:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;template&amp;amp;gt;
  &amp;amp;lt;nav&amp;amp;gt;
    &amp;amp;lt;RouterLink to=&amp;amp;quot;/&amp;amp;quot;&amp;amp;gt;Home&amp;amp;lt;/RouterLink&amp;amp;gt;
    &amp;amp;lt;RouterLink to=&amp;amp;quot;/tabs&amp;amp;quot;&amp;amp;gt;Tabs demo&amp;amp;lt;/RouterLink&amp;amp;gt;
  &amp;amp;lt;/nav&amp;amp;gt;

  &amp;amp;lt;RouterView /&amp;amp;gt;
&amp;amp;lt;/template&amp;amp;gt;

&amp;amp;lt;script setup lang=&amp;amp;quot;ts&amp;amp;quot;&amp;amp;gt;
import { RouterLink, RouterView } from &amp;amp;apos;vue-router&amp;amp;apos;
&amp;amp;lt;/script&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Since we're focusing on nested/child routes in this article, there's no need to spend much time on the homepage. I've also renamed default &lt;code&gt;AboutView.vue&lt;/code&gt; to &lt;code&gt;TabsView.vue&lt;/code&gt; and created bunch of example views in &lt;code&gt;views/tabs&lt;/code&gt;, called &lt;code&gt;TabsAbout.vue&lt;/code&gt;, &lt;code&gt;TabsBlog.vue&lt;/code&gt;, &lt;code&gt;TabsContact.vue&lt;/code&gt;, &lt;code&gt;TabsRelated.vue&lt;/code&gt;. We're going to include them in our routes structure in the next step.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- views
-- tabs
--- TabsAbout.vue
--- TabsBlog.vue
--- TabsContact.vue
--- TabsRelated.vue
-- HomeView.vue
-- TabsView.vue

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As we have a simple structure for our views/pages, now it's time to include them in router configuration. Let's open &lt;code&gt;router/index.ts&lt;/code&gt; now and adjust it to our needs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import { createRouter, createWebHistory } from &amp;amp;apos;vue-router&amp;amp;apos;
import HomeView from &amp;amp;apos;../views/HomeView.vue&amp;amp;apos;

const router = createRouter({
  history: createWebHistory(import.meta.env.BASE_URL),
  routes: [
    {
      path: &amp;amp;apos;/&amp;amp;apos;,
      name: &amp;amp;apos;home&amp;amp;apos;,
      component: HomeView
    },
    {
      path: &amp;amp;apos;/tabs&amp;amp;apos;,
      name: &amp;amp;apos;tabs&amp;amp;apos;,
      component: () =&amp;amp;gt; import(&amp;amp;apos;../views/TabsView.vue&amp;amp;apos;),
      children: [
        {
          name: &amp;amp;apos;about&amp;amp;apos;,
          path: &amp;amp;apos;&amp;amp;apos;,
          component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsAbout.vue&amp;amp;apos;),
        },
        {
          name: &amp;amp;apos;blog&amp;amp;apos;,
          path: &amp;amp;apos;blog&amp;amp;apos;,
          component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsBlog.vue&amp;amp;apos;),
        },
        {
          name: &amp;amp;apos;contact&amp;amp;apos;,
          path: &amp;amp;apos;contact&amp;amp;apos;,
          component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsContact.vue&amp;amp;apos;),
        },
        {
          name: &amp;amp;apos;related&amp;amp;apos;,
          path: &amp;amp;apos;related&amp;amp;apos;,
          component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsRelated.vue&amp;amp;apos;),
        },
      ]
    }
  ]
})

export default router

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, our application has nested/children routes which we can use to display tabs in the component.&lt;/p&gt;

&lt;h2&gt;
  
  
  Tabs component
&lt;/h2&gt;

&lt;p&gt;In this step, we're going to create our tab component, include it in the first-level route view and then extend it with additional features. First of all, we're going to create file called &lt;code&gt;AppTabs.vue&lt;/code&gt; in &lt;code&gt;components&lt;/code&gt; directory. Since our component is going to be flexible and might be used in different routes, we're following &lt;a href="https://v2.vuejs.org/v2/style-guide/?redirect=true#Base-component-names-strongly-recommended"&gt;Vue naming convention&lt;/a&gt; for base components.&lt;/p&gt;

&lt;p&gt;Let's start from the &lt;code&gt;&amp;amp;lt;script setup&amp;amp;gt;&lt;/code&gt; section. We're using &lt;code&gt;useRouter()&lt;/code&gt; composable there to access the router instance. Then, we're using it to define &lt;code&gt;tabs&lt;/code&gt; computed property.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;script setup lang=&amp;amp;quot;ts&amp;amp;quot;&amp;amp;gt;
import { computed, type ComputedRef } from &amp;amp;apos;vue&amp;amp;apos;
import { useRouter, RouterView, type RouteRecordRaw } from &amp;amp;apos;vue-router&amp;amp;apos;

// Use children routes for the tabs
const router = useRouter()
const tabs: ComputedRef&amp;amp;lt;RouteRecordRaw[] | undefined&amp;amp;gt; = computed(() =&amp;amp;gt; {
  const currentRoute = router.currentRoute.value.name
  return router.options.routes?.find(
    (route) =&amp;amp;gt;
      route.name === currentRoute || route.children?.find((child) =&amp;amp;gt; child.name === currentRoute)
  )?.children
})
&amp;amp;lt;/script&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After getting the current route name using &lt;code&gt;router.currentRoute&lt;/code&gt; property, we're using it to find it within the routes array (either within top-level routes and their children) and return its children routes. Now it's time to include it in the component template:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;template&amp;amp;gt;
  &amp;amp;lt;div class=&amp;amp;quot;tabs&amp;amp;quot; v-if=&amp;amp;quot;tabs&amp;amp;quot;&amp;amp;gt;
    &amp;amp;lt;nav class=&amp;amp;quot;tabs__nav&amp;amp;quot;&amp;amp;gt;
      &amp;amp;lt;RouterLink
        v-for=&amp;amp;quot;tab in tabs&amp;amp;quot;
        :key=&amp;amp;quot;tab.name&amp;amp;quot;
        class=&amp;amp;quot;tabs__nav-item&amp;amp;quot;
        :to=&amp;amp;quot;{ name: tab.name }&amp;amp;quot;
      &amp;amp;gt;
        {{ tab.name }}
      &amp;amp;lt;/RouterLink&amp;amp;gt;
    &amp;amp;lt;/nav&amp;amp;gt;
    &amp;amp;lt;div class=&amp;amp;quot;tabs__wrapper&amp;amp;quot;&amp;amp;gt;
      &amp;amp;lt;RouterView v-slot=&amp;amp;quot;{ Component }&amp;amp;quot;&amp;amp;gt;
        &amp;amp;lt;Transition name=&amp;amp;quot;fade&amp;amp;quot; mode=&amp;amp;quot;out-in&amp;amp;quot;&amp;amp;gt;
          &amp;amp;lt;component :is=&amp;amp;quot;Component&amp;amp;quot;&amp;amp;gt;&amp;amp;lt;/component&amp;amp;gt;
        &amp;amp;lt;/Transition&amp;amp;gt;
      &amp;amp;lt;/RouterView&amp;amp;gt;
    &amp;amp;lt;/div&amp;amp;gt;
  &amp;amp;lt;/div&amp;amp;gt;
&amp;amp;lt;/template&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Inside the &lt;code&gt;&amp;amp;lt;div&amp;amp;gt;&lt;/code&gt; wrapper, we have two parts of our component:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;navigation / tabs, where we iterate over output of &lt;code&gt;tabs&lt;/code&gt; computed getter and display links of children routes,&lt;/li&gt;
&lt;li&gt;tabs wrapper, where we're using native &lt;code&gt;&amp;amp;lt;RouterView&amp;amp;gt;&lt;/code&gt; and its v-slot api to &lt;a href="https://router.vuejs.org/guide/advanced/transitions.html#transitions"&gt;wrap nested route's content in &lt;code&gt;&amp;amp;lt;Transition&amp;amp;gt;&lt;/code&gt; component&lt;/a&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now we can include our component in the &lt;code&gt;TabsView.vue&lt;/code&gt; code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;template&amp;amp;gt;
  &amp;amp;lt;div class=&amp;amp;quot;view&amp;amp;quot;&amp;amp;gt;
    &amp;amp;lt;AppTabs /&amp;amp;gt;
  &amp;amp;lt;/div&amp;amp;gt;
&amp;amp;lt;/template&amp;amp;gt;

&amp;amp;lt;script setup lang=&amp;amp;quot;ts&amp;amp;quot;&amp;amp;gt;
import AppTabs from &amp;amp;apos;@/components/AppTabs.vue&amp;amp;apos;
&amp;amp;lt;/script&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And take a look at the result: &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--cSpP8Rkh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/Vite_App_2f6e016637.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--cSpP8Rkh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/Vite_App_2f6e016637.gif" alt="Vite-App.gif" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Extending &amp;amp; styling up the tabs
&lt;/h2&gt;

&lt;p&gt;Our tabs work nice, and we can easily include them in any view that has child routes. However, the tabs navigation uses &lt;code&gt;route.name&lt;/code&gt; as a link label, and &lt;a href="https://router.vuejs.org/guide/essentials/named-routes.html"&gt;route names&lt;/a&gt; should rather remain simple and easy to use. We can extend our solution with route props to include custom tab label &amp;amp; icon for each child route.&lt;/p&gt;

&lt;h3&gt;
  
  
  Use custom route props
&lt;/h3&gt;

&lt;p&gt;Before extending our component's code, let's add &lt;a href="https://router.vuejs.org/guide/advanced/meta.html"&gt;meta field&lt;/a&gt; to each nested route in &lt;code&gt;router/index.ts&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;children: [
  {
    name: &amp;amp;apos;about&amp;amp;apos;,
    path: &amp;amp;apos;&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsAbout.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;About&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;blog&amp;amp;apos;,
    path: &amp;amp;apos;blog&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsBlog.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Blog&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;contact&amp;amp;apos;,
    path: &amp;amp;apos;contact&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsContact.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Contact&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;related&amp;amp;apos;,
    path: &amp;amp;apos;related&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsRelated.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Related&amp;amp;apos; }
  },
]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, we can use &lt;code&gt;tabLabel&lt;/code&gt; value in our &lt;code&gt;AppTabs.vue&lt;/code&gt; component:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;RouterLink
  v-for=&amp;amp;quot;tab in tabs&amp;amp;quot;
  :key=&amp;amp;quot;tab.name&amp;amp;quot;
  class=&amp;amp;quot;tabs__nav-item&amp;amp;quot;
  :to=&amp;amp;quot;{ name: tab.name }&amp;amp;quot;
&amp;amp;gt;
  &amp;amp;lt;span class=&amp;amp;quot;tabs__nav-label&amp;amp;quot; v-if=&amp;amp;quot;tab.meta?.tabLabel&amp;amp;quot;&amp;amp;gt;{{ tab.meta.tabLabel }}&amp;amp;lt;/span&amp;amp;gt;
&amp;amp;lt;/RouterLink&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Add material icons to tabs navigation
&lt;/h3&gt;

&lt;p&gt;Our tabs navigation is going to look better with icons. Let's install Google's Material Symbols library using npm package: &lt;code&gt;npm install material-symbols@latest&lt;/code&gt; and include it in &lt;code&gt;main.ts&lt;/code&gt; (&lt;code&gt;main.js&lt;/code&gt; if you're not using typescript):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import { createApp } from &amp;amp;apos;vue&amp;amp;apos;
import App from &amp;amp;apos;./App.vue&amp;amp;apos;
import router from &amp;amp;apos;./router&amp;amp;apos;

import &amp;amp;apos;material-symbols/outlined.css&amp;amp;apos;;
import &amp;amp;apos;./assets/main.css&amp;amp;apos;

const app = createApp(App)

app.use(router)

app.mount(&amp;amp;apos;#app&amp;amp;apos;)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, we can add &lt;code&gt;tabIcon&lt;/code&gt; properties to route meta fields, filling it with the icon codes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;children: [
  {
    name: &amp;amp;apos;about&amp;amp;apos;,
    path: &amp;amp;apos;&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsAbout.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;About&amp;amp;apos;, tabIcon: &amp;amp;apos;group&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;blog&amp;amp;apos;,
    path: &amp;amp;apos;blog&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsBlog.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Blog&amp;amp;apos;, tabIcon: &amp;amp;apos;feed&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;contact&amp;amp;apos;,
    path: &amp;amp;apos;contact&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsContact.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Contact&amp;amp;apos;, tabIcon: &amp;amp;apos;email&amp;amp;apos; }
  },
  {
    name: &amp;amp;apos;related&amp;amp;apos;,
    path: &amp;amp;apos;related&amp;amp;apos;,
    component: () =&amp;amp;gt; import(&amp;amp;apos;../views/tabs/TabsRelated.vue&amp;amp;apos;),
    meta: { tabLabel: &amp;amp;apos;Related&amp;amp;apos;, tabIcon: &amp;amp;apos;star&amp;amp;apos; }
  },
]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After that, we're ready to include them in the component:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;RouterLink
  v-for=&amp;amp;quot;tab in tabs&amp;amp;quot;
  :key=&amp;amp;quot;tab.name&amp;amp;quot;
  class=&amp;amp;quot;tabs__nav-item&amp;amp;quot;
  :to=&amp;amp;quot;{ name: tab.name }&amp;amp;quot;
&amp;amp;gt;
  &amp;amp;lt;span class=&amp;amp;quot;tabs__nav-icon material-symbols-outlined&amp;amp;quot; v-if=&amp;amp;quot;tab.meta?.tabIcon&amp;amp;quot;&amp;amp;gt;{{
    tab.meta.tabIcon
  }}&amp;amp;lt;/span&amp;amp;gt;
  &amp;amp;lt;span class=&amp;amp;quot;tabs__nav-label&amp;amp;quot; v-if=&amp;amp;quot;tab.meta?.tabLabel&amp;amp;quot;&amp;amp;gt;{{ tab.meta.tabLabel }}&amp;amp;lt;/span&amp;amp;gt;
&amp;amp;lt;/RouterLink&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Done! We have custom icons &amp;amp; labels based on route meta fields displayed in our Tabs component. Now it's time to add final styling touch with CSS. &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1TDttIo9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/icons_e127e03029.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1TDttIo9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/icons_e127e03029.png" alt="icons.png" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Styling up the component
&lt;/h2&gt;

&lt;p&gt;You can style up the component on your own, customizing it fully to your needs or use code below including it in &lt;code&gt;AppTabs.vue&lt;/code&gt; below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;amp;lt;style&amp;amp;gt;
.tabs {
  border: 1px solid rgba(0, 0, 0, 0.2);
  border-radius: 0.5rem;
}
.tabs__wrapper {
  padding: 1.5rem 2rem 2rem 2rem;
}
.tabs__nav {
  display: flex;
  flex-direction: row;
  border-bottom: 1px solid rgba(0, 0, 0, 0.2);
}
.tabs__nav-item {
  display: flex;
  flex-direction: row;
  align-items: center;
  flex-wrap: nowrap;
  text-decoration: none;
  padding: 1rem;
  border-bottom: 3px solid transparent;
  margin-bottom: -1px;
  color: rgba(0, 0, 0, 0.87);
  transition: border-color 0.25s ease-in-out;
}
.tabs__nav-icon {
  margin-right: 0.5rem;
  color: rgba(0, 0, 0, 0.38);
}
.tabs__nav-item:hover {
  border-color: #ccc;
}
.tabs__nav-item.router-link-exact-active {
  border-color: var(--green);
  font-weight: 600;
}
&amp;amp;lt;/style&amp;amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note: Following &lt;a href="https://getbem.com/naming/"&gt;BEM naming convention&lt;/a&gt; is easier using SCSS but I didn't want to fill the example with extra dependencies.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Our tab component looks pretty slick now: &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3LfxBnHl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/Vite_App_2_739c51d2d1.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3LfxBnHl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/Vite_App_2_739c51d2d1.gif" alt="Vite-App-2.gif" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Instead of conclusion
&lt;/h2&gt;

&lt;p&gt;Now, I encourage you to give it a try, explore further customizations, and share your experiences and improvements with &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;our community&lt;/a&gt;. Let's continue building more efficient and elegant applications together!&lt;/p&gt;

</description>
      <category>vue</category>
      <category>ui</category>
    </item>
    <item>
      <title>Lessons we learned while building a stateful Kafka connector and tips for creating yours</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Wed, 03 May 2023 20:16:54 +0000</pubDate>
      <link>https://dev.to/bytewax/lessons-we-learned-while-building-a-stateful-kafka-connector-and-tips-for-creating-yours-157b</link>
      <guid>https://dev.to/bytewax/lessons-we-learned-while-building-a-stateful-kafka-connector-and-tips-for-creating-yours-157b</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3zQ82qXy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fjksww988bqqyata57p8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3zQ82qXy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fjksww988bqqyata57p8.png" alt="Bytewax" width="798" height="594"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Bytewax framework is a flexible tool designed to meet the challenges faced by Python developers in today's data-driven world. It aims to provide seamless integrations and time-saving shortcuts for data engineers dealing with streaming data, making their work more efficient and effective. One of the important sides of developing Bytewax is input connectors. These connectors help in establishing the connection between the external systems and Bytewax to help users in importing data from external systems.&lt;/p&gt;

&lt;p&gt;Here we're going to show how to write a custom input connector by walking through how we wrote &lt;a href="https://github.com/bytewax/bytewax/blob/5d5ec04851c2e254cf1aaf429f4890be3a3ce070/pysrc/bytewax/connectors/kafka.py"&gt;our built-in Kafka input connector&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Writing input connectors for arbitrary systems while supporting failure recovery and strong delivery guarantees requires a solid understanding of how recovery works internal to Bytewax and the chosen output system. We strongly encourage you to use the connectors we have built into &lt;a href="https://bytewax.io/apidocs/bytewax.connectors/index"&gt;&lt;code&gt;bytewax.connectors&lt;/code&gt;&lt;/a&gt; if possible, and read the documentation on their limits.&lt;/p&gt;

&lt;p&gt;If you are interested in writing your own, this article can give you an introduction into some of the decisions involved in writing an input connector for an ordered, partitioned input stream.&lt;/p&gt;

&lt;p&gt;If you need any help at all writing a connector, come say "hi" and ask questions in &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;the Bytewax community Slack&lt;/a&gt;! We are happy to help!&lt;/p&gt;

&lt;h2&gt;
  
  
  Partitions
&lt;/h2&gt;

&lt;p&gt;Writing a subclass for &lt;a href="https://bytewax.io/apidocs/bytewax.inputs#bytewax.inputs.PartitionedInput"&gt;&lt;code&gt;bytewax.inputs.PartitionedInput&lt;/code&gt;&lt;/a&gt; is the core API for writing an input connector when you have an input that has a fixed number of &lt;strong&gt;partitions&lt;/strong&gt;. A partition is a "sub-stream" of data that can be read concurrently and independently.&lt;/p&gt;

&lt;p&gt;To write a &lt;code&gt;PartitionedInput&lt;/code&gt; subclass, you need to answer three questions:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;How many partitions are there?&lt;/li&gt;
&lt;li&gt;How can I build a source that reads a single partition?&lt;/li&gt;
&lt;li&gt;How can I rewind a partition and read from a specific item?&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This is done via the abstract methods &lt;code&gt;list_parts&lt;/code&gt;, &lt;code&gt;build_part&lt;/code&gt;, and the &lt;code&gt;resume_state&lt;/code&gt; variable respectively.&lt;/p&gt;

&lt;p&gt;We're going to use the &lt;a href="https://github.com/confluentinc/confluent-kafka-python"&gt;&lt;code&gt;confluent-kafka&lt;/code&gt;&lt;/a&gt; package to actually communicate with the Kafka cluster. Let's import all the things we'll need for this input source.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from typing import Dict, Iterable

from confluent_kafka import (
    Consumer,
    KafkaError,
    OFFSET_BEGINNING,
    TopicPartition,
)
from confluent_kafka.admin import AdminClient

from bytewax.inputs import PartitionedInput, StatefulSource

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our KafkaInput connector is going to read from a specific set of topics on a cluster. First, let's define our class and write a constructor that takes all the arguments that make sense for configuring this specific kind of input source. This is going to be the public entry point to this connector, and is what you'll pass to the &lt;a href="https://bytewax.io/apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.input"&gt;&lt;code&gt;bytewax.dataflow.Dataflow.input&lt;/code&gt;&lt;/a&gt; operator.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class KafkaInput(PartitionedInput):
    def __init__ (
        self,
        brokers: Iterable[str],
        topics: Iterable[str],
        tail: bool = True,
        starting_offset: int = OFFSET_BEGINNING,
        add_config: Dict[str, str] = None,
    ):
        add_config = add_config or {}

        if isinstance(brokers, str):
            raise TypeError(&amp;amp;quot;brokers must be an iterable and not a string&amp;amp;quot;)
        self

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Listing Partitions
&lt;/h3&gt;

&lt;p&gt;Next, let's answer question one: how many partitions are there? Conveniently, &lt;code&gt;confluent-kafka&lt;/code&gt; provides an &lt;a href="https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.admin.AdminClient.list_topics"&gt;&lt;code&gt;AdminClient.list_topics&lt;/code&gt;&lt;/a&gt; which give you the partition count of each topic, packed deep in a metadata object. The signature of &lt;code&gt;PartitionedInput.list_parts&lt;/code&gt; says it must return a set of strings with IDs of all the partitions. Let's build the &lt;code&gt;AdminClient&lt;/code&gt; using our configuring instance variables and then delegate to a &lt;code&gt;_list_parts&lt;/code&gt; function so we can re-use it if necessary.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Continued
# class KafkaInput(PartitionedInput):
    def list_parts(self):
        config = {
            &amp;amp;quot;bootstrap.servers&amp;amp;quot;: &amp;amp;quot;,&amp;amp;quot;.join(self._brokers),
        }
        config.update(self._add_config)
        client = AdminClient(config)

        return set(_list_parts(client, self._topics))

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This function unpacks the nested metadata returned from &lt;code&gt;AdminClient.list_topics&lt;/code&gt;, and returns a string that looks like "3-my_topic" for the third partition in the topic &lt;code&gt;my_topic&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def _list_parts(client, topics):
    for topic in topics:
        # List topics one-by-one so if auto-create is turned on,
        # we respect that.
        cluster_metadata = client.list_topics(topic)
        topic_metadata = cluster_metadata.topics[topic]
        if topic_metadata.error is not None:
            raise RuntimeError(
                f&amp;amp;quot;error listing partitions for Kafka topic `{topic!r}`: &amp;amp;quot;
                f&amp;amp;quot;{topic_metadata.error.str()}&amp;amp;quot;
            )
        part_idxs = topic_metadata.partitions.keys()
        for i in part_idxs:
            yield f&amp;amp;quot;{i}-{topic}&amp;amp;quot;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;How do you decide what the partition ID string should be? It should be something that globally identifies this partition, hence combining partition number and topic name.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;PartitionedInput.list_parts&lt;/code&gt; might be called multiple times from multiple workers as a Bytewax cluster is setup and resumed, so it must return exactly the same set of partitions on every call in order to work correctly. Changing numbers of partitions is not currently supported with recovery.&lt;/p&gt;

&lt;h3&gt;
  
  
  Building Partitions
&lt;/h3&gt;

&lt;p&gt;Next, let's answer question two: how can I build a source that reads a single partition? We can use &lt;code&gt;confluent-kafka&lt;/code&gt;'s &lt;a href="https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#consumer"&gt;&lt;code&gt;Consumer&lt;/code&gt;&lt;/a&gt; to make a Kafka consumer that will read a specific topic and partition starting from an offset. The signature of &lt;code&gt;PartitionedInput.build_part&lt;/code&gt; takes a specific partition ID (we'll ignore the resume state for now) and must return a stateful source.&lt;/p&gt;

&lt;p&gt;We parse the partition ID to determine which Kafka partition we should be consuming from. (Hence the importance of having a globally unique partition ID.) Then we build a &lt;code&gt;Consumer&lt;/code&gt; that connects to the Kafka cluster, and build our custom &lt;code&gt;_KafkaSource&lt;/code&gt; stateful source. That is where the actual reading of input items happens.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Continued
# class KafkaInput(PartitionedInput):
    def build_part(self, for_part, resume_state):
        part_idx, topic = for_part.split(&amp;amp;quot;-&amp;amp;quot;, 1)
        part_idx = int(part_idx)
        assert topic in self._topics, &amp;amp;quot;Can&amp;amp;apos;t resume from different set of Kafka topics&amp;amp;quot;

        config = {
            # We&amp;amp;apos;ll manage our own &amp;amp;quot;consumer group&amp;amp;quot; via recovery
            # system.
            &amp;amp;quot;group.id&amp;amp;quot;: &amp;amp;quot;BYTEWAX_IGNORED&amp;amp;quot;,
            &amp;amp;quot;enable.auto.commit&amp;amp;quot;: &amp;amp;quot;false&amp;amp;quot;,
            &amp;amp;quot;bootstrap.servers&amp;amp;quot;: &amp;amp;quot;,&amp;amp;quot;.join(self._brokers),
            &amp;amp;quot;enable.partition.eof&amp;amp;quot;: str(not self._tail),
        }
        config.update(self._add_config)
        consumer = Consumer(config)
        return _KafkaSource(
            consumer, topic, part_idx, self._starting_offset, resume_state
        )

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Stateful Input Source
&lt;/h2&gt;

&lt;p&gt;What is a stateful source? It is defined by subclassing &lt;a href="https://bytewax.io/apidocs/bytewax.inputs#bytewax.inputs.StatefulSource"&gt;&lt;code&gt;bytewax.inputs.StatefulSource&lt;/code&gt;&lt;/a&gt;. You can think about it as a "snapshot-able Python iterator": something that produces a stream of items via &lt;code&gt;StatefulSource.next&lt;/code&gt;, and also lets the Bytewax runtime ask for a snapshot of the position of the source via &lt;code&gt;StatefulSource.snapshot&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Our &lt;code&gt;_KafkaSource&lt;/code&gt; is going to read items from a specific Kafka topic's partition. Let's define that class and have a constructor that takes in all the details to start reading that partition: the consumer (already configured to connect to the correct Kafka cluster), the topic, the specific partition index, the default starting offset (beginning or end of the topic), and again we'll ignore the resume state for just another moment.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class _KafkaSource(StatefulSource):
    def __init__ (self, consumer, topic, part_idx, starting_offset, resume_state):
        self._offset = resume_state or starting_offset
        # Assign does not activate consumer grouping.
        consumer.assign([TopicPartition(topic, part_idx, self._offset)])
        self._consumer = consumer
        self._topic = topic

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The beating heart of the input source is the StatefulSource.next method. It is periodically called by Bytewax and behaves similar to a &lt;a href="https://docs.python.org/3/library/stdtypes.html#iterator.%20__next__"&gt;built-in Python iterator's &lt;code&gt;__next__&lt;/code&gt; method&lt;/a&gt;. It must do one of three things: return a new item to send into the dataflow, return None signaling that there is no data currently but might be later, or raise StopIteration when the partition is complete.&lt;/p&gt;

&lt;p&gt;Consumer.poll gives us a method to ask if there are any new messages on the partition we setup this consumer to follow. And if there are, unpack the data message and return it. Otherwise handle the no data case, the end-of-stream case, or an exceptional error case.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Continued
# class _KafkaSource(StatefulSource):
    def next(self):
        msg = self._consumer.poll(0.001) # seconds
        if msg is None:
            return
        elif msg.error() is not None:
            if msg.error().code() == KafkaError._PARTITION_EOF:
                raise StopIteration()
            else:
                raise RuntimeError(
                    f&amp;amp;quot;error consuming from Kafka topic `{self.topic!r}`: {msg.error()}&amp;amp;quot;
                )
        else:
            item = (msg.key(), msg.value())
            # Resume reading from the next message, not this one.
            self._offset = msg.offset() + 1
            return item

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;An important thing to note here is that StatefulSource.next must never block. The Bytewax runtime employs a sort of cooperative multitasking, and so each operator must return quickly, even if it has nothing to do, so other operators in the dataflow that do have work can run. Unfortunately, currently there is no way in the Bytewax API to prevent polling of input sources (as input comes from outside the dataflow, Bytewax has no way of knowing when more data is available, so must constantly check). The best practice here is to pause briefly if there is no data to prevent a full spin-loop on no new data, but not so long you block other operators from doing their work.&lt;/p&gt;

&lt;p&gt;There is also a &lt;code&gt;StatefulSource.close&lt;/code&gt; method which enables you to do any well-behaved shutdown when EOF is reached. This is not guaranteed to be called in a failure situation and should not be crucial to the connecting system. In this case, &lt;code&gt;Consumer.close&lt;/code&gt; does graceful shutdown.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# class _KafkaSource(StatefulSource):
    def close(self):
        self._consumer.close()

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Resume State
&lt;/h3&gt;

&lt;p&gt;Lets explain how failure recovery works for input connectors. Bytewax's recovery system allows the dataflow to quickly resume processing and output without needing to replay all input. It does this by periodically snapshot all internal state, input positions, and output positions of the dataflow. Then when it needs to recover after a failure, it loads all state from a recent snapshot, and starts re-playing input items in the same order from the instant of the snapshot and overwriting output items. This will cause the state and output of the dataflow to evolve in the same way during the resume execution as during the previous execution.&lt;/p&gt;

&lt;h4&gt;
  
  
  Snapshotting
&lt;/h4&gt;

&lt;p&gt;So, we need to keep track of the current position somewhere in each partition. Kafka has the concept of message offsets, which is an incrementing immutable integer that is the position of each message. In &lt;code&gt;_KafkaSource.next&lt;/code&gt;, we kept track of the offset of the next message that partition will read via &lt;code&gt;self._offset = msg.offset() + 1&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Bytewax calls &lt;code&gt;StatefulSource.snapshot&lt;/code&gt; when it needs to record that partition's position and returns that internally stored next message offset.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Continued
# class _KafkaSource(StatefulSource):
    def snapshot(self):
        return self._offset

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Resume
&lt;/h4&gt;

&lt;p&gt;On resume after a failure, Bytewax's recovery machinery does the hard work of collecting all the snapshots, finding the ones that represent a coherent set of states across the previous execution's cluster, and threading each bit of snapshot data back through into &lt;code&gt;PartitionedInput.build_part&lt;/code&gt; for the same partition. To properly take advantage of that, your resulting partition must resume reading from the same spot represented by that snapshot.&lt;/p&gt;

&lt;p&gt;Since we were storing the Kafka message offset of the next message to be read in &lt;code&gt;_KafkaSource._offset&lt;/code&gt;, we need to ensure we thread through that message offset back into the &lt;code&gt;Consumer&lt;/code&gt; when it is built. That happens via passing &lt;code&gt;resume_state&lt;/code&gt; into the &lt;code&gt;_KafkaSource&lt;/code&gt; constructor, and it assigning that consumer to start reading from that offset. Looking at that code again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Continued
# class _KafkaSource(StatefulSource):
# def __init__ (self, consumer, topic, part_idx, starting_offset, resume_state):
        self._offset = resume_state or starting_offset
        # Assign does not activate consumer grouping.
        consumer.assign([TopicPartition(topic, part_idx, self._offset)])
        ...

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As one extra wrinkle, if there is no resume state for this partition if the partition is being built for the first time, &lt;code&gt;None&lt;/code&gt; will be passed for &lt;code&gt;resume_state&lt;/code&gt; in &lt;code&gt;PartitionedInput.build_part&lt;/code&gt;. In that case, we need to fill in the requested "default starting offset": either "beginning of topic" or "end of topic". In the case where we do have resume state, we should ignore that since we need to start from the specific offset to uphold the recovery contract.&lt;/p&gt;

&lt;h2&gt;
  
  
  Delivery Guarantees
&lt;/h2&gt;

&lt;p&gt;Let's talk for a moment about how this recovery model with snapshots impacts delivery guarantees. A well-designed input connector on its own can only guarantee that the output of a dataflow to a downstream system is at-least-once: the recovery system will ensure that we replay any input that might not have been output due to where the execution cluster failed, but it requires coordination with the output connector (via something like transactions or two-phase commits) to ensure that the replay does not result in duplicated writes downstream and exactly-once processing.&lt;/p&gt;

&lt;h3&gt;
  
  
  Non-Replay-Able Sources
&lt;/h3&gt;

&lt;p&gt;If your input source does not have the ability to replay old data, you can still use it with Bytewax, but your delivery guarantees are limited to at-least-once. For example, listening to an ephemeral SSE or WebSocket stream, you can always start listening, but often the request API does not let you specify an ability to replay missing events. When Bytewax attempts to resume, all the other operators will have their internal state returned to that last coherent snapshot, but since the input sources do not rewind, it will appear that the dataflow has missed out on all input between when that snapshot was taking and resume.&lt;/p&gt;

&lt;p&gt;In this case, your &lt;code&gt;StatefulSource.snapshot&lt;/code&gt; can return &lt;code&gt;None&lt;/code&gt; and no recovery data will be saved. You can then ignore the &lt;code&gt;resume_state&lt;/code&gt; argument of &lt;code&gt;PartitionedInput.build_part&lt;/code&gt; because it will always be &lt;code&gt;None&lt;/code&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>connectors</category>
    </item>
    <item>
      <title>How We Detect Anomalies In Our AWS Infrastructure (And Have Peaceful Nights)</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Tue, 02 May 2023 18:50:54 +0000</pubDate>
      <link>https://dev.to/bytewax/how-we-detect-anomalies-in-our-aws-infrastructure-and-have-peaceful-nights-19k1</link>
      <guid>https://dev.to/bytewax/how-we-detect-anomalies-in-our-aws-infrastructure-and-have-peaceful-nights-19k1</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--mE8HEAOX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d85qqusgtuorj5buicp0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--mE8HEAOX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d85qqusgtuorj5buicp0.png" alt="Post image" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Everyone who's using a cloud provider wants to monitor the system to detect anomalies in the usage. We run some internal data services, our website/blog and a few demo clusters on AWS and we wanted a low-maintenance way to monitor the infrastructure for issues, so we took the opportunity to dogfood Bytewax, of course :).&lt;/p&gt;

&lt;p&gt;In this blog post, we will walk you through the process of building a cloud-based anomaly detection system using Bytewax, Redpanda, and Amazon Web Services (AWS). Our goal is to create a dataflow that detects anomalies in EC2 instance CPU utilization. To achieve this, we will collect usage data from AWS CloudWatch using &lt;a href="https://www.elastic.co/logstash/"&gt;Logstash&lt;/a&gt; and store it using &lt;a href="https://redpanda.com/"&gt;Redpanda&lt;/a&gt;, a Kafka-compatible streaming data platform. Finally, we will use Bytewax, a Python stream processor, to build our anomaly detection system.&lt;/p&gt;

&lt;p&gt;This is exactly the same infrastructure we use internally at Bytewax and, in fact, we haven't touched it for months!&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting Up the Required Infrastructure on AWS
&lt;/h2&gt;

&lt;p&gt;Before we begin, ensure that you have the following prerequisites set up:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;AWS CLI configured with admin access&lt;/li&gt;
&lt;li&gt;Helm&lt;/li&gt;
&lt;li&gt;Docker&lt;/li&gt;
&lt;li&gt;A Kubernetes cluster running in AWS and kubectl configured to access it&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Configuring Kubernetes and Redpanda
&lt;/h3&gt;

&lt;p&gt;In this section, we will configure Kubernetes and Redpanda using the provided code snippets. Make sure you have a running Kubernetes cluster in AWS and kubectl configured to access it.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 1: Set up a namespace
&lt;/h3&gt;

&lt;p&gt;Create a new namespace for Redpanda and set it as the active context:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl create ns redpanda-bytewax


kubectl config set-context --current --namespace=redpanda-bytewax

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 2: Install Cert-Manager and Redpanda Operator
&lt;/h3&gt;

&lt;p&gt;The Redpanda operator requires cert-manager to create certificates for TLS communication. To install cert-manager with Helm:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;helm repo add jetstack https://charts.jetstack.io &amp;amp;amp;&amp;amp;amp; \
helm repo update &amp;amp;amp;&amp;amp;amp; \
helm install \
  cert-manager jetstack/cert-manager \
  --namespace cert-manager \
  --create-namespace \
  --version v1.4.4 \
  --set installCRDs=true

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Fetch the latest Redpanda Operator version, add the Redpanda Helm repo, and install the Redpanda Operator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;export VERSION=$(curl -s https://api.github.com/repos/redpanda-data/redpanda/releases/latest | jq -r .tag_name)


helm repo add redpanda https://charts.vectorized.io/ &amp;amp;amp;&amp;amp;amp; helm repo update


kubectl apply -k https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=$VERSION


helm install redpanda-operator redpanda/redpanda-operator --namespace redpanda-system --create-namespace --version $VERSION

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 3: Create Redpanda cluster
&lt;/h3&gt;

&lt;p&gt;Save the following YAML configuration in a file named &lt;code&gt;3_node_cluster.yaml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: three-node-cluster
spec:
  image: &amp;amp;quot;vectorized/redpanda&amp;amp;quot;
  version: &amp;amp;quot;latest&amp;amp;quot;
  replicas: 3
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Apply the Redpanda cluster configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl apply -f ./3_node_cluster.yaml

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Check the status of Redpanda pods:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get po -lapp.kubernetes.io/component=redpanda

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Export the broker addresses:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;export BROKERS=`kubectl get clusters three-node-cluster -o=jsonpath=&amp;amp;apos;{.status.nodes.internal}&amp;amp;apos; | jq -r &amp;amp;apos;join(&amp;amp;quot;,&amp;amp;quot;)&amp;amp;apos;`

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 4: Set up topics
&lt;/h3&gt;

&lt;p&gt;Run an rpk container to create and manage topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl run rpk-shell --rm -i --tty --image vectorized/redpanda --command /bin/bash

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the rpk terminal, export the broker addresses:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;export BROKERS=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;View the cluster information:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS cluster info

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create two topics with 5 partitions each:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS topic create ec2_metrics -p 5


rpk --brokers $BROKERS topic create ec2_metrics_anomalies -p 5

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;List the topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS topic list

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Consume messages from the &lt;code&gt;ec2_metrics&lt;/code&gt; topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS topic consume ec2_metrics -o start

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Exporting CloudWatch EC2 Metrics to our Redpanda Cluster with Logstash
&lt;/h2&gt;

&lt;p&gt;Logstash is an open-source data processing pipeline that can ingest data from multiple sources, transform it, and send it to various destinations, such as Redpanda. In this case, we'll use Logstash to collect EC2 metrics from CloudWatch and send them to our Redpanda cluster for further processing.&lt;/p&gt;

&lt;h4&gt;
  
  
  Logstash Permissions
&lt;/h4&gt;

&lt;p&gt;First, we need to create an AWS policy and user with the required permissions for Logstash to access CloudWatch and EC2. Save the following JSON configuration in a file named &lt;code&gt;cloudwatch-logstash-policy.json&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    &amp;amp;quot;Version&amp;amp;quot;: &amp;amp;quot;2012-10-17&amp;amp;quot;,
    &amp;amp;quot;Statement&amp;amp;quot;: [
        {
            &amp;amp;quot;Sid&amp;amp;quot;: &amp;amp;quot;Stmt1444715676000&amp;amp;quot;,
            &amp;amp;quot;Effect&amp;amp;quot;: &amp;amp;quot;Allow&amp;amp;quot;,
            &amp;amp;quot;Action&amp;amp;quot;: [
                &amp;amp;quot;cloudwatch:GetMetricStatistics&amp;amp;quot;,
                &amp;amp;quot;cloudwatch:ListMetrics&amp;amp;quot;
            ],
            &amp;amp;quot;Resource&amp;amp;quot;: &amp;amp;quot;*&amp;amp;quot;
        },
        {
            &amp;amp;quot;Sid&amp;amp;quot;: &amp;amp;quot;Stmt1444716576170&amp;amp;quot;,
            &amp;amp;quot;Effect&amp;amp;quot;: &amp;amp;quot;Allow&amp;amp;quot;,
            &amp;amp;quot;Action&amp;amp;quot;: [
                &amp;amp;quot;ec2:DescribeInstances&amp;amp;quot;
            ],
            &amp;amp;quot;Resource&amp;amp;quot;: &amp;amp;quot;*&amp;amp;quot;
        }
    ]
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we can create the policy and user, and attach the policy to the user:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws iam create-policy --policy-name CloudwatchLogstash --policy-document file://cloudwatch-logstash-policy.json
aws iam create-user --user-name logstash-user


export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query &amp;amp;quot;Account&amp;amp;quot; --output text)


aws iam attach-user-policy --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/CloudwatchLogstash --user-name logstash-user

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To provide access, we can create Kubernetes secrets for the AWS access key and secret access key:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl create secret generic aws-secret-access-key --from-literal=value=$(aws iam create-access-key --user-name logstash-user | jq -r .AccessKey.SecretAccessKey)


kubectl create secret generic aws-access-key-id --from-literal=value=$(aws iam list-access-keys --user-name logstash-user --query &amp;amp;quot;AccessKeyMetadata[0].AccessKeyId&amp;amp;quot; --output text)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we can create an Amazon Elastic Container Registry (ECR) repository to store the custom Logstash image:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws ecr create-repository --repository-name redpanda-bytewax


export REPOSITORY_URI=$(aws ecr describe-repositories --repository-names redpanda-bytewax --profile sso-admin --output text --query &amp;amp;quot;repositories[0].repositoryUri&amp;amp;quot;)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, we create a Logstash Image with CloudWatch Input Plugin installed by creating a Dockerfile named &lt;code&gt;logstash-Dockerfile&lt;/code&gt; that has the plugin installed as a &lt;code&gt;RUN&lt;/code&gt; step in the Dockerfile like shown in the dockerfile code snippet:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;FROM docker.elastic.co/logstash/logstash:7.17.3
RUN bin/logstash-plugin install logstash-input-cloudwatch

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, we build and push the Logstash image to the ECR repository:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker build -f logstash-Dockerfile -t $REPOSITORY_URI:\logstash-cloudwatch .


export AWS_REGION=us-west-2


aws ecr get-login-password --region $AWS_REGION --profile sso-admin | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com


docker push $REPOSITORY_URI:\logstash-cloudwatch

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Deploy Logstash on Kubernetes
&lt;/h2&gt;

&lt;p&gt;Now that we have our custom Logstash image, we will deploy it on Kubernetes using the Helm chart provided by Elastic. First, we need to gather some information and create a logstash-values.yaml file with the necessary configuration.&lt;/p&gt;

&lt;p&gt;Run the following commands to obtain the required information:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;echo $REPOSITORY_URI


echo $AWS_REGION


echo $BROKERS | sed -e &amp;amp;apos;s/local\./local\:9092/g&amp;amp;apos;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a logstash-values.yaml file and replace the placeholders (shown with &lt;code&gt;&amp;amp;lt;&amp;amp;gt;&lt;/code&gt;) with the information obtained above:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;image: &amp;amp;quot;&amp;amp;lt;YOUR REPOSITORY URI&amp;amp;gt;&amp;amp;quot;
imageTag: &amp;amp;quot;logstash-cloudwatch&amp;amp;quot;
imagePullPolicy: &amp;amp;quot;Always&amp;amp;quot;

persistence:
  enabled: true

logstashConfig:
  logstash.yml: |
    http.host: 0.0.0.0
    xpack.monitoring.enabled: false

logstashPipeline:
  uptime.conf: |
    input {
      cloudwatch {
        namespace =&amp;amp;gt; &amp;amp;quot;AWS/EC2&amp;amp;quot;
        metrics =&amp;amp;gt; [&amp;amp;quot;CPUUtilization&amp;amp;quot;]
        region =&amp;amp;gt; &amp;amp;quot;&amp;amp;lt;YOUR AWS REGION&amp;amp;gt;&amp;amp;quot;
        interval =&amp;amp;gt; 300
        period =&amp;amp;gt; 300
      }       
    }
    filter {
      mutate {
        add_field =&amp;amp;gt; {
          &amp;amp;quot;[index]&amp;amp;quot; =&amp;amp;gt; &amp;amp;quot;0&amp;amp;quot;
          &amp;amp;quot;[value]&amp;amp;quot; =&amp;amp;gt; &amp;amp;quot;%{maximum}&amp;amp;quot;
          &amp;amp;quot;[instance]&amp;amp;quot; =&amp;amp;gt; &amp;amp;quot;%{InstanceId}&amp;amp;quot;                      
        }
      }
    }
    output {
        kafka {
          bootstrap_servers =&amp;amp;gt; &amp;amp;quot;&amp;amp;lt;YOUR REDPANDA BROKERS&amp;amp;gt;&amp;amp;quot;
          topic_id =&amp;amp;gt; &amp;amp;apos;EC2Metrics&amp;amp;apos;
          codec =&amp;amp;gt; json
        }
    }

extraEnvs:
  - name: &amp;amp;apos;AWS_ACCESS_KEY_ID&amp;amp;apos;
    valueFrom:
      secretKeyRef:
        name: aws-access-key-id
        key: value
  - name: &amp;amp;apos;AWS_SECRET_ACCESS_KEY&amp;amp;apos;
    valueFrom:
      secretKeyRef:
        name: aws-secret-access-key
        key: value

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the logstash-values.yaml file ready, install the Logstash Helm chart:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;helm upgrade --install logstash elastic/logstash -f logstash-values.yaml

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now to verify that Logstash is exporting the EC2 metrics to the Redpanda cluster, open a terminal with rpk and consume the ec2_metrics topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS topic consume ec2_metrics -o start

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Use &lt;code&gt;CTRL-C&lt;/code&gt; to quit the rpk terminal when you're done.&lt;/p&gt;

&lt;h2&gt;
  
  
  Building a Dataflow to Detect Anomalies with Bytewax
&lt;/h2&gt;

&lt;p&gt;With our infrastructure in place, it's time to build a dataflow to detect anomalies. We will use Bytewax and &lt;a href="https://www.bytewax.io/docs/deployment/waxctl"&gt;Waxctl&lt;/a&gt; to define and deploy a dataflow that processes the EC2 instance CPU utilization data stored in the Redpanda cluster.&lt;/p&gt;

&lt;h3&gt;
  
  
  Anomaly Detection with Half Space Trees
&lt;/h3&gt;

&lt;p&gt;Half Space Trees (HST) is an unsupervised machine learning algorithm used for detecting anomalies in streaming data. The algorithm is designed to efficiently handle high-dimensional and high-velocity data streams. HST builds a set of binary trees to partition the feature space into half spaces, where each tree captures a different view of the data. By observing the frequency of points falling into each half space, the algorithm can identify regions that are less dense than others, suggesting that data points within those regions are potential anomalies.&lt;/p&gt;

&lt;p&gt;In our case, we will use HST to detect anomalous CPU usage in EC2 metrics. We'll leverage the Python library River, which provides an implementation of the HST algorithm, and Bytewax, a platform for creating data processing pipelines.&lt;/p&gt;

&lt;h3&gt;
  
  
  Building the Dataflow for Anomaly Detection
&lt;/h3&gt;

&lt;p&gt;To create our dataflow, we'll first import the necessary libraries and set up Kafka connections. The following code snippet demonstrates how to create a dataflow with River and Bytewax to consume EC2 metrics from Kafka and detect anomalous CPU usage using HST:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import json
import os
import datetime as dt
from pathlib import Path

from bytewax.connectors.kafka import KafkaInput, KafkaOutput
from bytewax.dataflow import Dataflow
from bytewax.recovery import SqliteRecoveryConfig

from river import anomaly

kafka_servers = os.getenv(&amp;amp;quot;BYTEWAX_KAFKA_SERVER&amp;amp;quot;, &amp;amp;quot;localhost:9092&amp;amp;quot;)
kafka_topic = os.getenv(&amp;amp;quot;BYTEWAX_KAFKA_TOPIC&amp;amp;quot;, &amp;amp;quot;ec2_metrics&amp;amp;quot;)
kafka_output_topic = os.getenv(&amp;amp;quot;BYTEWAX_KAFKA_OUTPUT_TOPIC&amp;amp;quot;, &amp;amp;quot;ec2_metrics_anomalies&amp;amp;quot;)

# Define the dataflow object and kafka input.
flow = Dataflow()
flow.input(&amp;amp;quot;inp&amp;amp;quot;, KafkaInput(kafka_servers.split(&amp;amp;quot;,&amp;amp;quot;), [kafka_topic]))

# convert to percentages and group by instance id
def group_instance_and_normalize(key__data):
  _, data = key__data
  data = json.loads(data)
  data[&amp;amp;quot;value&amp;amp;quot;] = float(data[&amp;amp;quot;value&amp;amp;quot;]) / 100
  return data[&amp;amp;quot;instance&amp;amp;quot;], data

flow.map(group_instance_and_normalize)
# (&amp;amp;quot;c6585a&amp;amp;quot;, {&amp;amp;quot;index&amp;amp;quot;: &amp;amp;quot;1&amp;amp;quot;, &amp;amp;quot;value&amp;amp;quot;: &amp;amp;quot;0.11&amp;amp;quot;, &amp;amp;quot;instance&amp;amp;quot;: &amp;amp;quot;c6585a&amp;amp;quot;})

# Stateful operator for anomaly detection
class AnomalyDetector(anomaly.HalfSpaceTrees):

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our anomaly detector inherits from the HalfSpaceTrees object from the river package and has the following inputs&lt;/p&gt;

&lt;p&gt;n_trees – defaults to 10 height – defaults to 8 window_size – defaults to 250 limits (Dict[Hashable, Tuple[float, float]]) – defaults to None seed (int) – defaults to None&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
  def __init__ (self, *args, **kwargs):
      super(). __init__ (*args, n_trees=5, height=3, window_size=5, seed=42, **kwargs)

  def update(self, data):
      self.learn_one({&amp;amp;quot;value&amp;amp;quot;: data[&amp;amp;quot;value&amp;amp;quot;]})
      data[&amp;amp;quot;score&amp;amp;quot;] = self.score_one({&amp;amp;quot;value&amp;amp;quot;: data[&amp;amp;quot;value&amp;amp;quot;]})
      if data[&amp;amp;quot;score&amp;amp;quot;] &amp;amp;gt; 0.7:
          data[&amp;amp;quot;anom&amp;amp;quot;] = 1
      else:
          data[&amp;amp;quot;anom&amp;amp;quot;] = 0
      return self, (
          data[&amp;amp;quot;index&amp;amp;quot;],
          data[&amp;amp;quot;timestamp&amp;amp;quot;],
          data[&amp;amp;quot;value&amp;amp;quot;],
          data[&amp;amp;quot;score&amp;amp;quot;],
          data[&amp;amp;quot;anom&amp;amp;quot;],
      )

flow.stateful_map(&amp;amp;quot;detector&amp;amp;quot;, lambda: AnomalyDetector(), AnomalyDetector.update)
# ((&amp;amp;quot;c6585a&amp;amp;quot;, {&amp;amp;quot;index&amp;amp;quot;: &amp;amp;quot;1&amp;amp;quot;, &amp;amp;quot;value&amp;amp;quot;:0.08, &amp;amp;quot;instance&amp;amp;quot;: &amp;amp;quot;fe7f93&amp;amp;quot;, &amp;amp;quot;score&amp;amp;quot;:0.02}))

# filter out non-anomalous values
flow.filter(lambda x: bool(x[1][4]))

flow.map(lambda x: (x[0], json.dumps(x[1][4])))
flow.output(&amp;amp;quot;output&amp;amp;quot;, KafkaOutput([kafka_servers], kafka_output_topic))

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this dataflow, we first read data from Kafka and deserialize the JSON message. We then normalize the CPU usage values and group them by the instance ID. Next, we apply the AnomalyDetector class inside a stateful operator, which calculates the anomaly score for each data point using HST. We set a threshold for the anomaly score (0.7 in this example) and mark data points as anomalous if their scores exceed the threshold. Finally, we filter out non-anomalous values and output the anomalous data points to a separate Kafka topic.&lt;/p&gt;

&lt;p&gt;Using this dataflow, we can continuously monitor EC2 metrics and detect anomalous CPU usage, helping us identify potential issues in our infrastructure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating a Dataflow docker image
&lt;/h2&gt;

&lt;p&gt;&lt;code&gt;dataflow-Dockerfile&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;FROM bytewax/bytewax:0.16.0-python3.9
RUN /venv/bin/pip install river==0.10.1 pandas confluent-kafka


docker build -f dataflow-Dockerfile -t $REPOSITORY_URI:\dataflow . 


docker push $REPOSITORY_URI:\dataflow

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Deploying the Dataflow
&lt;/h2&gt;

&lt;p&gt;To deploy the dataflow, we'll use the Bytewax command-line tool, waxctl. There are two options for deploying the dataflow, depending on how you have set up your Kafka server environment variable. When we deploy our dataflow we will set the processes (denoted by &lt;code&gt;p&lt;/code&gt;) to 5 to match the number of partitions we set when we intially created our redpanda topic.&lt;/p&gt;

&lt;h4&gt;
  
  
  Option 1: Generate waxctl command
&lt;/h4&gt;

&lt;p&gt;Use the following command to generate the waxctl command with the appropriate environment variables:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;echo&amp;amp;quot;
waxctl df deploy ./dataflow.py \\
  --name ec2-cpu-ad \\
  -p 5 \\
  -i $REPOSITORY_URI \\
  -t dataflow \\
  -e &amp;amp;apos;\&amp;amp;quot;BYTEWAX_KAFKA_SERVER=$BROKERS\&amp;amp;quot;&amp;amp;apos; \\
  -e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \\
  --debug
&amp;amp;quot;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will output the waxctl command with the correct Kafka server values. Copy the output and run it to deploy the dataflow.&lt;/p&gt;

&lt;h4&gt;
  
  
  Option 2: Hardcoded BYTEWAX_KAFKA_SERVER value
&lt;/h4&gt;

&lt;p&gt;If you prefer to hardcode the Kafka server values, use the following command to deploy the dataflow:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;waxctl df deploy ./dataflow.py \
  --name ec2-cpu-ad \
  -p 5 \
  -i $REPOSITORY_URL \
  -t dataflow \
  -e &amp;amp;apos;&amp;amp;quot;BYTEWAX_KAFKA_SERVER=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local.&amp;amp;quot;&amp;amp;apos; \
  -e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \
  --debug

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that we have deployed our dataflow, after enough time, you'll be able to consume from the anomalies topic to see any anomalies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rpk --brokers $BROKERS topic consume ec2_metrics_anomalies -o start

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As a next step, you could deploy a dataflow to consume from the anomalies and alert you in Slack! Or add &lt;a href="https://github.com/rerun-io/rerun"&gt;rerun&lt;/a&gt; like we demonstrated in the previous blog post to visualize the anomalies.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this blog post, we have demonstrated how to set up a system for monitoring EC2 metrics and detecting anomalous CPU usage. By leveraging tools like Logstash, &lt;a href="https://redpanda.com/"&gt;Redpanda&lt;/a&gt;, &lt;a href="https://riverml.xyz/0.15.0/"&gt;River&lt;/a&gt;, and Bytewax, we've created a robust and scalable pipeline for processing and analyzing streaming data.&lt;/p&gt;

&lt;p&gt;This system provides a range of benefits, including:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Efficiently processing high-dimensional and high-velocity data streams&lt;/li&gt;
&lt;li&gt;Using the Half Space Trees unsupervised machine learning algorithm for detecting anomalies in streaming data&lt;/li&gt;
&lt;li&gt;Continuously monitoring EC2 metrics and identifying potential issues in the infrastructure&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;With this setup, you can effectively monitor your EC2 instances and ensure that your infrastructure is running smoothly, helping you proactively address any issues that may arise.&lt;/p&gt;

&lt;p&gt;That's it! You now have a working cloud-based anomaly detection system using &lt;a href="https://bytewax.io/"&gt;Bytewax&lt;/a&gt;, &lt;a href="https://redpanda.com/"&gt;Redpanda&lt;/a&gt;, and AWS. Feel free to adapt this setup to your specific use case and explore the various features and capabilities offered by these tools.&lt;/p&gt;

</description>
      <category>anomalydetection</category>
      <category>aws</category>
      <category>redpanda</category>
    </item>
    <item>
      <title>Real-Time Anomaly Detection Visualization with Bytewax and Rerun</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 13 Apr 2023 22:39:53 +0000</pubDate>
      <link>https://dev.to/bytewax/real-time-anomaly-detection-visualization-with-bytewax-and-rerun-1574</link>
      <guid>https://dev.to/bytewax/real-time-anomaly-detection-visualization-with-bytewax-and-rerun-1574</guid>
      <description>&lt;p&gt;&lt;a href="https://www.rerun.io/"&gt;Rerun's&lt;/a&gt; open sourcing in February marked a significant step for those looking for accessible yet potent Python visualization libraries. Why is visualization important? Visualization is essential since companies like Scale.ai, Weights &amp;amp; Biases, and Hugging Face have streamlined deep learning by addressing dataset labeling, experiment tracking, and pre-trained models. However, a void still exists in rapid data capture and visualization.&lt;/p&gt;

&lt;p&gt;Many companies develop in-house data visualization solutions but often end up with suboptimal tools due to high development costs. Moreover, Python visualization on &lt;em&gt;streaming&lt;/em&gt; data is a problem that is not solved well either, leading to &lt;a href="https://bytewax.io/blog/visualize-streaming-data-in-python"&gt;JavaScript based solutions in notebooks&lt;/a&gt;. Rerun leverages a Python interface into a high-performant Rust visualization engine (much like Bytewax!) that makes it dead easy to analyze streaming data.&lt;/p&gt;

&lt;p&gt;In this blog post, we will explore how to use Bytewax and Rerun to visualize real-time streaming data in Python and create a real-time anomaly detection visualization. We chose anomaly detection, a.k.a. outlier detection, because it is a critical component in numerous applications, such as cybersecurity, fraud detection, and monitoring of industrial processes. Visualizing these anomalies in real time can aid in quickly identifying potential issues and taking necessary actions to mitigate them.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;For those eager to dive in, check out our end-to-end Python solution on our &lt;a href="https://github.com/bytewax/visualizing-anomalies/blob/main/dataflow.py"&gt;GitHub&lt;/a&gt;. Don't forget to star Bytewax!&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;Here is what we'll cover:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We will navigate the code and briefly discuss top-level entities&lt;/li&gt;
&lt;li&gt;Then we will discuss each step of the dataflow in greater detail: initialization of our dataflow, input source, stateful anomaly detection, data visualization &amp;amp; output, and how to spawn a cluster&lt;/li&gt;
&lt;li&gt;Finally, we will learn how to run it and see the beautiful visualization, all in Python &amp;lt;3&lt;/li&gt;
&lt;li&gt;As a bonus, we will think about other use cases&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's go!&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup your environment
&lt;/h2&gt;

&lt;p&gt;This blog post is based on the following versions of Bytewax and Rerun:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bytewax==0.15.1
rerun-sdk==0.4.0

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Rerun and Bytewax are installable as&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pip install rerun-sdk
pip install bytewax

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Follow Bytewax for updates as we are baking a new version that will ease the development of data streaming apps in Python further.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Code
&lt;/h2&gt;

&lt;p&gt;The solution is relatively compact, so we copy the entire code example here. Please feel free to skip this big chunk if it looks overwhelming; we will discuss each function later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;random&lt;/span&gt;
&lt;span class="c1"&gt;# pip install rerun-sdk
&lt;/span&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;rerun&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;rr&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;time&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sleep&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.dataflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.execution&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.inputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.outputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;

&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;init&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;)&lt;/span&gt;
&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;]&lt;/span&gt;
    &lt;span class="n"&gt;this_workers_keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nb"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;randrange&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mf"&gt;0.9&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;*=&lt;/span&gt; &lt;span class="mf"&gt;2.0&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;total_seconds&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
            &lt;span class="n"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;Anomaly&lt;/span&gt; &lt;span class="n"&gt;detector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

    &lt;span class="n"&gt;Use&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="n"&gt;call&lt;/span&gt; &lt;span class="n"&gt;to&lt;/span&gt; &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;

    &lt;span class="n"&gt;Looks&lt;/span&gt; &lt;span class="n"&gt;at&lt;/span&gt; &lt;span class="n"&gt;how&lt;/span&gt; &lt;span class="n"&gt;many&lt;/span&gt; &lt;span class="n"&gt;standard&lt;/span&gt; &lt;span class="n"&gt;deviations&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;current&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="n"&gt;away&lt;/span&gt;
    &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;mean&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Z&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;score&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;of&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;last&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="n"&gt;items&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt; &lt;span class="n"&gt;Mark&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;anomalous&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt;
    &lt;span class="n"&gt;over&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;threshold&lt;/span&gt; &lt;span class="n"&gt;specified&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
    &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;threshold_z&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;threshold_z&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;threshold_z&lt;/span&gt;

        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;del&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;:]&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_recalc_stats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;last_len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;last_len&lt;/span&gt;
        &lt;span class="n"&gt;sigma_sq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;last_len&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sigma_sq&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="mf"&gt;0.5&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="n"&gt;__value__&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="n"&gt;__value__&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;
        &lt;span class="n"&gt;is_anomalous&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;is_anomalous&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;abs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;threshold_z&lt;/span&gt;

        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_recalc_stats&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;scattered&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;3.0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;inspector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;input&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;inspector&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;apos&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;__main__&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;apos&lt;/span&gt;&lt;span class="p"&gt;;:&lt;/span&gt;
    &lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="c1"&gt;# (&amp;amp;quot;metric&amp;amp;quot;, value)
&lt;/span&gt;    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;2.0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;push&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# (&amp;amp;quot;metric&amp;amp;quot;, (value, mu, sigma, is_anomalous))
&lt;/span&gt;    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The provided code demonstrates how to create a real-time anomaly detection pipeline using Bytewax and Rerun. Let's break down the essential components of this code:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;input_builder&lt;/strong&gt; : This function generates random metrics simulating real-world data streams. It generates data points with a small chance of having an anomaly (values doubled).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;ZTestDetector&lt;/strong&gt; : This class implements an anomaly detector using the Z-score method. It maintains the mean and standard deviation of the last 10 values and marks a value as anomalous if its Z-score is greater than a specified threshold.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;output_builder&lt;/strong&gt; : This function is used to define the output behavior for the data pipeline. In this case, it prints the metric name, value, mean, standard deviation, and whether the value is anomalous.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Dataflow&lt;/strong&gt; : The main part of the code constructs the dataflow using Bytewax, connecting the RandomMetricInput, ZTestDetector, and the output builder.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Rerun visualization&lt;/strong&gt; : The Rerun visualization is integrated into the ZTestDetector class. The rr.log_scalar and rr.log_point functions are used to plot the data points and their corresponding anomaly status.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now, with an understanding of the code's main components, let's discuss how the visualization is created step by step.&lt;/p&gt;

&lt;h2&gt;
  
  
  Building the Dataflow
&lt;/h2&gt;

&lt;p&gt;To create a dataflow pipeline, you need to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initialize a new dataflow with &lt;code&gt;flow = Dataflow()&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Define the input source using &lt;code&gt;flow.input(&amp;amp;quot;input&amp;amp;quot;, ManualInputConfig(input_builder))&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Apply the stateful anomaly detector using &lt;code&gt;flow.stateful_map(&amp;amp;quot;AnomalyDetector&amp;amp;quot;, lambda: ZTestDetector(2.0), ZTestDetector.push)&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Configure the output behavior with &lt;code&gt;flow.capture(ManualOutputConfig(output_builder))&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Finally, spawn a cluster to execute the dataflow with &lt;code&gt;spawn_cluster(flow, proc_count=3)&lt;/code&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The resulting dataflow reads the randomly generated metric values from &lt;code&gt;input_builder&lt;/code&gt;, passes them through the &lt;code&gt;ZTestDetector&lt;/code&gt; for anomaly detection, and outputs the results using the &lt;code&gt;output_builder&lt;/code&gt; function. Let's clarify the details for each step.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;input_builder&lt;/code&gt; function
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;input_builder&lt;/code&gt; function serves as an alternative input source for the dataflow pipeline, generating random metric values in a distributed manner across multiple workers. It accepts three parameters: &lt;code&gt;worker_index&lt;/code&gt;, &lt;code&gt;worker_count&lt;/code&gt;, and &lt;code&gt;resume_state&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;]&lt;/span&gt;
    &lt;span class="n"&gt;this_workers_keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nb"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;randrange&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mf"&gt;0.9&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;*=&lt;/span&gt; &lt;span class="mf"&gt;2.0&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;total_seconds&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
            &lt;span class="n"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;worker_index&lt;/code&gt;: The index of the current worker in the dataflow pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;worker_count&lt;/code&gt;: The total number of workers in the dataflow pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;resume_state&lt;/code&gt;: The state of the input source from which to resume. In this case, it is asserted to be &lt;code&gt;None&lt;/code&gt;, indicating that the input source does not support resuming from a previous state.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's a step-by-step description of the &lt;code&gt;input_builder&lt;/code&gt; function:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Assert that &lt;code&gt;resume_state&lt;/code&gt; is &lt;code&gt;None&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Define a list of keys representing the metrics.&lt;/li&gt;
&lt;li&gt;Distribute the keys among the workers using the distribute function (not provided in the code snippet). The distributed keys for the current worker are assigned to this_workers_keys.&lt;/li&gt;
&lt;li&gt;Iterate 1,000 times and, for each iteration, iterate through the list of keys:

&lt;ul&gt;
&lt;li&gt;Generate a random value between 0 and 10.&lt;/li&gt;
&lt;li&gt;With a 10% probability, double the value to simulate an anomaly.&lt;/li&gt;
&lt;li&gt;Yield a tuple containing None (to indicate no specific partition key), the key, the generated value, and the elapsed time since the starting time (not provided in the code snippet).&lt;/li&gt;
&lt;li&gt;Introduce a sleep time between each generated value to simulate real-time data generation.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The &lt;code&gt;input_builder&lt;/code&gt; function is used in the dataflow as the input source with the following line of code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This line tells the dataflow to use the &lt;code&gt;RandomMetricInput&lt;/code&gt; class to generate the input data for the pipeline.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;ZTestDetector&lt;/code&gt; Class
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;ZTestDetector&lt;/code&gt; class is an anomaly detector that uses the Z-score method to identify whether a data point is anomalous or not. The Z-score is the number of standard deviations a data point is from the mean of a dataset. If a data point's Z-score is higher than a specified threshold, it is considered anomalous.&lt;/p&gt;

&lt;p&gt;The class has the following methods:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;__init__ (self, threshold_z)&lt;/code&gt;: The constructor initializes the ZTestDetector with a threshold Z-score value. It also initializes the last 10 values list (self.last_10), mean (self.mu), and standard deviation (self.sigma).&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;_push(self, value)&lt;/code&gt;: This private method is used to update the list of last 10 values with the new value. It inserts the new value at the beginning of the list and removes the oldest value, maintaining the list length at 10.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;_recalc_stats(self)&lt;/code&gt;: This private method recalculates the mean and standard deviation based on the current values in the self.last_10 list.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;push(self, key __value__ t)&lt;/code&gt;: This public method takes a tuple containing a key, a value, and a timestamp as input. It calculates the Z-score for the value, updates the last 10 values list, and recalculates the mean and standard deviation. It also logs the data point and its anomaly status using Rerun's visualization functions. Finally, it returns the updated instance of the ZTestDetector class and a tuple containing the value, mean, standard deviation, and anomaly status.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The ZTestDetector class is used in the dataflow pipeline as a stateful map with the following code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;2.0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;push&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This line tells the dataflow to apply the &lt;code&gt;ZTestDetector&lt;/code&gt; with a Z-score threshold of &lt;code&gt;2.0&lt;/code&gt; and use the &lt;code&gt;push&lt;/code&gt; method to process the data points.&lt;/p&gt;

&lt;h3&gt;
  
  
  Visualizing Anomalies
&lt;/h3&gt;

&lt;p&gt;To visualize the anomalies, the &lt;code&gt;ZTestDetector&lt;/code&gt; class logs the data points and their corresponding anomaly status using Rerun's visualization functions. Specifically, &lt;code&gt;rr.log_scalar&lt;/code&gt; is used to plot a scalar value, while &lt;code&gt;rr.log_point&lt;/code&gt; is used to plot 3D points.&lt;/p&gt;

&lt;p&gt;The following code snippet shows how the visualization is created:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt;
        &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;scattered&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;3.0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, we first log a scalar value representing the metric. Then, depending on whether the value is anomalous, we log a 3D point with a different radius and color. Anomalous points are logged in red with a larger radius, while non-anomalous points are logged with a smaller radius.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;output_builder&lt;/code&gt; Function
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;output_builder&lt;/code&gt; function is used to define the output behavior for the data pipeline. In this specific example, it is responsible for printing the metric name, value, mean, standard deviation, and whether the value is anomalous. The function takes two arguments: &lt;code&gt;worker_index&lt;/code&gt; and &lt;code&gt;worker_count&lt;/code&gt;. These arguments help the function understand the index of the worker and the total number of workers in the dataflow pipeline.&lt;/p&gt;

&lt;p&gt;Here's the definition of the &lt;code&gt;output_builder&lt;/code&gt; function:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;inspector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;input&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;inspector&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This function is a higher-order function, which means it returns another function called &lt;code&gt;inspector&lt;/code&gt;. The &lt;code&gt;inspector&lt;/code&gt; function is responsible for processing the input data tuple and printing the desired output. The output builder function is later used in the dataflow pipeline when configuring the output behavior with&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;)).&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Running the Dataflow
&lt;/h2&gt;

&lt;p&gt;Bytewax can run as a single process or in a multi-process way. This dataflow has been created to scale across multiple processes, but we will start off running it as a single process with the &lt;code&gt;spawn_cluster&lt;/code&gt; execution module.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;spawn_cluster&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we wanted to increase the parallelism, we would simply add more processes as arguments. For example - &lt;code&gt;spawn_cluster(flow, proc_count=3)&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;To run the provided code we can simply run it as a Python script, but first we need to install the dependencies.&lt;/p&gt;

&lt;p&gt;Create a new file in the same directory as dataflow.py and name it requirements.txt. Add the following content to the requirements.txt file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bytewax==0.15.1
rerun-sdk==0.4.0

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Open a terminal in the directory containing the requirements.txt and dataflow.py files.&lt;/p&gt;

&lt;p&gt;Install the dependencies using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pip install -r requirements.txt

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And run the dataflow!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python dataflow.py

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Expanding the Use Case
&lt;/h2&gt;

&lt;p&gt;While the provided code serves as a basic example of real-time anomaly detection, you can expand this pipeline to accommodate more complex scenarios. For example:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Incorporate real-world data sources&lt;/strong&gt; : Replace the RandomMetricInput class with a custom class that reads data from a real-world source, such as IoT sensors, log files, or streaming APIs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Implement more sophisticated anomaly detection techniques&lt;/strong&gt; : You can replace the ZTestDetector class with other stateful anomaly detection methods, such as moving average, exponential smoothing, or machine learning-based approaches.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Customize the Visualization&lt;/strong&gt; : Enhance the Rerun visualization by adding more data dimensions, adjusting the color schemes, or modifying the plot styles to better suit your needs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Integrate with alerting and monitoring systems&lt;/strong&gt; : Instead of simply printing the anomaly results, you can integrate the pipeline with alerting or monitoring systems to notify the appropriate stakeholders when an anomaly is detected.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By customizing and extending the dataflow pipeline, you can create a powerful real-time anomaly detection and visualization solution tailored to your specific use case. The combination of Bytewax and Rerun offers a versatile and scalable foundation for building real-time data processing and visualization systems.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This blog post has demonstrated how to use Bytewax and Rerun to create a real-time anomaly detection visualization. By building a dataflow pipeline with Bytewax and integrating Rerun's powerful visualization capabilities, we can monitor and identify anomalies in our data as they occur.&lt;/p&gt;

</description>
      <category>visualization</category>
      <category>machinelearning</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>Data Council: The Highlights of Day 2</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Sun, 26 Mar 2023 00:45:10 +0000</pubDate>
      <link>https://dev.to/bytewax/data-council-the-highlights-of-day-2-183e</link>
      <guid>https://dev.to/bytewax/data-council-the-highlights-of-day-2-183e</guid>
      <description>&lt;p&gt;Welcome back, data enthusiasts! I'm excited to dive into the second installment of my blog series covering the extraordinary Data Council Conference. If you haven't already, be sure to check out &lt;a href="https://dev.to/bytewax/data-council-the-highlights-of-day-1-493h"&gt;my first post&lt;/a&gt;, which provided a comprehensive overview of the engaging talks and workshops from Day 1.&lt;/p&gt;

&lt;p&gt;On Day 2, before sessions, we are organizing an informal #StreamBrew coffee gathering for early birds at 7:15 am at KesosTacos near the conference venue. RSVP &lt;a href="https://bitly.com/m/bytewax"&gt;here&lt;/a&gt;. I hope to mingle, network, and enjoy some scrumptious breakfast migas alongside morning coffee. If you've never had migas, don't worry - I haven't either - you won't experiment alone! &lt;/p&gt;

&lt;h2&gt;
  
  
  Panels
&lt;/h2&gt;

&lt;h3&gt;
  
  
  AI Panel
&lt;/h3&gt;

&lt;p&gt;One of the most highly anticipated events on Day 2 of the Data Council Conference is the AI Panel. Though details about the panel's specific focus remain under wraps, the excitement is palpable. I expect a riveting discussion featuring top-tier experts, who will undoubtedly share their unique perspectives on artificial intelligence's current state and future directions. AI changes the world we are living in; it happens almost every week, every month, for sure!&lt;/p&gt;

&lt;h3&gt;
  
  
  How Investors Think About Data
&lt;/h3&gt;

&lt;p&gt;Another must-attend event on Day 2 is the panel titled "How Investors Think About Data," featuring an impressive lineup of investment professionals. Gain valuable insights from Lauren Reeder, Partner at Sequoia Capital; Slater Stich, Partner at Bain Capital Ventures; Leigh Marie Braswell, Principal at Founders Fund; and Pete Soderling, Founder of Data Community Fund.&lt;/p&gt;

&lt;p&gt;I work for a data-oriented startup. And given the current state of the economy, including the infamous SVB disaster, I am curious about what fundraising will look like in the mid-long term and how to maximize our chances to succeed. Also, Pete is the founder and chair of the Data Council conference, and I am eager to hear from him too!&lt;/p&gt;

&lt;h2&gt;
  
  
  Talks
&lt;/h2&gt;

&lt;p&gt;Day 2 of the Data Council Conference offers three tracks, full schedule is &lt;a href="https://docs.google.com/document/d/1T3dtBXeEyrujeg-5H8L5ncWKGuq3vMFYMyjriXWMAAI/edit"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The first track, "Applied &amp;amp; Generative AI," covers topics such as Large Language/Transformer Models, generative AI, product-based implementations of new research methods, and exciting new features powered by machine learning inside products.&lt;/p&gt;

&lt;p&gt;The second track, "Analytics," focuses on the latest tools, techniques, and best practices for extracting valuable insights from data. You'll learn how top teams are solving their analytics challenges and discover the best new tools in the process.&lt;/p&gt;

&lt;p&gt;Finally, my favorite one, the "Data Culture &amp;amp; Community" track. It emphasizes fostering a vibrant data ecosystem and promoting collaboration among data professionals. Sessions in this track will highlight the role of community building, open-source projects, and knowledge sharing in advancing data science and data engineering. &lt;/p&gt;

&lt;p&gt;In case you're torn between multiple sessions like me, remember that many of the presentations will be recorded and made available for viewing later. With that in mind, I will highlight only a fraction of what sparks my interest.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/generative-ai-for-product-builders?hsLang=en"&gt;Tristan Zajonc - Generative AI for Product Builders &lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;I always considered no-code or low-code solutions an excellent option for a non-technical (and technical, too, in some cases) founder to build a prototype and get their MVP out there as soon as possible without hiring a bunch of developers. DALL•E, MidJourney, and Stable Diffusion did a similar thing and unlocked creativity for the rest of us. In that light, Tristan's talk about the caveats and nuances of building products using generative AI is very well-timed and relevant. &lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/how-vercel-builds-dozens-of-metrics-from-one-heterogenous-table?hsLang=en"&gt;Thomas Mickley-Doyle "How Vercel Builds Dozens of Metrics from One Heterogenous Table"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;I remember quite a few blog posts about the importance of reacting quickly to changes. Partly because Bytewax is enabling real-time ML and because it's a hot topic. Thomas Mickley-Doyle from Vercel will also share their innovative approach to data-driven decision-making. Vercel's strategy has increased stakeholder participation in analytics, reduced troubleshooting time for outlier events, and eliminated the data team as a bottleneck for data-related tasks. Sounds like a lot of fun!&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/behind-the-curtain-what-it-takes-to-support-the-worlds-most-popular-open-source-communities?hsLang=en"&gt;Katrina Riehl "Behind the Curtain: What it Takes to Support the World's Most Popular Open Source Communities"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;Dr. Katrina Riehl is President of the Board of Directors at NumFOCUS, Head of the Streamlit Data Team at Snowflake, and Adjunct Lecturer at Georgetown University. If you are building an OOS-driven business or care about how the community perceives your brand (and you better do :)), her talk is a must-go. NumFOCUS is operating on a vast scale: 50 sponsored projects and 60 affiliated projects, including some of the world's most popular open-source projects like NumPy, Scipy, Jupyter, and Pandas. There is definitely a ton to learn from NumFOCUS and Katrina.&lt;/p&gt;

&lt;p&gt;I can't wait to share more of the content from the conference itself! I expect no less than an unforgettable experience!&lt;/p&gt;

</description>
      <category>conference</category>
      <category>realtimeanalytics</category>
      <category>datascience</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Data Council: The Highlights of Day 1</title>
      <dc:creator>Oli Makhasoeva</dc:creator>
      <pubDate>Thu, 23 Mar 2023 05:44:54 +0000</pubDate>
      <link>https://dev.to/bytewax/data-council-the-highlights-of-day-1-493h</link>
      <guid>https://dev.to/bytewax/data-council-the-highlights-of-day-1-493h</guid>
      <description>&lt;p&gt;The COVID-19 pandemic has profoundly impacted how we work and learn, and the conference industry is no exception. Many events have moved to virtual formats, allowing attendees to participate from the comfort of their own homes. I even built a business around it! And while I absolutely love virtual events and can talk about their advantages endlessly, there's an undeniable charm to in-person conferences, too.&lt;/p&gt;

&lt;p&gt;After *&lt;em&gt;three years *&lt;/em&gt; of remote work, I am thrilled to finally attend &lt;a href="https://www.datacouncil.ai/"&gt;the Data Council conference&lt;/a&gt; in person in Austin and connect with fellow tech enthusiasts face-to-face as soon as next week!&lt;/p&gt;

&lt;p&gt;The conference attracts diverse data professionals from various industries, and whilst I've been at events that featured data talks or data tracks and even organized a virtual data-focused conference myself, it's the first time when I have a chance to see so many professionals interested in the latest developments in data engineering, data science, machine learning, and AI.&lt;/p&gt;

&lt;p&gt;Come say hi 👋 I'm also bringing &lt;a href="https://bytewax.io/"&gt;Bytewax's&lt;/a&gt; swag that you don't want to miss, so &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;let's keep in touch!&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Today I want to share some of the sessions that I found particularly exciting and would like to attend.&lt;/p&gt;

&lt;p&gt;I have to split this post because it's too much to cover in one shot; you are reading about Day 1, March 28th.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;a href="https://docs.google.com/document/d/1T3dtBXeEyrujeg-5H8L5ncWKGuq3vMFYMyjriXWMAAI/edit"&gt;Agenda&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;The conference features an action-packed schedule across three days, including regular and lightning talks, workshops, and even speaker office hours.  The latter is especially helpful for newcomers to the community (like me), facilitating connections with experts.&lt;/p&gt;

&lt;p&gt;Beyond the formal sessions, the conference also offers plenty of opportunities for informal networking (see &lt;a href="https://twitter.com/DataCouncilAI/status/1630994017679802371?s=20"&gt;this thread&lt;/a&gt;). We (Bytewax) are organizing &lt;a href="https://bit.ly/3YszNvd?r=lp"&gt;#StreamBrew coffee&lt;/a&gt; on March 29th in the morning (7:15 AM) and &lt;a href="https://bit.ly/3ZGzRsw?r=lp"&gt;#StreamBrew Beer&lt;/a&gt; in the evening on March 30th.&lt;/p&gt;

&lt;p&gt;No wonder that with so much to offer this conference is a must-attend event for data folks!&lt;/p&gt;

&lt;h2&gt;
  
  
  Keynotes
&lt;/h2&gt;

&lt;p&gt;As I said before, the conference's schedule is crowded, and keynotes are no exception. 2 on each day!&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/building-a-control-plane-for-data?hsLang=en"&gt;Shirshanka Das "Building a Control Plane for Data"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;The conference kicks off with an exciting keynote by &lt;a href="https://www.linkedin.com/in/shirshankadas/"&gt;Shirshanka Das&lt;/a&gt;. Shirshanka is a co-founder and CEO of Acryl Data. He will discuss the control plane for data, a harmonizing layer powered by metadata that unifies data discovery, observability, quality, governance, and management. He will describe the fundamental characteristics of a control plane and explain the use cases that can be accomplished with a unified control plane.&lt;/p&gt;

&lt;p&gt;I am obsessed with unification and simplification. It brings order and enables teams to work more effectively. Thrilled to hear Shirshanka's thoughts on how to do that for data stacks.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/big-data-is-dead?hsLang=en"&gt;Jordan Tigani "Big Data is Dead" &lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;Next up is &lt;a href="https://twitter.com/jrdntgn"&gt;Jordan Tigani&lt;/a&gt; of MotherDuck with an intriguing title, "Big Data is Dead." The conference's website didn't have a description of the talk at the time I was writing this, but I googled and found &lt;a href="https://motherduck.com/blog/big-data-is-dead/"&gt;a fresh blog post&lt;/a&gt; by Jordan. &lt;br&gt;
I have to admit, I was a little skeptical about the title as it sounds like clickbait (unrelated, but I have a background in Scala, and Scala is dead forever and dies every year again and again, so it's not news). &lt;/p&gt;

&lt;p&gt;Nonetheless, Jordan is exceptionally qualified to talk about this topic, he shares graphs based on query logs, deal post-mortems, benchmark results, customer support tickets, customer conversations, service logs, and published blog posts. He has his points and I won't post spoilers by citing his blog post. Besides, I am sure he has more to share in his keynote.&lt;/p&gt;

&lt;h2&gt;
  
  
  Talks
&lt;/h2&gt;

&lt;p&gt;There are three tracks on the day 1: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Data Engineering &amp;amp; Infra&lt;/li&gt;
&lt;li&gt;Data Science &amp;amp; Algos&lt;/li&gt;
&lt;li&gt;ML Ops &amp;amp; Platforms&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;It is challenging to choose what to highlight, and I might overlook or forget some talks, so if your favorite one is not on the list, please feel free to let me know on &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;our Slack&lt;/a&gt;, or tag us on &lt;a href="https://twitter.com/bytewax"&gt;Twitter&lt;/a&gt; or &lt;a href="https://www.linkedin.com/company/bytewax"&gt;LinkedIn&lt;/a&gt;, &lt;a href="https://twitter.com/Oli_kitty"&gt;my DMs&lt;/a&gt; are open too.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/data-contracts-accountable-data-quality?hsLang=en"&gt;Chad Sanderson "Data Contracts: Accountable Data Quality."&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://www.linkedin.com/in/chad-sanderson/"&gt;Chad Sanderson&lt;/a&gt; is the Founder of Data Quality Camp, and &lt;a href="https://join.slack.com/t/dataqualitycamp/shared_invite/zt-1rk5xsx5j-o3dnRa75iM1mY5~R9HWJMg"&gt;the Data Quality Camp's Slack&lt;/a&gt; is the friendliest place to be. The channels are active, members are helpful, and you can even shamelessly promote whatever you want in the #be-shameless :D&lt;/p&gt;

&lt;p&gt;If you're interested in the data contracts, then Chad's talk is definitely worth checking out. He recently &lt;a href="https://www.linkedin.com/feed/update/urn:li:activity:7044381753561497600/"&gt;posted on his LinkedIn&lt;/a&gt;  that it's going to be the most in-depth presentation yet on how they implemented data contracts at scale at Convoy.&lt;/p&gt;

&lt;p&gt;You also want to attend Data Quality Camp's first-ever in-person happy hour on Monday the 27th at the Stay Put Brewery near the event venue.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/extinguishing-the-garbage-fire-of-ml-testing?hsLang=en"&gt;Emily Curtin "Extinguishing the Garbage Fire of ML Testing"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;The abstract of &lt;a href="https://www.linkedin.com/in/emilymaycurtin"&gt;Emily Curtin&lt;/a&gt;'s (Staff MLOps Engineer at Intuit Mailchimp) talk resonates with me, I also think that testing should be at the heart and mind of people implementing complex systems. Emily is focusing on testing in MLOps and Data Science, which I need to familiarize myself with, and I look forward to learning about it from her.&lt;/p&gt;

&lt;p&gt;I also adore that she says in her bio that she gets paid to say "it depends" and "well actually."&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/how-to-interpret-and-explain-your-black-box-models?hsLang=en"&gt;Sophia Yang "How to Interpret &amp;amp; Explain Your Black-Box Models?"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://www.linkedin.com/in/sophiamyang?trk=public_profile_browsemap"&gt;Sophia Yang&lt;/a&gt; is a Senior Data Scientist and a Developer Advocate at Anaconda. She is highly knowledgeable about technology and passionate about data science and Python open-source communities.&lt;br&gt;
I think we share many interests, so I'm not missing her talk in which she covers popular model explanation techniques such as explainable boosting machine, visual analytics, distillation, prototypes, saliency map, counterfactual, feature visualization, LIME, SHAP, interpretML, and TCAV.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--V53xU38b--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qot8zk7yk5b0uke5dwbm.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--V53xU38b--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qot8zk7yk5b0uke5dwbm.jpg" alt="Jules Damji at Data Love" width="880" height="440"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/huggingface-ray-air-integration-a-python-developers-guide-to-scaling-transformers?hsLang=en"&gt;Jules Damji &amp;amp; Antoni Baum "HuggingFace + Ray AIR Integration: A Python Developer's Guide to Scaling Transformers"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;Last but not least, I want to highlight a talk by &lt;a href="https://twitter.com/2twitme"&gt;Jules Damji&lt;/a&gt;, who spoke at one of my events before (check out his handmade avatar from the pre Midjourney era). Jules and Antoni will talk about Hugging Face Transformers and Ray AIR. It's cutting-edge Machine Learning, and I'm always willing to discover more about it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Workshops
&lt;/h2&gt;

&lt;p&gt;At Data Council all workshops are included for free in the cost of your ticket so I will try to attend them too.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/urgent-help-these-pets-find-homes-working-across-teams-in-datahub?hsLang=en"&gt;Maggie Hays &amp;amp; Paul Logan "URGENT! Help these Pets Find Homes: Working Across Teams in DataHub"&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://www.linkedin.com/in/maggie-hays/"&gt;Maggie&lt;/a&gt; and Paul's workshop is about Long Tail Companions (a hypothetical pet adoption service). It is in crisis – its data infrastructure has ground to a halt, and they cannot process any adoptions. I care about pets, love fixing failures, and enjoy teamwork. All things combined, it sounds like an excellent session for me.   &lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;a href="https://www.datacouncil.ai/talks/how-to-make-marketing-fall-in-love-with-data-modeling?hsLang=en"&gt;Erik Edelmann &amp;amp; Meredith Adler "How to Make Marketing Fall In Love with Data Modeling&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;Data Modeling applied to marketing is obviously something that I care about. I'm joining &lt;a href="https://www.linkedin.com/in/erik-edelmann-43247358"&gt;Erik&lt;/a&gt; and Meredith for a demo of the campaign they built at Hightouch. They will cover how the team modeled the data, validated the results, and created a reusable process to support future marketing campaigns.&lt;/p&gt;

&lt;h2&gt;
  
  
  🎈Community party
&lt;/h2&gt;

&lt;p&gt;The day wraps up with a Community Party at 5:30 pm (kudos to Databand for supporting it).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RwyiABu3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/o17mfpl6gr7x1w5oa2or.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RwyiABu3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/o17mfpl6gr7x1w5oa2or.png" alt="Zander Matheson - getting real time" width="880" height="880"&gt;&lt;/a&gt;&lt;br&gt;
Don't forget to attend &lt;a href="https://www.datacouncil.ai/talks/getting-real-time-when-to-move-from-batch-to-streaming-and-how-to-do-it-without-hiring-an-entirely-new-team?hsLang=en"&gt;Zander's awesome talk&lt;/a&gt;, I'll be giving away awesome swag there!&lt;/p&gt;

&lt;p&gt;Also see you at #StreamBrew, RSVP &lt;a href="https://bitly.com/m/bytewax"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In the next posts I'll cover following days, stay tuned!&lt;br&gt;
See in Austin!&lt;/p&gt;

&lt;p&gt;UPD: &lt;a href="https://dev.to/bytewax/data-council-the-highlights-of-day-2-183e"&gt;Day 2&lt;/a&gt;&lt;/p&gt;

</description>
      <category>conference</category>
      <category>datascience</category>
      <category>dataengineering</category>
      <category>machinelearning</category>
    </item>
    <item>
      <title>Using Language Models in a Streaming Context to Understand Financial Markets</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 16 Mar 2023 08:09:14 +0000</pubDate>
      <link>https://dev.to/bytewax/using-language-models-in-a-streaming-context-to-understand-financial-markets-1o9d</link>
      <guid>https://dev.to/bytewax/using-language-models-in-a-streaming-context-to-understand-financial-markets-1o9d</guid>
      <description>&lt;p&gt;For those who are eager to dive into the code, it's available:&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/bytewax" rel="noopener noreferrer"&gt;
        bytewax
      &lt;/a&gt; / &lt;a href="https://github.com/bytewax/news-analyzer" rel="noopener noreferrer"&gt;
        news-analyzer
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Analyze financial news in real-time with Machine Learning
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;news-analyzer&lt;/h1&gt;

&lt;/div&gt;
&lt;p&gt;Analyze financial news in real-time with Machine Learning&lt;/p&gt;
&lt;p&gt;See &lt;a href="https://github.com/bytewax/news-analyzer/FinancialNewsAnalysis.ipynb" rel="noopener noreferrer"&gt;FinancialNewsAnalysis.ipynb&lt;/a&gt;&lt;/p&gt;
&lt;/div&gt;



&lt;/div&gt;
&lt;br&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/bytewax/news-analyzer" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;br&gt;
&lt;/div&gt;
&lt;br&gt;


&lt;p&gt;Effective analysis of news is crucial for understanding the world, especially when it comes to financial markets. Being able to quickly identify significant events, such as a major corporation being hacked and sensitive customer data being compromised, can enable you to respond rapidly and either capitalize on opportunities or minimize losses. In this blog post, we'll delve into how Bytewax and large language models can be leveraged to analyze financial news in real time, providing you with the ability to respond to breaking news more effectively. We need to answer at least three questions to implement our little project successfully:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Where do we get the data?&lt;/li&gt;
&lt;li&gt;How do we analyze it?&lt;/li&gt;
&lt;li&gt;How do we access the data source and perform analysis in real-time?&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Data Source
&lt;/h2&gt;

&lt;p&gt;For the data source used in this demo, we will use the &lt;a href="https://alpaca.markets/docs/market-data/news/#real-time-streaming" rel="noopener noreferrer"&gt;Alpaca news API&lt;/a&gt;, which provides websocket access to news articles from Benzinga. To setup an account and create an API key and secret, you can follow the &lt;a href="https://alpaca.markets/docs/market-data/getting-started/" rel="noopener noreferrer"&gt;Alpaca documentation&lt;/a&gt;. &lt;em&gt;You can use any websocket as a data source. A future follow-up will look at how we can build our own real-time news aggregation pipeline for analysis.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Content Analysis
&lt;/h2&gt;

&lt;p&gt;We're obviously going to leverage Large Language Models (LLMs) to analyze news articles. And the best place which comes to mind when looking for LLMs is Hugging Face.&lt;a href="https://huggingface.co/" rel="noopener noreferrer"&gt;Hugging Face&lt;/a&gt; is a company that provides a marketplace where researchers can release models and datasets on their hub that can then be used by other researchers and developers via their hosted model endpoints and their Transformers library. Firstly, we need to perform sentiment analysis on the headline, which can quickly provide valuable insights. For this, we'll use a fine-tuned BERT model called &lt;a href="https://huggingface.co/ahmedrachid/FinancialBERT-Sentiment-Analysis" rel="noopener noreferrer"&gt;FinancialBERT&lt;/a&gt;. Then we will summarize the content of the article, and a fine-tuned &lt;a href="https://huggingface.co/facebook/bart-large-cnn" rel="noopener noreferrer"&gt;BART model&lt;/a&gt; will come in handy for this. Both can be found on &lt;a href="https://huggingface.co" rel="noopener noreferrer"&gt;huggingface.co&lt;/a&gt;. We also are going to cover how we can use the Transformers library to run the models.&lt;/p&gt;

&lt;h2&gt;
  
  
  Real-Time Data Processing with Bytewax
&lt;/h2&gt;

&lt;p&gt;If you are not familiar with Bytewax. Bytewax is a stateful stream processor that can be used to analyze data in real time with support for stateful operators like windowing and aggregation. Bytewax is especially suitable for workflows that leverage the Python ecosystem of tools, from data crunching tools like Pandas to machine learning-focused tools like Hugging Face Transformers. It also supports a variety of data sources, including websockets.&lt;/p&gt;

&lt;p&gt;Let's get started analyzing the news in real-time. First things first! Dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;!&lt;/span&gt;pip &lt;span class="nb"&gt;install &lt;/span&gt;bytewax transformers torch sentencepiece websocket-client
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Constructing Our Dataflow
&lt;/h2&gt;

&lt;p&gt;A Bytewax dataflow is a sequence of steps that transform data from an input source and then write it to an output. At each step an operator is used to control the flow of data; whether it should be filtered, aggregated or accumulated. Developers writing dataflows will write Python code that will do the data transformation at each step.&lt;/p&gt;

&lt;h3&gt;
  
  
  Input
&lt;/h3&gt;

&lt;p&gt;To begin the dataflow, we'll create an input using the Alpaca websocket, which we'll use to subscribe to articles on multiple tickers. It's important to note that you'll require an Alpaca API key and secret, and it's recommended to store them as environment variables.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.dataflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.inputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;websocket&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_connection&lt;/span&gt;

&lt;span class="n"&gt;API_KEY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;API_KEY&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;API_SECRET&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;API_SECRET&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;ticker_list&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;*&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;state&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;worker_tickers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker_list&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;subscribing to&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;news_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_connection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;wss://stream.data.alpaca.markets/v1beta1/news&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;action&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;auth&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;API_KEY&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;secret&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;API_SECRET&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}))&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;action&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;subscribe&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;news&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;}))&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# to use without API uncomment the below line and comment the one below that
&lt;/span&gt;        &lt;span class="c1"&gt;# articles = [{"T":"n","id":31248067,"headline":"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State","summary":"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important.","author":"Chris Katje","created_at":"2023-03-07T22:58:40Z","updated_at":"2023-03-07T22:58:40Z","url":"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state","content":"\u003cp\u003eA lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here\u0026rsquo;s the potential law and why it\u0026rsquo;s important.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhat Happened:\u003c/strong\u003e States have passed laws aimed at banning the sale of gas-powered vehicles in the future. One state took it a step further by seeking to ban electric vehicle \u003ca href=\"https://www.benzinga.com/news/23/01/30424292/taking-on-elon-musk-this-state-legislature-could-ban-electric-vehicle-sales-by-2035\"\u003esales in the future.\u003c/a\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eOne of the leading states for electric vehicle purchases could now see a temporary ban on using electric vehicles during the time of a crisis.\u003c/p\u003e\r\n\r\n\u003cp\u003eFlorida Republican state Sen.\u0026nbsp;\u003cstrong\u003eJonathan Martin\u003c/strong\u003e is considering legislation to ban electric vehicles like those from \u003cstrong\u003eTesla Inc\u003c/strong\u003e (NASDAQ:\u003ca class=\"ticker\" href=\"https://www.benzinga.com/stock/TSLA#NASDAQ\"\u003eTSLA\u003c/a\u003e) to be used during hurricane evacuations in the state, according to \u003ca href=\"https://electrek.co/2023/03/06/florida-lawmaker-wants-to-ban-evs-from-hurricane-evacuations/\"\u003eElectrek\u003c/a\u003e.\u0026nbsp;\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin told the state\u0026rsquo;s Department of Transportation that electric vehicles could block traffic during evacuations if they run out of battery charge.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin serves on the Committee on Environment and Natural Resources and the Select Committee on Resiliency.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Select Committee on Resiliency met with the Florida Department of Transportation executive director of transportation technologies in Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eAmong the topics discussed were the $198 million the state is going to get from the Bipartisan Infrastructure Law for electric vehicle charging infrastructure from the current administration led by \u003cstrong\u003ePresident Joe Biden.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eThe legislation requires electric vehicle charging stations to be 50 miles apart and serve all electric vehicles.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;With a couple of guys behind you, you can\u0026rsquo;t get out of the car and push it to the side of the road. Traffic backs up. And what might look like a two-hour trip might turn into an eight-hour trip once you\u0026rsquo;re on the road,\u0026rdquo; Martin said.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin said his concern is with the electric vehicle infrastructure available in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRelated Link: \u003ca href=\"https://www.benzinga.com/trading-ideas/22/06/27568560/4-stocks-to-watch-this-hurricane-season\"\u003e4 Stocks To Watch This Hurricane Season\u0026nbsp;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhy It\u0026rsquo;s Important:\u003c/strong\u003e The Florida Department of Transportation told Martin it isn\u0026rsquo;t a fan of banning electric vehicles during hurricane evacuations and that it is looking into portable EV chargers.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;We have our emergency assistance vehicles that we deploy during a hurricane evacuation that have gas \u0026hellip; we need to provide that same level of service to electrical vehicles,\u0026rdquo; Department of Transportation director of transportation technologies \u003cstrong\u003eTrey Tillander \u003c/strong\u003esaid.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Tampa Bay Times \u003ca href=\"https://www.tampabay.com/hurricane/2023/02/24/florida-lawmaker-suggests-limiting-electric-vehicles-during-hurricane-evacuations/\"\u003ereported\u003c/a\u003e\u0026nbsp;around 1% of the vehicles in Florida are electric vehicles. One of the owners of an EV is state Sen.\u0026nbsp;\u003cstrong\u003eTina Polsky.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;I don\u0026rsquo;t think you can ban an electric vehicle from evacuating because that may be the only car someone has,\u0026rdquo; Polsky said.\u003c/p\u003e\r\n\r\n\u003cp\u003eIn December 2022, there were 203,094 electric vehicles registered in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe increased funding for charging infrastructure could help ease concerns over charging.\u003c/p\u003e\r\n\r\n\u003cp\u003eUltimately, once people are on the road headed out of the state, they likely won\u0026rsquo;t be able to stop at a charging station, similar to people not being able to quickly stop at a gas station.\u003c/p\u003e\r\n\r\n\u003cp\u003eJust like people prepare for the evacuation by filling up their vehicle with gas, owners of electric vehicles will likely need to fully charge their vehicle before evacuating the state.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe comments from the state senator may have Florida residents thinking about owning at least one non-electric vehicle or a hybrid to ensure they have the best chance to exit the state without future restrictions and without the potential of running out of charge and not finding stations prevalent.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRead Next:\u0026nbsp;\u003ca href=\"https://www.benzinga.com/analyst-ratings/analyst-color/23/03/31172188/tesla-analysts-praise-vertical-integration-after-investor-day-but-want-more-from-el\"\u003eTesla Analysts Praise Vertical Integration After Investor Day, But Want More From Elon Musk: \u0026#39;Long On Vision, Short On Specifics\u0026#39;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003ePhoto:\u0026nbsp;\u003ca href=\"https://www.shutterstock.com/g/hsaduraphotos\"\u003eHenryk Sadura\u003c/a\u003e\u0026nbsp;via Shutterstock\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cbr /\u003e\r\n\u0026nbsp;\u003c/p\u003e\r\n","symbols":["TSLA"],"source":"benzinga"}]
&lt;/span&gt;          &lt;span class="n"&gt;articles&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
          &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;source&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;news_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;inp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resulting data returned from the news API looks like the json shown here.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[{&lt;/span&gt;&lt;span class="nl"&gt;"T"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"n"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;31248067&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"headline"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"summary"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important."&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"author"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Chris Katje"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"created_at"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2023-03-07T22:58:40Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"updated_at"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2023-03-07T22:58:40Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"content"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003eA lawmaker in one American state could make it hard for owners of electric vehicles ... ertical Integration After Investor Day, But Want More From Elon Musk: &lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026#39;Long On Vision, Short On Specifics&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026#39;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/a&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/em&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cem&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003ePhoto:&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003ca href=&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;https://www.shutterstock.com/g/hsaduraphotos&lt;/span&gt;&lt;span class="se"&gt;\"\u&lt;/span&gt;&lt;span class="s2"&gt;003eHenryk Sadura&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/a&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;via Shutterstock&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/em&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cbr /&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"symbols"&lt;/span&gt;&lt;span class="p"&gt;:[&lt;/span&gt;&lt;span class="s2"&gt;"TSLA"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"benzinga"&lt;/span&gt;&lt;span class="p"&gt;}]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will use this in the next steps in our dataflow to analyze the sentiment and provide a summary.&lt;/p&gt;

&lt;h3&gt;
  
  
  Managing Duplicates and Updates
&lt;/h3&gt;

&lt;p&gt;When working with news stories from RSS/Atom feeds or news APIs, it's common to receive duplicates as they're created and then updated. To prevent these duplicates from being analyzed multiple times and incurring additional overhead of running ML models on the same story, we'll use the Bytewax operator &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.stateful_map"&gt;&lt;code&gt;stateful_map&lt;/code&gt;&lt;/a&gt; to create a simplified storage layer. We'll store a list of unique identifiers for each news article we encounter. If an article has been seen before, we'll mark it as an update. Otherwise, we'll add the article's ID to the stateful object. To filter out the updates and avoid reclassifying and summarizing them, we'll use the &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.filter"&gt;&lt;code&gt;filter&lt;/code&gt;&lt;/a&gt; operator. Think of this process as the equivalent of checking a database for a unique ID.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;update_articles&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;source_articles&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;update_articles&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Sentiment Analysis
&lt;/h3&gt;

&lt;p&gt;Sentiment analysis is the next step in our process. Our approach involves using a fine-tuned Hugging Face model to analyze the article's headline sentiment. We will be leveraging a BERT model for this purpose. BERT, which stands for Bidirectional Encoder Representations from Transformers, was developed by Google. For a detailed understanding of how this model operates and was trained, you can refer to the &lt;a href="https://huggingface.co/ahmedrachid/FinancialBERT-Sentiment-Analysis" rel="noopener noreferrer"&gt;model card&lt;/a&gt; on Hugging Face or the accompanying &lt;a href="https://www.researchgate.net/publication/358284785_FinancialBERT_-_A_Pretrained_Language_Model_for_Financial_Text_Mining" rel="noopener noreferrer"&gt;research paper&lt;/a&gt;. Since we want to analyze each news article independently, the sentiment classification will take place in a &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.map"&gt;&lt;code&gt;map&lt;/code&gt;&lt;/a&gt; operator. Despite the extensive research that goes into designing novel model architectures and creating training datasets, implementing sentiment analysis is remarkably straightforward. &lt;em&gt;Note that if you're following along in a notebook, the model will take some time to download initially.&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;transformers&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSequenceClassification&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSeq2SeqLM&lt;/span&gt;

&lt;span class="n"&gt;sent_tokenizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ahmedrachid/FinancialBERT-Sentiment-Analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sent_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSequenceClassification&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ahmedrachid/FinancialBERT-Sentiment-Analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sent_nlp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sentiment-analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sent_model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tokenizer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sent_tokenizer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;sentiment_analysis&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker__news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ticker__news&lt;/span&gt;
    &lt;span class="n"&gt;sentiment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sent_nlp&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;headline&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]])&lt;/span&gt;
    &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sentiment&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sentiment&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sentiment&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="nf"&gt;return &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sentiment_analysis&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Article Summarization
&lt;/h3&gt;

&lt;p&gt;After analyzing the article sentiment, we will utilize a BART (Bidirectional Auto-Regressive Transformers) model architecture, which is a combination of Google's BERT and OpenAI's GPT architectures, to summarize its content. Despite the significant effort that goes into creating the model, implementing it with the Hugging Face Transformers library is relatively easy. We can generate a summarization pipeline and apply it in a &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.map"&gt;&lt;code&gt;map&lt;/code&gt;&lt;/a&gt; step. To obtain better results, we also incorporated an extra step into this map process, which involved cleaning the text before summarizing it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;

&lt;span class="c1"&gt;# Let's create a summarization pipeline
&lt;/span&gt;&lt;span class="n"&gt;sum_tokenizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;facebook/bart-large-cnn&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sum_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSeq2SeqLM&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;facebook/bart-large-cnn&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;summarizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;summarization&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tokenizer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sum_tokenizer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sum_model&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;tag_re&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;compile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;r&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;(&amp;lt;!--.*?--&amp;gt;|&amp;lt;[^&amp;gt;]*&amp;gt;)&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;summarize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker__news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ticker__news&lt;/span&gt;
    &lt;span class="n"&gt;article&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;content&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;article_no_tags&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tag_re&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sub&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;''&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;article_no_tags&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;article_no_tags&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\r&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;summary&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;summarizer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;article_no_tags&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;max_length&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;130&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;min_length&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;do_sample&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bart_summary&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;summary_text&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bart summary:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;summary_text&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;return &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;summarize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Output
&lt;/h3&gt;

&lt;p&gt;With our news analyzed, we can set a capture step to output the modified news object and then run our dataflow. For this instance we are going to write the output to StdOut so we can easily view it, but in a production system we could write the results to a downstream kafka topic or database for further analysis.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;If you are following along in a notebook, remember you have to be authenticated for this to work and will need to set your Alpaca API key and secret&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.execution&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;run_main&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.outputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;StdOutputConfig&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;StdOutputConfig&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;run_main&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Wrapping up
&lt;/h2&gt;

&lt;p&gt;While our example is simplified, it showcases the power of Bytewax and Hugging Face's language models. We can easily analyze financial news articles in real-time, identify significant events, and make informed decisions: using the Alpaca news API as our data source, we were able to construct a dataflow that deduplicate stories and summarizes the content of each article.&lt;/p&gt;

&lt;p&gt;The ease of implementation through Python native Bytewax and the Hugging Face Transformers library makes it accessible for data engineers and researchers to utilize these state-of-the-art language models in their own projects. We hope this blog post serves as a useful guide for anyone looking to leverage real-time news analysis in their financial decision-making process.&lt;/p&gt;

</description>
      <category>llm</category>
      <category>machinelearning</category>
      <category>python</category>
      <category>datastreaming</category>
    </item>
  </channel>
</rss>
