<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: The Team @ Redpanda</title>
    <description>The latest articles on DEV Community by The Team @ Redpanda (@redpandadata).</description>
    <link>https://dev.to/redpandadata</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F815304%2F42869667-db86-4e0f-beac-d3ce6f812dea.png</url>
      <title>DEV Community: The Team @ Redpanda</title>
      <link>https://dev.to/redpandadata</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/redpandadata"/>
    <language>en</language>
    <item>
      <title>Using Bytewax to build an anomaly detection app</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 05 Oct 2022 15:42:15 +0000</pubDate>
      <link>https://dev.to/redpanda-data/using-bytewax-to-build-an-anomaly-detection-app-doj</link>
      <guid>https://dev.to/redpanda-data/using-bytewax-to-build-an-anomaly-detection-app-doj</guid>
      <description>&lt;p&gt;Highly scalable distributed processing has been a traditionally difficult task for any team to achieve. However, with technologies like Timely Dataflow and Redpanda, today it is easier than ever to build real-time fault-tolerant and easy-to-scale systems.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://timelydataflow.github.io/timely-dataflow/" rel="noopener noreferrer"&gt;Timely Dataflow&lt;/a&gt; is a low-latency cyclic dataflow computational model, meaning it allows you to build data-parallel systems that can be scaled up from one thread on your laptop to a distributed execution environment across a cluster of computers.&lt;/p&gt;

&lt;p&gt;A great use-case for Timely Dataflow is to detect anomalies in real-time data. Redpanda — as an Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; API-compatible data store — enables us to easily create an application that can monitor data coming from multiple sources in real time.&lt;/p&gt;

&lt;p&gt;Live anomaly detection usually requires a pre-trained model but for this project we opted for an online algorithm.&lt;/p&gt;

&lt;p&gt;Bytewax is a Python native binding to the Rust-based Timely Dataflow library, which allows us to quickly build powerful applications in the same language as our mock producer.&lt;/p&gt;

&lt;p&gt;Integrating Bytewax with Redpanda allows us to harness the power of the Rust-based Timely Dataflow framework which, combined with the robustness and developer-friendliness of Redpanda, allows us to build real-time data processing systems quickly.&lt;/p&gt;

&lt;p&gt;The main flow of the application can be described using the following diagram:&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/4I92CPSBeEDBpvukpU7ILM/534bd35bd499ce258e6d8e951bf274d8/anomaly_detection_redpanda_bytewax.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/4I92CPSBeEDBpvukpU7ILM/534bd35bd499ce258e6d8e951bf274d8/anomaly_detection_redpanda_bytewax.png" alt="anomaly detection redpanda bytewax"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Bytewax reads from the Redpanda topic, calculates the anomalies and produces the results to a different topic. As mentioned above, there is no distinction between Bytewax and Timely Dataflow in the diagram because Bytewax is a wrapper around the Rust Timely Dataflow library.&lt;/p&gt;

&lt;p&gt;For the sensor data, we use a mock data generator that generates random air quality values and pushes them into a topic in Redpanda.&lt;/p&gt;

&lt;p&gt;For calculating the outliers, we will use a five-second window to aggregate the data coming from the sensors and using these averages we will detect anomalies, then push them into a new Redpanda topic. Alerting can be easily set up using the anomaly topic (watch out for a later post on this).&lt;/p&gt;

&lt;p&gt;You can access the code used in this demo &lt;a href="https://github.com/redpanda-data-blog/2022-bytewax-redpanda-air-quality-monitoring"&gt;in this GitHub repo&lt;/a&gt;. &lt;/p&gt;

&lt;h2&gt;
  
  
  The tech
&lt;/h2&gt;

&lt;p&gt;To build our real-time monitoring applications we will use Redpanda for storage and Bytewax for the anomaly detection.&lt;/p&gt;

&lt;h3&gt;
  
  
  Redpanda
&lt;/h3&gt;

&lt;p&gt;Redpanda is a source-available, Kafka API-compatible data store. This API compatibility allows us to very quickly use it in place of Kafka. In our case, our Producer and Consumer side code can stay exactly the same as if they were targeting Kafka!&lt;/p&gt;

&lt;h3&gt;
  
  
  Bytewax
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://github.com/bytewax/bytewax" rel="noopener noreferrer"&gt;Bytewax&lt;/a&gt; is an up-and-coming data processing framework that is built on top of Timely Dataflow, which is a cyclic dataflow computational model. At a high-level, dataflow programming is a programming paradigm where program execution is conceptualized as data flowing through a series of operator based steps. The Timely Dataflow library is written in Rust which makes it blazingly fast and easy to use due to the language's great Python bindings.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting up Redpanda
&lt;/h2&gt;

&lt;p&gt;To make setting up Redpanda for this project super easy, we can use the provided docker-compose.yml file. In this file we define a Redpanda service as such:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;redpanda&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;docker.vectorized.io/vectorized/redpanda:v22.1.4&lt;/span&gt;
  &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redpanda&lt;/span&gt;
  &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;redpanda start&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--overprovisioned&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--smp &lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--memory 1G&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--reserve-memory 0M&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--node-id &lt;/span&gt;&lt;span class="m"&gt;0&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--check=false&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--kafka-addr 0.0.0.0:9092&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--advertise-kafka-addr redpanda:9092&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--pandaproxy-addr 0.0.0.0:8082&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--advertise-pandaproxy-addr redpanda:8082&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--set redpanda.enable_transactions=true&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--set redpanda.enable_idempotence=true&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--set redpanda.auto_create_topics_enabled=true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will start a Redpanda instance which we can access on port &lt;code&gt;9092&lt;/code&gt;.&lt;br&gt;
Let's create this service first using &lt;code&gt;docker-compose up -d redpanda&lt;/code&gt;. This will start the container in the background.&lt;/p&gt;

&lt;p&gt;After the container is started we can run &lt;code&gt;docker exec -it redpanda /bin/bash&lt;/code&gt; to access a shell inside the container, which allows us to use &lt;code&gt;rpk&lt;/code&gt;, the official bundled CLI application for Redpanda clusters.&lt;/p&gt;

&lt;p&gt;The CLI allows us to create topics with a simple command, so let's create two. One will store the data coming from our mock sensors, and one will store the calculated anomalies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;rpk create topic &lt;span class="nt"&gt;--topic&lt;/span&gt; air-quality &lt;span class="nt"&gt;--brokers&lt;/span&gt; 127.0.0.1.9092
rpk create topic &lt;span class="nt"&gt;--topic&lt;/span&gt; air-quality-anomalies &lt;span class="nt"&gt;--brokers&lt;/span&gt; 127.0.0.1.9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To verify, we can list the topics with &lt;code&gt;rpk topic list --brokers 127.0.0.1:9092&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;In an environment where are unable to access the cluster manually in order to create topics, we can automate the their creation by interfacing with the Admin API of Redpanda from Python.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;admin_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;KafkaAdminClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;client_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"air-quality-producer"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"air-quality"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"air-quality-anomalies"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="c1"&gt;# Check if topics already exist first
&lt;/span&gt;&lt;span class="n"&gt;existing_topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;admin_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;list_topics&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;topics&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;existing_topics&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;admin_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;create_topics&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;NewTopic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;num_partitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;replication_factor&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Data Ingestion
&lt;/h2&gt;

&lt;p&gt;The script we will use to generate the data is located under the path &lt;code&gt;producer/main.py&lt;/code&gt;. The Kafka producers' configuration looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"127.0.0.1:9092"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The script will start five asynchronous workers that generate data every 3 seconds (configurable, but an easily interpretable time limit helps with the demo here) with the following minimal schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2022-07-15 11:34:32.1134000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;All five workers are going to produce records to the same topic so in order to be able to identify which sensor was the source of our data we have to attach a key to the records being produced into Redpanda. This is done by adding a &lt;code&gt;key&lt;/code&gt; argument to the &lt;code&gt;producer.send&lt;/code&gt; function call:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"air-quality"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sensor_name&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"utf-8"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"utf-8"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we want to test the application outside of Docker we can run the producer with the &lt;code&gt;python producer/main.py&lt;/code&gt; command.&lt;/p&gt;

&lt;p&gt;After a few seconds data should be flowing into Redpanda.&lt;br&gt;
The output of the script will look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:06.799306', 'value': 95}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:06.910629', 'value': 98}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:07.020454', 'value': 49}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:07.127894', 'value': 59}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:07.243341', 'value': 81}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:11.812876', 'value': 13}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:11.913514', 'value': 15}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:12.023514', 'value': 43}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:12.132398', 'value': 39}, sleeping for 3 seconds
Sent data to Redpanda: {'timestamp': '2022-07-18 16:20:12.247009', 'value': 38}, sleeping for 3 seconds
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To validate if the data is arriving in Redpanda, we can inspect the topic using the &lt;code&gt;rpk&lt;/code&gt; command, from inside the container.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; redpanda /bin/bash
rpk topic consume air-quality &lt;span class="nt"&gt;--brokers&lt;/span&gt; 127.0.0.1:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will show us all the records in the topic so far.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "air-quality",
  "key": "Sensor3",
  "value": "{\"timestamp\": \"2022-07-18 16:20:17.024915\", \"value\": 4}",
  "timestamp": 1658154017025,
  "partition": 0,
  "offset": 257
}
{
  "topic": "air-quality",
  "key": "Sensor4",
  "value": "{\"timestamp\": \"2022-07-18 16:20:17.133643\", \"value\": 21}",
  "timestamp": 1658154017133,
  "partition": 0,
  "offset": 258
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This fake data producer application is also part of the &lt;code&gt;docker-compose.yml&lt;/code&gt; configuration file, so when you start all containers at once you won't have to manually initiate the script. In the &lt;code&gt;entrypoint.sh&lt;/code&gt; script we poll Redpanda through it's exposed admin API and if it replies with a &lt;em&gt;ready&lt;/em&gt; the producer app will start generating data.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;[[&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:9644/v1/status/ready&lt;span class="si"&gt;)&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="s2"&gt;"{&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;status&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;ready&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;}"&lt;/span&gt; &lt;span class="o"&gt;]]&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;&lt;span class="nb"&gt;sleep &lt;/span&gt;5&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done
&lt;/span&gt;python /app/main.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Machine learning with Bytewax
&lt;/h2&gt;

&lt;p&gt;Let's do some machine learning using our data from the Redpanda topic with Bytewax!&lt;/p&gt;

&lt;p&gt;The code located in &lt;code&gt;consumer/main.py&lt;/code&gt; is the consumer side code that will consume the data from the Redpanda topic, run the aggregation on a five-second window, calculate the anomalies suing a supervised learning algorithm and push them into the Redpanda anomaly topic.&lt;/p&gt;

&lt;p&gt;The Consumer configuration looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"air-quality"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"127.0.0.1:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;auto_offset_reset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;auto_offset_reset="earliest"&lt;/code&gt; option will make the consumer start from the beginning of the topic. With some Bytewax helper functions we sort our incoming data and group it into five-second windows before return them as a generator.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt; &lt;span class="c1"&gt;# Ensure inputs are sorted by timestamp
&lt;/span&gt;&lt;span class="n"&gt;sorted_inputs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sorted_window&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;get_records&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;seconds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# All inputs within a tumbling window are part of the same epoch.
&lt;/span&gt;&lt;span class="n"&gt;tumbling_window&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tumbling_epoch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;sorted_inputs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;seconds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The high-level flow of the timely dataflow pipeline looks like this in Python:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Create a dataflow
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Dataflow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="c1"&gt;# Group by sensor name
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;group_by_sensor&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Calculate the rolling average of Air Quality values
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;calculate_avg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Calculate anomaly score in tumbling window of 5 seconds
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stateful_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;step_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"anomaly_detector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;n_trees&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;height&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;window_size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;seed&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;mapper&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Annotate with anomaly
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;annotate_with_anomaly&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Send to anomaly Redpanda
&lt;/span&gt;&lt;span class="n"&gt;flow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capture&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First, we instantiate a &lt;code&gt;Dataflow&lt;/code&gt; object, then we sequentially apply the following transformations:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Group the data by sensor name&lt;/li&gt;
&lt;li&gt;Calculate the rolling average of Air Quality values&lt;/li&gt;
&lt;li&gt;Calculate anomaly score in this tumbling window of 5 seconds&lt;/li&gt;
&lt;li&gt;Annotate with an anomaly flag&lt;/li&gt;
&lt;li&gt;Send the data to the anomaly Redpanda topic&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The Producer object is configured like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;value_serializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"utf-8"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"127.0.0.1:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The actual anaomly detection is done in the &lt;code&gt;AnomalyDetector&lt;/code&gt; class.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AnomalyDetector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;anomaly&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HalfSpaceTrees&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;update&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;normalized_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"aiq_avg"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;
        &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;learn_one&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;normalized_value&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"score"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;score_one&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;normalized_value&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Half-space trees are an online variant of isolation forests. Before feeding the data to the anomaly detector, we normalize them to the range [0, 1], as the algorithm expects values in this range.&lt;/p&gt;

&lt;p&gt;Running the script with &lt;code&gt;python consumer/main.py&lt;/code&gt; should print some logs that look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Sending anomalous data to Redpanda: {'sensor_name': ['Sensor3'], 'timestamp': '2022-07-14 19:16:17.583034', 'aiq_avg': [94.0], 'score': 0.7786666666666666, 'anomaly': True} with key b'Sensor3'
Sending anomalous data to Redpanda: {'sensor_name': ['Sensor5'], 'timestamp': '2022-07-14 19:16:17.807478', 'aiq_avg': [6.0], 'score': 0.7253333333333334, 'anomaly': True} with key b'Sensor5'
Sending anomalous data to Redpanda: {'sensor_name': ['Sensor1'], 'timestamp': '2022-07-14 19:16:22.370494', 'aiq_avg': [91.0], 'score': 0.7413333333333334, 'anomaly': True} with key b'Sensor1'
Sending anomalous data to Redpanda: {'sensor_name': ['Sensor2'], 'timestamp': '2022-07-14 19:16:22.476654', 'aiq_avg': [53.0], 'score': 0.7573333333333333, 'anomaly': True} with key b'Sensor2'
Sending anomalous data to Redpanda: {'sensor_name': ['Sensor4'], 'timestamp': '2022-07-14 19:16:22.697965', 'aiq_avg': [96.0], 'score': 0.8213333333333334, 'anomaly': True} with key b'Sensor4'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After a few calculations we can check the target Redpanda topic to see if the data is arriving correctly.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; redpanda /bin/bash

rpk topic consume air-quality-anomaly &lt;span class="nt"&gt;--brokers&lt;/span&gt; 127.0.0.1:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The records in this topic should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "air-quality-anomalies",
  "key": "Sensor1",
  "value": "\"{\\\"sensor_name\\\": [\\\"Sensor1\\\"], \\\"timestamp\\\": \\\"2022-07-18 16:20:06.799306\\\", \\\"aiq_avg\\\": [95.0], \\\"score\\\": 0.8693333333333333, \\\"anomaly\\\": true}\"",
  "timestamp": 1658154557943,
  "partition": 0,
  "offset": 282
}
{
  "topic": "air-quality-anomalies",
  "key": "Sensor2",
  "value": "\"{\\\"sensor_name\\\": [\\\"Sensor2\\\"], \\\"timestamp\\\": \\\"2022-07-18 16:20:11.913514\\\", \\\"aiq_avg\\\": [15.0], \\\"score\\\": 0.9226666666666666, \\\"anomaly\\\": true}\"",
  "timestamp": 1658154557944,
  "partition": 0,
  "offset": 283
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What this data means is that, in the five seconds following a certain timestamp, the average air quality value for that sensor was considered anomalous compared to all the previous rolling averages.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running the demo
&lt;/h2&gt;

&lt;p&gt;In order to run the demo end-to-end all you have to do is to run the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will start our Redpanda container as well as our Python producer and consumer scripts. The Python containers will wait until Redpanda is ready to accept data and after that you should see the producer pushing mock air quality data into the &lt;code&gt;air-quality&lt;/code&gt; topic. The Bytewax consumer will also get to work and start calculating the rolling average of the inputs and running the supervised anomaly detection algorithm. The output will be sent to the &lt;code&gt;air-quality-anomalies&lt;/code&gt; topic.&lt;/p&gt;

&lt;p&gt;If you want to see the logs of the containers, you can use the &lt;code&gt;docker-compose logs&lt;/code&gt; command.&lt;/p&gt;

&lt;p&gt;Successfully started services will log their output like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;redpanda  | INFO  2022-08-03 08:51:28,298 &lt;span class="o"&gt;[&lt;/span&gt;shard 0] redpanda::main - application.cc:1021 - Successfully started Redpanda!
consumer  | ++ curl &lt;span class="nt"&gt;-s&lt;/span&gt; redpanda:9644/v1/status/ready
producer  | ++ curl &lt;span class="nt"&gt;-s&lt;/span&gt; redpanda:9644/v1/status/ready
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The producer will start logging the data it sends to Redpanda.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;producer  | Sent data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:32.740664'&lt;/span&gt;, &lt;span class="s1"&gt;'value'&lt;/span&gt;: 14&lt;span class="o"&gt;}&lt;/span&gt;, sleeping &lt;span class="k"&gt;for &lt;/span&gt;3 seconds
producer  | Sent data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:32.844237'&lt;/span&gt;, &lt;span class="s1"&gt;'value'&lt;/span&gt;: 16&lt;span class="o"&gt;}&lt;/span&gt;, sleeping &lt;span class="k"&gt;for &lt;/span&gt;3 seconds
producer  | Sent data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:32.947056'&lt;/span&gt;, &lt;span class="s1"&gt;'value'&lt;/span&gt;: 83&lt;span class="o"&gt;}&lt;/span&gt;, sleeping &lt;span class="k"&gt;for &lt;/span&gt;3 seconds
producer  | Sent data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:33.050752'&lt;/span&gt;, &lt;span class="s1"&gt;'value'&lt;/span&gt;: 76&lt;span class="o"&gt;}&lt;/span&gt;, sleeping &lt;span class="k"&gt;for &lt;/span&gt;3 seconds
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And after a few seconds (when we have enough data for our rolling average window parameter) we should see the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;consumer  | Sending anomalous data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'sensor_name'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'Sensor4'&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;, &lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:53.065961'&lt;/span&gt;, &lt;span class="s1"&gt;'aiq_avg'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;18.0], &lt;span class="s1"&gt;'score'&lt;/span&gt;: 0.72, &lt;span class="s1"&gt;'anomaly'&lt;/span&gt;: True&lt;span class="o"&gt;}&lt;/span&gt; with key b&lt;span class="s1"&gt;'Sensor4'&lt;/span&gt;
consumer  | Sending anomalous data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'sensor_name'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'Sensor2'&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;, &lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:57.855306'&lt;/span&gt;, &lt;span class="s1"&gt;'aiq_avg'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;62.0], &lt;span class="s1"&gt;'score'&lt;/span&gt;: 0.7573333333333333, &lt;span class="s1"&gt;'anomaly'&lt;/span&gt;: True&lt;span class="o"&gt;}&lt;/span&gt; with key b&lt;span class="s1"&gt;'Sensor2'&lt;/span&gt;
consumer  | Sending anomalous data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'sensor_name'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'Sensor3'&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;, &lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:51:57.961218'&lt;/span&gt;, &lt;span class="s1"&gt;'aiq_avg'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;20.0], &lt;span class="s1"&gt;'score'&lt;/span&gt;: 0.752, &lt;span class="s1"&gt;'anomaly'&lt;/span&gt;: True&lt;span class="o"&gt;}&lt;/span&gt; with key b&lt;span class="s1"&gt;'Sensor3'&lt;/span&gt;
consumer  | Sending anomalous data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'sensor_name'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'Sensor1'&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;, &lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:52:02.782120'&lt;/span&gt;, &lt;span class="s1"&gt;'aiq_avg'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;5.0], &lt;span class="s1"&gt;'score'&lt;/span&gt;: 0.7893333333333333, &lt;span class="s1"&gt;'anomaly'&lt;/span&gt;: True&lt;span class="o"&gt;}&lt;/span&gt; with key b&lt;span class="s1"&gt;'Sensor1'&lt;/span&gt;
consumer  | Sending anomalous data to Redpanda: &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s1"&gt;'sensor_name'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'Sensor2'&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;, &lt;span class="s1"&gt;'timestamp'&lt;/span&gt;: &lt;span class="s1"&gt;'2022-08-03 08:52:02.860648'&lt;/span&gt;, &lt;span class="s1"&gt;'aiq_avg'&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;66.0], &lt;span class="s1"&gt;'score'&lt;/span&gt;: 0.736, &lt;span class="s1"&gt;'anomaly'&lt;/span&gt;: True&lt;span class="o"&gt;}&lt;/span&gt; with key b&lt;span class="s1"&gt;'Sensor2'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;As you can see, creating a minimal, real-time, highly-scalable data processing pipeline is a breeze with Redpanda and Bytewax. &lt;/p&gt;

&lt;p&gt;Redpanda's Kafka-compatibility allows us to integrate any tool that was built with Kafka in mind, like Bytewax. This combination of a fault-tolerant streaming data platform and a high-performance data processing framework is a powerful tool not just for hobby projects, but also for large-scale production systems. &lt;/p&gt;

&lt;p&gt;Now that you know how to use these tools together, you can apply this knowledge to any number of applications you dream up in the future. &lt;/p&gt;

&lt;p&gt;To learn more about Redpanda, check out &lt;a href="https://docs.redpanda.com/docs/home"&gt;the documentation here&lt;/a&gt; or join &lt;a href="https://redpanda.com/slack" rel="noopener noreferrer"&gt;the Redpanda Community on Slack&lt;/a&gt;. &lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>programming</category>
      <category>beginners</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Why we built our streaming data platform in C++</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 28 Sep 2022 17:09:44 +0000</pubDate>
      <link>https://dev.to/redpanda-data/why-we-built-our-streaming-data-platform-in-c-2913</link>
      <guid>https://dev.to/redpanda-data/why-we-built-our-streaming-data-platform-in-c-2913</guid>
      <description>&lt;p&gt;We're reinventing and expanding what was previously possible with data streaming by building a platform from the ground up for cloud-native computing platforms and by designing a system that’s easy to use, even for non-experts. &lt;/p&gt;

&lt;p&gt;For years, there have been inefficiencies in infrastructure that result in a significant amount of computer waste, but hardware is fundamentally different today than it was a decade ago, as &lt;a href="https://www.usenix.org/publications/loginonline/jurassic-cloud"&gt;this article by Avishai Ish-Shalom&lt;/a&gt; eloquently explains. Disk speeds, for example, grew by 1,000 times over the past 10 years. Processing capabilities have also significantly increased alongside developments in core processing. &lt;/p&gt;

&lt;p&gt;Despite these critical improvements in computing hardware, today's software hasn't caught up - it's still engineered for a decade-old paradigm computer platform. That sets up a disparity between hardware and software that’s difficult to reconcile. &lt;/p&gt;

&lt;p&gt;At Redpanda, we firmly believe that the only true platform is the hardware, so we asked ourselves if we were to design software for modern hardware, what could we do differently? The answer is Redpanda. &lt;/p&gt;

&lt;p&gt;Redpanda differs from other projects of its kind by streamlining the complexity of the program and by presenting a simple interface to the user. We cannot entirely remove complexity from the system, but we can move it around. Because our developers are the experts, it makes more sense for us to own the complexity rather than push it down to the end user. &lt;/p&gt;

&lt;p&gt;We do this by focusing on two core principles to shift that complexity: Redpanda needs to function well without constant human attention, and the results and output need to be predictable. &lt;/p&gt;

&lt;h2&gt;
  
  
  The advantages of C++ 
&lt;/h2&gt;

&lt;p&gt;For Redpanda to pull this off, we chose to use a programming language that both allows direct communication with hardware and has predictable latencies. &lt;/p&gt;

&lt;p&gt;We wrote the early Redpanda prototypes in several different programming languages, but only C++ gave us the ability to create a developer and user experience aligned with our goals. It allows Redpanda to extract every ounce of performance from the available hardware while also maintaining predictability. &lt;/p&gt;

&lt;p&gt;Most programmers only view performance in terms of latency averages, and we think that's an inefficient metric. Latency is measured in percentiles, and there’s no way to measure the average of a percentile. It’s math that doesn’t tell a useful story. &lt;/p&gt;

&lt;p&gt;Instead of focusing on the latencies at 99.9%, 99.99%, or even 99.999%, we focus on the entire 0-100% latency distribution. It’s not enough to look at the experience of 99.999% of the transactions – we need to fix the problems that show up at the 100th percentile. When a system is processing millions of messages a second, the difference between 99.999% and 100% matters. C++ provides a high level of tail latency predictability. &lt;/p&gt;

&lt;p&gt;Another benefit of C++ is its stable and mature repository of libraries. Redpanda only uses &lt;a href="https://github.com/redpanda-data/redpanda/blob/dev/cmake/oss.cmake.in"&gt;a few dozen libraries&lt;/a&gt;, while other comparatively sized projects use hundreds of dependent libraries. Having so many dependencies weakens the security posture of the software. We avoid vulnerabilities by utilizing C++ libraries that have worked for decades and which are very good at finding precise information.&lt;/p&gt;

&lt;p&gt;C++ also allows us to control as much as possible from the platform. Through the efficiency of our own code, combined with the amazing &lt;a href="https://seastar.io"&gt;Seastar framework&lt;/a&gt; and other best-in-class libraries, Redpanda speaks directly to the hardware. It only depends on the Linux kernel to launch the process, after which Redpanda is &lt;strong&gt;very&lt;/strong&gt; deterministic in terms of performance, runtime characteristics, memory utilization, and CPU speed. We own the entire end-to-end experience, which provides safety and allows Redpanda to build impactful products. &lt;/p&gt;

&lt;h2&gt;
  
  
  Building the best present and future for streaming data
&lt;/h2&gt;

&lt;p&gt;Redpanda creates new possibilities for developers, like what airplanes did for passenger liners. Ships are a slower mode of transportation, even if they're reliable, and even today, you can take a passenger ship from New York to London. Transcontinental travel used passenger ships for centuries, but when airplanes came into existence, they fundamentally changed the way people traveled. In doing so, air travel invented entire industries that people had never thought of before. &lt;/p&gt;

&lt;p&gt;That’s the impact of Redpanda on where the streaming industry is headed. &lt;/p&gt;

&lt;p&gt;We discovered that when you give programmers a new infrastructure primitive like Redpanda, something that's fast, predictable, and geared towards zero data loss, it expands the realm of possibilities about what they can do. Although Redpanda was initially designed to be a replacement for Kafka, it has started to transition into operational workloads. &lt;/p&gt;

&lt;p&gt;For us, discovering new ways that developers are using Redpanda is probably the most exciting aspect of our job. For example, a satellite currently in orbit is running Redpanda, and &lt;a href="https://www.youtube.com/watch?v=3td64fGIT8U"&gt;the Alpaca platform&lt;/a&gt; uses Redpanda to trade millions of dollars in securities every single day. Redpanda will soon power the process of monitoring both a pregnant mother's heartbeat and her baby's vital signs during labor. &lt;/p&gt;

&lt;p&gt;Redpanda Data is the only company that can cross this chasm and move to the foreground of operational workloads. &lt;/p&gt;

&lt;p&gt;Redpanda expands the toolset for developers and crosses multiple computing paradigms, allowing us to expand what's possible in software development and operational workloads. &lt;/p&gt;

&lt;p&gt;While we couldn't have imagined this when we started, we can't wait to hear what developers are going to build tomorrow. Take Redpanda &lt;a href="https://redpanda.com/try-redpanda"&gt;for a test drive today&lt;/a&gt;, and introduce yourself to &lt;a href="https://redpanda.com/slack"&gt;the Redpanda Community&lt;/a&gt; in Slack. &lt;/p&gt;

</description>
    </item>
    <item>
      <title>Testcontainers &amp; Zerocode: An integration testing tutorial</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 21 Sep 2022 17:20:03 +0000</pubDate>
      <link>https://dev.to/redpanda-data/testcontainers-zerocode-an-integration-testing-tutorial-212b</link>
      <guid>https://dev.to/redpanda-data/testcontainers-zerocode-an-integration-testing-tutorial-212b</guid>
      <description>&lt;p&gt;When setting up a streaming application, especially if you’re new to streaming data platforms like Redpanda, you’ll want to test that your application is set up correctly. You can do this using integration tests.&lt;/p&gt;

&lt;p&gt;Integration tests check your producers and consumers against your data stream. They push test data through your application, allowing you to see if your architecture is correctly set up and working as expected. &lt;/p&gt;

&lt;p&gt;Below, I discuss two popular libraries for integration testing: Testcontainers and Zerocode. I use these when I need to run integration tests, and nearly every developer I know uses them, as well. &lt;/p&gt;

&lt;p&gt;In this post, you’ll learn how to run integration tests with them, too, so you can ensure your streaming application is properly configured. You can find the resources for the demos below in &lt;a href="https://github.com/redpanda-data-blog/2022-integration-testing-tools" rel="noopener noreferrer"&gt;this GitHub repository&lt;/a&gt;. &lt;/p&gt;

&lt;h2&gt;
  
  
  The 2 best integration testing tools for streaming data stacks
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Testcontainers
&lt;/h3&gt;

&lt;p&gt;Testcontainers is a Java library that you can use to test anything that runs in a Docker container. You can use it to do integration testing on your data stream. &lt;/p&gt;

&lt;h4&gt;
  
  
  1.1 Prerequisites
&lt;/h4&gt;

&lt;p&gt;&lt;a href="https://www.testcontainers.org/" rel="noopener noreferrer"&gt;Testcontainers&lt;/a&gt; can only be used within the Java ecosystem. For this reason, you will need to import it as a dependency. You can do this with Maven as shown here:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.testcontainers&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;testcontainers&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;1.17.2&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;scope&amp;gt;&lt;/span&gt;test&lt;span class="nt"&gt;&amp;lt;/scope&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can check out the latest dependencies in Maven’s &lt;a href="https://mvnrepository.com/artifact/org.testcontainers/testcontainers" rel="noopener noreferrer"&gt;central repository&lt;/a&gt;. You can also access the complete demo we’re about to walk through &lt;a href="https://github.com/redpanda-data-blog/2022-integration-testing-tools/tree/master/testcontainers" rel="noopener noreferrer"&gt;in this GitHub repo&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Next, you can move on to setting up your producer and consumer for testing.&lt;/p&gt;

&lt;h4&gt;
  
  
  1.2 Producer and consumer setup
&lt;/h4&gt;

&lt;p&gt;Typical integration tests will include a producer that creates and sends an event. To do this with Redpanda, you can use the Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; API since Redpanda is API-compatible with Kafka.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;
       &lt;span class="nc"&gt;ImmutableMap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
               &lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="n"&gt;bootstrapServers&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CLIENT_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="no"&gt;UUID&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;randomUUID&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
       &lt;span class="o"&gt;),&lt;/span&gt;
       &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
       &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"testcontainers"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"redpanda"&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You will also need a consumer that consumes the event from the same topic as your producer. Again, we can set this up using the Kafka API.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;
       &lt;span class="nc"&gt;ImmutableMap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
               &lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="n"&gt;bootstrapServers&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="s"&gt;"tc-"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;UUID&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;randomUUID&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
               &lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="s"&gt;"earliest"&lt;/span&gt;
       &lt;span class="o"&gt;),&lt;/span&gt;
       &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
       &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singletonList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  1.3 Redpanda node setup
&lt;/h4&gt;

&lt;p&gt;Since we are running this test in Redpanda, you will also need to set up a Redpanda node. This is where Testcontainers comes into the picture. It allows you to create throwaway instances of the node, which will then be destroyed when tests finish running.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Before&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;init&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;redpanda&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;RedpandaContainer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"vectorized/redpanda:v22.1.4"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
   &lt;span class="n"&gt;redpanda&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In above snippet, I tell Testcontainers to pull down the Redpanda Docker image and start the container behind the scenes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker container &lt;span class="nb"&gt;ls
&lt;/span&gt;CONTAINER ID   IMAGE                            COMMAND                  CREATED         STATUS         PORTS                                                                                         NAMES
34a719219ff9   vectorized/redpanda:v22.1.3      &lt;span class="s2"&gt;"sh -c 'while [ ! -f..."&lt;/span&gt;   7 minutes ago   Up 7 minutes   8081-8082/tcp, 9644/tcp, 0.0.0.0:49167-&amp;gt;9092/tcp, :::49167-&amp;gt;9092/tcp                          zealous_elion
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see, Testcontainers created the Redpanda container. Next, you’ll define your test as a regular JUnit test:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;testUsage&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;testRedpandaFunctionality&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;redpanda&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getHost&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;":"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;   &lt;span class="n"&gt;redpanda&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMappedPort&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;9092&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You then run the test using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;mvn &lt;span class="nb"&gt;test&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If a test fails, you will see this printed in the log under the Failures and Errors sections:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If your test is successful, you will instead see a 0 in the Failures and Errors sections:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After you are done testing, you can stop the container with the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@After&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;tearDown&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;redpanda&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stop&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And that’s it! You’ve successfully run an integration test with Testcontainers. &lt;/p&gt;

&lt;p&gt;Next, we’ll move on to another popular integration testing tool. &lt;/p&gt;

&lt;h3&gt;
  
  
  2. Zerocode
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://github.com/authorjapps/zerocode" rel="noopener noreferrer"&gt;Zerocode&lt;/a&gt; is an open-source Java test automation framework that uses a declarative style of testing. In declarative testing, you don't write code, you declare scenarios that describe each step of a test in a JSON/YAML file. The Zerocode framework will then interpret the scenario and execute the instructions that you specify via a custom DSL. Zerocode can be used for end-to-end testing of your data stream. &lt;/p&gt;

&lt;h4&gt;
  
  
  2.1 Prerequisites
&lt;/h4&gt;

&lt;p&gt;Zerocode is Java library, so it can only be used in the Java ecosystem. You can get it from &lt;a href="https://mvnrepository.com/artifact/org.jsmart/zerocode-tdd" rel="noopener noreferrer"&gt;this central repo&lt;/a&gt; and declare it as a dependency:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.jsmart&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;zerocode-tdd&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;1.3.28&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can also check out official GitHub repository of &lt;a href="https://github.com/authorjapps/zerocode" rel="noopener noreferrer"&gt;the Zerocode framework here&lt;/a&gt;, and you can find the full demo I’m about to walk you through below &lt;a href="https://github.com/redpanda-data-blog/2022-integration-testing-tools/tree/master/zerocode" rel="noopener noreferrer"&gt;in the GitHub here&lt;/a&gt;.&lt;/p&gt;

&lt;h4&gt;
  
  
  2.2 Producer and consumer setup
&lt;/h4&gt;

&lt;p&gt;As I did in my integration testing with Testcontainers, I also need a producer to create events with Zerocode:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"produce_test_message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"kafka-topic:test-topic"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"method"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"produce"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"request"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"recordType"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"JSON"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"records"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
       &lt;/span&gt;&lt;span class="nl"&gt;"key"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"${RANDOM.NUMBER}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
       &lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Hello Redpanda"&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"assertions"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"status"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Ok"&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the above snippet, I declare what our producer should do:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;name&lt;/strong&gt; - The scenario step name. This can be anything you want. &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;url&lt;/strong&gt; - Specifies the Redpanda topic via the kafka-topic property and tells the producer which topic events should be sent to (Note: Although there is no Redpanda keyword here, kafka-topic will work with Redpanda).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;method&lt;/strong&gt; - Tells Zerocode to create a Redpanda producer.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;request&lt;/strong&gt; - Specifies data that should be produced.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;recordType&lt;/strong&gt; - The type of records to be produced/consumed. In this example, it's JSON.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;assertions&lt;/strong&gt; - Checks the execution response. In this example, we are verifying that producing the event was successful.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The above declared producer will send one event (with a JSON payload) where the value would be “Hello Redpanda”.  You then need to consume that event and check the payload. For that reason, I declare the consumer as well:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"consume_test_message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"kafka-topic:test-topic"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"method"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"consume"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"request"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"consumerLocalConfigs"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"recordType"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"JSON"&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"retry"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"delay"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"validators"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"field"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"records[0].value"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Hello Redpanda"&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;url&lt;/strong&gt; - Specifies the topic (via kafka-topic keyword) to consume from. This should be the same as the url you set in your producer.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;method&lt;/strong&gt; - Tells Zerocode to create a Redpanda consumer.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;retry&lt;/strong&gt; - Sets a max number of retries and the delay between retries in case the consumer did not find any events.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Other keywords in our consumer are the same as in the produce. In the validators block, you can see that I’m verifying that the consumed events value is “Hello Redpanda” as it was written by the producer.&lt;/p&gt;

&lt;p&gt;With the above steps I verify that I can produce an event to the Redpanda stream and I can consume that same event. Once you’ve completed this step, you can move on to configuring the test.&lt;/p&gt;

&lt;h4&gt;
  
  
  2.3 Configuration
&lt;/h4&gt;

&lt;p&gt;In the case of Testcontainers, it was the library that created a Redpanda broker (via Docker). However, before launching the Zerocode tests, you need to have the Redpanda broker up and running. For local testing, you can &lt;a href="https://github.com/redpanda-data-blog/2022-data-libraries-list/blob/master/docker-compose.yaml" rel="noopener noreferrer"&gt;create a YAML file and use Docker Compose&lt;/a&gt; to do this. &lt;/p&gt;

&lt;p&gt;After you have the Redpanda broker ready, you need to tell Zerocode how to reach it. You may need to specify some properties that Zerocode will use when creating your producer and consumer. For that reason, create a &lt;strong&gt;properties&lt;/strong&gt; file with following content:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;kafka.bootstrap.servers=localhost:&lt;/span&gt;&lt;span class="mi"&gt;9092&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt;kafka.producer.properties=producer.properties&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt;kafka.consumer.properties=consumer.properties&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Kafka.bootstrap.servers&lt;/strong&gt; - Here you specify the bootstrap of Redpanda. Keep in mind that there is not a Redpanda keyword, but the Kafka keyword works with Redpanda.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka.producer.properties&lt;/strong&gt; - Name of the file that contains producer properties.The file is in the same folder in this example.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka.consumer.properties&lt;/strong&gt; - Name of the file that contains consumer properties.The file is in the same folder in this example.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Once you’ve configured the test, it’s time to write the test case.&lt;/p&gt;

&lt;h4&gt;
  
  
  2.4 Writing the test case
&lt;/h4&gt;

&lt;p&gt;At this point in the demo, you have the scenario file and configuration. So, how do you link them and run the scenario? Behind the scenes, Zerocode uses JUnit4 runners. For that reason, we now create a Java test class where you will utilize JUnit annotations:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@RunWith&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZeroCodeUnitRunner&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@TargetEnv&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"redpanda.properties"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedpandaTest&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="nd"&gt;@Test&lt;/span&gt;
   &lt;span class="nd"&gt;@Scenario&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"redpanda-stream-test.json"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
   &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;test_redpanda&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;@Runwith&lt;/strong&gt; - You specify the Zerocode runner that will be responsible for running your scenario.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;@TargetEnv&lt;/strong&gt; - The name of the configuration file that Zerocode will use for the scenario. This is how you link configurations files to scenarios.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;@Scenario&lt;/strong&gt; - The name of our scenario that Zerocode will run.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;a class="mentioned-user" href="https://dev.to/test"&gt;@test&lt;/a&gt;&lt;/strong&gt; - This is the Junit annotation.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can run the test using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;mvn &lt;span class="nb"&gt;test&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Just as with our Testcontainers test above, you will see any errors or failures printed in the log:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If your test is successful, no failures or errors will be noted.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Now that you’ve learned how to run two types of integration tests on your applications, you can validate that your data streams are configured correctly. These tests are also useful for checking your producers and consumers against your Redpanda nodes. &lt;/p&gt;

&lt;p&gt;As I mentioned at the start of this article, Zerocode and Testcontainers are the two integration testing tools that I and other devs tend to use, and there aren’t many other integration testing tools available. If you know of others that we should look into, share them &lt;a href="https://redpanda.com/slack" rel="noopener noreferrer"&gt;in the Redpanda Community on Slack&lt;/a&gt;, or share them &lt;a href="https://twitter.com/redpandadata" rel="noopener noreferrer"&gt;on Twtiter: @redpandadata&lt;/a&gt;. To learn more about getting started with Redpanda, &lt;a href="https://docs.redpanda.com/docs/quickstart/" rel="noopener noreferrer"&gt;view the documentation here&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>beginners</category>
      <category>tutorial</category>
      <category>testing</category>
      <category>productivity</category>
    </item>
    <item>
      <title>Building a real-time materialized cache</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 14 Sep 2022 14:23:22 +0000</pubDate>
      <link>https://dev.to/redpanda-data/building-a-real-time-materialized-cache-2ni5</link>
      <guid>https://dev.to/redpanda-data/building-a-real-time-materialized-cache-2ni5</guid>
      <description>&lt;p&gt;Organizations often need to build real-time data-processing applications. Specialized tools for stream processing can help build such applications. &lt;a href="https://redpanda.com/blog/apache-flink-redpanda-real-time-word-count-application"&gt;In another article&lt;/a&gt;, you learned how to process data streams with Apache Flink&lt;sup&gt;Ⓡ&lt;/sup&gt; . This article will show you how to do something similar with ksqlDB.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://ksqldb.io/" rel="nofollow noopener noreferrer"&gt;ksqlDB&lt;/a&gt; is an event-streaming database that simplifies real-time application building with two Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; API components — Kafka Connect&lt;sup&gt;Ⓡ&lt;/sup&gt; and Kafka Streams&lt;sup&gt;Ⓡ&lt;/sup&gt; — into a single system. This makes it possible to integrate the stream-processing application with different source systems.&lt;/p&gt;

&lt;p&gt;With ksqlDB, you can use SQL queries for processing streaming data. Examples of such use cases include identifying anomalies in real-time data, log monitoring, tracking, and alerting. Using ksqlDB on top of Redpanda, which is API-compatible with Kafka, allows you to explore topics, transform data within topics, copy existing topics from one format to another, and more.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is ksqlDB?
&lt;/h2&gt;

&lt;p&gt;ksqlDB differs from other popular data-processing tools like Flink or Apache Spark&lt;sup&gt;Ⓡ&lt;/sup&gt; in its ability to build complete streaming applications with only a small set of SQL statements—you don’t need to write Java/Scala/Python in addition to SQL statements when using ksqlDB.&lt;/p&gt;

&lt;p&gt;ksqlDB has a simplified architecture and is deployed as a separate, scalable cluster. The interface for event capturing, processing, and query serving is combined into a single system.&lt;/p&gt;

&lt;p&gt;Let’s take a look at how Redpanda and ksqlDB can be used together to build a stream-processing application.&lt;/p&gt;

&lt;h2&gt;
  
  
  Integrating ksqlDB with Redpanda
&lt;/h2&gt;

&lt;p&gt;To set the scene, imagine that you have a database that stores emergency calls made by residents of different locations. It contains their names, emergency type, and area code. You frequently make a few specific queries, and you want to move those out of the database, precompute them, and store the results for fast access. Here, you can leverage the power of ksqlDB (computing) with Redpanda (storage) to build a materialized cache for quick access to the data.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/52caOQM3jUYPkMXfDYraTM/7a3275a204a74ffb15dac43af09f2243/ksqldb_and_redpanda.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/52caOQM3jUYPkMXfDYraTM/7a3275a204a74ffb15dac43af09f2243/ksqldb_and_redpanda.png" alt="ksqldb and redpanda"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This tutorial will walk you through creating this materialized cache using Redpanda, ksqlDB server, and ksqlDB CLI and show you how to query it.&lt;/p&gt;

&lt;p&gt;Specifically, you’ll learn how to do the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Install Redpanda&lt;/li&gt;
&lt;li&gt;Install ksqlDB&lt;/li&gt;
&lt;li&gt;Configure ksqlDB to ingest data from Redpanda&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Prerequisites
&lt;/h3&gt;

&lt;p&gt;Before getting started, you’ll need to have the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://docs.docker.com/get-docker/" rel="nofollow noopener noreferrer"&gt;Docker&lt;/a&gt; and &lt;a href="https://docs.docker.com/compose/install/" rel="nofollow noopener noreferrer"&gt;Docker Compose&lt;/a&gt; installed&lt;/li&gt;
&lt;li&gt;Familiarity with Apache Kafka or other messaging systems (recommended for Redpanda)&lt;/li&gt;
&lt;li&gt;Familiarity with SQL syntax (recommended for ksqlDB)&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up the stack
&lt;/h2&gt;

&lt;p&gt;The image below is a schematic view of data flow within the system you’d use to process external data. To connect to the external sources, you’d have to set up your &lt;a href="https://docs.ksqldb.io/en/latest/concepts/connectors/" rel="nofollow noopener noreferrer"&gt;connectors&lt;/a&gt;. This tutorial doesn’t connect to any external source and uses mock data for the sake of simplicity.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/1yQ15QcJWQsidVsLHuO1MM/80e9f40a8ab85d420a00801cb19b7506/ksqldb_2.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/1yQ15QcJWQsidVsLHuO1MM/80e9f40a8ab85d420a00801cb19b7506/ksqldb_2.png" alt="ksqldb 2"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Installing Redpanda
&lt;/h2&gt;

&lt;p&gt;You can follow the detailed steps for installing Redpanda from the &lt;a href="https://docs.redpanda.com/docs/quickstart/"&gt;official documentation&lt;/a&gt; on your platform of choice. In this tutorial, you’ll install Redpanda using Docker Compose.&lt;/p&gt;

&lt;p&gt;First, add the following configurations to a &lt;code&gt;docker-compose.yml&lt;/code&gt; file to install Redpanda &lt;a href="https://hub.docker.com/r/vectorized/redpanda" rel="nofollow noopener noreferrer"&gt;from its Docker image&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: '3.9'
services:
  redpanda:
    command:
    - redpanda
    - start
    - --smp
    - '1'
    - --reserve-memory
    - 0M
    - --overprovisioned
    - --set
    - redpanda.cluster_id=turning-red
    - --set 
    - redpanda.enable_idempotence=true
    - --set 
    - redpanda.enable_transactions=true
    - --set
    - redpanda.auto_create_topics_enabled=true
    - --node-id
    - '0'
    - --kafka-addr
    - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
    - --advertise-kafka-addr
    - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
    image: docker.vectorized.io/vectorized/redpanda:v21.11.11
    container_name: redpanda
    ports:
    - 9092:9092
    - 29092:29092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, run the command below in the root directory of your &lt;code&gt;docker-compose&lt;/code&gt; file to start a local Redpanda cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that your Redpanda cluster is running, you can do some test streaming.&lt;/p&gt;

&lt;h3&gt;
  
  
  Starting Redpanda
&lt;/h3&gt;

&lt;p&gt;Run the command below to access the Redpanda Docker container’s command line:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda /bin/sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then run the following command to create a &lt;code&gt;calls&lt;/code&gt; topic (note the use of the &lt;a href="https://docs.redpanda.com/docs/reference/rpk-commands/"&gt;rpk&lt;/a&gt; command-line utility for Redpanda):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ rpk topic create calls --brokers=localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, produce a message on the topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ rpk topic produce calls --brokers=localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Input some text into the topic, and once you’re finished, press &lt;strong&gt;Ctrl+C&lt;/strong&gt; to exit the prompt.&lt;/p&gt;

&lt;p&gt;Finally, consume the messages on the topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ rpk topic consume calls --brokers=localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Below is a sample output when consuming the messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;##Output
{
  "topic": "calls",
  "value": "3",
  "timestamp": 1650692216007,
  "partition": 0,
  "offset": 2
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That’s it! You now have enough Redpanda knowledge to leverage the power of ksqlDB. Before moving on, do some cleanup by running the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker compose down
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This command stops and removes the Redpanda container.&lt;/p&gt;

&lt;h2&gt;
  
  
  Installing ksqlDB
&lt;/h2&gt;

&lt;p&gt;First, add the services below in your &lt;code&gt;docker-compose.yml&lt;/code&gt; file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; ksqldb-server:
    image: confluentinc/ksqldb-server:0.25.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - redpanda
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "redpanda:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"  

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.25.1
    container_name: ksqldb-cli
    depends_on:
      - redpanda
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code snippet creates two containers—the ksqlDB server and ksqlDB CLI—from their respective Docker images. The ksqlDB server is where your application runs, and the ksqlDB CLI allows you to interact with the server.&lt;/p&gt;

&lt;p&gt;Then run the command below in the root directory of your &lt;code&gt;docker-compose.yml&lt;/code&gt; file to start all three services:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, run the following command to check if the containers are running as expected:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker stats
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If everything is okay, you should have three containers running: &lt;code&gt;ksqldb-cli&lt;/code&gt;, &lt;code&gt;ksqldb-server&lt;/code&gt;, and &lt;code&gt;redpanda&lt;/code&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Starting ksqlDB
&lt;/h3&gt;

&lt;p&gt;To start ksqlDB and access its interface, run the command below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see something similar to this:&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/7cpNCB84CF8ZdNMvpCDGFa/3909dd0c7f2099276ebbb0951363065c/ksqldb_3.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/7cpNCB84CF8ZdNMvpCDGFa/3909dd0c7f2099276ebbb0951363065c/ksqldb_3.png" alt="ksqldb 3"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If the server isn’t responding, give it a while, exit the ksqlDB CLI, and then retry.&lt;/p&gt;

&lt;h2&gt;
  
  
  Configuring ksqlDB to ingest data from Redpanda
&lt;/h2&gt;

&lt;p&gt;Now that your stack is running, it’s time to execute some ksqlDB code. You’ll use the ksqlDB CLI to interact with the server.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating a stream
&lt;/h3&gt;

&lt;p&gt;Before you create your stream, enter the command below in the running instance of ksqlDB CLI:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SHOW TOPICS;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This displays a list of existing topics. At this point, you will see only default topics. You can now create a stream that matches the data in your database as shown below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE STREAM emergencies (name VARCHAR, reason VARCHAR, area VARCHAR)
  WITH (kafka_topic='call-center', value_format='json', partitions=1);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This command creates not only a stream but also a Redpanda topic named &lt;code&gt;call-center,&lt;/code&gt; if it does not already exist. If the topic does already exist, the command defines the stream, which can then be selected from with SQL syntax.&lt;/p&gt;

&lt;p&gt;Running the &lt;code&gt;SHOW TOPICS;&lt;/code&gt; command displays the newly created topic.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating materialized views
&lt;/h3&gt;

&lt;p&gt;To keep track of certain logic, you need to create a materialized view for the logic. Run the following commands in the ksqlDB CLI instance to do so.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;location_of_interest&lt;/code&gt; materialized view counts the number of distinct areas, identifies the latest area of the emergency call, and then groups the rows returned by the reason for the call:&lt;/p&gt;

&lt;p&gt;// RUN 1&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE location_of_interest AS
    SELECT reason,
           count_distinct(area) AS distinct_pings,
           latest_by_offset(area) AS last_location
    FROM emergencies
    GROUP BY reason
    EMIT CHANGES;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;call_record&lt;/code&gt; materialized view counts the number of times a resident called based on the reason and groups them by the resident’s name:&lt;/p&gt;

&lt;p&gt;// RUN 2&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE call_record AS
    SELECT name,
           count(reason) AS total_emergencies
    FROM emergencies
    GROUP BY name
    EMIT CHANGES;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Adding mock data
&lt;/h3&gt;

&lt;p&gt;Now that you have a topic, a stream linked to your topic, and a materialized view to make your queries persistent, you can add some mock data to test your application.&lt;/p&gt;

&lt;p&gt;First, open a new terminal and open the Redpanda terminal using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda /bin/sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then you can produce messages on the topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ rpk topic produce call-center --brokers=localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Add the messages below in the terminal. Each message is produced to a partition and given a timestamp.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"name":"Liam", "reason": "allergy", "area": "Florida"}
{"name":"Fiona", "reason": "dizziness", "area": "Orlando"}
{"name":"Mike", "reason": "pain", "area": "Florida"}
{"name":"Louise", "reason": "allergy", "area": "Orlando"}
{"name":"Steven", "reason": "stroke", "area": "New York"}
{"name":"Liam", "reason": "pain", "area": "Florida"}
{"name":"Louise", "reason": "dizziness", "area": "Hawai"}
{"name":"Ivor", "reason": "choking", "area": "New York"}
{"name":"Louise", "reason": "pain", "area": "Florida"}
{"name":"Beckham", "reason": "allergy", "area": "New York"}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You are now ready to test your application by running some queries.&lt;/p&gt;

&lt;h3&gt;
  
  
  Running queries
&lt;/h3&gt;

&lt;p&gt;Before you run any query, set the property below to ensure the queries run from the beginning of the topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SET 'auto.offset.reset' = 'earliest';
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To run a query that terminates immediately after it has returned the results, test with the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT * FROM location_of_interest
WHERE reason = 'allergy';
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To run a query that keeps running and updates the results as more data comes in, use this command (note the use of the “EMIT CHANGES” clause):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT * FROM call_record
WHERE name = 'Louise' EMIT CHANGES;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you open a new ksqlDB server and add some more mock data, the query above will update with the new data.&lt;/p&gt;

&lt;p&gt;You can view failed ksqlDB messages by adding the following statements in your &lt;code&gt;docker-compose.yml&lt;/code&gt; file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;environment:
    …
    KSQL_LOG4J_ROOT_LOGLEVEL: "ERROR"
    KSQL_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
    KSQL_LOG4J_PROCESSING_LOG_BROKERLIST: kafka:29092
    KSQL_LOG4J_PROCESSING_LOG_TOPIC: &amp;lt;ksql-processing-log-topic-name&amp;gt;
    KSQL_KSQL_LOGGING_PROCESSING_TOPIC_NAME: &amp;lt;ksql-processing-log-topic-name&amp;gt;
    KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
    KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To stop and remove the containers, run the command below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose down
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Now that you've been proprly introduced to ksqlDB and how to use it with Redpanda, you can take what you've learned in this tutorial and create a data stream-processing application with a materialized cache for any number of use cases.&lt;/p&gt;

&lt;p&gt;As you saw, ksqlDB is easy to install and configure, and it lets you run standard SQL queries.&lt;/p&gt;

&lt;p&gt;All the code in this tutorial can be found on &lt;a href="https://github.com/redpanda-data-blog/2022-ksqlDB-stream-processing" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;. Try out Redpanda using the tutorial, interact with Redpanda’s developers directly in &lt;a href="https://redpanda.com/slack" rel="nofollow noopener noreferrer"&gt;the Redpanda Community on Slack&lt;/a&gt;, or contribute to Redpanda’s &lt;a href="https://github.com/redpanda-data/redpanda/" rel=" noopener noreferrer"&gt;source-available GitHub repo here&lt;/a&gt;. To learn more about everything you can do with Redpanda, check out &lt;a href="https://docs.redpanda.com/docs/home"&gt;our documentation here&lt;/a&gt;. &lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>beginners</category>
      <category>kafka</category>
      <category>redpanda</category>
    </item>
    <item>
      <title>Change data capture with CockroachDB and Redpanda</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 07 Sep 2022 18:15:31 +0000</pubDate>
      <link>https://dev.to/redpanda-data/change-data-capture-with-cockroachdb-and-redpanda-4peg</link>
      <guid>https://dev.to/redpanda-data/change-data-capture-with-cockroachdb-and-redpanda-4peg</guid>
      <description>&lt;p&gt;In the cloud-native era, applications have gradually transformed towards a more distributed, less coupled architecture. Monolithic architectures have evolved into microservices, and microservices are evolving into ever smaller services or functions.&lt;/p&gt;

&lt;p&gt;Apart from all the benefits of distributed architecture — &lt;a href="https://en.wikipedia.org/wiki/Separation_of_concerns" rel="nofollow noopener noreferrer"&gt;like separation of concerns&lt;/a&gt; — this approach can have drawbacks, one of which is the data itself.&lt;/p&gt;

&lt;p&gt;Data becomes a real problem when you want to share it in a distributed system. For monolithic applications, it used to be the case that you’d have a single database as a cluster with replicated nodes, but things have changed.&lt;/p&gt;

&lt;p&gt;Distributed applications like microservices need their dedicated databases or any other middlewares as a data store, such as a dedicated cache system like &lt;a href="https://redis.io/docs/about/" rel="nofollow noopener noreferrer"&gt;Redis&lt;/a&gt; or a search engine like &lt;a href="https://www.elastic.co/what-is/elasticsearch" rel="nofollow noopener noreferrer"&gt;Elasticsearch&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/4LAT6pxJe05l86YMYObWxY/9e52c6fdc3e851d94640b58d1f3458c4/cockroach_1.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/4LAT6pxJe05l86YMYObWxY/9e52c6fdc3e851d94640b58d1f3458c4/cockroach_1.png" alt="cockroach 1"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Because of the distributed architecture, you need to keep the same data in different databases or middleware systems, and you must keep this data consistent. In most cases, developers try to do so by doing dual writes.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/6KecY43hfE41LewJ7onkax/c77bf2a0108f5ad38f7be773c4aa81da/cockroach_2.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/6KecY43hfE41LewJ7onkax/c77bf2a0108f5ad38f7be773c4aa81da/cockroach_2.png" alt="cockroach 2"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A dual write happens when an application changes the same data in two different systems without any layer for data consistency, like transactions or distributed transactions. Not every system supports distributed transactions, so you can not guarantee data consistency in those cases.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/01D5cNDN4NRJnLE9BCs0o9/16d4d614fbc484ac3d5d1df8016839da/cockroach_3.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/01D5cNDN4NRJnLE9BCs0o9/16d4d614fbc484ac3d5d1df8016839da/cockroach_3.png" alt="cockroach 3"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, change data capture (CDC), a data integration pattern, enables capturing row-level changes into a configurable sink for downstream processing such as reporting, caching, full-text indexing, or — most importantly — helping avoid dual writes and ensuring data durability and consistency.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://www.cockroachlabs.com/" rel="nofollow noopener noreferrer"&gt;CockroachDB&lt;/a&gt;, a distributed and reliable database, &lt;a href="https://www.cockroachlabs.com/docs/stable/change-data-capture-overview.html" rel="nofollow noopener noreferrer"&gt;supports CDC&lt;/a&gt; via its &lt;a href="https://www.cockroachlabs.com/docs/v21.2/create-changefeed" rel="nofollow noopener noreferrer"&gt;Changefeeds&lt;/a&gt;. CockroachDB provides Changefeeds for data sinks like &lt;a href="https://aws.amazon.com/s3/" rel="nofollow noopener noreferrer"&gt;AWS S3&lt;/a&gt;, &lt;a href="https://en.wikipedia.org/wiki/Webhook" rel="nofollow noopener noreferrer"&gt;webhooks&lt;/a&gt;, or an Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; API-compatible streaming data system like Redpanda.&lt;/p&gt;

&lt;p&gt;Redpanda provides a fast and fault-tolerant, safe by default system while being fully compatible with Kafka API. You can use CockroachDB CDC to capture changes and stream into Redpanda in order to implement any vital CDC use case more efficiently, such as reporting, avoiding dual writes, or most importantly, keeping the data consistency through the shards of CockroachDB: Because CockroachDB has a distributed architecture, keeping the transactional jobs consistent through its shards is crucial. CDC with Changefeeds provides emitting changes to sinks like Kafka or Redpanda as a solution for this.&lt;/p&gt;

&lt;p&gt;The CDC mechanism of CockroachDB not only provides a data capturing mechanism but also an integration point to Redpanda, which can stream the captured change events to other data points like data warehouses, OLAP databases, or search engines.&lt;/p&gt;

&lt;p&gt;In this article, you will learn how to stream CDC from CockroachDB to Redpanda by completing a tutorial involving the following steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Run a Redpanda cluster in a containerized way using Docker.&lt;/li&gt;
&lt;li&gt;Create a topic within Redpanda using its &lt;code&gt;rpk&lt;/code&gt; CLI.&lt;/li&gt;
&lt;li&gt;Install CockroachDB and use its SQL client.&lt;/li&gt;
&lt;li&gt;Create a table on CockroachDB and configure it for using CDC.&lt;/li&gt;
&lt;li&gt;Create, update, and delete records in the CockroachDB table.&lt;/li&gt;
&lt;li&gt;Consume the change events from the relevant Redpanda topic using the &lt;code&gt;rpk&lt;/code&gt; CLI.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you’d like to follow along in your own editor, you can access all the resources for this tutorial in &lt;a href="https://github.com/redpanda-data-blog/2022-cdc-with-cockroachdb" rel="nofollow noopener noreferrer"&gt;this repository&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;To complete this tutorial, you’ll need the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A macOS environment with the Homebrew package manager &lt;a href="https://docs.brew.sh/Installation" rel="nofollow noopener noreferrer"&gt;installed&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;A recent version of &lt;a href="https://docs.docker.com/get-docker/" rel="nofollow noopener noreferrer"&gt;Docker&lt;/a&gt; installed on your machine. (Docker Desktop 4.6.1 was used at the time of writing this article.)&lt;/li&gt;
&lt;li&gt;A &lt;a href="https://www.cockroachlabs.com/get-cockroachdb/enterprise/" rel="nofollow noopener noreferrer"&gt;30-day trial license&lt;/a&gt; for CockroachDB, which is required in order to use CDC capabilities.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Use case: Change data capture with CockroachDB and Redpanda
&lt;/h2&gt;

&lt;p&gt;Suppose that you are a contractor who is about to sign a five-year contract with a potential customer, the PandaBank. Before closing the deal, they would like you to accomplish a small task of theirs as an assignment to see if you are suitable for the work.&lt;/p&gt;

&lt;p&gt;PandaBank uses CockroachDB internally, and most of the daily account transactions are kept in this database. Currently, they have a mechanism for indexing the account transaction changes in Elasticsearch, but they noticed that it creates data inconsistencies between the actual data and the indexed log data that is in Elasticsearch.&lt;/p&gt;

&lt;p&gt;They want you to create a base mechanism to avoid any data inconsistency issues between the systems. They require you to create a basic implementation of a CDC using CockroachDB’s changefeed mechanism and Redpanda for a durable, Kafka API-compliant messaging system.&lt;/p&gt;

&lt;p&gt;For their assignment, you don’t need to implement the Elasticsearch part, just the CDC part. You are responsible for creating a CockroachDB instance and a Redpanda instance on your local machine. Because PandaBank runs Redpanda on Docker, you’ll need to do so as well.&lt;/p&gt;

&lt;p&gt;The following image shows the architectural diagram of the system they require you to implement:&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/21W2Ka3fmPL0LqEaOnktP/5a688a6a707aab32c9df8944d6085be8/cockroach_4.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/21W2Ka3fmPL0LqEaOnktP/5a688a6a707aab32c9df8944d6085be8/cockroach_4.png" alt="cockroach 4"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Running Redpanda
&lt;/h2&gt;

&lt;p&gt;In this tutorial, you will run Redpanda in a container via Docker.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; For more information on installing Redpanda on other platforms, refer to this &lt;a href="https://docs.redpanda.com/docs/quickstart/"&gt;documentation&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Make sure that you have installed Docker and started the Docker daemon in your macOS environment. Then, open a terminal window and run the following command to run Redpanda:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker run -d --pull=always --name=redpanda-1 --rm \
    -p 9092:9092 \
    -p 9644:9644 \
    docker.vectorized.io/vectorized/redpanda:latest \
    redpanda start \
    --overprovisioned \
    --smp 1  \
    --memory 1G \
    --reserve-memory 0M \
    --node-id 0 \
    --check=false
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Redpanda will be accessible via &lt;code&gt;localhost:9092&lt;/code&gt; on your computer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Installing and running CockroachDB
&lt;/h2&gt;

&lt;p&gt;In order to install CockroachDB on your local macOS environment, run the following command:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; Make sure you’ve already installed the Homebrew package manager, as noted in the prerequisites.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;code&gt;brew install cockroachdb/tap/cockroach&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;After installing CockroachDB, run a single-node cluster that is in insecure mode:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; For information on starting a CockroachDB cluster in secure mode, you can refer to &lt;a href="https://www.cockroachlabs.com/docs/stable/secure-a-cluster.html" rel="noopener noreferrer"&gt;&lt;br&gt;
this documentation&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;code&gt;cockroach start-single-node --insecure&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;In another terminal, run the following command to access the CockroachDB SQL client interface:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;cockroach sql --insecure&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;On the client interface, run the following commands to enable enterprise usage since CDC is an Enterprise Changefeed. Refer to the prerequisites section if you have not signed up for a trial license yet.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SET CLUSTER SETTING cluster.organization = '_YOUR_ORGANIZATION_';
SET CLUSTER SETTING enterprise.license = '_YOUR_LICENSE_';
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Creating and configuring the CockroachDB table
&lt;/h2&gt;

&lt;p&gt;On the terminal window where the SQL query client is open, run the following command to create a database called bank in CockroachDB:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;root@:26257/defaultdb&amp;gt; CREATE DATABASE bank;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Select the bank database to be used for the rest of the actions in the query window:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;root@:26257/defaultdb&amp;gt; USE bank;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Create a table called accounts with integer fields named id and balance:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;root@:26257/bank&amp;gt; CREATE TABLE accounts (id INT PRIMARY KEY, balance INT);&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Create a Changefeed for the table accounts. Set the Redpanda broker address for the Changefeed to configure it to send the captured change data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;root@:26257/bank&amp;gt; CREATE CHANGEFEED FOR TABLE accounts INTO 'kafka://localhost:9092' WITH UPDATED;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; For more information on creating a Changefeed on CockroachDB, refer to &lt;a href="https://www.cockroachlabs.com/docs/v21.2/create-changefeed" rel="noopener noreferrer"&gt;&lt;br&gt;
their documentation&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Leave the terminal window open for later use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating the Redpanda topic and consuming data
&lt;/h2&gt;

&lt;p&gt;In another terminal window, run the following command to create a Redpanda topic called accounts:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic create accounts
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;TOPIC     STATUS
accounts  OK
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that the topic has the same name as the CockroachDB table accounts. CockroachDB CDC produces data to a topic with the same name as the table by default.&lt;/p&gt;

&lt;p&gt;In the same terminal window, run the following command to start consuming from the accounts topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic consume accounts
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Leave the terminal window open to view the consumed messages in the following steps.&lt;/p&gt;

&lt;h2&gt;
  
  
  Capturing the change events
&lt;/h2&gt;

&lt;p&gt;In order to confirm that the CDC mechanism works, you must create, update, and delete some data in the accounts table. You’ll also observe and examine the captured events in the Redpanda accounts topic.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating the accounts data
&lt;/h3&gt;

&lt;p&gt;In the SQL client terminal window, run the following command to insert some data into the accounts table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;root@:26257/bank&amp;gt; INSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250), (3, 700);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This creates the following accounts in the CockroachDB accounts table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Account ID  Balance
1           1000
2           250
3           700

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After inserting the data, verify that the Redpanda CLI consumer prints out the consumed data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "accounts",
  "key": "[1]",
  "value": "{\"after\": {\"balance\": 1000, \"id\": 1}, \"updated\": \"1648496379523876000.0000000000\"}",
  "timestamp": 1648496379856,
  "partition": 0,
  "offset": 0
}
{
  "topic": "accounts",
  "key": "[2]",
  "value": "{\"after\": {\"balance\": 250, \"id\": 2}, \"updated\": \"1648496379523876000.0000000000\"}",
  "timestamp": 1648496379856,
  "partition": 0,
  "offset": 1
}
{
  "topic": "accounts",
  "key": "[3]",
  "value": "{\"after\": {\"balance\": 700, \"id\": 3}, \"updated\": \"1648496379523876000.0000000000\"}",
  "timestamp": 1648496379856,
  "partition": 0,
  "offset": 2
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that you have all the data from the accounts table as separate event logs in your Redpanda instance’s accounts topic.&lt;/p&gt;

&lt;h3&gt;
  
  
  Running account transactions
&lt;/h3&gt;

&lt;p&gt;The application development team of PandaBank shared a small containerized application with you that runs some transactions on these bank accounts. This application connects to CockroachDB over &lt;code&gt;localhost:26257&lt;/code&gt;, so be sure that CockroachDB is accessible in your local environment.&lt;/p&gt;

&lt;p&gt;Use the following command to run the transaction between accounts:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone git@github.com:redpanda-data-blog/2022-cdc-with-cockroachdb.git
cd 2022-cdc-with-cockroachdb/account_transaction_manager/
docker build -t account-transaction-manager .
docker run account-transaction-manager
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output of this command should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;DEBUG:root:print_balances(): status message: SELECT 3
Balances at Mon Mar 28 19:43:34 2022:
(1, 1000)
(2, 250)
(3, 700)
DEBUG:root:transfer_funds(): status message: UPDATE 1
DEBUG:root:transfer_funds(): status message: UPDATE 1
DEBUG:root:print_balances(): status message: SELECT 3
Balances at Mon Mar 28 19:43:35 2022:
(1, 700)
(2, 350)
(3, 900)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When you run the &lt;code&gt;SELECT * FROM accounts;&lt;/code&gt; command in the CockroachDB SQL client, you will see the following results:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Account ID  Balance
1           700
2           350
3           900
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Verify that the new changes captured by CockroachDB CDC are reflected to your Redpanda consumer. In the consumer’s terminal window, you should see the following result:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "accounts",
  "key": "[1]",
  "value": "{\"after\": {\"balance\": 900, \"id\": 1}, \"updated\": \"1648496614787637000.0000000000\"}",
  "timestamp": 1648496615283,
  "partition": 0,
  "offset": 3
}
{
  "topic": "accounts",
  "key": "[2]",
  "value": "{\"after\": {\"balance\": 350, \"id\": 2}, \"updated\": \"1648496614787637000.0000000000\"}",
  "timestamp": 1648496615283,
  "partition": 0,
  "offset": 4
}
{
  "topic": "accounts",
  "key": "[1]",
  "value": "{\"after\": {\"balance\": 700, \"id\": 1}, \"updated\": \"1648496614835272000.0000000000\"}",
  "timestamp": 1648496615283,
  "partition": 0,
  "offset": 5
}
{
  "topic": "accounts",
  "key": "[3]",
  "value": "{\"after\": {\"balance\": 900, \"id\": 3}, \"updated\": \"1648496614835272000.0000000000\"}",
  "timestamp": 1648496615283,
  "partition": 0,
  "offset": 6
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice the balance changes that represent the transaction history in the consumer output.&lt;/p&gt;

&lt;h3&gt;
  
  
  Deleting the accounts
&lt;/h3&gt;

&lt;p&gt;As the last step, delete the bank accounts and see how CDC captures them. Run the following SQL query in the query console of CockroachDB:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;root@:26257/bank&amp;gt; DELETE FROM bank.accounts where id &amp;lt;&amp;gt; 0;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The Redpanda consumer should have the following captured events:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "accounts",
  "key": "[1]",
  "value": "{\"after\": null, \"updated\": \"1648497640110587000.0000000000\"}",
  "timestamp": 1648497640200,
  "partition": 0,
  "offset": 6
}
{
  "topic": "accounts",
  "key": "[2]",
  "value": "{\"after\": null, \"updated\": \"1648497640110587000.0000000000\"}",
  "timestamp": 1648497640200,
  "partition": 0,
  "offset": 7
}
{
  "topic": "accounts",
  "key": "[3]",
  "value": "{\"after\": null, \"updated\": \"1648497640110587000.0000000000\"}",
  "timestamp": 1648497640200,
  "partition": 0,
  "offset": 8
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that the value field becomes null when the change event is a delete. CockroachDB CDC successfully captures data change events and sends them to Redpanda for further consumption.&lt;/p&gt;

&lt;p&gt;Congratulations! You’ve successfully captured the bank account transaction changes and made them consumable as events from Redpanda.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this tutorial, you’ve learned how to run Redpanda in a container using Docker and how to create a topic and consume messages from it. You also learned to install CockroachDB, create a table using its SQL query interface, and configure it for using CDC. &lt;/p&gt;

&lt;p&gt;You can now use CockroachDB to capture changes and stream them to Redpanda to implement CDC use cases such as reporting, caching, full-text indexing, avoiding dual writes, and much more.&lt;/p&gt;

&lt;p&gt;Find all the resources for this tutorial in &lt;a href="https://github.com/redpanda-data-blog/2022-cdc-with-cockroachdb"&gt;this repository&lt;/a&gt;, or &lt;a href="https://redpanda.com/slack"&gt;join the Redpanda Community on Slack&lt;/a&gt; to ask specific questions. View &lt;a href="https://docs.redpanda.com/"&gt;Redpanda’s documentation here&lt;/a&gt;. &lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>kafka</category>
      <category>beginners</category>
      <category>productivity</category>
    </item>
    <item>
      <title>Using GitHub Actions to automate development</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 31 Aug 2022 15:29:23 +0000</pubDate>
      <link>https://dev.to/redpanda-data/using-github-actions-to-automate-development-2mea</link>
      <guid>https://dev.to/redpanda-data/using-github-actions-to-automate-development-2mea</guid>
      <description>&lt;p&gt;If you’re already using Redpanda, then you know one of its most alluring draws is its intention to make data streaming development work as simple as possible. (And, if you aren’t already using Redpanda, you can learn how we deliver that simplicity &lt;a rel="noopener noreferrer" href="https://redpanda.com/blog/rpk-container/"&gt;in this blog post&lt;/a&gt;.) &lt;/p&gt;

&lt;p&gt;Having scorned complexity, the logical next step was to create an easy and efficient way to automate and test builds that depend on Redpanda. In this post, we’ll show you how to do just that, using the Redpanda GitHub Action. &lt;/p&gt;

&lt;h2&gt;
  
  
  What are GitHub Actions?
&lt;/h2&gt;

&lt;p&gt;Before digging into how to test our code, it’s worth understanding what a &lt;a rel="noopener noreferrer" href="https://github.com/features/actions"&gt;GitHub Action&lt;/a&gt; is. According to GitHub’s website, “GitHub Actions makes it easy to automate all your software workflows.”&lt;/p&gt;

&lt;p&gt;In our case, we'll focus specifically on the continuous integration workflow so that we can run automated tests &lt;a rel="noopener noreferrer" href="https://resources.github.com/ci-cd/"&gt;on GitHub CI&lt;/a&gt;. After you get your code running and working with Redpanda on your local development environment, how do you ensure that your teammates won’t introduce breaking changes to code? Running the automated tests on the CI will ensure it's working as expected.&lt;/p&gt;

&lt;p&gt;Sometimes it makes sense to isolate third-party dependencies in the software architecture, but what if we want to test using a &lt;em&gt;real&lt;/em&gt; Redpanda instance? That's the main reason why the Action was created: to bring all the Redpanda power to the GitHub CI environment. It makes the test suite faster and more reliable.&lt;/p&gt;

&lt;p&gt;Even if you aren’t using Redpanda on &lt;a rel="noopener noreferrer" href="https://docs.redpanda.com/docs/deployment/production-deployment/"&gt;your production environment&lt;/a&gt;, you can benefit from its drop-in replacement for other Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; distributions and take advantage of faster boot times and less RAM usage. The Redpanda GitHub Action means saving build minutes (and CI costs) by using it.&lt;/p&gt;

&lt;p&gt;So how do you use it? Let's configure it together.&lt;/p&gt;

&lt;h2&gt;
  
  
  Local development
&lt;/h2&gt;

&lt;p&gt;Disclaimer: The &lt;a rel="noopener noreferrer" href="https://www.ruby-lang.org/en/"&gt;Ruby&lt;/a&gt; language is an entirely arbitrary choice. You can write the code in any language you prefer.&lt;/p&gt;

&lt;p&gt;All the code discussed here is available on the &lt;a rel="noopener noreferrer" href="https://github.com/fernandes/Redpanda-action-demo"&gt;Redpanda-action-demo&lt;/a&gt; repository.&lt;/p&gt;

&lt;p&gt;Here we have a test suite that performs two tests:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;It publishes to a Redpanda topic&lt;/li&gt;
&lt;li&gt;It fetches the message &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Both tests are supported by a setup that connects to Redpanda using the &lt;a rel="noopener noreferrer" href="https://rubygems.org/gems/ruby-kafka"&gt;ruby-kafka gem&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Note: Always use &lt;code&gt;localhost:9092&lt;/code&gt; as your Redpanda address. It works locally, and GitHub CI will spin up the Docker image and bind its port to 9092.&lt;/p&gt;

&lt;h2&gt;
  
  
  Continuous Integration
&lt;/h2&gt;

&lt;p&gt;To use the Redpanda GitHub Action, you’ll need to configure GitHub CI to run the test. This is configured in &lt;code&gt;.github/workflows/ci.yml&lt;/code&gt;. Here are its contents, which we will talk more about below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# the name of our job
name: CI

# yes, we want to run for all branches and pull requests
on:
  push:
    branches: "*"
  pull_request:
    branches: "*"

# we have just our job `test`
jobs:
  test:
    runs-on: ubuntu-latest
    # here is the main section for us, where we spin up the Redpanda instance
    # using the Redpanda GitHub action,pay attention on the `.with.version` key, we are using _latest_ but you can use any Redpanda version
    # tip: version is exactly the same as the Redpanda docker image
    steps:
    - name: start Redpanda
        uses: Redpanda-data/github-action@v0.1.3
        with:
        version: "latest"
    - uses: actions/checkout@v2
    # below is how we setup ruby and run the tests using `rake`
    - name: Set up Ruby
        uses: ruby/setup-ruby@359bebbc29cbe6c87da6bc9ea3bc930432750108
        with:
        ruby-version: '3.0'
        bundler-cache: true
    - name: Install dependencies
        run: bundle install
    - name: Run tests
        run: bundle exec rake
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After committing this file and pushing it to a branch, GitHub CI will run the test suite automatically and give you feedback about the changes on your pull request, showing you if the test failed or was successful.&lt;/p&gt;

&lt;p&gt;Using the Redpanda GitHub Action is a straightforward way to get Kafka-based projects tested. It's simple, using just one Docker image with everything you need. There’s no JVM, no Zookeeper&lt;sup&gt;Ⓡ&lt;/sup&gt;: it's an all-in-one solution. And it's super fast! Redpanda will boot up and be ready to work in a few seconds. It's a huge win in the developer experience.&lt;/p&gt;

&lt;p&gt;When compared to &lt;a rel="noopener noreferrer" href="https://rubygems.org/gems/ruby-kafka"&gt;a popular Kafka Docker image&lt;/a&gt;, the Redpanda GitHub Action offers smaller RAM usage (47MB vs. 465MB) and smaller Docker image size (only 130MB vs. 465MB). Considering GitHub Actions is billed by minutes usage, the smaller the footprint, the cheaper it's to run the tests.&lt;/p&gt;

&lt;p&gt;After getting set up and running the tests locally, we now know that everything is working properly and can now publish to our repo and collaborate.&lt;/p&gt;

&lt;h2&gt;
  
  
  How will you use the Redpanda GitHub Action?
&lt;/h2&gt;

&lt;p&gt;We want to hear how you use your newfound knowledge about the Redpanda Action on Github CI. Join &lt;a rel="noopener noreferrer" href="https://rpnda.co/slack"&gt;the community Slack&lt;/a&gt; to share your ideas and experience, and check out &lt;a rel="noopener noreferrer" href="https://docs.redpanda.com/"&gt;Redpanda’s documentation&lt;/a&gt; for information on the other things you can do with Redpanda and your applications. &lt;/p&gt;

&lt;p&gt;For more information on the Redpanda GitHub Action, or to report an issue, please see &lt;a rel="noopener noreferrer" href="https://github.com/redpanda-data/github-action"&gt;the GitHub repository for the project&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;You can also visit &lt;a rel="noopener noreferrer" href="https://github.com/fernandes/redpanda-action-demo"&gt;the repository for the demo in this article&lt;/a&gt;, where you’ll find the code from this article and a sample test suite that uses the Github Action in a real-world scenario.&lt;/p&gt;

&lt;p&gt;Happy testing!&lt;/p&gt;

</description>
      <category>productivity</category>
      <category>devops</category>
      <category>github</category>
      <category>redpanda</category>
    </item>
    <item>
      <title>Clickstream data analysis with Databricks and Redpanda</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 24 Aug 2022 17:36:16 +0000</pubDate>
      <link>https://dev.to/redpanda-data/clickstream-data-analysis-with-databricks-and-redpanda-6d4</link>
      <guid>https://dev.to/redpanda-data/clickstream-data-analysis-with-databricks-and-redpanda-6d4</guid>
      <description>&lt;p&gt;Global organizations need a way to process the massive amounts of data they produce for real-time decision making. They often utilize event-streaming tools like Redpanda with stream-processing tools like &lt;a href="https://databricks.com/" rel="nofollow noopener noreferrer"&gt;Databricks&lt;/a&gt; for this purpose. &lt;/p&gt;

&lt;p&gt;An example use case is recommending content to users based on their clicks on a mobile or web app. The clickstreams will be streamed through Redpanda to Databricks, where a recommendation engine will analyze their data and recommend content:&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/6j0zMK5LSd4cFKCiZapMXH/416fb4510f377e3ce61c912594a08f97/databricks_1.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/6j0zMK5LSd4cFKCiZapMXH/416fb4510f377e3ce61c912594a08f97/databricks_1.png" alt="databricks 1"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Redpanda is a fast and scalable real-time event streaming platform that serves as an Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; alternative. It’s API-compatible with Kafka, so all of your existing tooling with Kakfa works with Redpanda, too. It also ships as a single binary and can run on a virtual machine, Docker, and Kubernetes.&lt;/p&gt;

&lt;p&gt;This tutorial covers event streaming and data analytics using Redpanda and Databricks. You will learn how to produce data to a Redpanda topic from a shell, store the produced data in CSV files within Databricks, and analyze the data in real-time to obtain insights.&lt;/p&gt;

&lt;h2&gt;
  
  
  Using Redpanda to send data to Databricks
&lt;/h2&gt;

&lt;p&gt;Let’s get started!&lt;/p&gt;

&lt;p&gt;First, the prerequisites. To complete this tutorial, you’ll need the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A physical or virtual machine with a publicly accessible IP address&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://docs.docker.com/get-docker/" rel="nofollow noopener noreferrer"&gt;Docker&lt;/a&gt; and docker-compose installed on that virtual machine&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/edenhill/kcat" rel="nofollow noopener noreferrer"&gt;Kafkacat&lt;/a&gt; (or any client compatible with the Kafka API) for connecting to Redpanda as a producer&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up Redpanda
&lt;/h2&gt;

&lt;p&gt;To set up Redpanda, create a &lt;code&gt;docker-compose.yml&lt;/code&gt; file in a server that can be accessed over the internet. This ensures that the Redpanda broker can communicate with your deployed Databricks instance:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;3.7"&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;redpanda&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;redpanda&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;start&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--smp&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;1"&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--reserve-memory&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;0M&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--overprovisioned&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--node-id&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0"&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--kafka-addr&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--advertise-kafka-addr&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;docker.vectorized.io/vectorized/redpanda:latest&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redpanda-1&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9092:9092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;29092:29092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8081:8081&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start the Redpanda container by changing directories to the directory containing the &lt;code&gt;docker-compose.yml&lt;/code&gt; file and execute the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This operation will pull the Redpanda Docker image and start Redpanda on port 9092. Ensure your virtual machine instance has a static public IP address and that port 9092 is public. &lt;/p&gt;

&lt;p&gt;Refer to the following guides to add a public IP address and port on your virtual machine instance:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://aws.amazon.com/premiumsupport/knowledge-center/ec2-associate-static-public-ip/" rel="nofollow noopener noreferrer"&gt;Adding a public IP address on Amazon EC2&lt;/a&gt; &lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.microsoft.com/en-us/azure/virtual-network/ip-services/associate-public-ip-address-vm" rel="nofollow noopener noreferrer"&gt;Adding a public IP address on Azure Virtual Machine &lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address" rel="nofollow noopener noreferrer"&gt;Adding a public IP address on Google Compute Engine&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up Databricks
&lt;/h2&gt;

&lt;p&gt;To get started, &lt;a href="https://databricks.com/try-databricks" rel="nofollow noopener noreferrer"&gt;create a Databricks account&lt;/a&gt; (Your account is free for a 14-day trial period). After filling in your account details, you'll be redirected to choose a cloud provider. Go ahead with your preferred cloud provider, choosing the appropriate setup instructions from the list below:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://docs.databricks.com/getting-started/account-setup.html" rel="nofollow noopener noreferrer"&gt;Setting up Databricks on AWS&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.microsoft.com/en-us/azure/databricks/scenarios/quickstart-create-databricks-workspace-portal?tabs=azure-portal" rel="nofollow noopener noreferrer"&gt;Setting up Databricks on Azure&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.gcp.databricks.com/getting-started/try-databricks-gcp.html" rel="nofollow noopener noreferrer"&gt;Setting up Databricks on GCP&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/7njzynPxeBjQjlQKmUBDkW/6656adac0cdce8a39baa5b0603fcf307/databricks_2.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/7njzynPxeBjQjlQKmUBDkW/6656adac0cdce8a39baa5b0603fcf307/databricks_2.png" alt="databricks 2"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After a successful setup, you should land on a dashboard with links to various aspects of Databricks.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/7mZtFvqiJ2ufJonAq4ihHp/a93c500d1f61dce351a605c2ef1bc026/databricks_3.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/7mZtFvqiJ2ufJonAq4ihHp/a93c500d1f61dce351a605c2ef1bc026/databricks_3.png" alt="databricks 3"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Your first task is to create a Databricks cluster. A cluster is a set of computational resources and configurations that lets you run data science and engineering workloads. In this case, you’ll be running a data engineering workload to stream data from a Redpanda topic to a CSV file within Databricks.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/66i9aVLkoPf0piYLVXiv0n/a6faada1914f6f58533c05a1f5d9987f/New_website_image_size.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/66i9aVLkoPf0piYLVXiv0n/a6faada1914f6f58533c05a1f5d9987f/New_website_image_size.png" alt="databricks 4"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;On the dashboard, click on the Create button at the top left to create a new cluster. By clicking on Cluster, you’ll be taken to a page to configure your cluster’s properties. Choose a name for your cluster and leave the other fields unchanged; then, click on Create Cluster.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/2bec2gXpdIQLuw5BUodm5O/5673cf51563c7c02d9c5332bee058f4e/New_website_image_size__1_.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/2bec2gXpdIQLuw5BUodm5O/5673cf51563c7c02d9c5332bee058f4e/New_website_image_size__1_.png" alt="Databricks new cluster"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You’ll be using a Databricks notebook to carry out all the tasks in this tutorial. A Databricks notebook is an interface that can contain runnable code, documentation, and visualization, similar to a Jupyter notebook. This notebook will serve as the scratchpad for running your commands.&lt;br&gt;
Again, click on the Create button at the top left of your dashboard and this time select the option to create a new Notebook. Choose a descriptive name like &lt;code&gt;redpanda-kconnect-scratchpad&lt;/code&gt; and set Scala as the default language.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/1MYcbBTfO99rurgklo2pVd/97ccf6e8605d27b5ad9021bfaa9c94de/databricks_6.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/1MYcbBTfO99rurgklo2pVd/97ccf6e8605d27b5ad9021bfaa9c94de/databricks_6.png" alt="databricks 6"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Setting up streaming in Databricks
&lt;/h2&gt;

&lt;p&gt;After setting up your first notebook, paste the content below in the notebook’s first cell:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nx"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.{&lt;/span&gt;&lt;span class="nx"&gt;get_json_object&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;json_tuple&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="kd"&gt;var&lt;/span&gt; &lt;span class="nx"&gt;streamingInputDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="nx"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;readStream&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;kafka&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;kafka.bootstrap.servers&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;SERVER_IP:9092&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;subscribe&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;csv_input&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;startingOffsets&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;latest&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;minPartitions&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;10&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;failOnDataLoss&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;true&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;$&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;cast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;string&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;as&lt;/span&gt;&lt;span class="p"&gt;[(&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The code snippet above creates a streaming data frame assigned to the variable &lt;code&gt;streamingInputDF&lt;/code&gt;. This data frame subscribes to the topic of interest, &lt;code&gt;csv_input&lt;/code&gt;, in the Redpanda cluster. The cluster is identified by the server IP and port. The port in this case is &lt;code&gt;9092&lt;/code&gt;, the same port that Kafka exposes.&lt;/p&gt;

&lt;p&gt;Replace &lt;code&gt;SERVER_IP&lt;/code&gt; with the deployed IP address of your server. After setting the &lt;code&gt;SERVER_IP&lt;/code&gt;, run the cell to initialize the configuration. You should get an output similar to the one below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;streamingInputDF:org.apache.spark.sql.Dataset[String] = [value: string]
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
streamingInputDF: org.apache.spark.sql.Dataset[String] = [value: string]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In order to save data in Databricks, you need to define a write stream to a file. This write stream should be of the same file type as the input stream. The snippet below reads data from the &lt;code&gt;streamingInput&lt;/code&gt; data frame and writes it to a CSV file. The write operation is performed every thirty seconds and all new entries to the Redpanda topic will be read and written to a new CSV file.&lt;/p&gt;

&lt;p&gt;Create a second cell and add the content in the snippet below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nx"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streaming&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;Trigger&lt;/span&gt;

&lt;span class="nx"&gt;val&lt;/span&gt; &lt;span class="nx"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="nx"&gt;streamingInputDF&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;writeStream&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;csv&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;outputMode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;append&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;path&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;/FileStore/tables/user-details-data&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;checkpointLocation&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;/FileStore/tables/user-details-check&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;trigger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;Trigger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;ProcessingTime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;30 seconds&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, run the command to start the streaming operation. Your output should look similar to the image below.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/1HAQz09wI6BvA5TwEVej2n/30853861e854bf3dcbf933c64f9ea8a1/New_website_image_size__6_.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/1HAQz09wI6BvA5TwEVej2n/30853861e854bf3dcbf933c64f9ea8a1/New_website_image_size__6_.png" alt="databricks start streaming"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Loading data from Redpanda to Databricks
&lt;/h2&gt;

&lt;p&gt;In order to see actual data in Databricks, you’ll stream data to Redpanda using Kafkacat. Run the command below in your shell to create a Redpanda console producer. Replace &lt;code&gt;SERVER_IP&lt;/code&gt; with the IP address of the server running Redpanda:&lt;br&gt;
&lt;br&gt;&lt;br&gt;
&lt;code&gt;kafkacat -b SERVER_IP:9092 -t csv_input1 -P&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Now paste the CSV content into the producer line by line:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;id,first_name,last_name
1,Israel,Edeh
2,John,Doe
3,Jane,Austin
4,Omo,Lawal
5,John,Manson
6,John,Rinzler
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Depending on your interval of pasting the content, you should see five completed jobs in the output area of the write stream cell.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/3P6SJ3XAWM78Q2XQZu5SAo/edee5f0ce9b5f49332e48eda8597df1f/New_website_image_size__5_.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/3P6SJ3XAWM78Q2XQZu5SAo/edee5f0ce9b5f49332e48eda8597df1f/New_website_image_size__5_.png" alt="Databricks complete streaming"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To see the CSV files, create a new cell and run the following command:&lt;br&gt;
&lt;br&gt;&lt;br&gt;
&lt;code&gt;%fs ls /FileStore/tables/user-details-data/&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;You should get a table showing all created CSV files in the output area.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/41lTLsmodUaXtKosZ1AE0k/a7d0b9762c8cf0ce6ca2b503324a715a/New_website_image_size__4_.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/41lTLsmodUaXtKosZ1AE0k/a7d0b9762c8cf0ce6ca2b503324a715a/New_website_image_size__4_.png" alt="databricks user details data"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Run the command below to see the actual content of the files:&lt;br&gt;
&lt;br&gt;&lt;br&gt;
&lt;code&gt;spark.read.csv("/FileStore/tables/user-details-data/").show()&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;You should get an output listing the entries you’ve streamed so far. It will look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+-------------+
|          _c0|
+-------------+
|id, first_name,last_name|
|1,Israel,Edeh|
|3,Jane,Austin|
|5,John,Manson|
|6,John,Rinzler|
|  4,Omo,Lawal|
|   2,John,Doe|
+-------------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Analyzing the streamed user data
&lt;/h2&gt;

&lt;p&gt;Databricks offers data analysis and machine learning tools to help organizations make sense of their data. You can perform simple analysis on the data you streamed in this tutorial using Apache Spark&lt;sup&gt;Ⓡ&lt;/sup&gt; queries. Say you want to group users by their first name to find out the number of users with the same first names. You can use the following code to achieve this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="n"&gt;python&lt;/span&gt;
&lt;span class="n"&gt;users_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/FileStore/tables/user-details-data/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;inferSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;users_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"first_name"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Running the command above will produce the following result:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+-----+
|first_name|count|
+----------+-----+
|      John|    3|
|    Israel|    1|
|       Omo|    1|
|      Jane|    1|
+----------+-----+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can see from the analysis that three users have "John" as their first name. You can run further analysis with a dataset with more rows.&lt;/p&gt;

&lt;h2&gt;
  
  
  Plotting the streamed data
&lt;/h2&gt;

&lt;p&gt;Databricks also allows you to visualize and plot your data. You can prepare your CSV data for plotting by selecting the headers and configuring the plot options. Your first task is to display the data as a table. Run the command below in a new cell to do so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="n"&gt;python&lt;/span&gt;
&lt;span class="n"&gt;diamonds_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/FileStore/tables/user-details-data/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;inferSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;display&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;diamonds_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"first_name"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"last_name"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, change the display type to &lt;code&gt;bar&lt;/code&gt; and then click on the Plot Options… button to customize the bar chart. Drag and drop &lt;code&gt;first_name&lt;/code&gt; to the &lt;code&gt;keys&lt;/code&gt; and &lt;code&gt;values&lt;/code&gt; boxes and remove other fields. Then set the aggregation type to &lt;code&gt;COUNT&lt;/code&gt;. Finally, click on Apply to apply your customization.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/4oIvoBmtg7LEYdSJC1PlWr/88ac8c406fb2a27bce21fcdcaae8a037/New_website_image_size__3_.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/4oIvoBmtg7LEYdSJC1PlWr/88ac8c406fb2a27bce21fcdcaae8a037/New_website_image_size__3_.png" alt="databricks customize plot"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What will you build with Databricks and Redpanda?
&lt;/h2&gt;

&lt;p&gt;Distributed systems require speed at all levels and in every component. Redpanda scales particularly well for mission-critical systems, and without dependencies on a JVM or ZooKeeper.&lt;/p&gt;

&lt;p&gt;Now that you know how to stream data from Redpanda to Databricks and analyze and plot data using Databricks’s native display function, you can use this setup to analyze data in real-time for nearly any project.&lt;/p&gt;

&lt;p&gt;Interact with Redpanda’s developers directly in &lt;a href="https://redpanda.com/slack" rel="nofollow noopener noreferrer"&gt;the Redpanda Slack community&lt;/a&gt;, or contribute to Redpanda’s &lt;a href="https://github.com/redpanda-data/redpanda/" rel=" noopener noreferrer"&gt;source-available GitHub repo&lt;/a&gt;.To learn more about everything you can do with Redpanda, check out &lt;a href="https://docs.redpanda.com/docs/home"&gt;the documentation here&lt;/a&gt;. &lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>beginners</category>
      <category>webdev</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Spark vs Flink vs ksqlDB for stream processing</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 17 Aug 2022 14:19:00 +0000</pubDate>
      <link>https://dev.to/redpanda-data/spark-vs-flink-vs-ksqldb-for-stream-processing-5a5h</link>
      <guid>https://dev.to/redpanda-data/spark-vs-flink-vs-ksqldb-for-stream-processing-5a5h</guid>
      <description>&lt;p&gt;Modern business is digital and happens in real time. Users expect more interactive and instantaneous experiences all the time, which must be facilitated with suitable real-time data processing. Distributed applications like microservices, with automated deployments to public or private cloud platforms, have also incorporated more event-driven systems with a corresponding increased need for real-time capabilities. In this context, applications rely on real-time stream processing to power their business logic and deliver the appropriate real-time experiences for their users and decision-making capabilities for themselves.&lt;/p&gt;

&lt;p&gt;As the amount of data that must be processed has grown, companies have focused on large-scale data processing technologies that can analyze data, run machine learning functions, and create materialized views and time windows. There are many available stream processing technologies, but this article focuses on three of the most popular:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://spark.apache.org/" rel="nofollow noopener noreferrer"&gt;Apache Spark&lt;sup&gt;Ⓡ&lt;/sup&gt;&lt;/a&gt; is a multi-language framework designed for executing data engineering, data science, and machine learning computation on single-node machines or clusters. &lt;/li&gt;
&lt;li&gt;
&lt;a href="https://flink.apache.org/" rel="nofollow noopener noreferrer"&gt;Apache Flink&lt;sup&gt;Ⓡ&lt;/sup&gt;&lt;/a&gt; is a stream and batch processing framework designed for data analytics, data pipelines, ETL, and event-driven applications. Like Spark, Flink helps process large-scale data streams and delivers real-time analytical insights.
&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://ksqldb.io/" rel="nofollow noopener noreferrer"&gt;ksqlDB&lt;/a&gt; is an Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt;-native stream processing framework that provides a useful, lightweight SQL interface for event capturing, continuous event transformations, aggregations, and serving materialized views.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This article introduces these stream processing frameworks and compares the pros and cons of the tools and some of their more unique features. You'll also learn how to use each tool with Redpanda for real-time data processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Apache Spark
&lt;/h2&gt;

&lt;p&gt;Apache Spark is a popular open-source analytics engine that is designed for scalable big data analytics. The Apache Spark research project was started in 2014 at UC Berkeley as a solution to the limitations of a MapReduce algorithm.&lt;/p&gt;

&lt;p&gt;MapReduce is a first-generation distributed data processing system.&lt;br&gt;
It processes data that is parallelizable and performs computation on a distributed, horizontally scalable infrastructure. As a distributed system, MapReduce applies a particular linear data flow structure, which consists of MapReduce programs that read input from a disk, map a function across the data, reduce the resulting data of the map, and store it on the disk again.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/6ZGBa7Ltvc7Xr1ET3ruW2l/5b557c934f707a4b41615a9b73ae1b6b/redpanda_and_apache_spark.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/6ZGBa7Ltvc7Xr1ET3ruW2l/5b557c934f707a4b41615a9b73ae1b6b/redpanda_and_apache_spark.png" alt="redpanda and apache spark"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Spark is a third-generation data processing framework that enhances MapReduce’s performance by processing data in memory instead of writing them to the disk in each step.&lt;br&gt;
Spark distributes the in-memory data in a logically partitioned way on many machines and calls these &lt;code&gt;Resilient Distributed Datasets (RDDs)&lt;/code&gt;, which are then used as an abstraction layer to manage the logically distributed data.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/5kyKn3UtaO08HKfFJHYAJG/2f3e424fc23274f04f90697e1549320f/redpanda_and_apache_spark_2.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/5kyKn3UtaO08HKfFJHYAJG/2f3e424fc23274f04f90697e1549320f/redpanda_and_apache_spark_2.png" alt="redpanda and apache spark 2"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Spark consists of many components such as Spark SQL, MLlib, Spark Streaming, and GraphX.&lt;br&gt;
These components were not included from the start but have been developed over the years to satisfy many big data system requirements. Spark Streaming is one of the most important components, which provides support for live data streams generated by a variety of sources such as Apache Kafka, Apache Flume, Twitter, ZeroMQ, Amazon Kinesis, and more.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/19FMj0yKjo5VvmJK60zDrl/d4820d6e1a7adbd835644108a3b1bade/spark_core_api.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/19FMj0yKjo5VvmJK60zDrl/d4820d6e1a7adbd835644108a3b1bade/spark_core_api.png" alt="spark core api"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Moreover, Spark has a high-level API called Structured Streaming, which is built on top of Spark SQL API. Structured Streaming can stream the same operations that you would perform in batch mode, such as querying a static RDD.&lt;/p&gt;
&lt;h3&gt;
  
  
  Using Apache Spark with Redpanda
&lt;/h3&gt;

&lt;p&gt;Both Spark Streaming and Structured Streaming API integrate well with the Kafka API.&lt;/p&gt;

&lt;p&gt;Because Redpanda is API-compatbible with Kafka, you can use Redpanda for mission-critical workloads that you need to process via Apache Spark.&lt;/p&gt;

&lt;p&gt;You can stream any data from Redpanda and process the data in batches in Apache Spark. Or, you can also use the Structured Streaming API to consume the data from Redpanda, process them in Spark, and save them in the Spark SQL DataFrame as an example.&lt;/p&gt;

&lt;p&gt;The following Python code snippet is a very high-level example of reading a stream from a Redpanda topic called &lt;code&gt;my-redpanda-topic&lt;/code&gt;, by accessing the Redpanda cluster via &lt;code&gt;redpandahost:9092&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt; \
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;readStream&lt;/span&gt; \
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"kafka"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"kafka.bootstrap.servers"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"redpandahost:9092"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"subscribe"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"my-redpanda-topic"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;selectExpr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"CAST(key AS STRING)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"CAST(value AS STRING)"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can use Redpanda and Spark Streaming for real-time analysis, sentiment analysis, real-time fraud detection, and many more real-time stream processing use-cases that require the computational capabilities of Apache Spark. For more detailed information on how to use Apache Spark with Redpanda, check out our blog on &lt;a href="https://redpanda.com/blog/buildling-streaming-application-docker-spark-redpanda/"&gt;structured stream processing with Redpanda and Apache Spark&lt;/a&gt;. &lt;/p&gt;

&lt;h3&gt;
  
  
  Pros and cons of Apache Spark
&lt;/h3&gt;

&lt;p&gt;Apache Spark is a great framework to use with Redpanda streaming, but as is the case with many tools or frameworks, there are a few pros and cons worth mentioning.&lt;/p&gt;

&lt;p&gt;One of the game-changing advantages of Spark is its in-memory structure, which provides very fast performance. However, Spark’s in-memory structure can cause high memory consumption as it might respond to many stream processing requests simultaneously.&lt;/p&gt;

&lt;p&gt;Apart from the in-memory structure, the following is an aggregated list of pros and cons for Apache Spark:&lt;/p&gt;

&lt;h4&gt;
  
  
  Pros:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;supports multiple languages (Scala, Java, Python, R, C#, F#)&lt;/li&gt;
&lt;li&gt;fault-tolerant&lt;/li&gt;
&lt;li&gt;integrates with many technologies&lt;/li&gt;
&lt;li&gt;advanced analysis capability&lt;/li&gt;
&lt;li&gt;easily does batch processing (micro-batches)&lt;/li&gt;
&lt;li&gt;supports stream processing&lt;/li&gt;
&lt;li&gt;fast performance (because of the in-memory structure)&lt;/li&gt;
&lt;li&gt;supports SQL&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Cons:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;high memory consumption&lt;/li&gt;
&lt;li&gt;HDFS as the only state backend&lt;/li&gt;
&lt;li&gt;steep learning curve&lt;/li&gt;
&lt;li&gt;time windowing only&lt;/li&gt;
&lt;li&gt;no native stream processing&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These are just some examples, and there will likely be many other pros and cons of Apache Spark that are either use-case specific or related to the streaming technology being used.&lt;/p&gt;

&lt;h2&gt;
  
  
  Apache Flink
&lt;/h2&gt;

&lt;p&gt;Apache Flink is an open source distributed processing engine that provides stateful computations over unbounded and bounded data streams. In Flink, everything is considered as a stream, including the batch files.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/1TsrVsQNPdgY2M82ilYLUS/2b731c6467b9e91e30514c6f21482785/redpanda_and_apache_flink.png" alt="redpanda and apache flink"&gt;&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;em&gt;&lt;a href="https://flink.apache.org/"&gt;https://flink.apache.org/&lt;/a&gt;&lt;/em&gt;&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;Flink is a fourth-generation data processing framework and supports both batch and stream processing. Unlike Apache Spark, Flink is natively designed for stream processing. It treats batch files as bounded streams.&lt;/p&gt;

&lt;p&gt;You can ingest streaming data from many sources, process them, and distribute them across various nodes with Apache Flink.&lt;/p&gt;

&lt;p&gt;Apache Kafka is one such streaming source, which is considered a great persistent layer for stream processing applications. Flink provides a &lt;a href="https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/" rel="noopener noreferrer"&gt;Kafka connector library&lt;/a&gt; for reading data from a Kafka topic or writing data to Kafka topics.&lt;/p&gt;

&lt;h3&gt;
  
  
  Using Apache Flink with Redpanda
&lt;/h3&gt;

&lt;p&gt;Apache Flink can easily read data from or write data to Redpanda. It does not provide a particular API or connector for Redpanda, but because Redpanda is fully Kafka API compatible, you can just configure it as a Kafka connection, and Redpanda takes care of the rest.&lt;/p&gt;

&lt;p&gt;You can create continuous streaming pipelines where event computations are triggered in Flink as soon as the event is received from Redpanda.&lt;/p&gt;

&lt;p&gt;The following Java code snippet is a small example of using the connector library to read a stream from a Redpanda topic called &lt;code&gt;my-redpanda-topic&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaSource&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaSource&lt;/span&gt;&lt;span class="o"&gt;.&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setBootstrapServers&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"redpandahost:9092"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setTopics&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"my-redpanda-topic"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setGroupId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"my-group"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setStartingOffsets&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;OffsetsInitializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;earliest&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setValueOnlyDeserializer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SimpleStringSchema&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

  &lt;span class="nc"&gt;DataStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;dataStreamSource&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromSource&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;WatermarkStrategy&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;noWatermarks&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"Redpanda Source"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;//Stream processing actions on dataStreamSource...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should note that a class called &lt;code&gt;KafkaSource&lt;/code&gt; is used for consuming the data from Redpanda. Inversely, to send messages to Redpanda, you must use the &lt;code&gt;KafkaSink&lt;/code&gt; to stream data to Redpanda. You might come across some examples that use classes like &lt;code&gt;FlinkKafkaConsumer&lt;/code&gt; and &lt;code&gt;FlinkKafkaProducer&lt;/code&gt;, which are the deprecated Flink Kafka connector classes.&lt;/p&gt;

&lt;p&gt;For more detailed information about using Apache Flink with Redpanda, check out the &lt;a href="https://redpanda.com/blog/redpanda-flink-docker/"&gt;building streaming applications using Flink SQL and Redpanda&lt;/a&gt; tutorial.&lt;/p&gt;

&lt;h3&gt;
  
  
  Pros and cons of Apache Flink
&lt;/h3&gt;

&lt;p&gt;Apache Flink is a great native stream processing system to use with Redpanda. However, as with the other streaming technologies, there are several pros and cons of Apache Flink you should consider. &lt;/p&gt;

&lt;p&gt;One of the great benefits of Apache Flink is its very shallow learning curve. It’s very easy to get started and has good documentation. However, good documentation is not enough in terms of support, particularly if you run into more advanced issues; compared to Apache Spark, Apache Flink has a smaller community that can provide limited support.&lt;/p&gt;

&lt;p&gt;Apart from the learning curve, documentation, and the support, the following is an aggregated list of pros and cons for Apache Flink:&lt;/p&gt;

&lt;h4&gt;
  
  
  Pros:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;easy to start / very low learning curve&lt;/li&gt;
&lt;li&gt;good documentation&lt;/li&gt;
&lt;li&gt;low latency / high throughput&lt;/li&gt;
&lt;li&gt;clean data stream API&lt;/li&gt;
&lt;li&gt;simple UI and UX&lt;/li&gt;
&lt;li&gt;in-memory, file system, RocksDB as the state backend&lt;/li&gt;
&lt;li&gt;windowing by both time and count&lt;/li&gt;
&lt;li&gt;SQL support&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Cons:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;difficult Hadoop integration (Apache Spark integrates better)&lt;/li&gt;
&lt;li&gt;limited language support (Java, Scala, Python, and SQL)&lt;/li&gt;
&lt;li&gt;limited community support (compared to Apache Spark)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Again, these are simply some examples of Apaches Flink’s pros and cons, which can vary depending on the specific use-case or related to the streaming technology you are using.&lt;/p&gt;

&lt;h2&gt;
  
  
  ksqlDB
&lt;/h2&gt;

&lt;p&gt;The database ksqlDB is for building stream processing applications on top of Apache Kafka.&lt;br&gt;
It is based on the &lt;a href="https://kafka.apache.org/documentation/streams/" rel="noopener noreferrer"&gt;Kafka Streams API&lt;/a&gt; and licensed under the &lt;a href="https://www.confluent.io/confluent-community-license/" rel="nofollow noopener noreferrer"&gt;Confluent Community License Agreement&lt;/a&gt;. It is also a distributed, scalable, real-time stream processing framework that provides a lightweight SQL syntax.&lt;/p&gt;

&lt;p&gt;Powered by the Kafka Streams API, ksqlDB is a robust, embeddable, stream processing engine that provides a simple way to build standard Java stream processing applications.&lt;br&gt;
It extends Kafka Streams API by providing more features, such as a streaming server and an easy-to-use SQL interface.&lt;/p&gt;

&lt;p&gt;&lt;a href="//images.ctfassets.net/paqvtpyf8rwu/40hFuxVhg61EJLBRQxiYnp/b11ab6faa27ccbbaeca897b44ca2f897/redpanda_and_ksqldb.png" class="article-body-image-wrapper"&gt;&lt;img src="//images.ctfassets.net/paqvtpyf8rwu/40hFuxVhg61EJLBRQxiYnp/b11ab6faa27ccbbaeca897b44ca2f897/redpanda_and_ksqldb.png" alt="redpanda and ksqldb"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The ksqlDB has Apache Kafka in its core as the persistence layer. Since it uses the Kafka Streams API, it can access and use any Kafka cluster without integration configuration.&lt;/p&gt;
&lt;h3&gt;
  
  
  Using ksqlDB with Redpanda
&lt;/h3&gt;

&lt;p&gt;Redpanda can be easily used as the Kafka replacement of ksqlDB. The standalone installation of ksqlDB requires a Kafka backbone, so Redpanda can be a great replacement as it is fully compatible with the Kafka API.&lt;/p&gt;

&lt;p&gt;The following Docker Compose YAML is an example of setting up a ksqlDB server and a ksqlDB CLI container that is connected to the Redpanda cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="nn"&gt;---&lt;/span&gt;
&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2'&lt;/span&gt;

&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;redpanda&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
     &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;redpanda&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;start&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--smp&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1'&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--reserve-memory&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;0M&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--overprovisioned&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--node-id&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;0'&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--kafka-addr&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;--advertise-kafka-addr&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092&lt;/span&gt;
     &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;docker.vectorized.io/vectorized/redpanda:latest&lt;/span&gt;
     &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redpanda-1&lt;/span&gt;
     &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9092:9092&lt;/span&gt;
     &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;29092:29092&lt;/span&gt;

  &lt;span class="na"&gt;ksqldb-server&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/ksqldb-server:0.25.1&lt;/span&gt;
    &lt;span class="na"&gt;hostname&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ksqldb-server&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ksqldb-server&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;redpanda&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;8088:8088"&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KSQL_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;http://0.0.0.0:8088&lt;/span&gt;
      &lt;span class="na"&gt;KSQL_BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redpanda:9092&lt;/span&gt;
      &lt;span class="na"&gt;KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true"&lt;/span&gt;
      &lt;span class="na"&gt;KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true"&lt;/span&gt;

  &lt;span class="na"&gt;ksqldb-cli&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/ksqldb-cli:0.25.1&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ksqldb-cli&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;ksqldb-server&lt;/span&gt;
    &lt;span class="na"&gt;entrypoint&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/bin/sh&lt;/span&gt;
    &lt;span class="na"&gt;tty&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can use Redpanda and ksqlDB to create a streaming ETL pipeline (aka a streaming data pipeline) for real-time data analysis, creating materialized views of event-driven microservices, predictive analytics, and many more similar use-cases.&lt;/p&gt;

&lt;p&gt;For more detailed information on how to use ksqlDB with Redpanda, check out &lt;a href="https://redpanda.com/blog/ksqldb-materialized-cache"&gt;this tutorial on how to build a materialized cache with ksqlDB and Redpanda&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Pros and cons of ksqlDB
&lt;/h3&gt;

&lt;p&gt;The ksqlDB is a Kafka native stream processing system that is very easy to use with Redpanda.&lt;br&gt;
However, as with the other tools, it does have its pros and cons. &lt;/p&gt;

&lt;p&gt;One of ksqlDB’s greatest advantages is strong integration with Apache Kafka. Whereas other streaming frameworks manage this integration using connectors or Kafka libraries, ksqlDB has Kafka in its core. However, ksqlDB has poor analytics capability in comparison with Flink and with Spark, especially, that have more tools to handle workloads for analytics. &lt;/p&gt;

&lt;p&gt;Apart from the Kafka integration, analytics, capabilities and licensing model, here is an aggregated list of pros and cons for ksqlDB:&lt;/p&gt;

&lt;h4&gt;
  
  
  Pros:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;very easy Apache Kafka / Redpanda integration&lt;/li&gt;
&lt;li&gt;low-latency value of up to ten milliseconds&lt;/li&gt;
&lt;li&gt;easy to use SQL interface&lt;/li&gt;
&lt;li&gt;integrates with existing applications (because of Kafka Streams API)&lt;/li&gt;
&lt;li&gt;RocksDB as the state backend&lt;/li&gt;
&lt;li&gt;less steep learning curve (compared to Spark)&lt;/li&gt;
&lt;li&gt;integrates through Kafka Connect&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Cons:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;poor analytics capability (compared to Flink and Spark)&lt;/li&gt;
&lt;li&gt;higher learning curve (compared to Flink)&lt;/li&gt;
&lt;li&gt;unbounded data streams only (compared to Flink)&lt;/li&gt;
&lt;li&gt;license (Confluent Community License)&lt;/li&gt;
&lt;li&gt;no direct integration to Hadoop or other big data frameworks&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As was the case with the other tools, the stated pros and cons are just examples, and there might be many more depending on either the specific use-case or related to the streaming technology that is used.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Now that you understand the differences between popular stream processing frameworks Apache Spark, Apache Flink, and ksqlDB, you can make more informed decision about when to use each tool. And, thanks to the integration tutorials we’ve linked in each section above, you know how to use any of them with Redpanda to accomplish your stream processing needs. Use any of these tools with Redpanda for real-time sentiment analysis, fraud detection, predictive analytics, and more.&lt;/p&gt;

&lt;p&gt;Follow the &lt;a href="https://redpanda.com/blog"&gt;Redpanda Blog&lt;/a&gt; for future tutorials and articles about integration use-cases of Redpanda and other cool data technologies, and &lt;a href="https://redpanda.com/slack" rel="noopener noreferrer"&gt;join our Slack community&lt;/a&gt; to share what you plan to build with Redpanda. To contribute to our GitHub repo, &lt;a href="https://github.com/redpanda-data/redpanda/" rel="noopener noreferrer"&gt;submit an issue here&lt;/a&gt;. For specific feature and usability questions, &lt;a href="https://docs.redpanda.com/docs/home"&gt;our documentation&lt;/a&gt; can help.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Using Buildkite and GitHub to automate parallel CI steps</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 10 Aug 2022 14:09:31 +0000</pubDate>
      <link>https://dev.to/redpanda-data/using-buildkite-and-github-to-automate-parallel-ci-steps-50ba</link>
      <guid>https://dev.to/redpanda-data/using-buildkite-and-github-to-automate-parallel-ci-steps-50ba</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;At Redpanda, we want to always provide an experience that is fast, simple, and productive for developers. That applies to our own team of engineers, too. When considering how we could achieve a more stable continuous integration (CI) pipeline, we wanted that same experience: fast, simple, productive. By running multiple instances of our pipeline steps in parallel on our CI platform, &lt;a href="https://buildkite.com/" rel="noopener noreferrer"&gt;Buildkite&lt;/a&gt;, we can now run multiple repetitions of the same Buildkite step and use only the amount of time needed for a single step. &lt;/p&gt;

&lt;p&gt;Today, our devs can kick off any number of builds in parallel simply by attaching a label to their PR like “ci-repeat-X.” In the rest of this post, I’ll discuss how we made this easy dev experience possible. I discuss how we achieve repeatable builds by taking advantage of Buildkite’s &lt;code&gt;parallelism&lt;/code&gt; attribute and &lt;code&gt;pre-command&lt;/code&gt; hook, in combination with GitHub labels on pull requests for triggering parallel builds.&lt;/p&gt;

&lt;h2&gt;
  
  
  Buildkite parallel programming
&lt;/h2&gt;

&lt;p&gt;When Buildkite introduced &lt;a href="https://buildkite.com/docs/tutorials/parallel-builds#parallel-jobs" rel="noopener noreferrer"&gt;a new feature&lt;/a&gt; to run multiple repetitions of a build step in parallel, we took advantage of this by adding an attribute in our CI pipeline configuration called parallelism. We use this attribute to define the desired level of parallelism. We started off using a constant value of 1.&lt;/p&gt;

&lt;p&gt;However, the challenge is to have the &lt;code&gt;parallelism&lt;/code&gt; value configurable so that users can enable/disable it whenever they want, providing a value of their choice that represents the number of the parallel instances per step. Ideally, we want to grant the ability to developers to configure this number outside Buildkite’s context. A good candidate for that is GitHub, but we need a “bridge” between it and Buildkite. The bridge cannot be configured in a step’s command querying GitHub’s pull request because it would be too late to configure the parallelism attribute of a step at runtime. In seeking a way to do this, we discovered Buildkite’s &lt;code&gt;pre-command&lt;/code&gt; hook.&lt;/p&gt;

&lt;h2&gt;
  
  
  Buildkite pre-command
&lt;/h2&gt;

&lt;p&gt;Buildkite includes hooks that we can enable in order to have them automatically executed before a step’s command is initiated (&lt;code&gt;pre-command&lt;/code&gt;), or after a step run (&lt;code&gt;post-command&lt;/code&gt;). We took advantage of the &lt;code&gt;pre-command&lt;/code&gt; hook to discover the value that the user wants to configure as the &lt;code&gt;parallelism&lt;/code&gt; value. By doing this, we created a way to run any bash script we want before a pipeline’s step gets executed. This means that we can tweak a variable in the &lt;code&gt;pre-command&lt;/code&gt; hook in order to update the &lt;code&gt;parallelism&lt;/code&gt; attribute of a Buildkite step. &lt;/p&gt;

&lt;p&gt;Having done this, we addressed the next natural question: what is the most productive process for users to follow in order to update this variable when opening a pull request? Our options were:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Comment on the pull request (e.g. &lt;code&gt;/hey-buildkite repeat 5&lt;/code&gt;)&lt;/li&gt;
&lt;li&gt;Edit a file to update the value and push the code&lt;/li&gt;
&lt;li&gt;Add a GitHub label (e.g. &lt;code&gt;ci-repeat-5&lt;/code&gt;)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our choice trails are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Productive&lt;/li&gt;
&lt;li&gt;Easy-to-use&lt;/li&gt;
&lt;li&gt;Clean&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If we go with choice number one, we will end up having a big pull request conversation with many scattered comments that clutter up what should be a conversation between developers about a pull request. Thus, we didn’t select this option because it violates the second and third trails.&lt;/p&gt;

&lt;p&gt;For choice number two, we would have to answer the questions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What happens when we want to merge the PR? &lt;/li&gt;
&lt;li&gt;Do we want our default branch to be based on this file and run in parallel? (If so, what’s the impact on our cost?) &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Thus, the questions raised by option two also suggested it wasn’t the best course to take. Besides, it violates the second choice trail because the user has to push code each time they want to update the request level of parallelism.&lt;/p&gt;

&lt;p&gt;So, we decided to go with the third and best choice: add a GitHub label. Using this process, users who desire to run their PR tests in parallel need only to add a label in their PR and rebuild the pipeline.&lt;/p&gt;

&lt;h2&gt;
  
  
  The workflow
&lt;/h2&gt;

&lt;p&gt;The &lt;code&gt;parallelism&lt;/code&gt; attribute is set in each Buildkite step of the &lt;code&gt;pipeline.yml&lt;/code&gt; configuration. Its value is dynamically provided via an environment variable called &lt;code&gt;PARALLEL_STEPS&lt;/code&gt;. We just have to modify this environment variable using the &lt;code&gt;pre-command&lt;/code&gt; hook. &lt;/p&gt;

&lt;p&gt;We wrote a script to run before the steps are loaded into Buildkite that queries the GitHub API. This allows us to get the labels of this PR (Buildkite provides the PR number as environment variable &lt;code&gt;BUILDKITE_PULL_REQUEST&lt;/code&gt;) and match those against the pattern &lt;code&gt;ci-repeat-NN&lt;/code&gt;. Thus, we have the whole workflow ready: the hook queries and gets the specific label, discovers the number, and exports it as the environment variable &lt;code&gt;PARALLEL_STEPS&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;What about the cost? Shouldn’t we require users to delete this label after their job is done? Otherwise, won’t every commit have Buildkite run multiple steps in parallel? As mentioned, we aim to increase developer productivity. Requiring users to delete the label after the job is a manual step, and we avoid these as much as we can. When the &lt;code&gt;pre-command&lt;/code&gt; discovers the label, then it’s useless to keep it on the PR, so the bot we’re using can delete it. Thus, we decrease the manual steps required of the developer and improve the cost, just by deleting a label.&lt;/p&gt;

&lt;h2&gt;
  
  
  Building with DevProd in mind
&lt;/h2&gt;

&lt;p&gt;In summary, our process for running multiple instances of CI steps in parallel was created with developer productivity in mind. By parallelizing and running multiple instances of CI steps on Buildkite, we decreased our build’s total running time and improved the stability of CI testing in Redpanda. &lt;/p&gt;

&lt;p&gt;Learn more about Redpanda and download &lt;a href="https://github.com/redpanda-data/redpanda/"&gt;our binary on GitHub&lt;/a&gt;. Interact with our developers directly by joining &lt;a href="https://redpanda.com/slack"&gt;our Slack Community&lt;/a&gt; to ask questions about our CI steps or anything else. For more information about Redpanda and its features, &lt;a href="https://docs.redpanda.com/"&gt;browse our documentation&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>github</category>
      <category>programming</category>
      <category>productivity</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Simplifying Java development for real-time applications with Redpanda</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 03 Aug 2022 17:46:14 +0000</pubDate>
      <link>https://dev.to/redpanda-data/simplifying-java-development-for-real-time-applications-with-redpanda-3hap</link>
      <guid>https://dev.to/redpanda-data/simplifying-java-development-for-real-time-applications-with-redpanda-3hap</guid>
      <description>&lt;p&gt;At &lt;a rel="nofollow noopener noreferrer" href="https://datacater.io"&gt;DataCater&lt;/a&gt; we make real-time streaming a commodity for data and developer teams. DataCater provides a user interface, API, and declarative formats for creating production-grade Kafka Streams applications. Consequently, our products utilize and evolve around Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; API-compatible technologies.&lt;/p&gt;

&lt;p&gt;Kafka is a heavy piece of machinery, with multiple components that must be managed together to create a running instance. Components such as ZooKeeper&lt;sup&gt;Ⓡ&lt;/sup&gt;, a separate schema registry, etc. make it unfit for modern continuous integration / continuous deployment (CI/CD) pipelines, wherein we need to spin up clusters quickly and efficiently.  &lt;/p&gt;

&lt;p&gt;Therefore, to accelerate our development cycle, we make use of Redpanda and &lt;a rel="nofollow noopener noreferrer" href="https://quarkus.io/"&gt;Java Quarkus&lt;/a&gt; to keep our workstations lightweight and simplify our CI pipeline for end-to-end testing.&lt;/p&gt;

&lt;p&gt;We develop our applications and services in Java to utilize the wide variety of connections and protocols readily available through frameworks like Apache Kafka Connect&lt;sup&gt;Ⓡ&lt;/sup&gt; and Apache Camel&lt;sup&gt;Ⓡ&lt;/sup&gt;. &lt;/p&gt;

&lt;p&gt;In this blog, we’ll show you how to use Redpanda to speed up and ease the burden of developing a Java-based streaming application.&lt;/p&gt;

&lt;h2&gt;
  
  
  Integrating Redpanda into Java Quarkus
&lt;/h2&gt;

&lt;p&gt;Java Quarkus is a Kubernetes-native Java stack. One goal of Java Quarkus is packaging libraries, such that Java developers can create single binaries from Java programs via GraalVM&lt;sup&gt;TM&lt;/sup&gt;. This leads to fast start-up times and JVM-free containers making Quarkus a go-to choice for developers targeting Kubernetes as their runtime. We use multiple Java Quarkus packages, including &lt;code&gt;quarkus-smallrye-reactive-messaging-kafka,&lt;/code&gt; which pulls Redpanda’s docker image and runs it as a process for streaming applications.&lt;code&gt;&lt;/code&gt; &lt;/p&gt;

&lt;p&gt;The ease of installation, zero-config process starts, and fast startup of Redpanda has led to increasing adoption of Redpanda as the default Kafka-compatible messaging technology for development. At DataCater, we chose Redpanda as our default with Java Quarkus as well.&lt;/p&gt;

&lt;p&gt;Once you have &lt;code&gt;quarkus-smallrye-reactive-messaging-kafka&lt;/code&gt; extension as a dependency in your build system, Quarkus will automatically pull and run a single node Redpanda installation in your local docker engine.&lt;/p&gt;

&lt;p&gt;Starting Quarkus in dev mode will yield the following output and you can get started on working interactively with Redpanda. This example uses the &lt;a rel="nofollow noopener noreferrer" href="https://www.testcontainers.org/"&gt;Testontainers library&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ ./gradlew quarkusDev
…

2022-04-14 11:33:16,880 INFO  [🐳 .io/.11.3]] (build-27) Pulling docker image: docker.io/vectorized/redpanda:v21.11.3. Please be patient; this may take some time but only needs to be done once.
…
2022-04-14 11:33:41,037 INFO  [🐳 .io/.11.3]] (build-27) Container docker.io/vectorized/redpanda:v21.11.3 is starting: c41e717b516ad9810ca93828a72cf5203068c607996962951aa4979e08a6f15e
2022-04-14 11:33:42,539 INFO  [🐳 .io/.11.3]] (build-27) Container docker.io/vectorized/redpanda:v21.11.3 started in PT25.70221S
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see, the command &lt;code&gt;./gradlew quarkusDev&lt;/code&gt; gets you started with Redpanda, whereas a Kafka cluster would require you to set up ZooKeeper alongside. Configuring ZooKeeper, Kafka, and networking on a development machine is brittle and makes setting up development environments complex.  &lt;/p&gt;

&lt;p&gt;This setup, instead, enables you to tear down and restart without having to think or be time-constrained by setting up anything new on your developer machine.&lt;/p&gt;

&lt;h2&gt;
  
  
  Perfect fit for continuous integration
&lt;/h2&gt;

&lt;p&gt;Redpanda’s small footprint compared to the Kafka stack and startup time make it a perfect fit for integration testing in our CI pipeline. First, let’s take a look at the image size:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker images | grep 'debezium\|redpanda'
debezium/kafka                                  1.8         5da8d5410fe6   2 days ago   764MB
debezium/zookeeper                              1.8         c76319be13f3   2 days ago   547MB
vectorized/redpanda                             v21.11.12   31f4853dadff   6 days ago   302MB
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above example takes containers for Kafka and ZooKeeper distributed by the Debezium project as examples. By the nature of Kafka and ZooKeeper, a Java runtime has to be packaged into the container. This leads to Redpanda’s size being 1GB less than the combination of Kafka and ZooKeeper container images. A huge difference, especially, if you can not guarantee to cache your pulled images when using cloud-provided CI pipelines.&lt;/p&gt;

&lt;p&gt;Further, startup of containers for Kafka and ZooKeeper takes around 10 seconds until the broker endpoint is ready, while with Redpanda this takes roughly six seconds. Over a lot of commits and continuous testing this difference easily accumulates to minutes and hours over the period of a week or month.&lt;/p&gt;

&lt;h2&gt;
  
  
  Redpanda simplifies networking for development
&lt;/h2&gt;

&lt;p&gt;Engineering teams are employing Kubernetes (K8s) and, in the process, are trying to make development environments as close to production as possible. As a software engineer, I want my tools to be as simple to set up and run with similar configurations across multiple stages. Redpanda's setup remains just as simple, easy, efficient and quick for production as it is for development or staging. &lt;/p&gt;

&lt;p&gt;At DataCater we target K8s as our runtime environment. A common approach for exposing K8s resources outside of a given cluster is to use an &lt;a rel="nofollow noopener noreferrer" href="https://kubernetes.io/docs/concepts/services-networking/ingress/"&gt;Ingress&lt;/a&gt;, which we also use for developing a local application against services in a minikube cluster.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--weML-sA8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/id5d30xsibdvttcsefsd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--weML-sA8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/id5d30xsibdvttcsefsd.png" alt="Image description" width="880" height="284"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To achieve this with Redpanda, you simply deploy Redpanda &lt;a href="https://docs.redpanda.com/docs/deployment/kubernetes-connectivity/#custom-resource"&gt;via its custom resource&lt;/a&gt; and configure it with the following YAML description:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;configuration:
 kafkaApi:
 - port: 9092
 pandaproxyApi:
 - port: 8082
 adminApi:
 - port: 9644 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The Redpanda operator prepares a &lt;code&gt;redpanda.yaml&lt;/code&gt; and starts a Redpanda cluster with its configuration including an &lt;code&gt;advertised_kafka_api&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;strimzi-kafka-operator&lt;/code&gt; deploys a Kafka cluster with all its dependencies and networking configuration. &lt;/p&gt;

&lt;p&gt;Kafka’s networking makes this much more complicated for developers using &lt;code&gt;strimzi-kafka-operator&lt;/code&gt;.  The equivalent to &lt;code&gt;advertised_kafka_api&lt;/code&gt; is only known upon initial deployment. Hence, developers need to adjust their K8s manifest and re-deploy Kafka via &lt;code&gt;strimzi-kafka-operator&lt;/code&gt; after any change to a Kafka cluster.&lt;/p&gt;

&lt;p&gt;An example &lt;a rel="nofollow noopener noreferrer" href="https://strimzi.io/blog/2019/05/23/accessing-kafka-part-5/"&gt;from their blog&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;listeners:
  external:
    type: ingress
    configuration:
      bootstrap:
        host: kafka-bootstrap.localhost
      brokers:
      - broker: 0
        host: kafka-broker-0.localhost
      - broker: 1
        host: kafka-broker-1.localhost
      - broker: 2
        host: kafka-broker-2.localhost
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  How will you speed up development with Redpanda?
&lt;/h2&gt;

&lt;p&gt;Now that you’ve seen how we use Redpanda to speed up and simplify the burden of developing a data-streaming application, you can improve your own development projects! &lt;/p&gt;

&lt;p&gt;At DataCater, we use Redpanda to speed up the development of our core product. Redpanda easily integrates in our development stack with Java Quarkus, reducing time and size of our CI pipeline, and Redpanda’s ease of configuration helps us to test Kafka workloads easily on a new Kubernetes cluster. Setting up &lt;code&gt;advertised_kafka_api&lt;/code&gt; for access from outside the cluster makes working with Redpanda a charm, and really reaps the benefit of thinking cloud-native and Kubernetes-first.&lt;/p&gt;

&lt;p&gt;So, a big thank you to the engineering team at Redpanda for creating a great developer experience! Check out the &lt;a href="https://github.com/redpanda-data/redpanda/" rel="noopener noreferrer"&gt;&lt;br&gt;
Redpanda GitHub repo&lt;/a&gt;, or go to their &lt;a href="https://docs.redpanda.com/"&gt;documentation&lt;/a&gt; to learn more. Join &lt;a href="https://redpanda.com/slack" rel="noopener noreferrer"&gt;&lt;br&gt;
the Redpanda Community&lt;/a&gt; on Slack to interact with me and Redpanda’s engineers directly. &lt;/p&gt;

</description>
      <category>java</category>
      <category>programming</category>
      <category>devops</category>
      <category>redpanda</category>
    </item>
    <item>
      <title>Using Flink SQL and Redpanda for stream processing</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 13 Jul 2022 20:59:41 +0000</pubDate>
      <link>https://dev.to/redpanda-data/using-flink-sql-and-redpanda-for-stream-processing-4e0j</link>
      <guid>https://dev.to/redpanda-data/using-flink-sql-and-redpanda-for-stream-processing-4e0j</guid>
      <description>&lt;p&gt;Stream-based processing has risen in popularity in recent years, spurred on by event-driven architectures, the need for faster analytics, and the availability of various technology stacks that make it all feasible. One popular component of such a stack is Apache Flink&lt;sup&gt;Ⓡ&lt;/sup&gt;, a stream processing framework that boasts one of the most active open source communities today. Flink takes a stream-first approach — everything is a stream, and a batch file is treated as a special case of a bounded stream. At Redpanda, we share the view that streaming is a superset of batch, and our goal is to make Redpanda the best persistence layer for stream processors.&lt;/p&gt;

&lt;p&gt;Flink has several API layers that provide different levels of abstraction. At the highest level is Flink SQL, which allows non-programmers to easily build complex streaming pipelines in a familiar language. Flink SQL is ANSI compliant, and supports constructs such as joins, aggregations, windowing, and even user-defined functions. It can integrate with a number of data sources and sinks declaratively, via DDL statements. For example, to allow Flink SQL to read from an Apache Kafka&lt;sup&gt;Ⓡ&lt;/sup&gt; topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE access_logs (
    event_time TIMESTAMP(3) METADATA FROM 'timestamp',
    host STRING,
    ident STRING,
    authuser STRING,
    request STRING,
    status SHORT,
    bytes BIGINT
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = ‘logs’,  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address
    'format' = 'csv'
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the table is declared, reading and processing the stream coming from the topic is straightforward. For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT count(1), status FROM access_logs GROUP BY status ;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Connecting to Redpanda
&lt;/h2&gt;

&lt;p&gt;Flink SQL does not ship with a specific connector for Redpanda. However, given Redpanda’s strong wire &lt;a href="https://redpanda.com/blog/codegen/"&gt;compatibility&lt;/a&gt; with the Kafka protocol, the standard &lt;a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/"&gt;Kafka connector&lt;/a&gt; works perfectly. As an example, we take an existing &lt;a href="https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html"&gt;Flink SQL demo&lt;/a&gt; that shows an end-to-end streaming application. The demo shows Flink SQL reading a stream from a Kafka topic, which is then processed via streaming SQL. The results are written to Elastic, which are then presented as dashboards using Kibana. We replaced Kafka with Redpanda while keeping the rest of the application intact. &lt;br&gt;
TL;DR: It just works.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hb4vc79l--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qzd5v5qheo2hqo3a7lbm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hb4vc79l--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qzd5v5qheo2hqo3a7lbm.png" alt="dashboard" width="880" height="523"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To see for yourself, &lt;a href="https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html"&gt;go to the demo article&lt;/a&gt; at the Apache Flink project website. (Props to Flink PMC / Committer &lt;a href="https://twitter.com/JarkWu"&gt;@JarkWu&lt;/a&gt; for putting together this excellent demo.) The demo requires Docker and Docker Compose to bring together the various components to run in your local environment.&lt;/p&gt;

&lt;p&gt;You can follow the step-by-step instructions in the article, except for the initial step of grabbing the &lt;code&gt;&lt;br&gt;
docker-compose.yml&lt;/code&gt; file. We had to modify the file to substitute Redpanda for Kafka, so the first step should be to enter the following from your command line to get our version of &lt;code&gt;docker-compose.yml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mkdir flink-sql-demo-redpanda; cd flink-sql-demo-redpanda;
wget https://raw.githubusercontent.com/patrickangeles/flink-sql-demo-redpanda/main/docker-compose.yml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The rest of the demo walks you through establishing connectors to Redpanda and Elastic via SQL DDL statements, building streaming jobs via SQL DML statements, and wiring the data and visualizations together via Kibana. We won’t repeat the steps here, instead we encourage you to follow the instructions exactly as described in &lt;a href="https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html"&gt;the original article&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Dissecting &lt;code&gt;docker-compose.yml&lt;/code&gt;
&lt;/h2&gt;

&lt;p&gt;It’s worth going through the changes made to &lt;code&gt;docker-compose.yml&lt;/code&gt; in case you want to build your own Redpanda powered projects using Docker Compose. We updated to a more current compose version (3.7), replaced the &lt;code&gt;kafka&lt;/code&gt; and &lt;code&gt;zookeeper&lt;/code&gt; services with a &lt;code&gt;redpanda&lt;/code&gt; service, and updated the service dependency graph appropriately. The Redpanda service declaration looks like the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;redpanda:
    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
    command:
      - redpanda start
      - --smp 1
      - --memory 512M
      - --overprovisioned
      - --node-id 0
      - --set redpanda.auto_create_topics_enabled=true
      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://localhost:9092
    hostname: kafka
    ports:
      - "9092:9092"
      - "9094:9094"
    volumes:
      - /var/lib/redpanda/data
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Some of these parameters are worth mentioning, especially if you’re new to Redpanda. For one thing, Redpanda follows a &lt;a href="https://redpanda.com/blog/tpc-buffers/"&gt;thread-per-core model&lt;/a&gt; and likes to consume all available resources in the host environment when permitted. This is great for production deployments, but not ideal when you are prototyping on your laptop. The first three parameters mentioned below are startup flags that tell Redpanda to play nice with other processes in a shared host or VM.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;--smp 1&lt;/code&gt; Limits Redpanda to only use one logical core.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;--memory 512M&lt;/code&gt; Limits Redpanda to 512M memory. Alternatively, you can specify &lt;code&gt;--reserve-memory N&lt;/code&gt;, which lets Redpanda to grab all the available memory, but reserving N for the OS and other processes.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;--overprovisioned&lt;/code&gt; Indicates to Redpanda that there are other processes running on the host. Redpanda will not pin its threads or memory, and will reduce the amount of polling to lower CPU utilization.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;--node-id 0&lt;/code&gt; This is a required parameter. Every broker in Redpanda is identified by a node-id that survives restarts.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;--set redpanda.auto_create_topics_enabled=true&lt;/code&gt; Equivalent to setting &lt;code&gt;auto.create.topics.enable=true&lt;/code&gt; in Kafka.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;hostname: kafka&lt;/code&gt; In Docker Compose, the default hostname is based on the service name. We override this with &lt;code&gt;hostname: kafka&lt;/code&gt;, so we can stay compatible with the connector declaration from the original demo script.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;volumes: /var/lib/redpanda/data&lt;/code&gt; This tells Docker Compose to make a volume available for that path, which is the default Redpanda data directory.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Docker and Docker Compose are great for developer productivity as they allow for quick assembly of different application components. Developers can build rapid prototypes of end-to-end applications all within their local environment. In this article, we showed how to retrofit an existing application prototype using Kafka (as well as Flink, MySQL, Elastic, Kibana) with Redpanda. Using Redpanda containers in lieu of Kafka and Zookeeper for your streaming stack has some nice benefits, including faster startup times and more efficient resource consumption.&lt;/p&gt;

&lt;p&gt;We believe that this way of prototyping is conducive to building new streaming applications. In particular, Redpanda for event sourcing and Flink SQL for stream processing is a powerful, easy-to-use combination. The upcoming Redpanda Data Policies feature will allow for outbound data transformation via WASM. Eventually, we can use this to implement capabilities like predicate and projection push-down, which have the potential to speed up basic streaming operations by reducing the amount of data that goes from Redpanda to your stream processors.&lt;/p&gt;

&lt;p&gt;In the future, we want to provide more prototype examples of Redpanda with Flink SQL, and also explore Redpanda in combination with other streaming engines. Our goal is to make Redpanda the best persistence layer for streaming. Watch this space!&lt;/p&gt;

&lt;p&gt;If you have any questions about this example project or Redpanda in general, you can interact with our team &lt;a href="https://github.com/redpanda-data/redpanda"&gt;on GitHub&lt;/a&gt; or by joining &lt;a href="https://redpanda.com/slack"&gt;our Community on Slack&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>sql</category>
      <category>beginners</category>
      <category>programming</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>How to join multiple data streams with Kafka Streams and Redpanda</title>
      <dc:creator>The Team @ Redpanda</dc:creator>
      <pubDate>Wed, 06 Jul 2022 21:11:43 +0000</pubDate>
      <link>https://dev.to/redpanda-data/how-to-join-multiple-data-streams-with-kafka-streams-and-redpanda-18pi</link>
      <guid>https://dev.to/redpanda-data/how-to-join-multiple-data-streams-with-kafka-streams-and-redpanda-18pi</guid>
      <description>&lt;p&gt;Apache Kafka Streams® (KStreams) is a client library and Java dependency that performs stream processing, or real-time processing of incoming streams of data, for smoother builds of applications and microservices. Common use cases for KStreams include aggregating the occurrence of certain words in a chat application, or filtering fraudulent transactions in a credit card processing system.&lt;/p&gt;

&lt;p&gt;Message brokers—which act as a buffer for events produced by different services—are a vital component in the development of distributed applications. For the mission-critical systems serviced by KStreams, message brokers with minimal latencies are the best option. Redpanda is an Apache Kafka® API-compatible system that integrates with the Kafka ecosystem, is easy to manage, and can deliver significant performance improvements over other systems. &lt;a href="https://redpanda.com/blog/fast-and-safe/" rel="noopener noreferrer"&gt;Check this benchmark&lt;/a&gt; to learn more.&lt;/p&gt;

&lt;p&gt;In this tutorial, you will learn how to aggregate multiple event streams using a KStreams application and Redpanda as the message broker. You’ll set up a KStreams application but use Redpanda instead of Kafka as the message broker without any code changes.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;In order to follow this tutorial, you need the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Access to &lt;a href="https://www.docker.com/products/docker-desktop" rel="noopener noreferrer"&gt;Docker&lt;/a&gt; and Docker Compose&lt;/li&gt;
&lt;li&gt;An appropriate IDE like &lt;a href="https://www.jetbrains.com/idea/download/" rel="noopener noreferrer"&gt;IntelliJ IDEA&lt;/a&gt; or &lt;a href="https://www.eclipse.org/downloads/" rel="noopener noreferrer"&gt;Eclipse&lt;/a&gt; to run Java applications&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can find the demo application in &lt;a href="https://github.com/redpanda-data-blog/2022-aggregation-with-kstreams" rel="noopener noreferrer"&gt;this GitHub repo&lt;/a&gt;. To follow the tutorial, you can clone the project with the below command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/redpanda-data-blog/2022-aggregation-with-kstreams.git
cd kstreams-demo
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Overview of a KStreams application
&lt;/h2&gt;

&lt;p&gt;KStreams is a Java dependency added to a Java application like a Spring Boot backend. When it connects to a Kafka cluster, it takes input streams from a Kafka topic, transforms them, and sends the output as a stream to a different topic.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdj0rn19vwxqvjgr1xd6k.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdj0rn19vwxqvjgr1xd6k.png" alt="kstreams streaming architecture"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A KStreams application is an important component of a distributed system due to the nature of data processing within such systems. Consumers might want to use data in a certain way and would have to do the transformation themselves if stream processing were excluded from the systems. KStreams ensures that stream processing occurs only once. For a KStreams application to function properly, it requires a Kafka cluster with topics that will serve as input and output stream sources.&lt;/p&gt;

&lt;p&gt;A typical stream processing application contains several methods such as &lt;code&gt;Map&lt;/code&gt;, &lt;code&gt;MapValues&lt;/code&gt; , &lt;code&gt;Filter&lt;/code&gt;, and &lt;code&gt;Join&lt;/code&gt; that transform or exclude data in one way or another. KStreams implements these methods in its domain-specific language (DSL). The DSL gives you all the tools required to perform stream processing so that it easily integrates with your Java application.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs5y70wa7jjjsyeik2tx1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs5y70wa7jjjsyeik2tx1.png" alt="combining multiple streams"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To understand how a KStreams application works, navigate to the cloned KStreams demo application and open the &lt;code&gt;/src/main/java/com/application/WordCountDemo.java&lt;/code&gt; file. In this file, you will find a &lt;code&gt;createWordCountStream&lt;/code&gt; method. This method contains the main logic of the application.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;static void createWordCountStream(final StreamsBuilder builder) {
        final KStream&amp;lt;String, String&amp;gt; source = builder.stream(INPUT_TOPIC);

        final KTable&amp;lt;String, Long&amp;gt; counts = source
                .flatMapValues(value -&amp;gt; Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -&amp;gt; value)
                .count();

        // need to override value serde to Long type
        counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The method uses a StreamsBuilder object passed to it to create a KStream called &lt;code&gt;source&lt;/code&gt;. The values in this KStream are then grouped and counted using a combination of &lt;code&gt;flatMapValues&lt;/code&gt;, &lt;code&gt;groupBy&lt;/code&gt;, and &lt;code&gt;count&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;source&lt;/code&gt; KStream object represents the stream of data from the &lt;code&gt;streams-plaintext-input&lt;/code&gt; topic, while the &lt;code&gt;counts&lt;/code&gt; KTable object represents the transformed data. These two objects are fundamental to every KStreams application.&lt;/p&gt;

&lt;p&gt;A KStream and KTable are two sides of the same coin. A KStream is a continuous stream of data in which every new event is recorded as a unique piece of data. As an example, consider the following events sent through a KStream: &lt;code&gt;('eggs', 34)&lt;/code&gt;, &lt;code&gt;('bread', 10)&lt;/code&gt;, &lt;code&gt;('bread', 8)&lt;/code&gt;. These events are composed as key-value pairs in which the string is the key and the number is the value. The second event will not be affected by the third in a KStream even if they have the same key.&lt;/p&gt;

&lt;p&gt;A KTable, on the other hand, updates events with the same key. So if these three events, &lt;code&gt;('eggs', 34)&lt;/code&gt;, &lt;code&gt;('bread', 10)&lt;/code&gt;, and &lt;code&gt;('bread', 8)&lt;/code&gt;, are sent to the KTable, it will resolve them into &lt;code&gt;('eggs', 34)&lt;/code&gt;, &lt;code&gt;('bread', 8)&lt;/code&gt;. Much like a relational database table, it basically takes note of the latest event. But then, a KTable can be transformed to a KStream and a KStream to a KTable. Think of a KTable as the latest values of a KStream and a KStream as the historical view of a KTable. This is the basis of aggregation and data joining in a KStreams application.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting up Redpanda
&lt;/h2&gt;

&lt;p&gt;You will be using the Redpanda Docker image to run examples in this tutorial. To ensure that you can start Redpanda with a single command, you will use a &lt;code&gt;docker-compose.yml&lt;/code&gt; file to define the Redpanda configuration. Create a &lt;code&gt;docker-compose.yml&lt;/code&gt; file in a directory of your choice, then add the following content to it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: "3.7"
services:
  redpanda:
    command:
      - redpanda
      - start
      - --smp
      - "1"
      - --memory
      - 1G
      - --reserve-memory
      - 0M
      - --overprovisioned
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
    # NOTE: Please use the latest version here!
    image: docker.vectorized.io/vectorized/redpanda:v21.11.11
    container_name: redpanda-1
    ports:
      - 9092:9092
      - 29092:29092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can run this file in detached mode by running &lt;code&gt;docker compose up -d&lt;/code&gt; in the directory where the file lives. You should get an output like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 1/1
 - Container redpanda-1  Started
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In event streaming architectures, topics live in the message brokers. These topics store the messages or events sent by a producer. A stream is a continuous flow of data to or from the topic. Use the below command to create the topics needed in this KStreams application:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic create streams-plaintext-input streams-wordcount-output
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now you need to set up the producer to write to the input topic and a consumer to read from the output topic. Redpanda offers an easy way to set up a producer without adding command line arguments. Run the below command to create the producer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic produce streams-plaintext-input
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Setting up the consumer follows a similar approach. Open a new terminal and run the below command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic consume streams-wordcount-output
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now start the KStreams application in your IDE. In the terminal running the Redpanda &lt;code&gt;streams-plaintext-input&lt;/code&gt; producer, type in the sentence all streams lead to &lt;code&gt;kafka&lt;/code&gt;. Check for the output on the Redpanda &lt;code&gt;streams-wordcount-output&lt;/code&gt; terminal. Your output should look like the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "streams-wordcount-output",
  "key": "all",
  "value": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0003",
  "timestamp": 1646660088785,
  "partition": 0,
  "offset": 1
}
{
  "topic": "streams-wordcount-output",
  "key": "streams",
  "value": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0003",
  "timestamp": 1646660088785,
  "partition": 0,
  "offset": 2
}
{
  "topic": "streams-wordcount-output",
  "key": "lead",
  "value": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0003",
  "timestamp": 1646660088785,
  "partition": 0,
  "offset": 3
}
{
  "topic": "streams-wordcount-output",
  "key": "to",
  "value": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0003",
  "timestamp": 1646660088785,
  "partition": 0,
  "offset": 4
}
{
  "topic": "streams-wordcount-output",
  "key": "kafka",
  "value": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0004",
  "timestamp": 1646660088785,
  "partition": 0,
  "offset": 5
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Joining multiple streams in Redpanda
&lt;/h2&gt;

&lt;p&gt;A KStreams application is not limited to stream aggregation. It can also be used to join multiple streams together, as long as they have the same key. Typically a Kafka Streams application can join any of the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;One KStream to another KStream; produces a KStream&lt;/li&gt;
&lt;li&gt;One KStream to a KTable; produces a KTable&lt;/li&gt;
&lt;li&gt;One KTable to another KTable; produces a KStream&lt;/li&gt;
&lt;li&gt;A KStream to a GlobalKTable; produces a KStream&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Similar to a typical database, these join operations can be any of the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A left join&lt;/li&gt;
&lt;li&gt;An inner join&lt;/li&gt;
&lt;li&gt;An outer join&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Foa95almjk5b7k2ihoqi7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Foa95almjk5b7k2ihoqi7.png" alt="kstreams joins"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this section, you will create an enhanced orders stream formed by joining user and order streams. It follows a similar architecture to the previous example, but in this case, you will use a &lt;code&gt;leftJoin&lt;/code&gt; to join both streams.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frt37nq9bfvdc4s02j6ws.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frt37nq9bfvdc4s02j6ws.png" alt="joining user and order streams"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To get started, create new topics for &lt;code&gt;userProfiles&lt;/code&gt;, &lt;code&gt;orders&lt;/code&gt;, and &lt;code&gt;enhancedOrders&lt;/code&gt; using the below command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic create userProfiles orders enhancedOrders
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The code that handles the joining can be found in the &lt;code&gt;src/main/java/com/application/EnhancedOrdersApplication.java&lt;/code&gt; from the cloned repo. Find the part where the &lt;code&gt;userProfiles&lt;/code&gt; table and &lt;code&gt;orders&lt;/code&gt; stream are initialized. A simple join is achieved using concatenation. The joined data is then streamed to the output topic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;final KTable&amp;lt;String, String&amp;gt; userProfiles = builder.table("userProfiles");
final KStream&amp;lt;String, String&amp;gt; orders = builder.stream("orders");

KStream&amp;lt;String, String&amp;gt; joined = orders.join(userProfiles,
        (userProfile, order) -&amp;gt; userProfile + order
);
joined.to("enhancedOrders");
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, run the application in IntelliJ. Open three terminals and run the commands below for the producers and consumer sequentially. The messages will be streamed via stdin. The producer set the delimiter as a new line (&lt;code&gt;\n&lt;/code&gt;) and the key-value separator as a colon (&lt;code&gt;:&lt;/code&gt;). In this way the messages can have defined key-value pairs.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it redpanda-1 \
rpk topic produce userProfiles -f "%k:%v\n"

docker exec -it redpanda-1 \
rpk topic produce orders -f "%k:%v\n"

docker exec -it redpanda-1 \
rpk topic consume enhancedOrders
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the first terminal, produce the following data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;1:{"id":"1", "email":"john.wick@gmail.com", "first_name":"John", "last_name":"Wick"}
2:{"id":"2", "email":"malik.gruder@gmail.com", "first_name":"Malik", "last_name":"Gruder"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Produce data to the orders topic in the second terminal using the command below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;1:{"id":"1", "product_id":"33","user_id":"1"}
2:{"id":"2", "product_id":"75","user_id":"2"}
1:{"id":"3", "product_id":"1005","user_id":"1"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now observe the third terminal for the output of the &lt;code&gt;enhancedOrders&lt;/code&gt; stream. You should get data similar to what’s shown below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "topic": "enhancedOrders",
  "key": "1",
  "value": "{\"id\":\"1\", \"product_id\":\"33\",\"user_id\":\"1\"}{\"id\":\"1\", \"email\":\"john.wick@gmail.com\", \"first_name\":\"John\", \"last_name\":\"Wick\"}",
  "timestamp": 1649141650456,
  "partition": 0,
  "offset": 0
}
{
  "topic": "enhancedOrders",
  "key": "2",
  "value": "{\"id\":\"2\", \"product_id\":\"75\",\"user_id\":\"2\"}{\"id\":\"2\", \"email\":\"malik.gruder@gmail.com\", \"first_name\":\"Malik\", \"last_name\":\"Gruder\"}",
  "timestamp": 1649141650458,
  "partition": 0,
  "offset": 1
}
{
  "topic": "enhancedOrders",
  "key": "1",
  "value": "{\"id\":\"3\", \"product_id\":\"1005\",\"user_id\":\"1\"}{\"id\":\"1\", \"email\":\"john.wick@gmail.com\", \"first_name\":\"John\", \"last_name\":\"Wick\"}",
  "timestamp": 1649141650461,
  "partition": 0,
  "offset": 2
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You’ll notice that the second output has the second user’s details, while the first and third output has the first user’s details. This matches the expected result from the join diagram above.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Distributed applications that use message brokers often need a way to process streams of data for easier consumption and less computational overhead in consumer applications. KStreams serves as an excellent stream-processing tool for this purpose. While KStreams was built with Kafka in mind, it works with other systems, too.&lt;/p&gt;

&lt;p&gt;As you saw in this tutorial, your KStreams application works well using Redpanda, which is a drop-in replacement for Kafka, without Zookeeper®, and without a JVM. This enables you to increase your application’s message-processing ability with minimal effort.&lt;/p&gt;

&lt;p&gt;To check your work, use the demo application in &lt;a href="https://github.com/redpanda-data-blog/2022-aggregation-with-kstreams" rel="noopener noreferrer"&gt;this GitHub repo&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;You can also discuss the demo, KStreams, or ask questions about anything else you can do with Redpanda in &lt;a href="https://redpanda.com/slack" rel="noopener noreferrer"&gt;the Redpanda Slack community&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>programming</category>
      <category>kafka</category>
      <category>redpanda</category>
    </item>
  </channel>
</rss>
