DEV Community

Cover image for RisingWave workshop
Cris Crawford
Cris Crawford

Posted on

RisingWave workshop

To prepare for the RisingWave workshop of the data engineering zoomcamp, I downloaded the workshop repository into my zoomcamp directory. Then I opened a terminal and changed into that directory. I executed the following commands, and then waited.

# Check version of psql
psql --version
source commands.sh

# Start the RW cluster
start-cluster
Enter fullscreen mode Exit fullscreen mode

The first time I did this, it hung. So I couldn't follow along with the live workshop. I didn't know why. start-cluster is one of the commands that's defined in commands.sh. It sets up a docker container. When it again did nothing, I realized there was something wrong with my docker setup. I looked at docker desktop and saw that there was a docker container still running. It was the "emacski" TensorFlow container that I had pulled when the official TensorFlow docker didn't work on a Mac with an Apple chip. I tried to delete it but nothing happened. I couldn't run docker ps either. So I had to close everything that was open and restart my computer. That solved the problem, and I could continue. I ran the next set of commands:

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pip install psycopg2-binary
Enter fullscreen mode Exit fullscreen mode

The last command wasn't in the instructions, but I had problems with installing psycopg2 and I remembered that I had to install the binary instead.

Up to now, we've been doing batch data processing. Stream is different. It has different sources, and uses different methods. In stream processing, you take data as it comes in in real time, and you add to and modify your database.

Here's an image from the workshop, where you can see the different kinds of inputs for the two methods.

Diagram of batch vs. stream processing

With stream processing, you update the data incrementally. This has low latency, meaning that it's fast.
Here's a diagram of the RisingWave architecture:

Diagram of RisingWave architecture

Here's a simple example to illustrate how RisingWave works. First, we create a table with v1, and then we create a view with the columns v1 = v1 + 1, where v1 is not zero. So the first instance of the materialized table will not have any rows. The larger query is a query plan that you get when you prefix the "CREATE MATERIALIZED VIEW" by the word "EXPLAIN".

sql command to create simple table

so we have steps: scan -> process -> filter -> table
where the intermediate steps, process and filter, are stateless.

Here's a diagram of how the system process the input of two variables, 0 and 1:

Diagram of system processing adding 0 and 1

In this picture, the two central processing units are stateless. In stream processing, adding and deleting records are often considered stateless operations because they do not require maintaining any context or history across individual records. Let's break down why they are typically viewed as stateless:

• Adding Records (Inserts): When a new record is added to a stream or a dataset, it doesn't usually depend on the previous state of the system. Each record is processed independently, and the addition of one record doesn't affect the processing or outcome of another record in the stream. Therefore, adding records is stateless because it doesn't rely on any context or history.

• Deleting Records: Similarly, deleting records in a stream processing context is often considered stateless because the removal of a record is typically based on specific criteria or conditions at the time of deletion. Once a record is deleted, it is removed from the stream, and subsequent processing doesn't depend on its existence. Like adding records, deleting records doesn't involve maintaining any state or context across records.

In contrast, operations such as aggregations, windowing, or joining streams often involve maintaining state or context across multiple records. These operations rely on previous records or events to compute the result for the current record, which makes them stateful operations.

Stateless operations are generally preferred in stream processing because they are easier to parallelize and scale horizontally across distributed systems. They also simplify fault tolerance and recovery mechanisms since there is no need to maintain the state across processing nodes. However, in many real-world scenarios, stream processing applications combine both stateless and stateful operations to achieve the desired functionality and processing logic.

Here's an example of a stateful operation, count(*):

Example of stateful operation count(*)

This is stateful because there has to be a record of how many things have been counted.

Now for the workshop. We are running a docker file using a docker-compose.yml from Rising Wave. We can edit it to use other inputs and outputs, but for now we're reading in from kafka and reading out to ClickHouse.

The workshop was set up so that python/Kafka modified the static data from January 2022 yellow taxi trips so that 10 records were released in a stream with the dropoff times set to NOW. That way we could create a materialized view that constantly updated as new data came in.

The first view we created was a table of total airport pickups.

CREATE MATERIALIZED VIEW total_airport_pickups AS
    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
Enter fullscreen mode Exit fullscreen mode

This simply groups by taxi_zone and counts only the pickups that happened at an airport. However because this is a materialized view, it's constantly updating.

Here's an explanation of how this happens:

EXPLAIN CREATE MATERIALIZED VIEW total_airport_pickups AS
    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                 ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
Enter fullscreen mode Exit fullscreen mode

In this explanation, you read from the bottom up. The first thing that happens is that taxi zone data is parsed to find the zones that have "Airport" in them (the filter). Then it's joined to the taxi_trip data, and the output is a materialized table.

text of EXPLAIN command

Here's a schematic view of the plan:

schematic of EXPLAIN command

The other commands were more or less the same, but what was really cool was to see how the data changed from one second to the next. Near the end of the workshop we looked at an index.html file that had a canvas app that produced a bar chart of the ten longest trips of the previous five minutes. A backend python file read data from RisingWave and the index.html picked it up and displayed it. It showed the data changing in real time. Even though it was fake, it was pretty cool.

I was lucky because I had psql on my computer. In the class, we had used pgcli, but I could follow along with the commands (not in real time - I watched the workshop video again later) just as the instructor had written them. The workshop was very well organized.

Top comments (0)