DEV Community

Cover image for Building a real-time data pipeline with Ably and Redpanda
The Team @ Redpanda for Redpanda Data

Posted on • Edited on • Originally published at redpanda.com

Building a real-time data pipeline with Ably and Redpanda

This blog was written by Tom Camp, DevRel Engineer at Ably.

The value of your data decays the longer it takes to extract and process it. If your users expect their online experiences to happen instantaneously, you face the added challenge of not only processing at great speed, but also streaming the processed data reliably from your private network to (and from) millions of concurrent users on internet-connected devices.

Together, Ably and Redpanda provide a simple solution for streaming and processing mission-critical data to and from millions of end-users on the internet instantaneously, and without data loss.

ably and redpanda .png

Benefits of integrating Ably with Redpanda

Using Ably and Redpanda together provides a helpful solution for any company that needs mission-critical data ingestion and processing at speed with resilience and guaranteed data integrity. This is especially true if your company:

  • does not have a dedicated team of distributed system engineers available to build and operate at scale a real-time event-driven system.
  • needs to add real-time features to your data product quickly in order to stay competitive.
  • is scaling fast, to millions of mobile users across multiple geographies.
  • does not have a JVM background.

For these companies, Ably and Redpanda provide a data ingestion and processing solution that is accessible and easy to implement.

As a use case example, this combined solution can connect a network of global stores like Target or Ikea using Redpanda at the edge in a store-and-forward fashion, relying on Ably’s network to synchronize events across the world.

Applications of the Redpanda and Ably pipeline are wide-ranging: from financial products sending time-sensitive information such as stock data or currency exchange rates to mobile users; to medical and emergency services dealing with patient data and deploying emergency support; to live streaming and audience engagement apps fanning out real-time scores, updates, and commentary to millions of devices; and many more.

How does it work?

Redpanda is an Apache Kafka® API-compatible real-time event streaming platform. It was built from the ground up with performance and simplicity in mind. It requires no Zookeeper®, no JVM, and no code changes. With support for on-broker WebAssembly (Wasm) transforms, it’s extremely powerful when it comes to transforming and filtering any data that comes into it.

Ably is a pub/sub messaging platform built for streaming messages to client devices over the internet. It was designed with resilience at its heart and mathematically modelled to work regardless of the volume of messages regardless of how many connections are opened or how patchy the network conditions are. It delivers predictable 65ms round-trip latency for the 99th percentile and guarantees message delivery, message ordering, and exactly-once semantics.

Combining Redpanda and Ably is simple, and only requires a few lines of code. To show off the fundamentals of Ably and Redpanda, we’ll show you how simple it is to make a Redpanda instance with a topic and a Wasm function which passes any messages that come through to an Ably channel. To make this as accessible as possible, we’ll be using a Docker container using CentOS to run everything. Make sure you’ve got Docker on your machine.

1. Creating the Docker Container with Redpanda

Firstly, create a file called dockerfile with the following contents:

FROM centos
MAINTAINER "Tom Camp" tom.camp@ably.com
ENV container docker
RUN yum -y update; yum clean all
RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-Linux-*
RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-Linux-*
RUN curl -sL https://rpm.nodesource.com/setup_14.x | bash -;
RUN yum -y install nodejs;
RUN curl -1sLf 'https://packages.vectorized.io/HxYRCzL4xbbaEtPi/redpanda-beta/setup.rpm.sh' | bash;
RUN yum -y install redpanda;
RUN yum -y install vim;
RUN yum -y install systemd; yum clean all;
RUN (cd /lib/systemd/system/sysinit.target.wants/; for i in ; do [ $i == systemd-tmpfiles-setup.service ] || rm -f $i; done);
RUN rm -rf /lib/systemd/system/multi-user.target.wants/;
RUN rm -rf /etc/systemd/system/.wants/;
RUN rm -rf /lib/systemd/system/local-fs.target.wants/;
RUN rm -rf /lib/systemd/system/sockets.target.wants/udev;
RUN rm -rf /lib/systemd/system/sockets.target.wants/initctl;
RUN rm -rf /lib/systemd/system/basic.target.wants/;
RUN rm -rf /lib/systemd/system/anaconda.target.wants/*;
VOLUME [ "/sys/fs/cgroup" ]
CMD ["/usr/sbin/init"]
Enter fullscreen mode Exit fullscreen mode

This file defines the initial setup for our Docker Image we’ll be creating. Overall, it:

  • Makes use of a CentOS image to initialize our new image
  • Gets a few essentials for this demo, such as installing Node.js and Redpanda. It also installs vim as an easy way to edit documents, but feel free to use whatever method of editing files you’d like within the container
  • As certain parts of using the Wasm engine use systemctl, we need to remove some files and define the start command as /usr/sbin/init. This is a bit of an antipattern for Docker as generally you only want a single process running per container. For this demonstration, however, it’s functional for use. More details on this can be found on a thread on the Docker forums.

With our dockerfile defined, we can build it into an image, which we’ll call redpanda-image:

docker build --rm -t redpanda-image

With the image built we can run it as a container:

docker run -itd --privileged --name=redpanda-container redpanda-image

If this has worked correctly you should get the new container’s ID returned to you in the terminal. If not, you can run docker ps to get all currently running container IDs. Use this ID in the following command to instantiate a bash instance in the container:

docker exec -it <CONTAINER ID> /bin/bash

Now we have a bash instance running in the container, let’s check Redpanda is available and we have the right version:

rpk version

2. Testing out the Redpanda topics

Let’s try out the core Redpanda functionality by setting up a topic.

rpk topic create test-topic

To make sure everything’s working as expected, try subscribing to the topic then publishing a message into it. To set up a consumer, run:

rpk topic consume test-topic

Then, to publish, in another terminal:

rpk topic produce test-topic

Type a message, and press Ctrl + D to send it. If everything’s working, you should see the message come through to the client consuming from the topic!

3. Enabling Wasm in Redpanda

Although we have Redpanda running, we’ll need to adjust its configuration file in order to allow us to correctly use the Wasm engine it provides. To do this we need to edit the file which should be found in /etc/redpanda/redpanda.yaml. If you’re happy using vim, run the following command:

vim /etc/redpanda/redpanda.yaml

Within this yaml file, there should be a line redpanda with various configuration lines for it. We will need to add in the following lines to this section to enable the Wasm engine functionality:

developer_mode: true
  enable_coproc: true
  enable_idempotence : false
Enter fullscreen mode Exit fullscreen mode

The file after adding these in should look something like this:

config_file: /etc/redpanda/redpanda.yaml
node_uuid: mecCncdqA9842798tg4v98b3289b298fg9Kwe6VTgvHoJ
pandaproxy: {}
redpanda:
  admin:
  - address: 0.0.0.0
    port: 9644
  data_directory: /var/lib/redpanda/data
  developer_mode: true      ## since this is a tech preview you must set developer mode as true
  enable_coproc: true       ## data transform flag here. make sure it's under redpanda
  enable_idempotence : false  ## this flag is also mandatory because we're running a tech preview
  coproc_engine:
  logFilePath: <file_path>  ## remember to change the file_path
  coproc_supervisor_server:
    address: 0.0.0.0        ## if you want, you can change the ip address here
    port: <new port>        ## remember to change the port here
  kafka_api:
  - address: 0.0.0.0
    port: 9092
  node_id: 0
  rpc_server:
    address: 0.0.0.0
    port: 33145
  seed_servers: []
rpk:
  coredump_dir: /var/lib/redpanda/coredump
  enable_memory_locking: false
  enable_usage_stats: false
  overprovisioned: false
  tune_aio_events: false
  tune_ballast_file: false
  tune_clocksource: false
  tune_coredump: false
  tune_cpu: false
  tune_disk_irq: false
  tune_disk_nomerges: false
  tune_disk_scheduler: false
  tune_disk_write_cache: false
  tune_fstrim: false
  tune_network: false
  tune_swappiness: false
  tune_transparent_hugepages: false
schema_registry: {}
Copy
With the configuration updated, we now just need to restart redpanda and start up the wasm_engine:

systemctl restart redpanda
systemctl start wasm_engine
Enter fullscreen mode Exit fullscreen mode

You can check if the wasm_engine has started up correctly by running systemctl status wasm_engine, and checking to see if you get the following output:

wasm_engine.service - Redpandas wasm engine, your on-broker programmable data transformer
   Loaded: loaded (/usr/lib/systemd/system/wasm_engine.service; disabled; vendor preset: disabled)
   Active: active (running) since Fri 2022-02-11 11:56:11 UTC; 25s ago
 Main PID: 143 (node)
    Tasks: 11 (limit: 30846)
   Memory: 23.4M
   CGroup: /wasm.slice/wasm_engine.service
        └─143 /opt/redpanda/bin/node /opt/wasm/main.js /etc/redpanda/redpanda.yaml
Enter fullscreen mode Exit fullscreen mode

If so, the Wasm engine should be ready for us to start using.

4. Creating a new Wasm function

Now the Wasm engine is running, let’s create a Wasm function to publish messages into Ably. To create a template for our function, run the following:

rpk wasm generate redpanda-to-ably

The folder generated contains everything we’ll need to create our Wasm function, and comes with a default example inside of src/main.js. When we eventually run npm run build, this will be used to generate a wasm function in a dist folder.

Within this generated folder, we will need to replace the contents of /src/main.js file with our code which’ll handle taking messages from the topic my-topic and sending them to the Ably channel my_ably_channel. In order for you to use Ably, you’ll need to create a free account with Ably. Once you’ve got an account, you will need to get an API key from one of your account’s apps.

Equipped with your Ably API key, add the following to main.js, replacing INSERT_API_KEY_HERE with yours:

const {
  SimpleTransform,
  PolicyError,
  PolicyInjection
} = require("@vectorizedio/wasm-api");
const Ably = require('ably');
const rest = Ably.Rest('INSERT_API_KEY_HERE');
const channel = rest.channels.get('my_ably_channel');
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.Deregister);
/* Auxiliar transform function for records */
const sendToAbly = (record) => {
  channel.publish('redpanda', record.value);
  return record;
}
/* Transform function */
transform.processRecord((recordBatch) => {
  const result = new Map();
  const transformedRecord = recordBatch.map(({ header, records }) => {
    return {
    header,
    records: records.map(sendToAbly),
    };
  });
  result.set("result-topic", transformedRecord);
  // processRecord function returns a Promise
  return Promise.resolve(result);
});
exports["default"] = transform;
Enter fullscreen mode Exit fullscreen mode

In effect, the code is doing the following:

  • Instantiating the Ably client library, connecting to Ably, and instantiating an Ably channel object which we’ll be using to communicate with Ably.
  • Creating a Transform, which is used to take data going into a topic, do something to said data, and then output the data somewhere else. Whilst we’ll be outputting the data into Ably as part of this process, we’ll also be outputting the data into another topic for visibility. This will be test-topic._result-topic_, an amalgamation of both the original topic name, and the string defined in result.set(...)
  • This transform is subscribed to a specific topic, the test-topic

With our Wasm function defined, we now need to build it to be used by the Wasm engine. We’ll need to install Ably and build it with the following, run in the base folder of the function:

npm install -s ably
npm install
npm run build
Enter fullscreen mode Exit fullscreen mode

With the Wasm function built, we can now deploy it to the Wasm engine:

rpk wasm deploy dist/main.js --name redpanda-to-ably

With that we should be all set! Let’s try publishing a message to our topic again, and confirm that a message is being sent to Ably. Run the following command again and publish a message:

rpk topic produce test-topic

We can make a quick curl request to the Ably App to check if the Ably Channel specified has received the message. Make sure to replace MY_API_KEY with the Ably API key you used in your Wasm function.

curl https://rest.ably.io/channels/my_ably_channel/history -u "MY_API_KEY"

If everything has worked as expected, you should see the message you’d sent to your local Redpanda topic returned from the Ably app.

5. Filtering and organizing your data

With the above example, we’ve made use of Wasm to pass messages from Redpanda into Ably for distribution. However, we can extend this basic functionality to create considerably more powerful and interesting manipulations of our data.

As a simple example, if we wanted to make use of a parameter in our message’s header to dictate which Ably channel we publish it to, we could replace our main.js file with the following. Make sure to replace INSERT_API_KEY_HERE with your Ably API key as before:

const {
  SimpleTransform,
  PolicyError,
  PolicyInjection
} = require("@vectorizedio/wasm-api");
const Ably = require('ably');
const rest = Ably.Rest('INSERT_API_KEY_HERE');
const channelToUse = (key, record) => {
  for (let i = 0; i < record.headers.length; i++) {
    let header = record.headers[i];
    if (header.headerKey.toString('utf8') == key) {
    return header.value.toString('utf8');
    }
  }
}

const sendToAbly = (record) => {
  let channelName = channelToUse('channel_name', record);
  if (!channelName) {
    channelName = 'test';
  }
  let channel = rest.channels.get(channelName);
  channel.publish('redpanda', record.value.toString('utf8'));
  const newRecord = {
    ...record,
    value: record.value.map((char) => {
    if (char >= 97 && char <= 122) {
        return char - 32;
    } else {
        return char;
    }
    }),
  };
  return newRecord;
}

const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);


/* Transform function */
transform.processRecord((recordBatch) => {
  const result = new Map();
  const transformedRecord = recordBatch.map(({ header, records }) => {
    return {
    header,
    records: records.map(sendToAbly),
    };
  });
  result.set("result-topic", transformedRecord);
  // processRecord function returns a Promise
  return Promise.resolve(result);
});

exports["default"] = transform;
Enter fullscreen mode Exit fullscreen mode

Here all we’re doing is checking if a record has a channel_name header field, and if so using its value to decide which Ably channel to publish to. To deploy this, it’d be exactly the same steps as before: npm run build, followed by rpk wasm deploy dist/ably.js.

In addition, you could easily add some filters to the data itself. If you wanted to not send any messages which contain the word ‘foo’ in it, simply add an if statement around the publish like follows:

...
if (channelName && record.value.includes('foo')) {
    /* Publish message */
}
...
Enter fullscreen mode Exit fullscreen mode

Conclusion

Now that you have the pipeline set up, you have a simple solution for streaming and processing data to and from millions of end-users on the internet instantaneously, with no data loss.

If you have any questions or thoughts on using these two technologies together, please reach out to either the team at Ably or Slack the team at Redpanda in their community.

Top comments (0)