<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Zander</title>
    <description>The latest articles on DEV Community by Zander (@awmatheson).</description>
    <link>https://dev.to/awmatheson</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F877646%2F018b7296-f5e1-4974-999b-79878d41cd9c.jpeg</url>
      <title>DEV Community: Zander</title>
      <link>https://dev.to/awmatheson</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/awmatheson"/>
    <language>en</language>
    <item>
      <title>Data Parallel, Task Parallel, and Agent Actor Architectures</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 13 Jul 2023 19:26:35 +0000</pubDate>
      <link>https://dev.to/bytewax/data-parallel-task-parallel-and-agent-actor-architectures-dm6</link>
      <guid>https://dev.to/bytewax/data-parallel-task-parallel-and-agent-actor-architectures-dm6</guid>
      <description>&lt;h2&gt;
  
  
  Introduction:
&lt;/h2&gt;

&lt;p&gt;In the rapidly evolving world of data processing, understanding the various architectural approaches is pivotal to choosing the right tools for your specific needs. The three dominant architectures that have emerged—data parallel, task parallel, and agent actor—each offer unique strengths that cater to different types of data workloads.&lt;/p&gt;

&lt;p&gt;Data parallel architectures shine when large datasets need to be processed in parallel. This model divides data into smaller chunks, each processed independently but in the same manner on different workers or nodes. Apache Spark, a well-known data processing framework, uses this architecture. Spark's resilience, capacity for handling vast amounts of data, and ability to perform complex transformations make it a favorite in big data landscapes. Bytewax also follows this model with the same transformations happening on each worker, but on different data.&lt;/p&gt;

&lt;p&gt;On the other hand, task parallel architectures, as exemplified by Apache Flink and Dask, focus on executing different tasks concurrently across distributed systems. This approach is particularly effective for workflows with a wide variety of tasks that can be performed independently or have complex dependencies. Flink's stream-first philosophy provides robustness for real-time processing tasks, while Dask's flexibility makes it a great choice for parallel computing tasks in Python environments.&lt;/p&gt;

&lt;p&gt;Finally, the agent actor architecture, the foundation for Ray, presents a flexible and robust model for handling complex, stateful, and concurrent computations. In this model, "actors" encapsulate state and behavior, communicating through message passing. Ray's ability to scale from a single node to a large cluster makes it a popular choice for machine learning tasks.&lt;/p&gt;

&lt;p&gt;As we delve deeper into these architectures in the following sections, we will explore their pros and cons, use cases, and the unique features offered by Spark, Flink, Dask, Ray, and Bytewax. By understanding these architectures, you'll be better equipped to select the right framework for your next data processing venture. Stay tuned!&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Parallel Architectures
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ttZGQmfL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Data_parallelism_d1e340a7c5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ttZGQmfL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Data_parallelism_d1e340a7c5.png" alt="Data parallelism.png" width="776" height="886"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Data parallelism is a form of parallelization that distributes the data across different nodes, which operate independently of each other. Each node applies the same operation on its allocated subset of data. This approach is particularly effective when dealing with large datasets where the task can be divided and executed simultaneously, reducing computational time significantly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In data parallel architectures, the dataset is split into smaller, more manageable chunks, or partitions. Each partition is processed independently by separate tasks running the same operation. This distribution is done in a way that each task operates on a different core or processor, enabling high-level parallel computation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Scalability:&lt;/strong&gt; Data parallel architectures are designed to handle large volumes of data. As data grows, you can simply add more nodes to the system to maintain performance.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Performance:&lt;/strong&gt; The ability to perform computations in parallel leads to significant speedups, particularly for large datasets and computationally intensive operations. Due to the fact that data does not move around as often to different workers, there can also be a performance gain.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Simplicity:&lt;/strong&gt; Since the same operation is applied to each partition, this model is relatively simple to understand and implement.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Communication Overhead:&lt;/strong&gt; The nodes need to communicate with each other to synchronize and aggregate results, which can add overhead, particularly for large numbers of nodes.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Limited Use Cases:&lt;/strong&gt; Data parallelism works best when the same operation can be applied to all data partitions. It's less suitable for tasks that require complex interdependencies or shared state across tasks. As we have seen with spark though, this is not entirely true.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Data parallel architectures excel in situations where large volumes of data need to be processed quickly and in a similar manner. Some of the best use cases include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Batch Processing:&lt;/strong&gt; In scenarios where large amounts of data need to be processed all at once, data parallel architectures shine. This is a common use case in big data analytics, where massive datasets are processed in batch jobs.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Machine Learning:&lt;/strong&gt; Many machine learning algorithms, especially those that involve matrix operations, can be easily parallelized. For instance, in the training phase of a neural network, the weights of the neurons are updated based on the error. This operation can be done in parallel for each layer, making data parallelism a great fit.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;High Partitioned Input and Output:&lt;/strong&gt; Data parallel frameworks excel when the input and output are partitioned in such a way that the workers can evenly match the partitions and redistribution of the data is limited.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stream Processing:&lt;/strong&gt; The data parallelism approach is well suited to stream processing where the same operation is happening to data in real-time.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Apache Spark, a notable data parallel framework, is widely used in big data analytics for tasks like ETL (Extract, Transform, Load), predictive analytics, and data mining. It's particularly known for its ability to perform complex data transformations and aggregations across large datasets.&lt;/p&gt;

&lt;p&gt;Bytewax is known for its ability to handle large continuos streams of data and do complex transformations on them in real-time.&lt;/p&gt;

&lt;p&gt;As we continue our exploration into the different data processing architectures, we'll see how other approaches handle tasks that might not be as suitable for data parallel processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Task Parallel Architectures: Unlocking Concurrent Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--vkVYkOoH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Task_parallelism_4cc6d9f034.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--vkVYkOoH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Task_parallelism_4cc6d9f034.png" alt="Task parallelism.png" width="776" height="886"&gt;&lt;/a&gt;Task parallelism, also known as function parallelism, is an architectural approach that focuses on distributing tasks—rather than data—across different processing units. Each of these tasks can be a separate function or a method operating on different data or performing different computations. This type of parallelism is a great fit for problems where different operations can be performed concurrently on the same or different data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In a task parallel model, the focus is on concurrent execution of many different tasks that are part of a larger computation. These tasks can be independent, or they can have defined dependencies and need to be executed in a certain order. The tasks are scheduled and dispatched to different processors in the system, enabling parallel execution.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Diverse Workloads:&lt;/strong&gt; Task parallel architectures excel in scenarios where the problem can be broken down into a variety of tasks that can be executed in parallel.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Flexibility:&lt;/strong&gt; Since tasks don't necessarily need to operate on the same data or perform the same operation, this model offers a high level of flexibility.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Efficiency:&lt;/strong&gt; Task parallelism can lead to improved resource utilization, as tasks can be scheduled to keep all processors busy.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complexity:&lt;/strong&gt; Managing and scheduling tasks, especially when there are dependencies, can add complexity to the system.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Inter-task Communication:&lt;/strong&gt; Tasks often need to communicate with each other to synchronize or to pass data, which can lead to overhead and can be a challenge for performance.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Task parallel architectures are best suited to problems that can be broken down into discrete tasks that can run concurrently. This includes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complex Computations:&lt;/strong&gt; Scenarios where a complex problem can be broken down into a number of separate tasks, such as simulations or optimization problems, are a good fit for task parallel architectures.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Real-Time Processing On Diverse Datasets:&lt;/strong&gt; Task parallel architectures are often used in systems that require real-time processing and low latency, such as stream processing systems.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Apache Flink is an excellent example of a system that uses a task parallel architecture. Flink is designed for stream processing, where real-time results are of utmost importance. It breaks down stream processing into a number of tasks that can be executed in parallel, providing low-latency and high-throughput processing of data streams.&lt;/p&gt;

&lt;p&gt;Similarly, Dask is a flexible library for parallel computing in Python that uses task scheduling for complex computations. Dask allows you to parallelize and distribute computation by breaking it down into smaller tasks, making it a popular choice for tasks that go beyond the capabilities of typical data parallel tools.&lt;/p&gt;

&lt;p&gt;In the next section, we'll explore the agent actor model, a different approach to managing concurrency and state that opens up new possibilities for parallel computation.&lt;/p&gt;

&lt;h2&gt;
  
  
  Agent Actor Architectures: Pioneering Concurrent Computations
&lt;/h2&gt;

&lt;p&gt;Agent actor architectures introduce a fundamentally different approach to handle parallel computations, particularly for problems that involve complex, stateful computations. This approach build on task parallelism with the addition of an actor. An actor is a computational entity that, in response to a message it receives, can concurrently: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. The agents are then similar to task distributed or functional distributed systems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Mechanism&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In the agent actor model, actors are the universal primitives of concurrent computation. Upon receiving a message, an actor can change its local state, send messages to other actors, or create new actors. Actors encapsulate their state, avoiding common pitfalls of multithreaded programming such as race conditions. Actor systems are inherently message-driven and can be distributed across many nodes, making them highly scalable.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Concurrent State Management:&lt;/strong&gt; Actors provide a safe way to handle mutable state in a concurrent system. Since each actor processes messages sequentially and has isolated state, there is no need for locks or other synchronization mechanisms.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Scalability:&lt;/strong&gt; Actor systems are inherently distributed and can easily scale out across many nodes.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Fault Tolerance:&lt;/strong&gt; Actor systems can be designed to be resilient with self-healing capabilities. If an actor fails, it can be restarted, and messages it was processing can be redirected to other actors.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Disadvantages&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Complexity:&lt;/strong&gt; Building systems with the actor model can be more complex than traditional paradigms due to the asynchronous and distributed nature of actors.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Message Overhead:&lt;/strong&gt; Communication between actors is done with messages, which can lead to overhead, especially in systems with a large number of actors.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Best Use Cases&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Agent actor architectures are best suited for problems that involve complex, stateful computations and require high levels of concurrency. This includes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Real-time Systems:&lt;/strong&gt; The actor model is well suited for real-time systems where you need to process high volumes of data concurrently, such as trading systems or real-time analytics.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Distributed Systems:&lt;/strong&gt; The actor model can be a good fit for building distributed systems where you need to manage state across multiple nodes, like IoT systems or multiplayer online games.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Ray is an example of a system that employs the actor model. It was designed to scale Python applications from a single node to a large cluster, and it's commonly used for machine learning tasks, which often require complex, stateful computations.&lt;/p&gt;

&lt;p&gt;As we've seen, the landscape of data processing architectures is rich and diverse, with each model offering unique strengths and potential challenges. Whether it's data parallel, task parallel, or agent actor, the choice of architecture will depend largely on the nature of the data workload and the specific requirements of the system you're building.&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>data</category>
      <category>actors</category>
    </item>
    <item>
      <title>Reasoning about Streaming vs Batch with a Case Study from GitHub</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 15 Jun 2023 20:26:32 +0000</pubDate>
      <link>https://dev.to/bytewax/reasoning-about-streaming-vs-batch-with-a-case-study-from-github-5g3l</link>
      <guid>https://dev.to/bytewax/reasoning-about-streaming-vs-batch-with-a-case-study-from-github-5g3l</guid>
      <description>&lt;p&gt;&lt;em&gt;If you prefer videos check out Zander's talk at Data Council 2023 &lt;a href="//youtu.be/qJ3PWyx7w2Q"&gt;"When to Move from Batch to Streaming and how to do it without hiring an entirely new team"&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The world of data processing is undergoing a significant shift, moving towards real-time processing. Despite an increase in understanding that shifting workloads to real-time can increase ROI and lower costs, there isn't consensus in the industry around how to best transition workloads to real-time and what the best tools are for different types of real-time workloads. While traditional analytical tools such as data warehouses, business intelligence layers, and metrics are widely accepted and understood, the concept of real-time data processing and the technologies that enable it are not as widely recognized or agreed upon.&lt;/p&gt;

&lt;p&gt;In this post, we aim to demystify real-time data processing, discussing its relevance within an organization, the different types of real-time workloads, and some real-world examples from my time at GitHub. But first, let's clarify some definitions.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Real-Time and Stream Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--MAmdy1fh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/real_time_53cfa27cf7.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--MAmdy1fh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/real_time_53cfa27cf7.jpg" alt="real-time.jpg" width="800" height="420"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;What is Real-Time? "Real-Time" refers to anything that is perceived to happen in real time by a human - an admittedly fuzzy definition. Quantitatively, this usually refers to processes that happen in the sub-second realm. Interestingly, based on this definition, real-time data processing can actually occur with both batch and stream processing technologies depending on the end-to-end latency.&lt;/p&gt;

&lt;p&gt;Stream processing refers to processing a single datum at a time, flowing in a continuous stream, while batch processing is when you gather a batch of data and process it all at once. By reducing the size of the batch progressively, we can edge closer to real-time processing. This is precisely what technologies like Spark's structured streaming do with micro-batches.&lt;/p&gt;

&lt;p&gt;Now that we have some definitions out of the way, let's dive into real-time processing and the different types of real-time workloads.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Relevance of Real-Time Data Processing
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--DOxOtcp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/tenor_45419acb5c.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--DOxOtcp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_800/https://images.production.bytewax.io/tenor_45419acb5c.gif" alt="tenor.gif" width="576" height="576"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In our day-to-day lives, we're constantly receiving and processing information in real-time. Consider driving a car - an activity that requires processing multiple inputs and making decisions in real-time. If we were to approach driving in a batch processing manner, waiting to gather information for a duration and then trying to forecast the next 15 seconds, it would likely end in disaster. As another example you can imagine a sport like basketball. For each moment in time, the players are receiving tens or hundreds of inputs and they are reacting to them in real-time. If we also imagined a non-real-time version of this, it might not be so exciting to watch or play as each player waited for a certain number of seconds while receiving input and then tried to react to those inputs.&lt;/p&gt;

&lt;p&gt;These examples help to highlight why we might choose to process things in real-time. In the context of driving, we're making decisions that could potentially be a matter of life or death. And in our basketball example, the real-time processing elevates the user experience. However, while these examples provide some understanding, they don't necessarily help us generalize the concept.&lt;/p&gt;

&lt;h2&gt;
  
  
  Types of Real-Time Workloads
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KablIMzo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/analytical_vs_operational_49781ffe70.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KablIMzo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/analytical_vs_operational_49781ffe70.png" alt="analytical vs operational" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We can broadly categorize real-time processing into two types of workloads: analytical and operational.&lt;/p&gt;

&lt;h3&gt;
  
  
  Analytical workloads
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--UTlf0zho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/linked_in_views_71137c4795.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--UTlf0zho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/linked_in_views_71137c4795.png" alt="linked-in-views.png" width="800" height="375"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Analytical workloads require low latency, freshness, and the capability of retrieval at scale. Real-time analytical workloads must be queryable. A good example of this is LinkedIn's profile view notification. When you click into the profile view notification, you're taken to a page that shows your profile views history, all the way up to the most recent data. This demonstrates the freshness of data and the ability to query it as you can filter and interact with the data, querying the freshest data.&lt;/p&gt;

&lt;p&gt;Another example of a real-time analytical workload is an Instacart order. When you place an order on Instacart, you can go into your order and see the updated Estimated Time of Arrival (ETA). This is another instance of an analytical real-time workload where the user is interacting with analytical data in real-time.&lt;/p&gt;

&lt;h3&gt;
  
  
  Operational workloads
&lt;/h3&gt;

&lt;p&gt;On the other hand, operational workloads, require low latency and freshness, but they also need to be reactive. This means that some of the decision-making or business logic is embedded inline in the system. For example, in a streaming use case, this would be inside the stream processor. The data is received, transformed, and then a decision is made in an online fashion. Bytewax is a great example of a framework that can be used to make real-time decisions for operational workloads.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--eITPBuJE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/fraud_4cc560b3e9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--eITPBuJE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/fraud_4cc560b3e9.png" alt="fraud.png" width="236" height="512"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A good example of operational processing is fraud detection. The fraud detection system takes all the inputs in real time and makes a decision about them without a human in the loop. It then makes a decision on what to do and communicates with the user to confirm if its suspicion of fraud is correct.&lt;/p&gt;

&lt;p&gt;Another example in financial markets is high-frequency trading. The software system consumes inputs from a variety of different data sources, processes them in real time, and then makes a decision whether to buy or sell. The speed of making that decision is a key factor in this context.&lt;/p&gt;

&lt;h3&gt;
  
  
  Analytical vs Operational
&lt;/h3&gt;

&lt;p&gt;One more aspect I wanted to touch on here is the difference between having a human in the loop versus having a machine in the loop. If we look at different examples across analytical and operational workloads, there's a concept of the human being more involved in the analytical and less or not involved in the operational.&lt;/p&gt;

&lt;p&gt;To summarize, if there is a situation where you believe there's value to be derived and there's a human in the loop, there's probably a subset of tools within the real-time space that fall under the analytical workload. If you're building something like an algorithmic trading system, where you believe that there's no requirement for a human in the loop, you're more likely to fall under the operational category, and you should look at tools, like Bytewax, that support operational processing.&lt;/p&gt;

&lt;h1&gt;
  
  
  Case Studies. GitHub's Real-Time Data Processing Decisions
&lt;/h1&gt;

&lt;p&gt;Let's make things more concrete by discussing a couple of case studies involving decisions we made at GitHub concerning real-time data processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Trending Repositories and Developers: A Batch Processing Approach
&lt;/h2&gt;

&lt;p&gt;The team I was a part of at GitHub was responsible for several data products that were featured on github.com, including Trending Repositories and Trending Developers. These features were located on the GitHub Explore page and aimed to identify trending repositories and developers based on a variety of metrics, such as stars, forks, and views. We had access to this data in real time through a streaming platform (Kafka) managed by another team.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--003vyP93--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/trending_1bd345b99f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--003vyP93--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/trending_1bd345b99f.png" alt="trending.png" width="512" height="357"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Although we had the capacity to implement these as real-time features, we decided against it. Our team primarily consisted of data scientists and machine learning engineers who hadn't worked with streaming platforms or stream processors before. Moreover, these features were new products, and we didn't know how impactful they would be or whether users would find them valuable and engage with them repeatedly.&lt;/p&gt;

&lt;p&gt;Instead of implementing these features to use real-time data, we decided to process this data in a batch format. We would run nightly queries against Presto, where the data landed from Kafka, then store the processed data in a MySQL database for retrieval from github.com. These features were not real-time workloads, but they could have been. If it was determined they would be useful as real-time data products, they would serve as excellent examples of analytical use cases.&lt;/p&gt;

&lt;h2&gt;
  
  
  Star Spam Detection: A Real-Time Processing Solution
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--rgcu5vPH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/star_history_c2a1668eae.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--rgcu5vPH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/star_history_c2a1668eae.png" alt="star-history.png" width="512" height="348"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Another task we undertook was star spam detection. The concept of "stars" on GitHub repositories is used as a proxy to gauge the health and utility of the project. If we were unable to detect star spammers, it would degrade the platform's value for users, potentially leading to a downward spiral in the platform's overall value.&lt;/p&gt;

&lt;p&gt;We decided to tackle this problem in a real-time manner to limit the exposure of the users and potential degradation of the platform from star spam. The data was available in Kafka and could therefore be consumed as it was available. Based on certain criteria, users could be flagged as spammers and then action taken. Once a user was flagged, they were submitted for human review to decide on what the next steps should be. This is an excellent example of operational processing.&lt;/p&gt;

&lt;h1&gt;
  
  
  The Impact of Real-Time Processing Decisions
&lt;/h1&gt;

&lt;p&gt;The point is that the decision to implement real-time processing can have significant impacts on the return on investment for a project, and this correlation should be carefully considered. If we had decided to make the trending feature real-time, it would have been even more necessary to maintain the platform's value by detecting star spam as close to real-time as possible.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--8KNcxPFc--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Copy_of_Zander_Data_Council_2023_1_c28bfe1372.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--8KNcxPFc--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://images.production.bytewax.io/Copy_of_Zander_Data_Council_2023_1_c28bfe1372.png" alt="roi and latency.png" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The value of data often degrades over time, and while it's usually depicted as a sharp decline (see graph on the left), most projects or data tend to follow more of an S-curve (on the right). After a certain point, the return or value of the data caps out with respect to latency. In these case studies, neither project saw an exponential increase in return on investment as latency was reduced, and we were able to tackle star spammers on an hours timeframe instead of milliseconds. This demonstrates that not all data projects need to move towards zero latency to provide significant value.&lt;/p&gt;

&lt;p&gt;If you are interested in moving some of your workloads to real-time and not sure where to start. Please reach out to us in &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-1lhq9bxbr-T3CXxR_9RIUGb4qcBK26Qw"&gt;our slack channel&lt;/a&gt; and we would be happy to help you figure out if the value is there and where to start.&lt;/p&gt;

</description>
      <category>python</category>
      <category>streaming</category>
      <category>opensource</category>
    </item>
    <item>
      <title>Real-Time Anomaly Detection Visualization with Bytewax and Rerun</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 13 Apr 2023 22:39:53 +0000</pubDate>
      <link>https://dev.to/bytewax/real-time-anomaly-detection-visualization-with-bytewax-and-rerun-1574</link>
      <guid>https://dev.to/bytewax/real-time-anomaly-detection-visualization-with-bytewax-and-rerun-1574</guid>
      <description>&lt;p&gt;&lt;a href="https://www.rerun.io/"&gt;Rerun's&lt;/a&gt; open sourcing in February marked a significant step for those looking for accessible yet potent Python visualization libraries. Why is visualization important? Visualization is essential since companies like Scale.ai, Weights &amp;amp; Biases, and Hugging Face have streamlined deep learning by addressing dataset labeling, experiment tracking, and pre-trained models. However, a void still exists in rapid data capture and visualization.&lt;/p&gt;

&lt;p&gt;Many companies develop in-house data visualization solutions but often end up with suboptimal tools due to high development costs. Moreover, Python visualization on &lt;em&gt;streaming&lt;/em&gt; data is a problem that is not solved well either, leading to &lt;a href="https://bytewax.io/blog/visualize-streaming-data-in-python"&gt;JavaScript based solutions in notebooks&lt;/a&gt;. Rerun leverages a Python interface into a high-performant Rust visualization engine (much like Bytewax!) that makes it dead easy to analyze streaming data.&lt;/p&gt;

&lt;p&gt;In this blog post, we will explore how to use Bytewax and Rerun to visualize real-time streaming data in Python and create a real-time anomaly detection visualization. We chose anomaly detection, a.k.a. outlier detection, because it is a critical component in numerous applications, such as cybersecurity, fraud detection, and monitoring of industrial processes. Visualizing these anomalies in real time can aid in quickly identifying potential issues and taking necessary actions to mitigate them.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;For those eager to dive in, check out our end-to-end Python solution on our &lt;a href="https://github.com/bytewax/visualizing-anomalies/blob/main/dataflow.py"&gt;GitHub&lt;/a&gt;. Don't forget to star Bytewax!&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;Here is what we'll cover:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We will navigate the code and briefly discuss top-level entities&lt;/li&gt;
&lt;li&gt;Then we will discuss each step of the dataflow in greater detail: initialization of our dataflow, input source, stateful anomaly detection, data visualization &amp;amp; output, and how to spawn a cluster&lt;/li&gt;
&lt;li&gt;Finally, we will learn how to run it and see the beautiful visualization, all in Python &amp;lt;3&lt;/li&gt;
&lt;li&gt;As a bonus, we will think about other use cases&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's go!&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup your environment
&lt;/h2&gt;

&lt;p&gt;This blog post is based on the following versions of Bytewax and Rerun:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bytewax==0.15.1
rerun-sdk==0.4.0

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Rerun and Bytewax are installable as&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pip install rerun-sdk
pip install bytewax

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Follow Bytewax for updates as we are baking a new version that will ease the development of data streaming apps in Python further.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Code
&lt;/h2&gt;

&lt;p&gt;The solution is relatively compact, so we copy the entire code example here. Please feel free to skip this big chunk if it looks overwhelming; we will discuss each function later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;random&lt;/span&gt;
&lt;span class="c1"&gt;# pip install rerun-sdk
&lt;/span&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;rerun&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;rr&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;time&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sleep&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.dataflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.execution&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.inputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax.outputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;

&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;init&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;)&lt;/span&gt;
&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;]&lt;/span&gt;
    &lt;span class="n"&gt;this_workers_keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nb"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;randrange&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mf"&gt;0.9&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;*=&lt;/span&gt; &lt;span class="mf"&gt;2.0&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;total_seconds&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
            &lt;span class="n"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;Anomaly&lt;/span&gt; &lt;span class="n"&gt;detector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

    &lt;span class="n"&gt;Use&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="n"&gt;call&lt;/span&gt; &lt;span class="n"&gt;to&lt;/span&gt; &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;

    &lt;span class="n"&gt;Looks&lt;/span&gt; &lt;span class="n"&gt;at&lt;/span&gt; &lt;span class="n"&gt;how&lt;/span&gt; &lt;span class="n"&gt;many&lt;/span&gt; &lt;span class="n"&gt;standard&lt;/span&gt; &lt;span class="n"&gt;deviations&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;current&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="n"&gt;away&lt;/span&gt;
    &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;mean&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Z&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;score&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;of&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;last&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="n"&gt;items&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt; &lt;span class="n"&gt;Mark&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;anomalous&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt;
    &lt;span class="n"&gt;over&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;threshold&lt;/span&gt; &lt;span class="n"&gt;specified&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
    &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;threshold_z&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;threshold_z&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;threshold_z&lt;/span&gt;

        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;del&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;:]&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_recalc_stats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;last_len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;last_len&lt;/span&gt;
        &lt;span class="n"&gt;sigma_sq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;last_10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;last_len&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sigma_sq&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="mf"&gt;0.5&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="n"&gt;__value__&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="n"&gt;__value__&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;
        &lt;span class="n"&gt;is_anomalous&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;is_anomalous&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;abs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;threshold_z&lt;/span&gt;

        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_recalc_stats&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;scattered&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;3.0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;inspector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;input&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;inspector&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;apos&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;__main__&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;apos&lt;/span&gt;&lt;span class="p"&gt;;:&lt;/span&gt;
    &lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="c1"&gt;# (&amp;amp;quot;metric&amp;amp;quot;, value)
&lt;/span&gt;    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;2.0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;push&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# (&amp;amp;quot;metric&amp;amp;quot;, (value, mu, sigma, is_anomalous))
&lt;/span&gt;    &lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The provided code demonstrates how to create a real-time anomaly detection pipeline using Bytewax and Rerun. Let's break down the essential components of this code:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;input_builder&lt;/strong&gt; : This function generates random metrics simulating real-world data streams. It generates data points with a small chance of having an anomaly (values doubled).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;ZTestDetector&lt;/strong&gt; : This class implements an anomaly detector using the Z-score method. It maintains the mean and standard deviation of the last 10 values and marks a value as anomalous if its Z-score is greater than a specified threshold.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;output_builder&lt;/strong&gt; : This function is used to define the output behavior for the data pipeline. In this case, it prints the metric name, value, mean, standard deviation, and whether the value is anomalous.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Dataflow&lt;/strong&gt; : The main part of the code constructs the dataflow using Bytewax, connecting the RandomMetricInput, ZTestDetector, and the output builder.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Rerun visualization&lt;/strong&gt; : The Rerun visualization is integrated into the ZTestDetector class. The rr.log_scalar and rr.log_point functions are used to plot the data points and their corresponding anomaly status.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now, with an understanding of the code's main components, let's discuss how the visualization is created step by step.&lt;/p&gt;

&lt;h2&gt;
  
  
  Building the Dataflow
&lt;/h2&gt;

&lt;p&gt;To create a dataflow pipeline, you need to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initialize a new dataflow with &lt;code&gt;flow = Dataflow()&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Define the input source using &lt;code&gt;flow.input(&amp;amp;quot;input&amp;amp;quot;, ManualInputConfig(input_builder))&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Apply the stateful anomaly detector using &lt;code&gt;flow.stateful_map(&amp;amp;quot;AnomalyDetector&amp;amp;quot;, lambda: ZTestDetector(2.0), ZTestDetector.push)&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Configure the output behavior with &lt;code&gt;flow.capture(ManualOutputConfig(output_builder))&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Finally, spawn a cluster to execute the dataflow with &lt;code&gt;spawn_cluster(flow, proc_count=3)&lt;/code&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The resulting dataflow reads the randomly generated metric values from &lt;code&gt;input_builder&lt;/code&gt;, passes them through the &lt;code&gt;ZTestDetector&lt;/code&gt; for anomaly detection, and outputs the results using the &lt;code&gt;output_builder&lt;/code&gt; function. Let's clarify the details for each step.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;input_builder&lt;/code&gt; function
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;input_builder&lt;/code&gt; function serves as an alternative input source for the dataflow pipeline, generating random metric values in a distributed manner across multiple workers. It accepts three parameters: &lt;code&gt;worker_index&lt;/code&gt;, &lt;code&gt;worker_count&lt;/code&gt;, and &lt;code&gt;resume_state&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;]&lt;/span&gt;
    &lt;span class="n"&gt;this_workers_keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nb"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;randrange&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mf"&gt;0.9&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;*=&lt;/span&gt; &lt;span class="mf"&gt;2.0&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;total_seconds&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
            &lt;span class="n"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;worker_index&lt;/code&gt;: The index of the current worker in the dataflow pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;worker_count&lt;/code&gt;: The total number of workers in the dataflow pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;resume_state&lt;/code&gt;: The state of the input source from which to resume. In this case, it is asserted to be &lt;code&gt;None&lt;/code&gt;, indicating that the input source does not support resuming from a previous state.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's a step-by-step description of the &lt;code&gt;input_builder&lt;/code&gt; function:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Assert that &lt;code&gt;resume_state&lt;/code&gt; is &lt;code&gt;None&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Define a list of keys representing the metrics.&lt;/li&gt;
&lt;li&gt;Distribute the keys among the workers using the distribute function (not provided in the code snippet). The distributed keys for the current worker are assigned to this_workers_keys.&lt;/li&gt;
&lt;li&gt;Iterate 1,000 times and, for each iteration, iterate through the list of keys:

&lt;ul&gt;
&lt;li&gt;Generate a random value between 0 and 10.&lt;/li&gt;
&lt;li&gt;With a 10% probability, double the value to simulate an anomaly.&lt;/li&gt;
&lt;li&gt;Yield a tuple containing None (to indicate no specific partition key), the key, the generated value, and the elapsed time since the starting time (not provided in the code snippet).&lt;/li&gt;
&lt;li&gt;Introduce a sleep time between each generated value to simulate real-time data generation.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The &lt;code&gt;input_builder&lt;/code&gt; function is used in the dataflow as the input source with the following line of code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This line tells the dataflow to use the &lt;code&gt;RandomMetricInput&lt;/code&gt; class to generate the input data for the pipeline.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;ZTestDetector&lt;/code&gt; Class
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;ZTestDetector&lt;/code&gt; class is an anomaly detector that uses the Z-score method to identify whether a data point is anomalous or not. The Z-score is the number of standard deviations a data point is from the mean of a dataset. If a data point's Z-score is higher than a specified threshold, it is considered anomalous.&lt;/p&gt;

&lt;p&gt;The class has the following methods:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;__init__ (self, threshold_z)&lt;/code&gt;: The constructor initializes the ZTestDetector with a threshold Z-score value. It also initializes the last 10 values list (self.last_10), mean (self.mu), and standard deviation (self.sigma).&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;_push(self, value)&lt;/code&gt;: This private method is used to update the list of last 10 values with the new value. It inserts the new value at the beginning of the list and removes the oldest value, maintaining the list length at 10.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;_recalc_stats(self)&lt;/code&gt;: This private method recalculates the mean and standard deviation based on the current values in the self.last_10 list.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;push(self, key __value__ t)&lt;/code&gt;: This public method takes a tuple containing a key, a value, and a timestamp as input. It calculates the Z-score for the value, updates the last 10 values list, and recalculates the mean and standard deviation. It also logs the data point and its anomaly status using Rerun's visualization functions. Finally, it returns the updated instance of the ZTestDetector class and a tuple containing the value, mean, standard deviation, and anomaly status.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The ZTestDetector class is used in the dataflow pipeline as a stateful map with the following code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;2.0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;ZTestDetector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;push&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This line tells the dataflow to apply the &lt;code&gt;ZTestDetector&lt;/code&gt; with a Z-score threshold of &lt;code&gt;2.0&lt;/code&gt; and use the &lt;code&gt;push&lt;/code&gt; method to process the data points.&lt;/p&gt;

&lt;h3&gt;
  
  
  Visualizing Anomalies
&lt;/h3&gt;

&lt;p&gt;To visualize the anomalies, the &lt;code&gt;ZTestDetector&lt;/code&gt; class logs the data points and their corresponding anomaly status using Rerun's visualization functions. Specifically, &lt;code&gt;rr.log_scalar&lt;/code&gt; is used to plot a scalar value, while &lt;code&gt;rr.log_point&lt;/code&gt; is used to plot 3D points.&lt;/p&gt;

&lt;p&gt;The following code snippet shows how the visualization is created:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;155&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_scalar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;temp_&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt;
        &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;scattered&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;3.0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;rr&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;log_point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;dpoint&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;radius&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, we first log a scalar value representing the metric. Then, depending on whether the value is anomalous, we log a 3D point with a different radius and color. Anomalous points are logged in red with a larger radius, while non-anomalous points are logged with a smaller radius.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;output_builder&lt;/code&gt; Function
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;output_builder&lt;/code&gt; function is used to define the output behavior for the data pipeline. In this specific example, it is responsible for printing the metric name, value, mean, standard deviation, and whether the value is anomalous. The function takes two arguments: &lt;code&gt;worker_index&lt;/code&gt; and &lt;code&gt;worker_count&lt;/code&gt;. These arguments help the function understand the index of the worker and the total number of workers in the dataflow pipeline.&lt;/p&gt;

&lt;p&gt;Here's the definition of the &lt;code&gt;output_builder&lt;/code&gt; function:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;inspector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;input&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;:.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;{&lt;/span&gt;&lt;span class="n"&gt;is_anomalous&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;quot&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;inspector&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This function is a higher-order function, which means it returns another function called &lt;code&gt;inspector&lt;/code&gt;. The &lt;code&gt;inspector&lt;/code&gt; function is responsible for processing the input data tuple and printing the desired output. The output builder function is later used in the dataflow pipeline when configuring the output behavior with&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ManualOutputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;)).&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Running the Dataflow
&lt;/h2&gt;

&lt;p&gt;Bytewax can run as a single process or in a multi-process way. This dataflow has been created to scale across multiple processes, but we will start off running it as a single process with the &lt;code&gt;spawn_cluster&lt;/code&gt; execution module.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;spawn_cluster&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we wanted to increase the parallelism, we would simply add more processes as arguments. For example - &lt;code&gt;spawn_cluster(flow, proc_count=3)&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;To run the provided code we can simply run it as a Python script, but first we need to install the dependencies.&lt;/p&gt;

&lt;p&gt;Create a new file in the same directory as dataflow.py and name it requirements.txt. Add the following content to the requirements.txt file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bytewax==0.15.1
rerun-sdk==0.4.0

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Open a terminal in the directory containing the requirements.txt and dataflow.py files.&lt;/p&gt;

&lt;p&gt;Install the dependencies using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pip install -r requirements.txt

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And run the dataflow!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python dataflow.py

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Expanding the Use Case
&lt;/h2&gt;

&lt;p&gt;While the provided code serves as a basic example of real-time anomaly detection, you can expand this pipeline to accommodate more complex scenarios. For example:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Incorporate real-world data sources&lt;/strong&gt; : Replace the RandomMetricInput class with a custom class that reads data from a real-world source, such as IoT sensors, log files, or streaming APIs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Implement more sophisticated anomaly detection techniques&lt;/strong&gt; : You can replace the ZTestDetector class with other stateful anomaly detection methods, such as moving average, exponential smoothing, or machine learning-based approaches.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Customize the Visualization&lt;/strong&gt; : Enhance the Rerun visualization by adding more data dimensions, adjusting the color schemes, or modifying the plot styles to better suit your needs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Integrate with alerting and monitoring systems&lt;/strong&gt; : Instead of simply printing the anomaly results, you can integrate the pipeline with alerting or monitoring systems to notify the appropriate stakeholders when an anomaly is detected.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By customizing and extending the dataflow pipeline, you can create a powerful real-time anomaly detection and visualization solution tailored to your specific use case. The combination of Bytewax and Rerun offers a versatile and scalable foundation for building real-time data processing and visualization systems.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This blog post has demonstrated how to use Bytewax and Rerun to create a real-time anomaly detection visualization. By building a dataflow pipeline with Bytewax and integrating Rerun's powerful visualization capabilities, we can monitor and identify anomalies in our data as they occur.&lt;/p&gt;

</description>
      <category>visualization</category>
      <category>machinelearning</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>Using Language Models in a Streaming Context to Understand Financial Markets</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 16 Mar 2023 08:09:14 +0000</pubDate>
      <link>https://dev.to/bytewax/using-language-models-in-a-streaming-context-to-understand-financial-markets-1o9d</link>
      <guid>https://dev.to/bytewax/using-language-models-in-a-streaming-context-to-understand-financial-markets-1o9d</guid>
      <description>&lt;p&gt;For those who are eager to dive into the code, it's available:&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/bytewax" rel="noopener noreferrer"&gt;
        bytewax
      &lt;/a&gt; / &lt;a href="https://github.com/bytewax/news-analyzer" rel="noopener noreferrer"&gt;
        news-analyzer
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Analyze financial news in real-time with Machine Learning
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;news-analyzer&lt;/h1&gt;

&lt;/div&gt;
&lt;p&gt;Analyze financial news in real-time with Machine Learning&lt;/p&gt;
&lt;p&gt;See &lt;a href="https://github.com/bytewax/news-analyzer/FinancialNewsAnalysis.ipynb" rel="noopener noreferrer"&gt;FinancialNewsAnalysis.ipynb&lt;/a&gt;&lt;/p&gt;
&lt;/div&gt;



&lt;/div&gt;
&lt;br&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/bytewax/news-analyzer" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;br&gt;
&lt;/div&gt;
&lt;br&gt;


&lt;p&gt;Effective analysis of news is crucial for understanding the world, especially when it comes to financial markets. Being able to quickly identify significant events, such as a major corporation being hacked and sensitive customer data being compromised, can enable you to respond rapidly and either capitalize on opportunities or minimize losses. In this blog post, we'll delve into how Bytewax and large language models can be leveraged to analyze financial news in real time, providing you with the ability to respond to breaking news more effectively. We need to answer at least three questions to implement our little project successfully:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Where do we get the data?&lt;/li&gt;
&lt;li&gt;How do we analyze it?&lt;/li&gt;
&lt;li&gt;How do we access the data source and perform analysis in real-time?&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Data Source
&lt;/h2&gt;

&lt;p&gt;For the data source used in this demo, we will use the &lt;a href="https://alpaca.markets/docs/market-data/news/#real-time-streaming" rel="noopener noreferrer"&gt;Alpaca news API&lt;/a&gt;, which provides websocket access to news articles from Benzinga. To setup an account and create an API key and secret, you can follow the &lt;a href="https://alpaca.markets/docs/market-data/getting-started/" rel="noopener noreferrer"&gt;Alpaca documentation&lt;/a&gt;. &lt;em&gt;You can use any websocket as a data source. A future follow-up will look at how we can build our own real-time news aggregation pipeline for analysis.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Content Analysis
&lt;/h2&gt;

&lt;p&gt;We're obviously going to leverage Large Language Models (LLMs) to analyze news articles. And the best place which comes to mind when looking for LLMs is Hugging Face.&lt;a href="https://huggingface.co/" rel="noopener noreferrer"&gt;Hugging Face&lt;/a&gt; is a company that provides a marketplace where researchers can release models and datasets on their hub that can then be used by other researchers and developers via their hosted model endpoints and their Transformers library. Firstly, we need to perform sentiment analysis on the headline, which can quickly provide valuable insights. For this, we'll use a fine-tuned BERT model called &lt;a href="https://huggingface.co/ahmedrachid/FinancialBERT-Sentiment-Analysis" rel="noopener noreferrer"&gt;FinancialBERT&lt;/a&gt;. Then we will summarize the content of the article, and a fine-tuned &lt;a href="https://huggingface.co/facebook/bart-large-cnn" rel="noopener noreferrer"&gt;BART model&lt;/a&gt; will come in handy for this. Both can be found on &lt;a href="https://huggingface.co" rel="noopener noreferrer"&gt;huggingface.co&lt;/a&gt;. We also are going to cover how we can use the Transformers library to run the models.&lt;/p&gt;

&lt;h2&gt;
  
  
  Real-Time Data Processing with Bytewax
&lt;/h2&gt;

&lt;p&gt;If you are not familiar with Bytewax. Bytewax is a stateful stream processor that can be used to analyze data in real time with support for stateful operators like windowing and aggregation. Bytewax is especially suitable for workflows that leverage the Python ecosystem of tools, from data crunching tools like Pandas to machine learning-focused tools like Hugging Face Transformers. It also supports a variety of data sources, including websockets.&lt;/p&gt;

&lt;p&gt;Let's get started analyzing the news in real-time. First things first! Dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;!&lt;/span&gt;pip &lt;span class="nb"&gt;install &lt;/span&gt;bytewax transformers torch sentencepiece websocket-client
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Constructing Our Dataflow
&lt;/h2&gt;

&lt;p&gt;A Bytewax dataflow is a sequence of steps that transform data from an input source and then write it to an output. At each step an operator is used to control the flow of data; whether it should be filtered, aggregated or accumulated. Developers writing dataflows will write Python code that will do the data transformation at each step.&lt;/p&gt;

&lt;h3&gt;
  
  
  Input
&lt;/h3&gt;

&lt;p&gt;To begin the dataflow, we'll create an input using the Alpaca websocket, which we'll use to subscribe to articles on multiple tickers. It's important to note that you'll require an Alpaca API key and secret, and it's recommended to store them as environment variables.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.dataflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.inputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;distribute&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;websocket&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_connection&lt;/span&gt;

&lt;span class="n"&gt;API_KEY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;API_KEY&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;API_SECRET&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;API_SECRET&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;ticker_list&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;*&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;state&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;resume_state&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;worker_tickers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;distribute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker_list&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;subscribing to&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;news_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_connection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;wss://stream.data.alpaca.markets/v1beta1/news&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;action&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;auth&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;API_KEY&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;secret&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;API_SECRET&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}))&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;action&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;subscribe&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;news&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;}))&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# to use without API uncomment the below line and comment the one below that
&lt;/span&gt;        &lt;span class="c1"&gt;# articles = [{"T":"n","id":31248067,"headline":"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State","summary":"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important.","author":"Chris Katje","created_at":"2023-03-07T22:58:40Z","updated_at":"2023-03-07T22:58:40Z","url":"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state","content":"\u003cp\u003eA lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here\u0026rsquo;s the potential law and why it\u0026rsquo;s important.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhat Happened:\u003c/strong\u003e States have passed laws aimed at banning the sale of gas-powered vehicles in the future. One state took it a step further by seeking to ban electric vehicle \u003ca href=\"https://www.benzinga.com/news/23/01/30424292/taking-on-elon-musk-this-state-legislature-could-ban-electric-vehicle-sales-by-2035\"\u003esales in the future.\u003c/a\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eOne of the leading states for electric vehicle purchases could now see a temporary ban on using electric vehicles during the time of a crisis.\u003c/p\u003e\r\n\r\n\u003cp\u003eFlorida Republican state Sen.\u0026nbsp;\u003cstrong\u003eJonathan Martin\u003c/strong\u003e is considering legislation to ban electric vehicles like those from \u003cstrong\u003eTesla Inc\u003c/strong\u003e (NASDAQ:\u003ca class=\"ticker\" href=\"https://www.benzinga.com/stock/TSLA#NASDAQ\"\u003eTSLA\u003c/a\u003e) to be used during hurricane evacuations in the state, according to \u003ca href=\"https://electrek.co/2023/03/06/florida-lawmaker-wants-to-ban-evs-from-hurricane-evacuations/\"\u003eElectrek\u003c/a\u003e.\u0026nbsp;\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin told the state\u0026rsquo;s Department of Transportation that electric vehicles could block traffic during evacuations if they run out of battery charge.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin serves on the Committee on Environment and Natural Resources and the Select Committee on Resiliency.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Select Committee on Resiliency met with the Florida Department of Transportation executive director of transportation technologies in Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eAmong the topics discussed were the $198 million the state is going to get from the Bipartisan Infrastructure Law for electric vehicle charging infrastructure from the current administration led by \u003cstrong\u003ePresident Joe Biden.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eThe legislation requires electric vehicle charging stations to be 50 miles apart and serve all electric vehicles.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;With a couple of guys behind you, you can\u0026rsquo;t get out of the car and push it to the side of the road. Traffic backs up. And what might look like a two-hour trip might turn into an eight-hour trip once you\u0026rsquo;re on the road,\u0026rdquo; Martin said.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin said his concern is with the electric vehicle infrastructure available in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRelated Link: \u003ca href=\"https://www.benzinga.com/trading-ideas/22/06/27568560/4-stocks-to-watch-this-hurricane-season\"\u003e4 Stocks To Watch This Hurricane Season\u0026nbsp;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhy It\u0026rsquo;s Important:\u003c/strong\u003e The Florida Department of Transportation told Martin it isn\u0026rsquo;t a fan of banning electric vehicles during hurricane evacuations and that it is looking into portable EV chargers.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;We have our emergency assistance vehicles that we deploy during a hurricane evacuation that have gas \u0026hellip; we need to provide that same level of service to electrical vehicles,\u0026rdquo; Department of Transportation director of transportation technologies \u003cstrong\u003eTrey Tillander \u003c/strong\u003esaid.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Tampa Bay Times \u003ca href=\"https://www.tampabay.com/hurricane/2023/02/24/florida-lawmaker-suggests-limiting-electric-vehicles-during-hurricane-evacuations/\"\u003ereported\u003c/a\u003e\u0026nbsp;around 1% of the vehicles in Florida are electric vehicles. One of the owners of an EV is state Sen.\u0026nbsp;\u003cstrong\u003eTina Polsky.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;I don\u0026rsquo;t think you can ban an electric vehicle from evacuating because that may be the only car someone has,\u0026rdquo; Polsky said.\u003c/p\u003e\r\n\r\n\u003cp\u003eIn December 2022, there were 203,094 electric vehicles registered in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe increased funding for charging infrastructure could help ease concerns over charging.\u003c/p\u003e\r\n\r\n\u003cp\u003eUltimately, once people are on the road headed out of the state, they likely won\u0026rsquo;t be able to stop at a charging station, similar to people not being able to quickly stop at a gas station.\u003c/p\u003e\r\n\r\n\u003cp\u003eJust like people prepare for the evacuation by filling up their vehicle with gas, owners of electric vehicles will likely need to fully charge their vehicle before evacuating the state.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe comments from the state senator may have Florida residents thinking about owning at least one non-electric vehicle or a hybrid to ensure they have the best chance to exit the state without future restrictions and without the potential of running out of charge and not finding stations prevalent.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRead Next:\u0026nbsp;\u003ca href=\"https://www.benzinga.com/analyst-ratings/analyst-color/23/03/31172188/tesla-analysts-praise-vertical-integration-after-investor-day-but-want-more-from-el\"\u003eTesla Analysts Praise Vertical Integration After Investor Day, But Want More From Elon Musk: \u0026#39;Long On Vision, Short On Specifics\u0026#39;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003ePhoto:\u0026nbsp;\u003ca href=\"https://www.shutterstock.com/g/hsaduraphotos\"\u003eHenryk Sadura\u003c/a\u003e\u0026nbsp;via Shutterstock\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cbr /\u003e\r\n\u0026nbsp;\u003c/p\u003e\r\n","symbols":["TSLA"],"source":"benzinga"}]
&lt;/span&gt;          &lt;span class="n"&gt;articles&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
          &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;source&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;news_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_tickers&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;inp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;ManualInputConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resulting data returned from the news API looks like the json shown here.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[{&lt;/span&gt;&lt;span class="nl"&gt;"T"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"n"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;31248067&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"headline"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"summary"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important."&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"author"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Chris Katje"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"created_at"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2023-03-07T22:58:40Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"updated_at"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2023-03-07T22:58:40Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"content"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003eA lawmaker in one American state could make it hard for owners of electric vehicles ... ertical Integration After Investor Day, But Want More From Elon Musk: &lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026#39;Long On Vision, Short On Specifics&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026#39;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/a&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/em&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cem&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003ePhoto:&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003ca href=&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;https://www.shutterstock.com/g/hsaduraphotos&lt;/span&gt;&lt;span class="se"&gt;\"\u&lt;/span&gt;&lt;span class="s2"&gt;003eHenryk Sadura&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/a&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;via Shutterstock&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/em&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;003cp&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003cbr /&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n\u&lt;/span&gt;&lt;span class="s2"&gt;0026nbsp;&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003c/p&lt;/span&gt;&lt;span class="se"&gt;\u&lt;/span&gt;&lt;span class="s2"&gt;003e&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"symbols"&lt;/span&gt;&lt;span class="p"&gt;:[&lt;/span&gt;&lt;span class="s2"&gt;"TSLA"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"benzinga"&lt;/span&gt;&lt;span class="p"&gt;}]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will use this in the next steps in our dataflow to analyze the sentiment and provide a summary.&lt;/p&gt;

&lt;h3&gt;
  
  
  Managing Duplicates and Updates
&lt;/h3&gt;

&lt;p&gt;When working with news stories from RSS/Atom feeds or news APIs, it's common to receive duplicates as they're created and then updated. To prevent these duplicates from being analyzed multiple times and incurring additional overhead of running ML models on the same story, we'll use the Bytewax operator &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.stateful_map"&gt;&lt;code&gt;stateful_map&lt;/code&gt;&lt;/a&gt; to create a simplified storage layer. We'll store a list of unique identifiers for each news article we encounter. If an article has been seen before, we'll mark it as an update. Otherwise, we'll add the article's ID to the stateful object. To filter out the updates and avoid reclassifying and summarizing them, we'll use the &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.filter"&gt;&lt;code&gt;filter&lt;/code&gt;&lt;/a&gt; operator. Think of this process as the equivalent of checking a database for a unique ID.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;update_articles&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;source_articles&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;update_articles&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Sentiment Analysis
&lt;/h3&gt;

&lt;p&gt;Sentiment analysis is the next step in our process. Our approach involves using a fine-tuned Hugging Face model to analyze the article's headline sentiment. We will be leveraging a BERT model for this purpose. BERT, which stands for Bidirectional Encoder Representations from Transformers, was developed by Google. For a detailed understanding of how this model operates and was trained, you can refer to the &lt;a href="https://huggingface.co/ahmedrachid/FinancialBERT-Sentiment-Analysis" rel="noopener noreferrer"&gt;model card&lt;/a&gt; on Hugging Face or the accompanying &lt;a href="https://www.researchgate.net/publication/358284785_FinancialBERT_-_A_Pretrained_Language_Model_for_Financial_Text_Mining" rel="noopener noreferrer"&gt;research paper&lt;/a&gt;. Since we want to analyze each news article independently, the sentiment classification will take place in a &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.map"&gt;&lt;code&gt;map&lt;/code&gt;&lt;/a&gt; operator. Despite the extensive research that goes into designing novel model architectures and creating training datasets, implementing sentiment analysis is remarkably straightforward. &lt;em&gt;Note that if you're following along in a notebook, the model will take some time to download initially.&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;transformers&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSequenceClassification&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSeq2SeqLM&lt;/span&gt;

&lt;span class="n"&gt;sent_tokenizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ahmedrachid/FinancialBERT-Sentiment-Analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sent_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSequenceClassification&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ahmedrachid/FinancialBERT-Sentiment-Analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sent_nlp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sentiment-analysis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sent_model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tokenizer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sent_tokenizer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;sentiment_analysis&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker__news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ticker__news&lt;/span&gt;
    &lt;span class="n"&gt;sentiment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sent_nlp&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;headline&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]])&lt;/span&gt;
    &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sentiment&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sentiment&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sentiment&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="nf"&gt;return &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sentiment_analysis&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Article Summarization
&lt;/h3&gt;

&lt;p&gt;After analyzing the article sentiment, we will utilize a BART (Bidirectional Auto-Regressive Transformers) model architecture, which is a combination of Google's BERT and OpenAI's GPT architectures, to summarize its content. Despite the significant effort that goes into creating the model, implementing it with the Hugging Face Transformers library is relatively easy. We can generate a summarization pipeline and apply it in a &lt;a href="///apidocs/bytewax.dataflow#bytewax.dataflow.Dataflow.map"&gt;&lt;code&gt;map&lt;/code&gt;&lt;/a&gt; step. To obtain better results, we also incorporated an extra step into this map process, which involved cleaning the text before summarizing it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;

&lt;span class="c1"&gt;# Let's create a summarization pipeline
&lt;/span&gt;&lt;span class="n"&gt;sum_tokenizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoTokenizer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;facebook/bart-large-cnn&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;sum_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AutoModelForSeq2SeqLM&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_pretrained&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;facebook/bart-large-cnn&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;summarizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;summarization&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tokenizer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sum_tokenizer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sum_model&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;tag_re&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;compile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;r&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;(&amp;lt;!--.*?--&amp;gt;|&amp;lt;[^&amp;gt;]*&amp;gt;)&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;summarize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker__news&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ticker__news&lt;/span&gt;
    &lt;span class="n"&gt;article&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;content&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;article_no_tags&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tag_re&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sub&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;''&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;article&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;article_no_tags&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;article_no_tags&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\r&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;summary&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;summarizer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;article_no_tags&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;max_length&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;130&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;min_length&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;do_sample&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bart_summary&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;summary_text&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bart summary:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;summary_text&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;return &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;news&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;summarize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Output
&lt;/h3&gt;

&lt;p&gt;With our news analyzed, we can set a capture step to output the modified news object and then run our dataflow. For this instance we are going to write the output to StdOut so we can easily view it, but in a production system we could write the results to a downstream kafka topic or database for further analysis.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;If you are following along in a notebook, remember you have to be authenticated for this to work and will need to set your Alpaca API key and secret&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.execution&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;run_main&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bytewax.outputs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;StdOutputConfig&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;StdOutputConfig&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;run_main&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Wrapping up
&lt;/h2&gt;

&lt;p&gt;While our example is simplified, it showcases the power of Bytewax and Hugging Face's language models. We can easily analyze financial news articles in real-time, identify significant events, and make informed decisions: using the Alpaca news API as our data source, we were able to construct a dataflow that deduplicate stories and summarizes the content of each article.&lt;/p&gt;

&lt;p&gt;The ease of implementation through Python native Bytewax and the Hugging Face Transformers library makes it accessible for data engineers and researchers to utilize these state-of-the-art language models in their own projects. We hope this blog post serves as a useful guide for anyone looking to leverage real-time news analysis in their financial decision-making process.&lt;/p&gt;

</description>
      <category>llm</category>
      <category>machinelearning</category>
      <category>python</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>How to Analyze Cryptocurrency Orders with Python</title>
      <dc:creator>Zander</dc:creator>
      <pubDate>Thu, 16 Jun 2022 16:37:17 +0000</pubDate>
      <link>https://dev.to/awmatheson/how-to-analyze-cryptocurrency-orders-with-python-1m3m</link>
      <guid>https://dev.to/awmatheson/how-to-analyze-cryptocurrency-orders-with-python-1m3m</guid>
      <description>&lt;p&gt;&lt;em&gt;This post was originally published on the &lt;a href="https://www.bytewax.io/blog/crypto-sockets/"&gt;Bytewax blog&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In this example we are going to walk through how you can maintain a limit order book in real-time in Python. Take advantage of the ability to accrue state and parallelize input with Bytewax, an open source Python library for stateful streaming.&lt;/p&gt;

&lt;p&gt;We are going to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use websockets to connect to an exchange (coinbase)&lt;/li&gt;
&lt;li&gt;Setup an order book using a snapshot&lt;/li&gt;
&lt;li&gt;Update the order book in real-time&lt;/li&gt;
&lt;li&gt;Use algorithms to trade crypto and profit. Just kidding, this is left to an exercise for the reader.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To start off, we are going to diverge into some concepts around markets, exchanges and orders.&lt;/p&gt;

&lt;h2&gt;
  
  
  Concepts
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Order Book
&lt;/h3&gt;

&lt;p&gt;A Limit Order Book, or just Order Book is a record of all limit orders that are made. A limit order is an order to buy (bid) or sell (ask) an asset for a given price. This could facilitate the exchange of dollars for shares or, as in our case, they could be orders to exchange crypto currencies. On exchanges, the limit order book is constantly changing as orders are placed every fraction of a second. The order book can give a trader insight into the market, whether they are looking to determine liquidity, to create liquidity, design a trading algorithm or maybe determine when bad actors are trying to manipulate the market. &lt;/p&gt;

&lt;h3&gt;
  
  
  Bid and Ask
&lt;/h3&gt;

&lt;p&gt;In the order book, the &lt;strong&gt;ask price&lt;/strong&gt; is the lowest price that a seller is willing to sell at and the &lt;strong&gt;bid price&lt;/strong&gt; is the highest price that a buyer is willing to buy at. A limit order is different than a market order in that the limit order can be placed with generally 4 dimensions, the direction (buy/sell), the size, the price and the duration (time to expire). A market order, in comparison, has 2 dimensions, the direction and the size. It is up to the exchange to fill a market order and it is filled via what is available in the order book.&lt;/p&gt;

&lt;h3&gt;
  
  
  Level 2 Data
&lt;/h3&gt;

&lt;p&gt;An exchange will generally offer a few different tiers of information that traders can subscribe to. These can be quite expensive for some exchanges, but luckily for us, most crypto exchanges provide access to the various levels for free! For maintaining our order book we are going to need at least level2 order information. This gives us granularity of each limit order so that we can adjust the book in real-time. Without the ability to maintain our own order book, the snapshot would get almost instantly out of date and we would not be able to act with perfect information. We would have to download the order book every millisecond, or faster, to stay up to date and since the order book can be quite large, this isn't really feasible.&lt;/p&gt;

&lt;p&gt;Alright, let's get started!&lt;/p&gt;

&lt;h2&gt;
  
  
  Inputs &amp;amp; Outputs
&lt;/h2&gt;

&lt;p&gt;We are going to eventually create a cluster of dataflows where we could have multiple currency pairs running in parallel on different workers. In order to follow this approach, we will use the &lt;a href="https://docs.bytewax.io/apidocs#bytewax.spawn_cluster"&gt;&lt;code&gt;spawn_cluster&lt;/code&gt;&lt;/a&gt; method of kicking off our dataflow. To start, we will build a websocket input function that will use the coinbase pro websocket url (&lt;code&gt;wss://ws-feed.pro.coinbase.com&lt;/code&gt;) and the Python websocket library to create a connection. Once connected we can send a message to the websocket subscribing to product_ids (pairs of currencies - USD-BTC for this example) and channels (level2 order book data). Finally since we know there will be some sort of acknowledgement message we can grab that with &lt;code&gt;ws.recv()&lt;/code&gt; and print it out. In this example we are assuming we receive our data in order and we are going to assign a monotonically increasing epoch to each new message we receive. In Bytewax, an epoch is assigned to data and then Bytewax is instructed move that data through the dataflow as far as possible with the &lt;code&gt;Emit&lt;/code&gt; method. At some point, we would consider that epoch complete if there was not any additional data to be received. At that point we would instruct Bytewax to advance to the next epoch with &lt;code&gt;AdvanceTo&lt;/code&gt; and would lead to completion of the dataflow process for that epoch. This allows for flexibility so you could span an epoch over a time window and complete the processing at the end of the window. For more details on how this works, check the &lt;a href="https://docs.bytewax.io/getting-started/epochs"&gt;epoch documentation&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;websocket&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_connection&lt;/span&gt;


&lt;span class="n"&gt;PRODUCT_IDS&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'BTC-USD'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s"&gt;'ETH-USD'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;ws_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;ws&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;create_connection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"wss://ws-feed.pro.coinbase.com"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="s"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"subscribe"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="s"&gt;"product_ids"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="s"&gt;"channels"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"level2"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="n"&gt;epoch&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ws&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;recv&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;epoch&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;AdvanceTo&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;epoch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that we have our web socket based data generator built, we will write an input builder for our dataflow. The input builder is called on each worker and the function will have information about the &lt;code&gt;worker_index&lt;/code&gt; and the total number of workers (&lt;code&gt;worker_count&lt;/code&gt;). In this case we are designing our input builder to handle multiple workers and multiple currency pairs, so that we can parallelize the input. So we will distribute the currency pairs with the logic in the code below. It should be noted that if you run more than one worker with only one currency pair, the other workers will not be used.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;bytewax&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;prods_per_worker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;PRODUCT_IDS&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;product_ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;PRODUCT_IDS&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;prods_per_worker&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;&lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;prods_per_worker&lt;/span&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="n"&gt;prods_per_worker&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;ws_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that we have our input builder finished, we can create our output builder. The output builder is used in conjunction with &lt;code&gt;spawn_cluster&lt;/code&gt;. In a dataflow, when you use the &lt;code&gt;capture&lt;/code&gt; operator, the output_builder is called and will receive information about the worker as well as the epoch and the data (in the format &lt;code&gt;(epoch, data)&lt;/code&gt;). For this example, we keep it simple, we are just going to print out the result of our dataflow to the terminal.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_count&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;print&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Building Our Dataflow
&lt;/h2&gt;

&lt;p&gt;Before we get to the exciting part of our order book dataflow we need to prep the data. We initially receive some JSON formatted text, so we will first deserialize the JSON we are receiving from the websocket into a dictionary. Once deserialized, we can reformat the data to be a tuple of the shape (product_id, data). This will permit us to aggregate by the product_id as our key in the next step.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;key_on_product&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'product_id'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# {'type': 'l2update', 'product_id': 'BTC-USD', 'changes': [['buy', '36905.39', '0.00334873']], 'time': '2022-05-05T17:25:09.072519Z'}
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key_on_product&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# ('BTC-USD', {'type': 'l2update', 'product_id': 'BTC-USD', 'changes': [['buy', '36905.39', '0.00334873']], 'time': '2022-05-05T17:25:09.072519Z'})
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now for the exciting part. The code below is what we are using to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Construct the orderbook as two dictionaries, one for asks, one for bids&lt;/li&gt;
&lt;li&gt;Assign a value to the ask and bid price.&lt;/li&gt;
&lt;li&gt;For each new order, update the order book and then update the bid and ask prices where required.&lt;/li&gt;
&lt;li&gt;Return bid and ask price, the respective volumes of the ask and the difference between the prices.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The data from the coinbase pro websocket is first a snapshot of the current order book in the format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"snapshot"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"product_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"BTC-USD"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"bids"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[[&lt;/span&gt;&lt;span class="s2"&gt;"10101.10"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0.45054140"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"asks"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[[&lt;/span&gt;&lt;span class="s2"&gt;"10102.55"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0.57753524"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and then each additional message is an update with a new limit order of the format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"l2update"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"product_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"BTC-USD"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"time"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2019-08-14T20:42:27.265Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"changes"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="s2"&gt;"buy"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="s2"&gt;"10101.80000000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="s2"&gt;"0.162567"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To maintain an order book in real time, we will first need to construct an object to hold the orders from the snapshot and then update that object with each additional update. This is a good use case for the &lt;a href="https://docs.bytewax.io/apidocs#bytewax.Dataflow.stateful_map"&gt;&lt;code&gt;stateful_map&lt;/code&gt;&lt;/a&gt; operator, which can aggregate by key, over many epochs. &lt;code&gt;Stateful_map&lt;/code&gt; will aggregate data based on a function (mapper), into an object that you define. The object must be defined via a builder, because it will create a new object via this builder for each new key received. The mapper must return the object so that it can be updated.&lt;/p&gt;

&lt;p&gt;Below we have the code for the OrderBook object that has a bids and asks dictionary. These will be used to first create the order book from the snapshot and once created we can attain the first bid price and ask price. The bid price is the highest buy order placed and the ask price is the lowest sell order places. Once we have determined the bid and ask prices, we will be able to calculate the spread and track that as well.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;OrderBook&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="c1"&gt;# if using Python &amp;lt; 3.7 need to use OrderedDict here
&lt;/span&gt;        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;update&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="p"&gt;{}:&lt;/span&gt;
            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;size&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'bids'&lt;/span&gt;&lt;span class="p"&gt;]}&lt;/span&gt;
            &lt;span class="c1"&gt;# The bid_price is the highest priced buy limit order.
&lt;/span&gt;            &lt;span class="c1"&gt;# since the bids are in order, the first item of our newly constructed bids
&lt;/span&gt;            &lt;span class="c1"&gt;# will have our bid price, so we can track the best bid
&lt;/span&gt;            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;iter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="p"&gt;{}:&lt;/span&gt;
            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;&lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;size&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'asks'&lt;/span&gt;&lt;span class="p"&gt;]}&lt;/span&gt;
            &lt;span class="c1"&gt;# The ask price is the lowest priced sell limit order.
&lt;/span&gt;            &lt;span class="c1"&gt;# since the asks are in order, the first item of our newly constructed 
&lt;/span&gt;            &lt;span class="c1"&gt;# asks will be our ask price, so we can track the best ask
&lt;/span&gt;            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;iter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With our snapshot processed, for each new message we receive from the websocket, we can update the order book, the bid and ask price and the spread. Sometimes an order was filled or it was cancelled and in this case what we receive from the update is something similar to &lt;code&gt;'changes': [['buy', '36905.39', '0.00000000']]&lt;/code&gt;. When we receive these updates of size &lt;code&gt;'0.00000000'&lt;/code&gt;, we can remove that item from our book and potentially update our bid and ask price. The code below will check if the order should be removed and if not it will update the order. If the order was removed, it will check to make sure the bid and ask prices are modified if required.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# We receive a list of lists here, normally it is only one change, 
&lt;/span&gt;            &lt;span class="c1"&gt;# but could be more than one.
&lt;/span&gt;            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;update&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'changes'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
                &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
                &lt;span class="n"&gt;size&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;'sell'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# first check if the size is zero and needs to be removed
&lt;/span&gt;                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="k"&gt;del&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                        &lt;span class="c1"&gt;# if it was the ask price removed, 
&lt;/span&gt;                        &lt;span class="c1"&gt;# update with new ask price
&lt;/span&gt;                        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;KeyError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="c1"&gt;# don't need to add price with size zero
&lt;/span&gt;                        &lt;span class="k"&gt;pass&lt;/span&gt;
                &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt;
                    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;'buy'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# first check if the size is zero and needs to be removed
&lt;/span&gt;                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="k"&gt;del&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                        &lt;span class="c1"&gt;# if it was the bid price removed, 
&lt;/span&gt;                        &lt;span class="c1"&gt;# update with new bid price
&lt;/span&gt;                        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                            &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;KeyError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="c1"&gt;# don't need to add price with size zero
&lt;/span&gt;                        &lt;span class="k"&gt;pass&lt;/span&gt;
                &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;size&lt;/span&gt;
                    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;'bid'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'bid_size'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bids&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="s"&gt;'ask'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'ask_price'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;asks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="s"&gt;'spread'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ask_price&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bid_price&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;OrderBook&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;OrderBook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# if using bytewax&amp;gt;0.9.0 --&amp;gt; flow.stateful_map("order_book", lambda key: OrderBook(), OrderBook.update)
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finishing it up, for fun we can filter for a spread as a percentage of the ask price greater than 01% and then capture the output. Maybe we can profit off of this spread... or maybe not.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;capture&lt;/code&gt; operator is designed to use the output builder function that we defined earlier. In this case it will print out to our terminal.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="s"&gt;'spread'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="s"&gt;'ask'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mf"&gt;0.0001&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://dev.to/getting-started/execution/"&gt;Bytewax provides a few different entry points for executing a dataflow&lt;/a&gt;, in this example we are using &lt;code&gt;bytewax.spawn_cluster&lt;/code&gt; which allows us to run dataflows in parallel on threads and processes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"__main__"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;spawn_cluster&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_builder&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_builder&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;parse&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;cluster_args&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it, let's run it and verify our output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;python orderbook.py
&lt;span class="c"&gt;# for multiple workers --&amp;gt; python orderbook.py -w 2&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And eventually, if the spread is greater than $5, we will see some output similar to what is below.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"type"&lt;/span&gt;:&lt;span class="s2"&gt;"subscriptions"&lt;/span&gt;,&lt;span class="s2"&gt;"channels"&lt;/span&gt;:[&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"name"&lt;/span&gt;:&lt;span class="s2"&gt;"level2"&lt;/span&gt;,&lt;span class="s2"&gt;"product_ids"&lt;/span&gt;:[&lt;span class="s2"&gt;"BTC-USD"&lt;/span&gt;&lt;span class="o"&gt;]}]}&lt;/span&gt;
&lt;span class="o"&gt;(&lt;/span&gt;1046, &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'BTC-USD'&lt;/span&gt;, &lt;span class="o"&gt;(&lt;/span&gt;38590.1, 0.00945844, 38596.73, 0.01347429, 6.630000000004657&lt;span class="o"&gt;)))&lt;/span&gt;
&lt;span class="o"&gt;(&lt;/span&gt;1047, &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'BTC-USD'&lt;/span&gt;, &lt;span class="o"&gt;(&lt;/span&gt;38590.1, 0.00945844, 38597.13, 0.02591147, 7.029999999998836&lt;span class="o"&gt;)))&lt;/span&gt;
&lt;span class="o"&gt;(&lt;/span&gt;1048, &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'BTC-USD'&lt;/span&gt;, &lt;span class="o"&gt;(&lt;/span&gt;38590.1, 0.00945844, 38597.13, 0.02591147, 7.029999999998836&lt;span class="o"&gt;)))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it!&lt;/p&gt;

&lt;p&gt;We would love to see if you can build on this example. Feel free to share what you've built in our &lt;a href="https://join.slack.com/t/bytewaxcommunity/shared_invite/zt-vkos2f6r-_SeT9pF2~n9ArOaeI3ND2w"&gt;community slack channel&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>python</category>
      <category>crypto</category>
      <category>datascience</category>
    </item>
  </channel>
</rss>
