<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: RisingWave Labs</title>
    <description>The latest articles on DEV Community by RisingWave Labs (@risingwavelabs).</description>
    <link>https://dev.to/risingwavelabs</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1118264%2F4a6edd31-76ad-4e56-9f8d-6e0a23d888e8.png</url>
      <title>DEV Community: RisingWave Labs</title>
      <link>https://dev.to/risingwavelabs</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/risingwavelabs"/>
    <language>en</language>
    <item>
      <title>RisingWave v2.8: Query Your Lakehouse, Backfill Faster, and Tune Jobs Individually</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Mon, 20 Apr 2026 11:22:15 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/risingwave-v28-query-your-lakehouse-backfill-faster-and-tune-jobs-individually-52f1</link>
      <guid>https://dev.to/risingwavelabs/risingwave-v28-query-your-lakehouse-backfill-faster-and-tune-jobs-individually-52f1</guid>
      <description>&lt;h1&gt;
  
  
  RisingWave v2.8: Query Your Lakehouse, Backfill Faster, and Tune Jobs Individually
&lt;/h1&gt;

&lt;p&gt;If you run streaming pipelines on RisingWave and batch queries on a separate engine, v2.8 changes the equation. This release adds a DataFusion-powered query engine that lets you run batch SQL directly on your data lake (Iceberg, Delta Lake, Hudi) without moving data.&lt;/p&gt;

&lt;p&gt;Beyond the lakehouse, we’ve overhauled backfilling to make it significantly faster and introduced per-job resource isolation, giving you granular control over how individual streaming jobs consume CPU and memory.&lt;/p&gt;

&lt;p&gt;Here are the highlights of RisingWave v2.8.&lt;/p&gt;




&lt;h2&gt;
  
  
  Query Your Lakehouse Directly (Powered by DataFusion)
&lt;/h2&gt;

&lt;p&gt;RisingWave has long supported sinking data to data lakes like Apache Iceberg, Delta Lake, and Apache Hudi. However, querying that data usually required an external engine like Trino, StarRocks, or Spark.&lt;/p&gt;

&lt;p&gt;In v2.8, we’ve integrated &lt;strong&gt;Apache DataFusion&lt;/strong&gt; as a native batch query engine. You can now use RisingWave to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;strong&gt;Query Sinks Directly:&lt;/strong&gt; Run SQL queries on the data you’ve already exported to your lakehouse.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Join Streams with Lake Data:&lt;/strong&gt; Perform complex joins between real-time streaming data and historical data stored in the lake.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Unified Experience:&lt;/strong&gt; Use the same SQL dialect and connection for both real-time and historical analysis.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This turns RisingWave into a unified engine for the entire data lifecycle—from ingestion and streaming to long-term storage and ad-hoc analysis.&lt;/p&gt;




&lt;h2&gt;
  
  
  Faster Backfilling with Parallelism
&lt;/h2&gt;

&lt;p&gt;Backfilling—the process of populating a new materialized view with historical data—is a critical but resource-intensive task. In previous versions, backfilling was often a bottleneck for large datasets.&lt;/p&gt;

&lt;p&gt;v2.8 introduces &lt;strong&gt;Parallel Backfilling&lt;/strong&gt;. By distributing the backfill task across multiple nodes and utilizing more CPU cores, RisingWave can now process historical data significantly faster. In our internal benchmarks, we’ve seen backfill times drop by up to 50% for large-scale tables, allowing you to get your new materialized views online in record time.&lt;/p&gt;




&lt;h2&gt;
  
  
  Individual Job Tuning &amp;amp; Resource Isolation
&lt;/h2&gt;

&lt;p&gt;In a multi-tenant or complex streaming environment, one "heavy" job can sometimes impact the performance of others. v2.8 addresses this with &lt;strong&gt;Per-Job Resource Isolation&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;You can now:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;strong&gt;Set Resource Limits:&lt;/strong&gt; Define specific CPU and memory limits for individual streaming jobs.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Prioritize Critical Jobs:&lt;/strong&gt; Ensure that your most important pipelines always have the resources they need, regardless of other activity on the cluster.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Granular Monitoring:&lt;/strong&gt; Track resource consumption at the job level to identify and optimize inefficient queries.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This level of control makes RisingWave more robust for production environments where predictable performance is non-negotiable.&lt;/p&gt;




&lt;h2&gt;
  
  
  Other Notable Improvements
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;strong&gt;Enhanced Connector Support:&lt;/strong&gt; Improved stability and performance for Kafka, Pulsar, and Kinesis connectors.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;SQL Enhancements:&lt;/strong&gt; Support for more window functions and improved query optimization for complex joins.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Dashboard Updates:&lt;/strong&gt; The RisingWave dashboard now provides more detailed insights into cluster health and job performance.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Get Started with v2.8
&lt;/h2&gt;

&lt;p&gt;RisingWave v2.8 is available now. You can try it out via:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;strong&gt;RisingWave Cloud:&lt;/strong&gt; The easiest way to run RisingWave. &lt;a href="https://cloud.risingwave.com/" rel="noopener noreferrer"&gt;Sign up for a free account&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Docker:&lt;/strong&gt; &lt;code&gt;docker pull risingwavelabs/risingwave:v2.8.0&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Binary:&lt;/strong&gt; Download from our &lt;a href="https://github.com/risingwavelabs/risingwave/releases" rel="noopener noreferrer"&gt;GitHub releases page&lt;/a&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For a full list of changes, check out the &lt;a href="https://github.com/risingwavelabs/risingwave/releases/tag/v2.8.0" rel="noopener noreferrer"&gt;v2.8 Release Notes&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>database</category>
      <category>dataengineering</category>
      <category>opensource</category>
      <category>sql</category>
    </item>
    <item>
      <title>RisingWave AI Developer Tools: CLI, Agent Skills, and MCP</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Mon, 20 Apr 2026 11:00:32 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/risingwave-ai-developer-tools-cli-agent-skills-and-mcp-4c4p</link>
      <guid>https://dev.to/risingwavelabs/risingwave-ai-developer-tools-cli-agent-skills-and-mcp-4c4p</guid>
      <description>&lt;p&gt;We are entering a world where most database code will be written by AI agents, not humans. This is already happening. Developers ask Claude, Copilot, or Cursor to write their SQL, and the agent produces something that looks right. For traditional databases, it usually is right.&lt;/p&gt;

&lt;p&gt;For streaming databases, it is not.&lt;/p&gt;

&lt;p&gt;AI agents have seen billions of lines of batch SQL in their training data. They know PostgreSQL, MySQL, and SQLite inside out. But streaming SQL is different. The semantics are different. The architecture is different. And the mistakes are subtle -- queries that parse and execute correctly but build pipelines that silently duplicate data, miss events, or never produce results.&lt;/p&gt;

&lt;p&gt;We decided to fix this at the source. Instead of writing better documentation and hoping agents would find it, we built three tools that meet AI agents where they already work.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Cloud CLI That Distributes Knowledge
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://risingwave.com/blog/introducing-risingwave-cloud-cli" rel="noopener noreferrer"&gt;RisingWave Cloud CLI&lt;/a&gt; (&lt;code&gt;rwc&lt;/code&gt;) started as a straightforward infrastructure tool -- manage clusters, configure auth, take snapshots. But we realized the CLI is the one tool that every RisingWave Cloud user already installs. So we turned it into a distribution channel.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;rwc skill &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--target&lt;/span&gt; claude-code
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One command. Your AI coding assistant now understands RisingWave. No repo to clone, no config file to edit, no documentation to bookmark. The knowledge travels with the tool.&lt;/p&gt;

&lt;p&gt;This matters because the hardest problem in developer tooling is not building the tool. It is getting it into the developer's workflow. The CLI was already there.&lt;/p&gt;

&lt;h2&gt;
  
  
  Teaching Agents What LLMs Cannot Learn From Training Data
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://github.com/risingwavelabs/agent-skills" rel="noopener noreferrer"&gt;RisingWave Agent Skills&lt;/a&gt; is the knowledge itself -- an open-source pack of structured reference documents that AI agents load into their context window.&lt;/p&gt;

&lt;p&gt;Why does this need to exist? Because streaming SQL is a tiny fraction of what LLMs have been trained on. When an agent encounters RisingWave, it pattern-matches against what it knows: batch SQL. And the results are predictable.&lt;/p&gt;

&lt;p&gt;It writes &lt;code&gt;CREATE TABLE&lt;/code&gt; when it should write &lt;code&gt;CREATE SOURCE&lt;/code&gt;. It uses &lt;code&gt;date_trunc&lt;/code&gt; when &lt;code&gt;TUMBLE&lt;/code&gt; is the right choice. It sets up CDC by creating one table per source, paying the ingestion cost multiple times instead of sharing a single connection. It forgets to set watermarks, and then wonders why time windows never close.&lt;/p&gt;

&lt;p&gt;These are not bugs the agent can debug. They are architectural mistakes baked into the first line of code.&lt;/p&gt;

&lt;p&gt;The skill pack encodes 14 rules across schema design, materialized views, streaming SQL patterns, sink configuration, and performance optimization. Each rule exists because we have seen the same mistake made by different agents, in different codebases, repeatedly. They follow the open &lt;a href="https://agentskills.io/" rel="noopener noreferrer"&gt;Agent Skills specification&lt;/a&gt; and work with Claude Code, Cursor, GitHub Copilot, Windsurf, Gemini CLI, and 18+ other tools.&lt;/p&gt;

&lt;h2&gt;
  
  
  Giving Agents Eyes Into Your Database
&lt;/h2&gt;

&lt;p&gt;Static knowledge is necessary but not sufficient. An agent that knows the right patterns still cannot see your actual schema, your existing materialized views, or your cluster state.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/risingwavelabs/risingwave-mcp" rel="noopener noreferrer"&gt;RisingWave MCP Server&lt;/a&gt; bridges this gap. It connects AI agents directly to a running RisingWave instance through the Model Context Protocol, exposing 100+ tools for querying, schema inspection, DDL operations, streaming job monitoring, and cluster management.&lt;/p&gt;

&lt;p&gt;The difference is concrete. Without the MCP server, an agent might suggest creating a materialized view that duplicates one you already have. With it, the agent can inspect your existing views, check your source schemas, and write SQL that fits your actual system -- not a hypothetical one.&lt;/p&gt;

&lt;p&gt;It integrates with VS Code Copilot and Claude Desktop, and supports both STDIO and HTTP transports.&lt;/p&gt;

&lt;h2&gt;
  
  
  Three Layers, One Workflow
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fksnlm9lfu2pw8jo7nrlo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fksnlm9lfu2pw8jo7nrlo.png" alt=" " width="800" height="211"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;These tools are independent -- you can use any one without the others. But they are designed to work together. The CLI installs the skills. The skills tell the agent how to think. The MCP server shows the agent what is actually there.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why This Matters Now
&lt;/h2&gt;

&lt;p&gt;The window for shaping how AI agents write streaming SQL is right now. Patterns established today will be reinforced in every future codebase these agents touch. If agents learn the wrong patterns, those patterns will propagate. If they learn the right ones, every developer who uses an AI assistant gets the benefit.&lt;/p&gt;

&lt;p&gt;We are building these tools because we believe the quality of AI-generated streaming code should not depend on the developer's expertise in streaming systems. The agent should know what to do. That is the standard we are working toward.&lt;/p&gt;

&lt;p&gt;All three tools are open source and free to use:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;a href="https://docs.risingwave.com/cloud/install-cli" rel="noopener noreferrer"&gt;Cloud CLI&lt;/a&gt; -- install &lt;code&gt;rwc&lt;/code&gt; and get started&lt;/li&gt;
&lt;li&gt;  &lt;a href="https://github.com/risingwavelabs/agent-skills" rel="noopener noreferrer"&gt;Agent Skills&lt;/a&gt; -- teach your AI assistant streaming SQL&lt;/li&gt;
&lt;li&gt;  &lt;a href="https://github.com/risingwavelabs/risingwave-mcp" rel="noopener noreferrer"&gt;MCP Server&lt;/a&gt; -- connect your agent to a live instance&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  FAQ
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Do I need RisingWave Cloud to use these tools?&lt;/strong&gt; Only the &lt;code&gt;rwc&lt;/code&gt; CLI requires a Cloud account. Agent Skills and the MCP Server work with any RisingWave deployment, including open-source self-hosted.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Which AI coding assistants are supported?&lt;/strong&gt; Agent Skills work with Claude Code, Cursor, GitHub Copilot, Windsurf, Gemini CLI, and 18+ other tools. The MCP Server works with any MCP-compatible client.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Can I contribute to the agent skills?&lt;/strong&gt; Yes. Both the &lt;a href="https://github.com/risingwavelabs/agent-skills" rel="noopener noreferrer"&gt;Agent Skills&lt;/a&gt; and &lt;a href="https://github.com/risingwavelabs/risingwave-mcp" rel="noopener noreferrer"&gt;MCP Server&lt;/a&gt; repos are open source. We welcome contributions from the community.&lt;/p&gt;

</description>
      <category>ai</category>
      <category>database</category>
      <category>sql</category>
      <category>opensource</category>
    </item>
    <item>
      <title>Building a Polymarket-Style Prediction Engine with RisingWave</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Wed, 17 Dec 2025 17:49:44 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/building-a-polymarket-style-prediction-engine-with-risingwave-3672</link>
      <guid>https://dev.to/risingwavelabs/building-a-polymarket-style-prediction-engine-with-risingwave-3672</guid>
      <description>&lt;p&gt;&lt;a href="https://polymarket.com/" rel="noopener noreferrer"&gt;&lt;strong&gt;Polymarket&lt;/strong&gt;&lt;/a&gt; and similar platforms have proved something simple and powerful: &lt;strong&gt;price behaves like probability&lt;/strong&gt;. Traders want to bet on elections, sports, and crypto outcomes with the speed and responsiveness of a modern exchange.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr8xp8dui6x1lzucmdumh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr8xp8dui6x1lzucmdumh.png" alt="Building a Polymarket-Style Prediction Engine with RisingWave" width="800" height="461"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;While the user experience feels simple, a prediction market is actually a distributed system dealing with two conflicting engineering constraints. First, you have the need for real-time price discovery. You must continuously update odds, order books, and liquidity as trades stream in at high volume. Second, you face the &lt;strong&gt;"fan-out" settlement problem&lt;/strong&gt;. When a real-world event resolves (e.g., "Did Candidate X win?"), a single message from an oracle must instantly settle hundreds of thousands of open positions and update user balances without race conditions.&lt;/p&gt;

&lt;p&gt;Most teams attack this with a fragmented stack: Kafka for ingestion, Flink for windowing, Redis for hot state, and Postgres for the ledger. This requires extensive glue code to orchestrate, and often forces the application layer to handle the complex logic of settlement fan-out.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;RisingWave offers a different path.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;It is a Postgres-compatible streaming database. Instead of writing glue code, you describe your entire backend logic (from live odds to settlement and balances) as &lt;strong&gt;Materialized Views&lt;/strong&gt; in SQL. RisingWave maintains these views incrementally, effectively turning your database into a reactive engine that solves the "fan-out" problem automatically.&lt;/p&gt;

&lt;p&gt;In this post, we will walk through how to build a production-grade prediction market architecture, moving from raw streams to instant settlement.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;1. The Anatomy of a Prediction Market&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;To understand the architecture, it helps to first understand the &lt;strong&gt;two data streams&lt;/strong&gt; that drive a prediction market.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Trading Stream: All the Bets&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;The first is the &lt;strong&gt;Trading Stream&lt;/strong&gt;. Every time a user places a bet, whether matched via an AMM or a CLOB (Central Limit Order Book), the system appends a record like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nv"&gt;"user_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;12345&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"market_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"side"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nv"&gt;"YES"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"size"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"price"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;57&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nv"&gt;"2025-06-01T12:34:56Z"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This stream is high volume and continuous. It is what drives the live price you see on the frontend.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Oracle Stream: The Final Answer&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;The second is the &lt;strong&gt;Oracle Stream&lt;/strong&gt;. It is low volume but high impact. At some point, each market receives a resolution event, for example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nv"&gt;"market_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"outcome"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nv"&gt;"YES"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"resolved_at"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nv"&gt;"2025-06-02T03:00:00Z"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This single message decides who won and who lost in that market.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Where the Hard Part Lives: When Streams Meet&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;The core engineering challenge is what happens when these two streams meet.&lt;/p&gt;

&lt;p&gt;An oracle event arrives for &lt;code&gt;market_id = 42&lt;/code&gt;. The system must:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Find every trade for that market&lt;/li&gt;
&lt;li&gt;Compute Profit and Loss (PnL) for each user based on their entry price&lt;/li&gt;
&lt;li&gt;Update user balances safely and atomically&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In a traditional architecture, this often triggers a large batch job or a heavy, locking SQL update once the market resolves.&lt;/p&gt;

&lt;p&gt;With a streaming database, you can instead treat this as a continuous join between the Trading Stream and the Oracle Stream. When the oracle event appears, the fan-out settlement is performed automatically as part of the normal stream processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;2. RisingWave: Streaming SQL with Tiered Storage&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;RisingWave looks and feels like Postgres, but it processes data as it arrives. You connect it to sources like Kafka or database CDC, define Materialized Views (MVs) to express your logic, and then query those views directly.&lt;/p&gt;

&lt;p&gt;Crucially for this use case, RisingWave offloads its internal state to object storage (S3). This "Tiered Storage" architecture means you can maintain the state of millions of open positions or historical trades without being limited by RAM or managing complex RocksDB instances.&lt;/p&gt;

&lt;p&gt;For our market, we will define four core views to handle prices, settlement, risk, and the ledger.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;3. Real-Time Market State&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The most immediate requirement is that when a user buys "YES", the price on the dashboard should tick from 0.50 to 0.51 instantly.&lt;/p&gt;

&lt;p&gt;We start by ingesting the raw trades from Kafka.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;trades&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;trade_id&lt;/span&gt;   &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;user_id&lt;/span&gt;    &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;market_id&lt;/span&gt;  &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;side&lt;/span&gt;       &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;-- 'YES' or 'NO'&lt;/span&gt;
  &lt;span class="k"&gt;size&lt;/span&gt;       &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;price&lt;/span&gt;      &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;ts&lt;/span&gt;         &lt;span class="n"&gt;TIMESTAMPTZ&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'trades'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bootstrap&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'...'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;format&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'json'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To drive the UI, we create a materialized view that calculates the latest price and 24-hour volume. We use &lt;code&gt;last_value&lt;/code&gt; ordered by the timestamp to ensure the price is deterministic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;market_prices&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;last_value&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;last_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;                    &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;                     &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;total_volume_24h&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;trades&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="s1"&gt;'24 hours'&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Because this view is materialized, your API can serve the frontend by simply querying &lt;code&gt;SELECT * FROM market_prices WHERE market_id = 42&lt;/code&gt;. The result is always fresh, eliminating the need to sync data to a separate Redis cache.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;4. Instant Settlement: The Streaming Join&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;This is where the architecture diverges from the traditional approach. Instead of writing a batch script that runs "after the game" to calculate winners, we define settlement as a continuous join between the &lt;code&gt;trades&lt;/code&gt; stream and the &lt;code&gt;oracle_feed&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;First, we define the oracle source:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;oracle_feed&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;market_id&lt;/span&gt;   &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;outcome&lt;/span&gt;     &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;resolved_at&lt;/span&gt; &lt;span class="n"&gt;TIMESTAMPTZ&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'oracle_feed'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;format&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'json'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, we create the settlement engine. RisingWave maintains the state of all open trades. When an outcome message arrives, it immediately matches against that index and emits the calculated PnL for every affected trade.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;settled_trades&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;trade_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="c1"&gt;-- If you bought YES and outcome is YES, you get $1.00 per share.&lt;/span&gt;
  &lt;span class="c1"&gt;-- Otherwise, you lose your principal (price * size).&lt;/span&gt;
  &lt;span class="k"&gt;CASE&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outcome&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'YES'&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;side&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'YES'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outcome&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'NO'&lt;/span&gt;  &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;side&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'NO'&lt;/span&gt;  &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;
    &lt;span class="k"&gt;ELSE&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;END&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;pnl&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;resolved_at&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;trades&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;oracle_feed&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;
  &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;market_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This view is the engine of the platform. It handles the 1-to-N fan-out automatically. There is no loop logic to write and no state consistency to manage manually. As soon as the oracle message hits the Kafka topic, the PnL is calculated and ready for the ledger.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;5. User Balances: The Ledger Pattern&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;A common anti-pattern in SQL is to attempt joining trades, transfers, and settlements all at once to calculate a user's balance. In a streaming context, this is inefficient.&lt;/p&gt;

&lt;p&gt;A more robust approach is the &lt;strong&gt;Ledger Pattern&lt;/strong&gt;. We treat every financial event—whether it's a deposit, a withdrawal, or a trading profit—as a line item in a unified ledger. We can then use a &lt;code&gt;UNION ALL&lt;/code&gt; to combine these streams into a single flow of balance changes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;balance_ledger&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="c1"&gt;-- 1. Cash movements from the transfers stream&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;amount&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;transfers&lt;/span&gt;
&lt;span class="k"&gt;UNION&lt;/span&gt; &lt;span class="k"&gt;ALL&lt;/span&gt;
&lt;span class="c1"&gt;-- 2. Realized PnL from the settled trades view we created above&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pnl&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resolved_at&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;settled_trades&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the streams unified, calculating the live balance becomes a simple aggregation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;user_balances&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;change&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;current_balance&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;balance_ledger&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, to show users their active risk, we can aggregate the trades that have &lt;em&gt;not&lt;/em&gt; yet settled to show their current exposure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;user_positions&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;CASE&lt;/span&gt; &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;side&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'YES'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="k"&gt;size&lt;/span&gt; &lt;span class="k"&gt;ELSE&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt; &lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;net_exposure&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;trades&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;market_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  &lt;strong&gt;6. End-to-End Architecture&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;By pushing the complexity down into the database, the resulting architecture is remarkably clean.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fharnabftd823i3d6qxzc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fharnabftd823i3d6qxzc.png" width="800" height="297"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In a typical stack, the box labeled &lt;code&gt;MV: settled_trades&lt;/code&gt; would be a complex microservice or a heavy Spark/Flink job responsible for the fan‑out settlement logic. Here, that complexity is absorbed by the database’s join operator.&lt;/p&gt;

&lt;p&gt;Similarly, the box labeled &lt;code&gt;MV: user_balances&lt;/code&gt; replaces what is usually a fragile orchestration of updates between Redis and a permanent data store.&lt;/p&gt;

&lt;p&gt;Because RisingWave offloads the internal state of these joins to S3, you can scale to millions of open positions without managing expensive infrastructure. The result is a system where correctness is derived from the stream itself, providing built‑in auditability through the immutable &lt;code&gt;MV: balance_ledger&lt;/code&gt; pattern.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;7. Conclusion&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;To build a prediction market like Polymarket, you need streaming performance for the UI and relational correctness for the money. RisingWave bridges this gap by allowing you to ingest raw streams, define complex business logic using standard SQL, and serve the results directly to your application.&lt;/p&gt;

&lt;p&gt;If you are designing a high-frequency trading application or a prediction market, consider pushing the "hard parts“ (state management and stream joining) down into the database. It allows you to focus on the market mechanics rather than the plumbing.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Get Started with RisingWave&lt;/strong&gt;
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Try RisingWave Today:&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/risingwavelabs/risingwave" rel="noopener noreferrer"&gt;&lt;strong&gt;Download the open-sourced version of RisingWave&lt;/strong&gt;&lt;/a&gt; to deploy on your own infrastructure.&lt;/li&gt;
&lt;li&gt;Get started quickly with &lt;a href="https://cloud.risingwave.com/auth/signin/" rel="noopener noreferrer"&gt;&lt;strong&gt;RisingWave Cloud&lt;/strong&gt;&lt;/a&gt; for a fully managed experience.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

</description>
      <category>tutorial</category>
      <category>database</category>
      <category>datascience</category>
      <category>coding</category>
    </item>
    <item>
      <title>EXPLAIN ANALYZE in RisingWave: Profiling Real‑Time Query Performance</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Tue, 02 Dec 2025 12:58:00 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/explain-analyze-in-risingwave-profiling-real-time-query-performance-5dk3</link>
      <guid>https://dev.to/risingwavelabs/explain-analyze-in-risingwave-profiling-real-time-query-performance-5dk3</guid>
      <description>&lt;p&gt;You’ve meticulously crafted a real-time streaming pipeline in RisingWave. You’ve created a materialized view to power a critical dashboard, but something is wrong. The data on the dashboard is lagging, alerts are delayed, and you suspect a performance bottleneck somewhere in your query.&lt;/p&gt;

&lt;p&gt;You reach for a familiar tool: &lt;code&gt;EXPLAIN&lt;/code&gt;. You check the query plan, and it looks perfectly optimal. The join order is correct, filters are being pushed down, and the distribution strategy makes sense. Yet, the problem persists.&lt;/p&gt;

&lt;p&gt;This is a common frustration in stream processing. A static execution plan only tells you &lt;em&gt;what&lt;/em&gt; the system intends to do, not &lt;em&gt;how well&lt;/em&gt; it's actually doing it under a live workload. Is one of your joins struggling to keep up? Is a specific operator getting overwhelmed and buffering too much data? &lt;code&gt;EXPLAIN&lt;/code&gt; can't answer these questions.&lt;/p&gt;

&lt;p&gt;To bridge this gap between plan and reality, we introduced &lt;code&gt;EXPLAIN ANALYZE&lt;/code&gt;, a powerful diagnostic tool that enriches the execution plan with live, real-time performance statistics.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;What Is &lt;code&gt;EXPLAIN ANALYZE&lt;/code&gt;?&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;While &lt;code&gt;EXPLAIN&lt;/code&gt; shows the logical, physical, and distributed plan for a query, &lt;code&gt;EXPLAIN ANALYZE&lt;/code&gt; goes a step further. It executes the query (or, for a materialized view, attaches to the already-running job) and collects live metrics from each operator in the dataflow graph. With this information, you can stop guessing and start diagnosing.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Metrics Collection and Reporting&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;When you run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="k"&gt;EXPLAIN&lt;/span&gt; &lt;span class="k"&gt;ANALYZE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;my_view&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;RisingWave performs the following steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The &lt;strong&gt;serving node&lt;/strong&gt; sends an RPC to the &lt;strong&gt;meta service&lt;/strong&gt; to fetch fragment assignments and streaming worker locations.&lt;/li&gt;
&lt;li&gt;It sends &lt;strong&gt;two RPC calls&lt;/strong&gt; to each streaming node:

&lt;ul&gt;
&lt;li&gt;The first collects a baseline metric snapshot.&lt;/li&gt;
&lt;li&gt;The second (after a brief delay) captures a follow-up snapshot.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;The frontend computes &lt;strong&gt;deltas&lt;/strong&gt; (e.g., records/sec) from these two samples.&lt;/li&gt;
&lt;li&gt;It maps the collected metrics to a reconstructed &lt;strong&gt;streaming execution graph&lt;/strong&gt;, using fragment metadata.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The result is a per-operator breakdown of live performance, including:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Records per second (RPS)&lt;/strong&gt;: Throughput per operator.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Average output pending ratio&lt;/strong&gt;: How full the output buffer is; indicates potential backpressure or downstream delays.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Actor IDs&lt;/strong&gt;: Actors contributing to each operator.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Example&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;***&lt;/span&gt;&lt;span class="c1"&gt;-- Create a table and a materialized view*CREATE TABLE t(v1 int);&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;m1&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="n"&gt;y&lt;/span&gt; &lt;span class="k"&gt;USING&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;v1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="n"&gt;z&lt;/span&gt; &lt;span class="k"&gt;USING&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;v1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt; &lt;span class="k"&gt;USING&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;v1&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="c1"&gt;-- Analyze the materialized view*EXPLAIN ANALYZE MATERIALIZED VIEW m1;**&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Example output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="k"&gt;identity&lt;/span&gt;                                        &lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="n"&gt;actor_ids&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;output_rps&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;avg_output_pending_ratio&lt;/span&gt;
&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="c1"&gt;------------------------------------------------+-------------+------------+--------------------------*&lt;/span&gt;
 &lt;span class="n"&gt;StreamMaterialize&lt;/span&gt;                              &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;11&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;9&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;41589&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;    &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;
 &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;                               &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;11&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;9&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;41589&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;    &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;81&lt;/span&gt;
    &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;                               &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
       &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamHashJoin&lt;/span&gt;                        &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;13&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;41560&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;    &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;85&lt;/span&gt;
          &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;StreamHashJoin&lt;/span&gt;                     &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;13&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;152&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;      &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;93&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;                   &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;13&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;63&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;                   &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;     &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamFilter&lt;/span&gt;              &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;19&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;17&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;        &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamTableScan&lt;/span&gt;        &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;19&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;17&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;           &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;BatchPlanNode&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;19&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;17&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;           &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Upstream&lt;/span&gt;            &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;19&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;17&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;                   &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;13&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;     &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;                   &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;        &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamFilter&lt;/span&gt;              &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;28&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;27&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;26&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;           &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamTableScan&lt;/span&gt;        &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;28&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;27&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;26&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;              &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;BatchPlanNode&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;28&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;27&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;26&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;│&lt;/span&gt;              &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Upstream&lt;/span&gt;            &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;28&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;27&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;26&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
          &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;                      &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;13&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;51&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;71&lt;/span&gt;
             &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;                      &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
                &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamHashJoin&lt;/span&gt;               &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;24&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;22&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;21&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;51&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;71&lt;/span&gt;
                   &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;24&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;22&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;21&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                   &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
                   &lt;span class="err"&gt;│&lt;/span&gt;     &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamFilter&lt;/span&gt;        &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;36&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;33&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                   &lt;span class="err"&gt;│&lt;/span&gt;        &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamTableScan&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;36&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;33&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                   &lt;span class="err"&gt;│&lt;/span&gt;           &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;BatchPlanNode&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;36&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;33&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                   &lt;span class="err"&gt;│&lt;/span&gt;           &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Upstream&lt;/span&gt;      &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;36&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;33&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                   &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;MergeExecutor&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;24&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;22&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;21&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                      &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Dispatcher&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt;             &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;NaN&lt;/span&gt;
                         &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamFilter&lt;/span&gt;        &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;29&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                            &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;StreamTableScan&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;29&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                               &lt;span class="err"&gt;├─&lt;/span&gt; &lt;span class="n"&gt;BatchPlanNode&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;29&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
                               &lt;span class="err"&gt;└─&lt;/span&gt; &lt;span class="n"&gt;Upstream&lt;/span&gt;      &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;29&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;          &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt; &lt;span class="k"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Practical Use Cases&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;With &lt;code&gt;EXPLAIN ANALYZE&lt;/code&gt;, stream processing engineers can:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pinpoint bottlenecks: Use low &lt;code&gt;output_rps&lt;/code&gt; or high &lt;code&gt;avg_output_pending_ratio&lt;/code&gt; to identify slow or blocked operators.&lt;/li&gt;
&lt;li&gt;Tune query plans: Compare runtime performance across strategies (e.g., local vs. global aggregation, filter pushdown, partitioning schemes).&lt;/li&gt;
&lt;li&gt;Diagnose backpressure: A consistently high pending ratio may indicate downstream congestion or under-provisioned sinks.&lt;/li&gt;
&lt;li&gt;Test at scale: Run synthetic load tests on new materialized views and verify they can scale before rollout.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Try It Today&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;If you're running RisingWave 2.4 or later versions, you can start using &lt;code&gt;EXPLAIN ANALYZE&lt;/code&gt; immediately on materialized views, sinks, and indexes. It’s a low-overhead, high-impact tool to keep your streaming jobs efficient and transparent.&lt;/p&gt;

&lt;p&gt;Want to explore further?&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Check out our &lt;a href="https://docs.risingwave.com/sql/commands/sql-explain-analyze" rel="noopener noreferrer"&gt;&lt;strong&gt;EXPLAIN ANALYZE&lt;/strong&gt;&lt;/a&gt; documentation for detailed syntax and parameters.&lt;/li&gt;
&lt;li&gt;Join the conversation in our &lt;a href="http://go.risingwave.com/slack" rel="noopener noreferrer"&gt;&lt;strong&gt;Slack community&lt;/strong&gt;&lt;/a&gt; to ask questions and connect with others.&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>productivity</category>
      <category>opensource</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>What Are Process-Time Temporal Joins?</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Tue, 02 Dec 2025 03:00:00 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/what-are-process-time-temporal-joins-799</link>
      <guid>https://dev.to/risingwavelabs/what-are-process-time-temporal-joins-799</guid>
      <description>&lt;p&gt;Imagine you run an online store where product prices change frequently. A customer places an order, and five minutes later, you update the price of an item they bought. When you review the order, how do you determine the correct price the customer paid? A &lt;strong&gt;standard database join&lt;/strong&gt; would likely show you the &lt;em&gt;newest&lt;/em&gt; price, which is incorrect for that past order.&lt;/p&gt;

&lt;p&gt;This is a common challenge in &lt;strong&gt;real-time data systems&lt;/strong&gt;. You need to know what the data looked like at the specific moment when the data is processed. In &lt;strong&gt;RisingWave&lt;/strong&gt;, you can solve this easily with &lt;strong&gt;process-time temporal joins&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;This blog post will explain what process-time temporal joins are and demonstrate how to use them to achieve point-in-time correctness in your applications.&lt;/p&gt;

&lt;h3&gt;
  
  
  What Are Process-Time Temporal Joins?
&lt;/h3&gt;

&lt;p&gt;A process-time temporal join is a special type of join that connects a data stream to a table based on the time the system processes each event. Instead of joining with the latest version of a record, it joins with the version of the record that was active at the moment the event from the stream is processed.&lt;/p&gt;

&lt;p&gt;This is perfect for our e-commerce scenario. We can join an &lt;code&gt;orders&lt;/code&gt; stream with a &lt;code&gt;product_prices&lt;/code&gt; table to find the price that was active &lt;em&gt;at the exact moment each order was processed&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhb34b948gqgmyswah9rh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhb34b948gqgmyswah9rh.png" alt="What Are Process-Time Temporal Joins" width="800" height="758"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  A Step-by-Step Demo: Tracking Prices for Orders
&lt;/h3&gt;

&lt;p&gt;Let’s build a real-time materialized view that correctly calculates the total cost of an order using the product price at the time of purchase.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 1: Set up Your Tables
&lt;/h3&gt;

&lt;p&gt;First, we need two data structures. One will be a table to store product prices, which can be updated. The other will be an append-only stream of new orders.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;product_prices&lt;/code&gt; table requires a &lt;code&gt;PRIMARY KEY&lt;/code&gt;. This ensures that new rows with the same &lt;code&gt;product_id&lt;/code&gt; are treated as updates to the existing product's price.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- 1. Tracks product prices with automatic processing time
CREATE TABLE product_prices (
    product_id INT PRIMARY KEY, -- NOTICE: a primary key is needed
    price FLOAT
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;orders&lt;/code&gt; table will represent an append-only stream, meaning we only insert new orders and never update or delete existing ones.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- 2. Contains historical orders
CREATE TABLE orders (
        order_id INT,
        product_id INT,
        quantity INT
) APPEND ONLY; -- NOTICE: orders is a source instead of table
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 2: Insert Sample Data
&lt;/h3&gt;

&lt;p&gt;Let’s add some products and their initial prices. We’ll then immediately update them to simulate a price change.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- Initial prices (we assume that these prices are updated at 08:00)
INSERT INTO product_prices VALUES
(101, 110.0),
(102, 200.0);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you query &lt;code&gt;product_prices&lt;/code&gt; now, you'll only see the latest values (&lt;code&gt;110.0&lt;/code&gt; and &lt;code&gt;200.0&lt;/code&gt;).&lt;/p&gt;

&lt;p&gt;Now, let’s add two customer orders. These will be processed by the system &lt;em&gt;after&lt;/em&gt; the prices have been updated.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- We assume that these orders are processed at 08:30
INSERT INTO orders VALUES
(1, 101, 2),
(2, 102, 1);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 3: Create a Materialized View with a Temporal Join
&lt;/h3&gt;

&lt;p&gt;This is where the magic happens. We’ll create a materialized view that joins &lt;code&gt;orders&lt;/code&gt; to &lt;code&gt;product_prices&lt;/code&gt;. The key is the &lt;code&gt;FOR SYSTEM_TIME AS OF PROCTIME()&lt;/code&gt; syntax. This tells RisingWave: "For each order, join it with the version of the product price that was valid at the moment the order is processed."&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE MATERIALIZED VIEW order_with_price AS
SELECT
    o.order_id,
    o.product_id,
    p.price AS price_at_purchase
FROM orders o
-- Temporal join: Get price version active when the order was processed
JOIN product_prices FOR SYSTEM_TIME AS OF PROCTIME() p
ON o.product_id = p.product_id;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 4: Query the Results and See It in Action
&lt;/h3&gt;

&lt;p&gt;Let’s check our materialized view.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT * FROM order_with_price;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output shows the orders matched with the prices that were active when they were placed. Notice that the last two columns of data are mocked up data used merely for demonstrating the temporal logic and will not appear in the actual results.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;order_id | product_id |price_at_purchase| price_update_time | order_process_time
                                          -- (mock up data)      (mock up data)
----------+------------+-----------------+-------------------+------------------
        1 |        101 |           110   |--    08:00:00     |      08:30:00
        2 |        102 |           200   |--    08:00:00     |      08:30:00
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let’s update the prices again.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- We assume that these prices are updated at 09:00
INSERT INTO product_prices VALUES
(101, 150.0),
(102, 250.0);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we query the view again, the existing results remain unchanged because the temporal join “locks in” the price from when the order was first processed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT * FROM order_with_price;
-- The output is the same as before!
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;But what about a new order? Let’s add one that arrives after the latest price change.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- We assume that these orders are processed at 09:30
INSERT INTO orders VALUES
(3, 102, 3);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Query the view one last time. The new order (ID 3) correctly uses the latest price (&lt;code&gt;150.0&lt;/code&gt;), while the old orders retain their original prices.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT * FROM order_with_price;

-- Final Result
 order_id | product_id | current_price | price_update_time | order_process_time
                                        -- (mock up data)      (mock up data)
----------+------------+---------------+-------------------+-------------------
        1 |        101 |           110 |--    08:00:00     |      08:30:00
        2 |        102 |           200 |--    08:00:00     |      08:30:00
        3 |        102 |           250 |--    09:00:00     |      09:30:00
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Optimizing Your Joins: Append-Only vs. Non-Append-Only
&lt;/h3&gt;

&lt;p&gt;RisingWave is smart about how it handles temporal joins. The type of join it uses depends on the properties of your data streams.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Append-Only Temporal Join: This is what we used in our demo. It’s highly efficient because the left side of the join (&lt;code&gt;orders&lt;/code&gt;) is append-only. RisingWave knows that old orders will never be updated or deleted, so it doesn't need to maintain a large state to track changes. This is ideal for streaming data like events, logs, or orders.&lt;/li&gt;
&lt;li&gt;Non-Append-Only Temporal Join: If the table on the left side can receive updates or deletes (i.e., it’s not strictly append-only), RisingWave can still perform a temporal join. However, it requires more resources because it has to maintain an internal state to manage potential retractions or changes to past results.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For the best performance, use an append-only source or table on the left side of your temporal join whenever possible.&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;Process-time temporal joins are a powerful feature in RisingWave for building applications that require point-in-time accuracy. By using the &lt;code&gt;FOR SYSTEM_TIME AS OF PROCTIME()&lt;/code&gt; syntax, you can ensure that your joins reflect the correct state of your data at the exact moment it was processed. This eliminates a whole class of data consistency problems in real-time systems.&lt;/p&gt;

</description>
      <category>programming</category>
      <category>tutorial</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Convert millisecond price gaps into real profits</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Tue, 25 Nov 2025 07:08:15 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/convert-millisecond-price-gaps-into-real-profits-1kn9</link>
      <guid>https://dev.to/risingwavelabs/convert-millisecond-price-gaps-into-real-profits-1kn9</guid>
      <description>&lt;div class="crayons-card c-embed text-styles text-styles--secondary"&gt;
    &lt;div class="c-embed__content"&gt;
        &lt;div class="c-embed__cover"&gt;
          &lt;a href="https://dev.to/risingwavelabs/building-a-real-time-crypto-arbitrage-monitoring-system-4chf" class="c-link align-middle" rel="noopener noreferrer"&gt;
            &lt;img alt="" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F71s7tnqb6o98ljr5mc6m.png" height="533" class="m-0" width="800"&gt;
          &lt;/a&gt;
        &lt;/div&gt;
      &lt;div class="c-embed__body"&gt;
        &lt;h2 class="fs-xl lh-tight"&gt;
          &lt;a href="https://dev.to/risingwavelabs/building-a-real-time-crypto-arbitrage-monitoring-system-4chf" rel="noopener noreferrer" class="c-link"&gt;
            Building a Real-Time Crypto Arbitrage Monitoring System - DEV Community
          &lt;/a&gt;
        &lt;/h2&gt;
          &lt;p class="truncate-at-3"&gt;
            Arbitrage is a simple strategy. You buy an asset on one exchange where the price is low and sell it...
          &lt;/p&gt;
        &lt;div class="color-secondary fs-s flex items-center"&gt;
            &lt;img alt="favicon" class="c-embed__favicon m-0 mr-2 radius-0" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8j7kvp660rqzt99zui8e.png" width="300" height="299"&gt;
          dev.to
        &lt;/div&gt;
      &lt;/div&gt;
    &lt;/div&gt;
&lt;/div&gt;


</description>
      <category>productivity</category>
      <category>bitcoin</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Building a Real-Time Crypto Arbitrage Monitoring System</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Tue, 25 Nov 2025 07:06:01 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/building-a-real-time-crypto-arbitrage-monitoring-system-4chf</link>
      <guid>https://dev.to/risingwavelabs/building-a-real-time-crypto-arbitrage-monitoring-system-4chf</guid>
      <description>&lt;p&gt;Arbitrage is a simple strategy. You buy an asset on one exchange where the price is low and sell it on another where the price is high.&lt;/p&gt;

&lt;p&gt;In crypto markets, these price differences, or spreads, appear and vanish in milliseconds. If your data pipeline takes five seconds to process a batch of prices, the opportunity is already gone. This post demonstrates how to use &lt;a href="https://github.com/risingwavelabs/risingwave" rel="noopener noreferrer"&gt;RisingWave&lt;/a&gt;—an open-source real-time event streaming platform—to detect arbitrage opportunities with sub-second latency using standard SQL.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;The Engineering Bottleneck&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Arbitrage requires monitoring fragmented liquidity across &lt;a href="https://www.binance.com/en" rel="noopener noreferrer"&gt;Binance&lt;/a&gt;, &lt;a href="https://www.coinbase.com/" rel="noopener noreferrer"&gt;Coinbase&lt;/a&gt;, &lt;a href="https://www.okx.com/" rel="noopener noreferrer"&gt;OKX&lt;/a&gt;, and &lt;a href="https://www.coinbase.com/learn/crypto-basics/what-is-a-dex" rel="noopener noreferrer"&gt;DEXs&lt;/a&gt; simultaneously. This creates three specific engineering hurdles.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Velocity:&lt;/strong&gt; During volatility, you might ingest over 10,000 price ticks per second.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Synchronization:&lt;/strong&gt; You cannot compare a Binance price from 10:00:01 with a Coinbase price from 10:00:05. The comparison must be time-aligned.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Latency:&lt;/strong&gt; The profit potential decays effectively to zero within hundreds of milliseconds.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Traditional ETL pipelines are too slow. We need a system that processes data as it arrives, not in batches.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Why RisingWave Fits&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;RisingWave is a streaming database designed exactly for this velocity. Unlike a traditional Postgres database where a materialized view is a static snapshot, a &lt;strong&gt;RisingWave materialized view updates continuously&lt;/strong&gt;.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It connects natively to event streams like Kafka or WebSockets.&lt;/li&gt;
&lt;li&gt;It calculates results incrementally, processing only new data points.&lt;/li&gt;
&lt;li&gt;It handles time-windowing logic natively.&lt;/li&gt;
&lt;li&gt;It uses SQL, so you don’t need to write complex Java or Flink jobs to define trading strategies.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  How to build the monitor
&lt;/h2&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;1. Ingesting Market Data&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;We assume market data is flowing into a message queue like Kafka. We define two sources: one for Binance and one for Coinbase.&lt;/p&gt;

&lt;p&gt;We treat these Kafka topics as tables. RisingWave consumes the JSON events directly.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;binance_prices&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;DECIMAL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'binance.ticker'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bootstrap&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'localhost:9092'&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;PLAIN&lt;/span&gt; &lt;span class="n"&gt;ENCODE&lt;/span&gt; &lt;span class="n"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;coinbase_prices&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;DECIMAL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'coinbase.ticker'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bootstrap&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'localhost:9092'&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;PLAIN&lt;/span&gt; &lt;span class="n"&gt;ENCODE&lt;/span&gt; &lt;span class="n"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;2. The Core Logic: Detecting the Spread&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;We need to find instances where the price difference for the same symbol crosses a profitability threshold (e.g., 0.5%).&lt;/p&gt;

&lt;p&gt;Crucially, we must ensure we compare prices that exist at roughly the same moment. A price mismatch is not a valid opportunity if the data points are seconds apart.&lt;/p&gt;

&lt;p&gt;We use an interval join condition: ABS(EXTRACT(EPOCH FROM (b.ts - c.ts))) &amp;lt; 1. This restricts matches to price ticks that occurred within one second of each other.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;arbitrage_opportunities&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;binance_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;coinbase_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;spread_percent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;binance_prices&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;coinbase_prices&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt;
  &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;
&lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;ABS&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;EXTRACT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;EPOCH&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;  &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="c1"&gt;-- align within 1s window*WHERE ABS((c.price - b.price) / b.price * 100) &amp;gt; 0.5;  *-- threshold 0.5%*&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As new ticks arrive, RisingWave updates this view instantly. If a spread appears, it shows up here immediately.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;3. Market-Wide Aggregation&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;You may also want to see the global "Best Bid and Offer" (BBO) across all exchanges to visualize market fragmentation.&lt;/p&gt;

&lt;p&gt;We combine streams using &lt;code&gt;UNION ALL&lt;/code&gt; and apply a hopping window via &lt;code&gt;HOP&lt;/code&gt; to calculate spread statistics over short intervals.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;best_bid_offer&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;binance_prices&lt;/span&gt;
           &lt;span class="k"&gt;UNION&lt;/span&gt; &lt;span class="k"&gt;ALL&lt;/span&gt;
           &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;coinbase_prices&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;MIN&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;best_bid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;best_offer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="k"&gt;MIN&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;spread&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;HOP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1 second'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'5 seconds'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;4. Actionable Output (Subscriptions and Sinks)&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Detecting the spread is half the battle. You need to trigger your trading bot immediately.&lt;/p&gt;

&lt;p&gt;Traditionally, you would sink this data into Redis or Kafka and have your bot listen there. RisingWave simplifies this with &lt;a href="https://docs.risingwave.com/serve/subscription" rel="noopener noreferrer"&gt;&lt;strong&gt;Subscriptions&lt;/strong&gt;&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Subscriptions allow your application to listen directly to changes in a materialized view. Your trading bot connects to RisingWave and receives a stream of updates—new arbitrage opportunities—as they are generated. This removes the need for an intermediate message queue, reducing both latency and infrastructure complexity.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;SUBSCRIPTION&lt;/span&gt; &lt;span class="n"&gt;arb_subscription&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;arbitrage_opportunities&lt;/span&gt;
&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;retention&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'1h'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, if you need to archive data for backtesting or integrate with other existing systems, you can still use &lt;a href="https://docs.risingwave.com/sql/commands/sql-create-sink" rel="noopener noreferrer"&gt;&lt;strong&gt;Sinks&lt;/strong&gt;&lt;/a&gt;. For example, you might sink to S3 for long-term storage or to Redis if multiple external services need to access the state.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;SINK&lt;/span&gt; &lt;span class="n"&gt;arb_signals&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;arbitrage_opportunities&lt;/span&gt;
&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'redis'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;primary_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'arb_signals'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'redis://localhost:6379/'&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;PLAIN&lt;/span&gt; &lt;span class="n"&gt;ENCODE&lt;/span&gt; &lt;span class="n"&gt;JSON&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;force_append_only&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'true'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Expanding the Strategy&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Once the pipeline is running, you can use SQL to prototype more complex strategies.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Triangular Arbitrage:&lt;/strong&gt; Join three pairs (BTC/USDT, ETH/BTC, ETH/USDT) within the same exchange.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Funding Rate Arbitrage:&lt;/strong&gt; Monitor divergence between spot prices and perpetual swap funding rates.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Latency Monitoring:&lt;/strong&gt; Compare exchange API timestamps against your ingestion timestamps to detect network lag.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Summary&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Speed is the primary competitive advantage in crypto trading. By moving processing logic into RisingWave, you achieve sub-second detection of market anomalies without complex application code. The database handles the windowing and joining state, allowing you to define strategies in pure SQL.&lt;/p&gt;

</description>
      <category>productivity</category>
      <category>bitcoin</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>From designing Kafka Consumers to building a unified Source–Sink streaming engine that powers real-time data pipelines.</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Wed, 29 Oct 2025 07:12:04 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/from-designing-kafka-consumers-to-building-a-unified-source-sink-streaming-engine-that-powers-4om6</link>
      <guid>https://dev.to/risingwavelabs/from-designing-kafka-consumers-to-building-a-unified-source-sink-streaming-engine-that-powers-4om6</guid>
      <description>&lt;div class="crayons-card c-embed text-styles text-styles--secondary"&gt;
    &lt;div class="c-embed__content"&gt;
        &lt;div class="c-embed__cover"&gt;
          &lt;a href="https://dev.to/risingwavelabs/a-hands-on-guide-to-building-the-speed-layer-of-the-lambda-architecture-3jjh" class="c-link align-middle" rel="noopener noreferrer"&gt;
            &lt;img alt="" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj0w37f2028rjhi10yl9a.png" height="420" class="m-0" width="800"&gt;
          &lt;/a&gt;
        &lt;/div&gt;
      &lt;div class="c-embed__body"&gt;
        &lt;h2 class="fs-xl lh-tight"&gt;
          &lt;a href="https://dev.to/risingwavelabs/a-hands-on-guide-to-building-the-speed-layer-of-the-lambda-architecture-3jjh" rel="noopener noreferrer" class="c-link"&gt;
            A Hands-On Guide to Building the Speed Layer of the Lambda Architecture - DEV Community
          &lt;/a&gt;
        &lt;/h2&gt;
          &lt;p class="truncate-at-3"&gt;
            In the previous article, we introduced the design principles and implementation of the Lambda...
          &lt;/p&gt;
        &lt;div class="color-secondary fs-s flex items-center"&gt;
            &lt;img alt="favicon" class="c-embed__favicon m-0 mr-2 radius-0" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8j7kvp660rqzt99zui8e.png" width="300" height="299"&gt;
          dev.to
        &lt;/div&gt;
      &lt;/div&gt;
    &lt;/div&gt;
&lt;/div&gt;


</description>
      <category>beginners</category>
      <category>architecture</category>
      <category>python</category>
      <category>datascience</category>
    </item>
    <item>
      <title>A Hands-On Guide to Building the Speed Layer of the Lambda Architecture</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Wed, 29 Oct 2025 07:07:44 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/a-hands-on-guide-to-building-the-speed-layer-of-the-lambda-architecture-3jjh</link>
      <guid>https://dev.to/risingwavelabs/a-hands-on-guide-to-building-the-speed-layer-of-the-lambda-architecture-3jjh</guid>
      <description>&lt;p&gt;In the previous article, we introduced the design principles and implementation of the Lambda Architecture.&lt;/p&gt;

&lt;p&gt;We talked about its three layers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Batch Layer&lt;/strong&gt; – slow but precise; processes full historical data to ensure eventual consistency&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Speed Layer&lt;/strong&gt; – handles incremental data in real time&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Serving Layer&lt;/strong&gt; – merges results from both layers and provides a unified query interface&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Starting today, we’ll dive into the &lt;strong&gt;implementation of the Speed Layer&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;We’ll begin by writing a Kafka Consumer and gradually explore the core mechanisms of stream processing.&lt;/p&gt;

&lt;p&gt;In today’s article, we’ll write a Kafka Consumer — a key component of the Speed Layer — responsible for instantly receiving order data and writing it into the Serving DB (while the Batch Layer focuses on processing full historical data each day).&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;⚠️ Important&lt;/p&gt;

&lt;p&gt;All the code in this article is &lt;strong&gt;for educational and conceptual demonstration only (pseudo code)&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;It’s designed to help you understand and solve real-world problems, but it’s &lt;strong&gt;not production-ready&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Focus on grasping the overall architecture and design logic — you can skim over low-level code details.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Step 1: Speed Layer Kafka Consumer: Ingesting Orders into the Serving DB&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Let’s start with the simplest version:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaConsumer&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;

&lt;span class="c1"&gt;# Subscribe to the orders topic
&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;orders&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[Speed Layer] Waiting for fresh orders...&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="nf"&gt;insert_db&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;commit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[Speed Layer] Inserted order &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This logic is straightforward:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka provides a real-time stream of order data&lt;/li&gt;
&lt;li&gt;The Consumer consumes and processes the stream&lt;/li&gt;
&lt;li&gt;The Serving DB stores the processed results&lt;/li&gt;
&lt;li&gt;Core task: write incoming data into storage as fast as possible&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Step 2: Designing the Serving DB&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Here we have two key tables:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;orders_batch_summary&lt;/code&gt; – pre-aggregated historical data computed daily by the Batch Layer&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;orders_realtime&lt;/code&gt; – detailed real-time orders sent by the Speed Layer&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When querying from the dashboard, the system merges both tables while filtering out invalid orders with &lt;code&gt;status = 'removed'&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;total&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;orders_batch_summary&lt;/span&gt;
    &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="s1"&gt;'removed'&lt;/span&gt;

    &lt;span class="k"&gt;UNION&lt;/span&gt; &lt;span class="k"&gt;ALL&lt;/span&gt;

    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="k"&gt;count&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;orders_realtime&lt;/span&gt;
    &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="s1"&gt;'removed'&lt;/span&gt;
    &lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;status&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, as the company grows, more Consumers are written independently by different developers, each with their own logic. Over time, the codebase becomes tangled and unmaintainable.&lt;/p&gt;

&lt;p&gt;We need to &lt;strong&gt;refactor&lt;/strong&gt; — to establish a unified Stream Processing architecture so everyone can build upon a consistent framework.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Designing a Source Abstraction Layer&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;On a team project, different Kafka Consumer implementations can make integration harder.&lt;/p&gt;

&lt;p&gt;The solution is to define a unified &lt;strong&gt;Source Interface&lt;/strong&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Source Architecture Design&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scss"&gt;&lt;code&gt;    &lt;span class="err"&gt;┌─────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="nt"&gt;BaseSource&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;◄──&lt;/span&gt; &lt;span class="nt"&gt;Abstract&lt;/span&gt; &lt;span class="nt"&gt;Interface&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;             &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nt"&gt;run&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;     &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└─────────────┘&lt;/span&gt;
           &lt;span class="err"&gt;△&lt;/span&gt;
           &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="nt"&gt;implements&lt;/span&gt;
    &lt;span class="err"&gt;┌─────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;&lt;span class="nt"&gt;KafkaSource&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;◄──&lt;/span&gt; &lt;span class="nt"&gt;Concrete&lt;/span&gt; &lt;span class="nt"&gt;Implementation&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;             &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nt"&gt;run&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;     &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└─────────────┘&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;Step-by-Step Explanation of the Source Core&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Step 1: Define the BaseSource Abstract Interface&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;abc&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;abstractmethod&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;

    &lt;span class="nd"&gt;@abstractmethod&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Each Source must have a unique &lt;code&gt;name&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;run()&lt;/code&gt; is abstract — forcing subclasses to implement it&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 2: Initialize &lt;code&gt;SimpleKafkaSource&lt;/code&gt;&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimpleKafkaSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;broker_address&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;broker_address&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;broker_address&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_default_handler&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Inherits from &lt;code&gt;BaseSource&lt;/code&gt; to conform to a unified interface&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;message_handler&lt;/code&gt; is replaceable, offering flexibility in message processing logic&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 3: Set Up the Kafka Consumer&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_setup_consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;broker_address&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;simple-source-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;auto_offset_reset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;latest&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;value_deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;group_id&lt;/code&gt; is automatically generated to avoid conflicts&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;auto_offset_reset='latest'&lt;/code&gt; ensures consumption starts from the latest messages&lt;/li&gt;
&lt;li&gt;Automatic JSON deserialization&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 4: Core Runtime Logic&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_setup_consumer&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# initialize Consumer
&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;  &lt;span class="c1"&gt;# continuously listen for messages
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;message_handler&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;offset&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;
        &lt;span class="p"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Execution Flow:&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initialize Kafka Consumer&lt;/li&gt;
&lt;li&gt;Continuously read messages from the Topic&lt;/li&gt;
&lt;li&gt;Wrap messages in a standard format&lt;/li&gt;
&lt;li&gt;Call &lt;code&gt;message_handler&lt;/code&gt; for processing&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The Source only handles &lt;strong&gt;data ingestion&lt;/strong&gt;; message processing logic is injected externally through &lt;code&gt;message_handler&lt;/code&gt;, enabling high flexibility.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;abc&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;abstractmethod&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Callable&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaConsumer&lt;/span&gt;

&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Base abstract class for all Sources&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_running&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;

    &lt;span class="nd"&gt;@abstractmethod&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Main execution method&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Stop the Source&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_running&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Source &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; stopped&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimpleKafkaSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Simple Kafka Source implementation&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;broker_address&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;consumer_group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;message_handler&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Callable&lt;/span&gt;&lt;span class="p"&gt;[[&lt;/span&gt;&lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;broker_address&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;broker_address&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer_group&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer_group&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;simple-source-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_default_handler&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_default_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;] Received message: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_setup_consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;broker_address&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer_group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;auto_offset_reset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;latest&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;value_deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;key_deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Consumer setup for topic: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;, group: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer_group&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Failed to setup consumer: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;raise&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Starting Source &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; for topic &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_setup_consumer&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_running&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;

        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_running&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;message_batch&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout_ms&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;topic_partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messages&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;message_batch&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
                    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_running&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                            &lt;span class="k"&gt;break&lt;/span&gt;
                        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;message_handler&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;partition&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;offset&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;
                            &lt;span class="p"&gt;})&lt;/span&gt;
                        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error processing message: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;KeyboardInterrupt&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Received interrupt signal&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error in run loop: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;finally&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Source &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; finished&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Designing the Sink Abstraction Layer&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;In the Speed Layer architecture, &lt;strong&gt;Sources&lt;/strong&gt; handle data input, while &lt;strong&gt;Sinks&lt;/strong&gt; handle data output.&lt;/p&gt;

&lt;p&gt;To avoid inconsistent implementations, we define a unified Sink interface.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Sink Architecture Design&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scss"&gt;&lt;code&gt;    &lt;span class="err"&gt;┌─────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="nt"&gt;BaseSink&lt;/span&gt;   &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;◄──&lt;/span&gt; &lt;span class="nt"&gt;Abstract&lt;/span&gt; &lt;span class="nt"&gt;Interface&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;             &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nt"&gt;write&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;   &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└─────────────┘&lt;/span&gt;
           &lt;span class="err"&gt;△&lt;/span&gt;
           &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="nt"&gt;implements&lt;/span&gt;
    &lt;span class="err"&gt;┌──────────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;&lt;span class="nt"&gt;SimplePostgreSQL&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;◄──&lt;/span&gt; &lt;span class="nt"&gt;Concrete&lt;/span&gt; &lt;span class="nt"&gt;Implementation&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;&lt;span class="nt"&gt;Sink&lt;/span&gt;              &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nt"&gt;write&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;        &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└──────────────────┘&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;Step-by-Step Explanation of the Sink Core&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Step 1: Define the Base Sink Interface&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;abc&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;abstractmethod&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;

    &lt;span class="nd"&gt;@abstractmethod&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;  &lt;span class="c1"&gt;# default no-op
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Each Sink must have a unique &lt;code&gt;name&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;write()&lt;/code&gt; handles actual data writes&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;setup()&lt;/code&gt; is optional and can be overridden&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 2: Initialize the Simple PostgreSQL Sink&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimplePostgreSQLSink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;host&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dbname&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dbname&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;table_name&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Inherits from &lt;code&gt;BaseSink&lt;/code&gt; for interface consistency&lt;/li&gt;
&lt;li&gt;Stores database connection information&lt;/li&gt;
&lt;li&gt;Lazy connection initialization (&lt;code&gt;connection = None&lt;/code&gt;)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 3: Core Logic of &lt;code&gt;write()&lt;/code&gt;&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Automatically detect columns and insert into DB
&lt;/span&gt;    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{})&lt;/span&gt;
    &lt;span class="c1"&gt;# ... dynamically generate SQL and insert
&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Automatically detects field structures in &lt;code&gt;message['value']&lt;/code&gt;, dynamically generates &lt;code&gt;INSERT&lt;/code&gt; SQL, and writes to the database.&lt;/p&gt;

&lt;p&gt;This allows the Sink to adapt to various data schemas automatically.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Complete Sink Implementation&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;abc&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;abstractmethod&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;

&lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt;
    &lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;psycopg2.extras&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Json&lt;/span&gt;
    &lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sql&lt;/span&gt;
&lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;ImportError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;psycopg2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Warning: psycopg2 not installed. Run: pip install psycopg2-binary&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ABC&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Base abstract class for all Sinks&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;

    &lt;span class="nd"&gt;@abstractmethod&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;]):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Write a message&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Setup connection&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Close connection&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimplePostgreSQLSink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;PostgreSQL Sink with automatic schema detection&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;host&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;port&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dbname&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dbname&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;user&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;table_name&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;raise&lt;/span&gt; &lt;span class="nc"&gt;ImportError&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;psycopg2 is required&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Connected to PostgreSQL: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Dict&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;]):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Automatically detect fields and write to PostgreSQL&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{})&lt;/span&gt;
        &lt;span class="c1"&gt;# ... dynamic field detection and SQL execution logic
&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PostgreSQL connection closed&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Simple Streaming Engine: Unified Management Layer&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Between the &lt;strong&gt;Source&lt;/strong&gt; (data input) and the &lt;strong&gt;Sink&lt;/strong&gt; (data output), we need a unified management layer to handle orchestration, monitoring, and lifecycle management.&lt;/p&gt;

&lt;p&gt;This component is the &lt;strong&gt;SimpleStreamingEngine&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Simple Streaming Engine Architecture Design&lt;/strong&gt;
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scss"&gt;&lt;code&gt;    &lt;span class="err"&gt;┌─────────────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;&lt;span class="nt"&gt;SimpleStreamingEngine&lt;/span&gt;&lt;span class="err"&gt;│&lt;/span&gt;  &lt;span class="err"&gt;◄──&lt;/span&gt; &lt;span class="nt"&gt;Central&lt;/span&gt; &lt;span class="nt"&gt;Manager&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;                     &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="nt"&gt;add_source&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;    &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="nt"&gt;add_sink&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;      &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nt"&gt;run&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;          &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└─────────────────────┘&lt;/span&gt;
           &lt;span class="err"&gt;│&lt;/span&gt;
           &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="nt"&gt;manages&lt;/span&gt;
           &lt;span class="err"&gt;▼&lt;/span&gt;
    &lt;span class="err"&gt;┌──────────────┐&lt;/span&gt;    &lt;span class="err"&gt;┌──────────────┐&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="nt"&gt;Source&lt;/span&gt;    &lt;span class="err"&gt;│───▶│&lt;/span&gt;     &lt;span class="nt"&gt;Sink&lt;/span&gt;     &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt;              &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="err"&gt;│&lt;/span&gt;              &lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;│&lt;/span&gt; &lt;span class="nt"&gt;KafkaSource&lt;/span&gt;  &lt;span class="err"&gt;│&lt;/span&gt;    &lt;span class="err"&gt;│&lt;/span&gt;&lt;span class="nt"&gt;PostgreSQLSink&lt;/span&gt;&lt;span class="err"&gt;│&lt;/span&gt;
    &lt;span class="err"&gt;└──────────────┘&lt;/span&gt;    &lt;span class="err"&gt;└──────────────┘&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Step-by-Step Breakdown of the Simple Streaming Engine Core Code&lt;/strong&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Step 1: Initializing the Simple Streaming Engine&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimpleStreamingEngine&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;simple-streaming-app&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;  &lt;span class="c1"&gt;# List of Sources
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;    &lt;span class="c1"&gt;# List of Sinks
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The &lt;code&gt;SimpleStreamingEngine&lt;/code&gt; manages two lists: &lt;strong&gt;Sources&lt;/strong&gt; and &lt;strong&gt;Sinks&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;It provides unified registration interfaces for both.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Step 2: Registering Sources and Sinks&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_source&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_sink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Key Points:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Simple list-based management supporting multiple sources and sinks.&lt;/li&gt;
&lt;li&gt;Follows a unified interface — any class implementing &lt;code&gt;BaseSource&lt;/code&gt; or &lt;code&gt;BaseSink&lt;/code&gt; can be registered.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Step 3: Core Execution Logic&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Initialize all sinks
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c1"&gt;# Set message handler for each source
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_create_message_handler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Start consuming data
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Execution Flow:&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initialize connections for all sinks.&lt;/li&gt;
&lt;li&gt;Assign message handlers to sources.&lt;/li&gt;
&lt;li&gt;Start all sources to begin consuming data.&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Step 4: Message Handler Core Logic&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_create_message_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="c1"&gt;# Forward message to all sinks
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Detailed Data Flow Explanation:&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;When SimpleStreamingEngine starts:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Inside SimpleStreamingEngine.run()
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_create_message_handler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Inject handler
&lt;/span&gt;    &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Start source
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;When Source receives data:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Inside SimpleKafkaSource.run()
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;  &lt;span class="c1"&gt;# Fetch messages from Kafka
&lt;/span&gt;    &lt;span class="n"&gt;formatted_message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;message_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;formatted_message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Call injected handler
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;When handler forwards messages:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Handler returned by _create_message_handler()
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;  &lt;span class="c1"&gt;# message = formatted data from source
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Overall Data Flow:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;Kafka&lt;/code&gt; → &lt;code&gt;Source.run()&lt;/code&gt; → &lt;code&gt;message_handler()&lt;/code&gt; → &lt;code&gt;Sink.write()&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Key Point:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;SimpleStreamingEngine&lt;/code&gt; uses &lt;strong&gt;function injection&lt;/strong&gt; so that sources are unaware of sinks, achieving complete &lt;strong&gt;decoupling&lt;/strong&gt; between components.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Why Do We Need the Simple Streaming Engine?&lt;/strong&gt;
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Decoupled Design:&lt;/strong&gt; Sources and sinks are fully independent and interchangeable.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Scalability:&lt;/strong&gt; Supports multiple sinks simultaneously (e.g., PostgreSQL + Elasticsearch).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Unified Management:&lt;/strong&gt; Offers a consistent API for registration and execution.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Complete Simple Streaming Engine Code&lt;/strong&gt;
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;.source&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseSource&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;.sink&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseSink&lt;/span&gt;

&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimpleStreamingEngine&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    A minimal streaming processing engine
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;simple-streaming-engine&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;  &lt;span class="c1"&gt;# List of sources
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;      &lt;span class="c1"&gt;# List of sinks
&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_source&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BaseSource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        Register a source with the streaming engine
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_sink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BaseSink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        Register a sink with the streaming engine
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        Start the streaming engine and process data streams
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="c1"&gt;# Initialize all sinks
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="c1"&gt;# Assign message handler to each source
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sources&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_create_message_handler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Start consuming data
&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_create_message_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        Create a message handler that dispatches data to all sinks
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;sink&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_sinks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Putting It All Together: Automatic Data Flow in Action&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;At this point, we have:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Source:&lt;/strong&gt; Ingest data from Kafka&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Sink:&lt;/strong&gt; Deliver data into PostgreSQL&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;SimpleStreamingEngine:&lt;/strong&gt; Connects, manages, and orchestrates both&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Once assembled, we can kick off the &lt;strong&gt;automatic end-to-end data flow&lt;/strong&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# 1. Create the SimpleStreamingEngine
&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SimpleStreamingEngine&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt;

&lt;span class="c1"&gt;# 2. Create a Kafka Source
&lt;/span&gt;&lt;span class="n"&gt;orders_source&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SimpleKafkaSource&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt;

&lt;span class="c1"&gt;# 3. Create a PostgreSQL Sink
&lt;/span&gt;&lt;span class="n"&gt;pg_sink&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SimplePostgreSQLSink&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt;

&lt;span class="c1"&gt;# 4. Assemble and start
&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_source&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orders_source&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_sink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pg_sink&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Start processing: Kafka → PostgreSQL
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Summary&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;In this section, we explored the &lt;strong&gt;core implementation of the Speed Layer&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The &lt;strong&gt;Batch Layer&lt;/strong&gt; provides reliable historical data processing.&lt;/li&gt;
&lt;li&gt;The &lt;strong&gt;Speed Layer&lt;/strong&gt; delivers real-time responsiveness and streaming data handling.&lt;/li&gt;
&lt;li&gt;Without the Speed Layer, true real-time capability in the Lambda Architecture is impossible.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Through the &lt;strong&gt;Source–Sink–SimpleStreamingEngine&lt;/strong&gt; architecture, we built:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A unified data processing interface&lt;/li&gt;
&lt;li&gt;A scalable stream processing framework&lt;/li&gt;
&lt;li&gt;A fully functional Speed Layer implementation&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Day 5 Preview: Tackling Performance Bottlenecks&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;At first, the system runs smoothly — the consumer processes order data without issue.&lt;/p&gt;

&lt;p&gt;But under heavy traffic:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The console starts showing latency warnings.&lt;/li&gt;
&lt;li&gt;The consumer’s processing power hits its limit.&lt;/li&gt;
&lt;li&gt;Orders begin to queue up for processing.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In the next part, we’ll explore &lt;strong&gt;Speed Layer performance challenges&lt;/strong&gt; and how to optimize for &lt;strong&gt;high-throughput&lt;/strong&gt; workloads.&lt;/p&gt;

</description>
      <category>beginners</category>
      <category>architecture</category>
      <category>python</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Fixing the Half-Transaction Problem in Real-Time CDC</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Tue, 14 Oct 2025 06:43:59 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/fixing-the-half-transaction-problem-in-real-time-cdc-57o4</link>
      <guid>https://dev.to/risingwavelabs/fixing-the-half-transaction-problem-in-real-time-cdc-57o4</guid>
      <description>&lt;p&gt;Modern real-time data systems rely heavily on Change Data Capture (CDC) to power downstream stream processing. CDC continuously captures &lt;code&gt;INSERT&lt;/code&gt;, &lt;code&gt;UPDATE&lt;/code&gt;, and &lt;code&gt;DELETE&lt;/code&gt; operations from databases, allowing downstream systems to subscribe to these event streams to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Build real-time reports and monitoring dashboards&lt;/li&gt;
&lt;li&gt;Drive risk control and alerting logic&lt;/li&gt;
&lt;li&gt;Synchronize data to data warehouses, search engines, or caches&lt;/li&gt;
&lt;li&gt;Support applications like real-time analytics, recommendations, and trade matching&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;However, a critical issue is often overlooked: &lt;strong&gt;transactional boundaries&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Database writes are transactional. But when these changes are broken down into individual CDC events, downstream systems might split a single transaction, leading to the processing and exposure of “half-finished” data. Such transient inconsistencies can cause fluctuating reports, false alarms, and even the ingestion of incomplete data into data warehouses.&lt;/p&gt;

&lt;p&gt;This article details the common problems with transactional boundaries when processing CDC streams. We will then explore how to recognize and respect these boundaries in practice to guarantee atomicity and consistency, ensuring that materialized views and join queries always reflect the true, complete state of the database.&lt;/p&gt;

&lt;h2&gt;
  
  
  Two Examples
&lt;/h2&gt;

&lt;p&gt;Consider a typical payment scenario where a user completes a transfer, and the system executes a transaction to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Debit an account&lt;/li&gt;
&lt;li&gt;Credit another account&lt;/li&gt;
&lt;li&gt;Insert a record into an audit table&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If we send each &lt;code&gt;UPDATE&lt;/code&gt; and &lt;code&gt;INSERT&lt;/code&gt; event downstream as soon as it's received, a materialized view might see the debit before the credit, or the audit log before the balances are updated. For real-time risk control and monitoring systems, this could mean:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;False alarms&lt;/strong&gt;: A risk control rule sees an anomalous balance after the debit but before the credit, mistakenly flagging it as a fund loss.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incorrect analysis&lt;/strong&gt;: A real-time report shows a decrease in total assets, leading to wrong decisions by operations or traders.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data corruption&lt;/strong&gt;: Writing data to a warehouse mid-transaction might only capture half of the transaction, breaking consistency and foreign key constraints.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let’s look at another join scenario. Suppose we have a real-time job that joins an &lt;code&gt;orders&lt;/code&gt; table with a &lt;code&gt;payments&lt;/code&gt; table to calculate Gross Merchandise Volume (GMV):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;amount&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;payments&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If a transaction first inserts an order and then a payment record, but the CDC stream is split by a Barrier — with the order landing in one Epoch and the payment in the next — the join’s intermediate state could return an empty result for a cycle because the payment hasn’t arrived yet. A downstream dashboard would show a &lt;strong&gt;temporary drop in GMV&lt;/strong&gt;, triggering unnecessary alerts or manual investigations.&lt;/p&gt;

&lt;p&gt;This problem is particularly critical in domains like finance, payments, e-commerce, and logistics:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Transaction Monitoring&lt;/strong&gt;: Trade matching, payment processing, and risk validation must be based on complete transactions, not partial data.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Real-time Reporting&lt;/strong&gt;: Metrics like GMV, inventory levels, and warehouse movements require transactional consistency; otherwise, they will fluctuate, misleading operations.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Risk Control &amp;amp; Alerting&lt;/strong&gt;: Partial transactions can trigger false alarms, leading to frozen funds or intercepted orders, which negatively impacts the user experience.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Therefore, preserving transactional atomicity and order within CDC streams is fundamental to ensuring the reliability of real-time computation and analysis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Barriers and Epochs: The “Commit Points” of Stream Processing
&lt;/h2&gt;

&lt;p&gt;To understand the problem of transactional boundaries, we must first understand how stream processing systems organize data and ensure consistency. Unlike batch processing, stream processing deals with an infinite stream of events, making it impossible to simply “commit” everything at once as in a database transaction. To achieve consistency and fault tolerance, these systems introduce the concept of a &lt;strong&gt;Barrier&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;A Barrier is a special control message injected into the data stream. It carries no business data but propagates through the entire stream graph. When an operator receives a barrier, it first processes all data in its buffer, then snapshots its state, and finally passes the barrier downstream. Once all operators in the topology have received the barrier, it signifies that all data up to that point has been fully processed and can be safely made visible. The current state can also be persisted as a consistent checkpoint.&lt;/p&gt;

&lt;p&gt;The data between two consecutive barriers is called an &lt;strong&gt;Epoch&lt;/strong&gt;. You can think of it as a “micro-batch” in stream processing:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Within an Epoch, an operator’s state is continuously updated, but the output is buffered until the epoch is complete.&lt;/li&gt;
&lt;li&gt;The boundary of an Epoch is the barrier, which acts like a commit point in a database.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This design allows stream processing systems to achieve &lt;strong&gt;exactly-once&lt;/strong&gt; semantics (ensuring each event is processed only once) and recover from the latest checkpoint after a failure, avoiding data loss or duplicate processing.&lt;/p&gt;

&lt;p&gt;However, Barriers are typically injected based on a fixed &lt;strong&gt;schedule&lt;/strong&gt; (e.g., every second) or &lt;strong&gt;event count&lt;/strong&gt; (e.g., after every N records). They are completely unaware of when a transaction in the upstream database begins or ends. If a transaction’s updates span a barrier, they are split into two Epochs, causing temporary inconsistencies in downstream states and materialized views. This is the “half-a-transaction” problem.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Technical Challenge: Conflicts Between Barriers and Transactions
&lt;/h2&gt;

&lt;p&gt;We now understand the roles of Barriers and Epochs — they guarantee exactly-once semantics and recoverability, but they are oblivious to upstream transactional boundaries. Herein lies the problem: Barriers are injected periodically, while transaction commits are unpredictable. If a transaction spans two Barriers, it gets split into two Epochs.&lt;/p&gt;

&lt;p&gt;Imagine a typical bank transfer: first a debit, then a credit, then an audit log entry. If the debit event falls into one Epoch, while the credit and log events fall into the next, downstream materialized views and joins will show incorrect results for a period: Account A’s balance decreases, Account B’s balance remains unchanged, and the total assets mysteriously shrink. This not only triggers false risk alerts but can also corrupt the downstream data warehouse by causing ETL jobs to write incomplete data.&lt;/p&gt;

&lt;p&gt;Cross-table transactions make things even worse. CDC often splits changes from different tables into separate, parallel streams. Without unified coordination, a join operator might see an update from Table A before seeing the corresponding update from Table B, causing the join to return empty or partially matched results in some windows. For GMV calculations, order-payment reconciliation, or inventory-shipment consistency analysis, such transient inconsistencies are unacceptable.&lt;/p&gt;

&lt;p&gt;Therefore, the real challenge is this: &lt;strong&gt;how can we aggregate all events of a transaction into the same Epoch to ensure atomic commits, without breaking the fault-tolerance semantics of stream processing&lt;/strong&gt;? This also requires handling cross-table transactions by merging changes from multiple tables into a single logical stream; otherwise, downstream joins cannot guarantee consistency.&lt;/p&gt;

&lt;h2&gt;
  
  
  RisingWave’s Solution: Perceiving and Respecting Transactional Boundaries
&lt;/h2&gt;

&lt;p&gt;To ensure that CDC streams correctly reflect the transaction semantics of the upstream database, RisingWave’s design considers several key aspects: ensuring input stream order, preventing Barriers from splitting transactions, and handling cross-table transactions, schema evolution, and fault tolerance. Let’s dive into how RisingWave tackles these challenges.&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Acquiring a Strictly Ordered Transaction Stream
&lt;/h3&gt;

&lt;p&gt;The first step is to ensure the input stream is correctly ordered. While the change log of an upstream database is inherently ordered, middleware like a Debezium Connector with Kafka can split changes from different tables into different topics. With transaction metadata in a separate topic, the downstream system must reassemble the transaction, which adds complexity and introduces risks of out-of-order events and delays.&lt;/p&gt;

&lt;p&gt;RisingWave embeds the &lt;strong&gt;Debezium Embedded Engine&lt;/strong&gt;, running the parsing logic directly within the connector node. This provides a single, strictly linear stream of events whose order is identical to the commit order of the upstream database, complete with transaction markers like &lt;code&gt;BEGIN&lt;/code&gt;, &lt;code&gt;COMMIT&lt;/code&gt;, and &lt;code&gt;ROLLBACK&lt;/code&gt;. As a result, the downstream system doesn't need any extra "assembly" logic; all transaction events are naturally complete and contiguous.&lt;/p&gt;

&lt;p&gt;Technically, this step eliminates the complexities of multi-threaded consumption and multi-topic merging, allowing for precise perception of transactional boundaries. This is the foundation for everything that follows.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. Pausing Barriers to Keep Transactions in the Same Epoch
&lt;/h3&gt;

&lt;p&gt;With an ordered transaction stream, the next problem is preventing barriers from splitting transactions. RisingWave’s approach is straightforward: &lt;strong&gt;pause the propagation of barriers during a transaction&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Specifically, when the Source Executor receives a &lt;code&gt;BEGIN&lt;/code&gt; message, it enters a "transaction mode." All incoming events are buffered in memory instead of being immediately dispatched downstream. Any arriving Barrier is propagated downstream, but without any associated events. Only upon receiving a &lt;code&gt;COMMIT&lt;/code&gt; message are all buffered events atomically flushed. Downstream operators then immediately consume these events, update their state, and generate a new Epoch snapshot.&lt;/p&gt;

&lt;p&gt;The key advantages of this design are its simplicity and correctness:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Atomicity&lt;/strong&gt;: All events within a transaction are either completely invisible or become visible all at once.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Consistency&lt;/strong&gt;: Materialized views and joins always reflect the true state of the database, without any intermediate “half-transaction” states.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Recoverability&lt;/strong&gt;: Since Barriers only advance after a transaction is complete, checkpoints always align with transaction boundaries. Upon recovery, transactions won’t be lost or re-applied.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Of course, this approach has a trade-off: a particularly large transaction can delay Barrier propagation, causing an Epoch to last longer than usual, which in turn delays checkpoints and downstream visibility. However, we believe this is a reasonable trade-off. A delay of a few hundred milliseconds, or even a few seconds, is more acceptable than exposing partial transactions.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Multi-Table CDC Source for Cross-Table Transaction Guarantees
&lt;/h3&gt;

&lt;p&gt;Ensuring single-table transactionality is not enough. In real-world business scenarios, multi-table transactions are common: updating an order’s status, inserting a payment record, and modifying inventory all within a single transaction. If these tables are ingested using different sources, the events within the transaction will be scattered across different streams, and downstream joins cannot guarantee consistency.&lt;/p&gt;

&lt;p&gt;To address this, RisingWave introduces the multi-table CDC source. Users can specify multiple tables within a single source, and RisingWave will use one connection to subscribe to their change logs. This way, all events within a transaction appear in the same physical stream with their order fully preserved.&lt;/p&gt;

&lt;p&gt;For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;my_cdc&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;connector&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'postgres-cdc'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;database&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'mydb'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'public.orders, public.payments, public.inventory'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;my_cdc&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="s1"&gt;'public.orders'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;payments&lt;/span&gt; &lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;my_cdc&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="s1"&gt;'public.payments'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;inventory&lt;/span&gt; &lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="n"&gt;my_cdc&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="s1"&gt;'public.inventory'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, if a transaction updates all three tables simultaneously, these changes will be buffered and flushed together, landing in the same Epoch. Downstream joins and aggregations will see an atomically consistent result.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Streaming Graph Implementation Details
&lt;/h3&gt;

&lt;p&gt;From an implementation perspective, this means that when RisingWave builds a streaming graph, it places the source node and all its dependent materialized views into the same “transaction domain.” A Barrier is intercepted upon entering this domain and only propagates downstream after the transaction is complete. This ensures the integrity of the transaction and keeps checkpoints consistently aligned.&lt;/p&gt;

&lt;p&gt;RisingWave’s Meta Service manages the lifecycle of these “transaction domains.” During a DDL transaction, if a user creates multiple tables that reference the same CDC source, the system recognizes this and runs them under the same physical source job, sharing a single connection and barrier stream.&lt;/p&gt;

&lt;h3&gt;
  
  
  5. Schema Evolution and Transactional Boundaries
&lt;/h3&gt;

&lt;p&gt;Transaction awareness also simplifies schema evolution. When an upstream table’s schema changes (e.g., adding a new column), a user can execute &lt;code&gt;ALTER TABLE&lt;/code&gt; in RisingWave. The system will replan the execution graph at the next transaction boundary, ensuring the schema change and data processing remain consistent. This avoids the awkward situation where half a transaction uses the old schema and the other half uses the new one.&lt;/p&gt;

&lt;h3&gt;
  
  
  6. Trade-offs: Large Transactions and Latency
&lt;/h3&gt;

&lt;p&gt;While pausing Barriers guarantees transactional atomicity, it has an inevitable side effect: if a transaction is very large, the time needed to receive and buffer all its events might exceed a normal Barrier interval. This forces events that could have been spread across multiple epochs to be processed atomically in a single, longer epoch, thus increasing the system’s end-to-end latency.&lt;/p&gt;

&lt;p&gt;We evaluated several options during the design phase:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Propagate Barriers early&lt;/strong&gt;: Send the Barrier downstream first and then send the data after the transaction commits. This maintains a fixed checkpoint interval but exposes an incomplete transaction downstream, destroying consistency.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Optimistic execution with transaction rollback&lt;/strong&gt;: Send events downstream for preliminary computation, confirm visibility upon transaction commit, or roll back otherwise. This requires almost all stateful operators to support multi-version storage and rollback logic, which is extremely complex.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Strict buffering with delayed Barriers&lt;/strong&gt;: Simple and direct, ensuring a one-to-one alignment between transactions and Epochs at the cost of increased latency.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;RisingWave chose the third option because it guarantees consistency without introducing additional state management complexity. We found that in real-world scenarios, large and cross-table transactions are limited, and most transactions complete within a normal Barrier interval, so the added latency is acceptable. For the rare, extremely large transaction, trading latency for atomicity is a reasonable engineering trade-off.&lt;/p&gt;

&lt;h3&gt;
  
  
  7. Recovery and Fault Tolerance
&lt;/h3&gt;

&lt;p&gt;Another benefit of this design is simpler recovery logic. When the system fails, RisingWave recovers from the snapshot corresponding to the most recent Barrier, which is always located after a transaction boundary. This ensures that committed transactions are not re-applied and that no partial transaction data is lost upon recovery. In contrast, if a Barrier were to land in the middle of a transaction, recovery would require special logic to skip events that have already been sent, making the process complex and error-prone.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion: Making CDC Reliable
&lt;/h2&gt;

&lt;p&gt;Correctly processing CDC data isn’t just about pushing individual change events downstream; it’s about ensuring that the combination of these changes is consistent with the true state of the database. Without respecting transactional boundaries, downstream systems may see intermediate states that never actually existed, leading to fluctuating reports, false risk alerts, data inconsistencies, and even business incidents.&lt;/p&gt;

&lt;p&gt;RisingWave’s design philosophy is simple:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use the Debezium Embedded Engine to obtain a strictly ordered transaction stream.&lt;/li&gt;
&lt;li&gt;Pause Barriers during transactions to align transactions with Epochs.&lt;/li&gt;
&lt;li&gt;Support multi-table CDC sources to handle cross-table transactions within a single logical stream.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This way, materialized views and join queries always reflect the true state of the database; what you see at any moment is a complete, atomic result. This enhances data reliability and makes real-time analytics, risk control, trade matching, and monitoring scenarios dependable.&lt;/p&gt;

&lt;p&gt;For engineers, this means you can confidently use RisingWave to drive real-time reports, risk control, and trading decisions without worrying about “half a transaction” wandering through your system and breaking business logic. In other words, we bring the atomicity of databases to the streaming world.&lt;/p&gt;

</description>
      <category>programming</category>
      <category>productivity</category>
      <category>opensource</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Why Engineers Try HTTP for Streaming — And Where It Breaks</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Mon, 13 Oct 2025 10:09:47 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/why-engineers-try-http-for-streaming-and-where-it-breaks-1blg</link>
      <guid>https://dev.to/risingwavelabs/why-engineers-try-http-for-streaming-and-where-it-breaks-1blg</guid>
      <description>&lt;p&gt;In the &lt;a href="https://dev.to/__354f265b41dafa0d901b/why-do-data-pipelines-need-streaming-isnt-batch-processing-enough-17l4"&gt;previous article&lt;/a&gt;, we discussed the evolution of stream processing engines. Today, let’s talk about an interesting phenomenon: &lt;strong&gt;why do engineers often think of HTTP when faced with real-time processing requirements?&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;HTTP: The engineer’s Swiss army knife&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The product manager may excitedly say: "We need a &lt;strong&gt;real-time order statistics&lt;/strong&gt; system!"&lt;/p&gt;

&lt;p&gt;An engineer’s first instinct would be: "Just POST new orders to the stats service, update the counts, respond with success, done."&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Why this instinct?&lt;/strong&gt; Because HTTP feels like a Swiss Army knife for engineers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Familiarity&lt;/strong&gt;: Both frontend and backend engineers use it daily.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Rich tooling&lt;/strong&gt;: Postman, curl, Swagger—whatever you need, it’s there.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Minimal learning curve&lt;/strong&gt;: Way easier than learning Kafka.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Easy debugging&lt;/strong&gt;: A single &lt;code&gt;curl&lt;/code&gt; command is enough to test.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This naturally leads to a &lt;strong&gt;synchronous model&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;A closer look at HTTP approach&lt;/strong&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;1. Architecture&lt;/strong&gt;
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Backend (Producer)&lt;/strong&gt;: POST new orders to the stats service.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stats service (Server)&lt;/strong&gt;: Receive request → update in-memory counters → respond success.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Dashboard (Consumer)&lt;/strong&gt;: Sends a GET request to &lt;code&gt;/stats&lt;/code&gt; every few seconds to fetch the latest data.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Flowchart:
Shop System --sync HTTP--&amp;gt; Stats Service &amp;lt;--sync HTTP-- Dashboard
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Looks perfect, right? Any engineer could implement this in 5 minutes with no new tech to learn.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;But…&lt;/strong&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;2. Problems emerge when traffic grows&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;The HTTP synchronous model works fine under &lt;strong&gt;low traffic&lt;/strong&gt;, but when the system scales:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(1) Tight coupling&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The shop system has to wait for the stats service to process the request before completing the order. If the stats service slows down, the entire order API slows down.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(2) Traffic spike challenge&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;During high traffic, say 100,000 orders in a short period, the stats service would need to handle 100,000 concurrent HTTP requests—an obvious bottleneck.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(3) Fault tolerance issues&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Network glitches or service restarts can result in lost stats, requiring additional retry or compensation mechanisms.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;3. Why switch to an asynchronous model?&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;The core problem with the synchronous model is that &lt;strong&gt;transmission and processing are locked to the same timeline&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;If one side slows down, the other suffers—like two people running tied together with a chain.&lt;/p&gt;

&lt;p&gt;A better approach is to introduce a &lt;strong&gt;middle layer (message queue/event streaming platform)&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Backend&lt;/strong&gt;: Just push new orders into the event stream, no need to wait for stats processing.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stats service&lt;/strong&gt;: Pull data from the event stream and process at its own pace.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Three major benefits of asynchronous architecture:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Stress-resistant&lt;/strong&gt;: During spikes, queue orders first and process them gradually.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Decoupled&lt;/strong&gt;: Shop system and stats service don’t block each other.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Fault-tolerant&lt;/strong&gt;: Event streams can persist data, so services can recover after crashes.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Conclusion&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The HTTP synchronous model is a great starting point and &lt;strong&gt;works well for small-scale systems&lt;/strong&gt;. But as traffic grows, synchronous processing introduces coupling and scaling challenges.&lt;/p&gt;

&lt;p&gt;That’s when &lt;strong&gt;asynchronous, event-driven architectures&lt;/strong&gt; come into play. By decoupling producers and consumers via a middle layer, the system becomes much more resilient.&lt;/p&gt;

&lt;p&gt;In the next article, we’ll dive into the classic &lt;strong&gt;Lambda Architecture&lt;/strong&gt;, exploring how it elegantly addresses these problems while ensuring &lt;strong&gt;both speed and accuracy&lt;/strong&gt;.&lt;/p&gt;

</description>
      <category>programming</category>
      <category>productivity</category>
      <category>discuss</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Most people know what streaming frameworks can do — but few dig into why they work.
Shih-min (Stan) Hsu, Team Lead of Data Engineering at SHOPLINE is embarking on a 30-day challenge to Dive Deep into Streaming Pipelines.</title>
      <dc:creator>RisingWave Labs</dc:creator>
      <pubDate>Fri, 10 Oct 2025 04:32:13 +0000</pubDate>
      <link>https://dev.to/risingwavelabs/most-people-know-what-streaming-frameworks-can-do-but-few-dig-into-why-they-work-shih-min-4b66</link>
      <guid>https://dev.to/risingwavelabs/most-people-know-what-streaming-frameworks-can-do-but-few-dig-into-why-they-work-shih-min-4b66</guid>
      <description>&lt;div class="crayons-card c-embed text-styles text-styles--secondary"&gt;
    &lt;div class="c-embed__content"&gt;
        &lt;div class="c-embed__cover"&gt;
          &lt;a href="https://dev.to/risingwavelabs/why-do-data-pipelines-need-streaming-isnt-batch-processing-enough-17l4" class="c-link align-middle" rel="noopener noreferrer"&gt;
            &lt;img alt="" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzwvvuts3e159wxhvnwt4.png" height="420" class="m-0" width="800"&gt;
          &lt;/a&gt;
        &lt;/div&gt;
      &lt;div class="c-embed__body"&gt;
        &lt;h2 class="fs-xl lh-tight"&gt;
          &lt;a href="https://dev.to/risingwavelabs/why-do-data-pipelines-need-streaming-isnt-batch-processing-enough-17l4" rel="noopener noreferrer" class="c-link"&gt;
            Why Do Data Pipelines Need Streaming — Isn’t Batch Processing Enough? - DEV Community
          &lt;/a&gt;
        &lt;/h2&gt;
          &lt;p class="truncate-at-3"&gt;
            About this series   This time, I’m taking on a 30-day challenge to focus on streaming...
          &lt;/p&gt;
        &lt;div class="color-secondary fs-s flex items-center"&gt;
            &lt;img alt="favicon" class="c-embed__favicon m-0 mr-2 radius-0" src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8j7kvp660rqzt99zui8e.png" width="300" height="299"&gt;
          dev.to
        &lt;/div&gt;
      &lt;/div&gt;
    &lt;/div&gt;
&lt;/div&gt;


</description>
      <category>beginners</category>
      <category>discuss</category>
      <category>datascience</category>
      <category>database</category>
    </item>
  </channel>
</rss>
