DEV Community

Olena Kutsenko
Olena Kutsenko

Posted on • Originally published at aiven.io

How to stream data from Mastodon public timelines to Apache Kafka with NodeJS and TypeScript

Mastodon has been rising in popularity over recent months. If you're not yet familiar with this exotic online creature, Mastodon is an open-source social networking software for microblogging. Instead of being a single network, like Twitter, Mastodon is a federated platform that connects independent interconnected servers. This makes it a fully decentralised system. It relies on ActivityPub and uses the ActivityStreams 2.0 data format and JSON-LD. As for the functionality, it resembles closely Twitter - you can read the timeline, post messages and interact with other users.

If you just recently joined Mastodon and are still exploring it, you might find that the scrolling timeline has its limits to understand all that is happening there. Applying some engineering skills will give a better overview on the topics and discussions happening on the platform.

Since Mastodon's timeline is nothing more than a collection of continuously coming events, the data feed is well-suited for Apache Kafka®. Adding Kafka connectors on top of that opens multiple possibilities to use data for aggregations and visualisations.

Continue reading to learn how to bring data from Mastodon to Kafka using TypeScript and a couple of helpful libraries.

Prepare the Apache Kafka cluster

To bring the data from the Mastodon timeline to a topic in Apache Kafka, you'll need an Apache Kafka cluster and some code to stream the data there. For the former, you can use either your own cluster, or a managed version that runs in the cloud, such as Aiven for Apache Kafka. If you don't have an Aiven account yet, sign up for a free trial and create your cluster, the setup only takes a few minutes.

Once your cluster is running, add a topic with the name mastodon. Alternatively, you can use any other name, just remember it, you'll need it a bit later.

To connect securely to the cluster we'll use SSL. Aiven already takes care of the configuration of the server, but you'll need to download three files for the client to establish the connection. Download these files from Aiven's console:

Screenshot of the Aiven for Apache Kafka page in Aiven's console showing where to take certificates and keys

You will need these files later, so put them somewhere safe for now.

Working with the Mastodon API

The Mastodon API has excellent documentation that makes it straightforward to access the public data feeds. You don't need to be registered to retrieve a public feed, which makes it very convenient. Actually, just give it a try right now. Run the line below in your terminal to start retrieving a stream of data from mastodon.social:

curl https://mastodon.social/api/v1/streaming/public
Enter fullscreen mode Exit fullscreen mode

As a response you should see an endless flow of incoming events:

Running  raw ``curl https://mastodon.social/api/v1/streaming/public`` endraw  in your terminal shows a response with event data

Parsing the response from the server manually is a monotonous and tedious operation. Rather than reinvent the wheel, you can use one of the available libraries for Mastodon. For this example we'll be using masto.js.

Jump into the code

To give you an instant start to bring Mastodon data into an Apache Kafka cluster, clone this repository:

git clone https://github.com/aiven/mastodon-to-kafka
Enter fullscreen mode Exit fullscreen mode

Once you have the contents of the repo locally, follow these steps:

  1. Create a folder certificates/ and add the SSL certificates you downloaded earlier into this folder. We will need these to connect securely to Apache Kafka.
  2. Copy the file .env.example and rename to .env, this file will hold the environment variables.
  3. Set kafka.uri in .env to the address of your cluster. You can take it from the connection information of your Aiven for Apache Kafka service.
  4. Run npm install to install all dependencies (if you don't have npm or NodeJS yet, follow the installation instructions).

Finally, start by running npm run start and you should see a flow of delivery reports for every new message coming from the Mastodon public feed, that is defined in the code (in the next section you'll see how to change it to whichever Mastodon feed you like!).

Screenshot showing running code to send data to the Kafka topic

If things don't work first time, check for error messages printed in the terminal. They will help you navigate the problem.

You can verify that the data is flowing and see what messages you get by enabling Apache Kafka Rest API.

In the contextual menu for the topic, select Apache Kafka REST:
Screenshot showing Apache Kafka REST menu option for a topic

We can step back to see exactly how the code works to send the data from Mastodon to Apache Kafka. This can be divided into two logical steps:

  1. Streaming messages from a public Mastodon timeline.
  2. Sending these messages to Apache Kafka.

In the sections below you can see these steps in detail.

Streaming messages from a public Mastodon timeline

Open the file mastostream.ts. It contains a small module to stream the Mastodon data.

To initialise the Mastodon client you need to call login() from the masto.js client library and provide the required configuration. This is also the place to provide authentication information, however, since we are only interested in public feeds, the URL property is enough. As URL, use your favourite Mastodon server.

const masto = await login({
    url: 'https://mastodon.social/', // choose your favourite mastodon server
});
Enter fullscreen mode Exit fullscreen mode

With the initialised Mastodon client you connect to the public stream API by calling the asynchronous function masto.stream.streamPublicTimeline().

const stream = await masto.stream.streamPublicTimeline();
Enter fullscreen mode Exit fullscreen mode

Finally, you're ready to subscribe to the updates from the public stream provided by the API.

stream.on('update', (status) => {
    console.log(status)
    // next - send status data to Apache Kafka topic
});
Enter fullscreen mode Exit fullscreen mode

Now time to put these building blocks together.

For the sake of encapsulation, you wouldn't want the mastostream module to know directly about the Apache Kafka producer. That's why when putting all the above ingredients together we provide the module mastostream with a more generic callback argument.
This callback function will return the Mastodon status message converted to a string, and the party that triggered the mastostream will receive the data and be able to act on it:

export default async (callback: (status: string) => void) => {
    try {
        const masto = await login({
            url: 'https://fosstodon.org/',
        });

        // Connect to the streaming api
        const stream = await masto.stream.streamPublicTimeline();

        // Subscribe to updates
        stream.on('update', (status) => {
            try {
                callback(JSON.stringify(status));
            } catch (err) {
                console.log('Callback failed', err);
            }
        });
    } catch (err) {
        console.log(err)
    }
};
Enter fullscreen mode Exit fullscreen mode

This is what you need to stream the data from Mastadon! Time to send these messages to an Apache Kafka topic.

Sending messages to Apache Kafka using node-rdkafka

Open producer.ts to see the code you need to send the data to an Apache Kafka topic. To work with Apache Kafka you can use one of the existing client libraries, there are several options available. This project uses node-rdkafka, which is a NodeJS wrapper for the Kafka C/C++ library. Check its GitHub repository Readme for installation steps.

With node-rdkafka you can create a producer to send data to the cluster. This is where you'll use the Apache Kafka configuration settings defined in .env earlier and the certificates that you downloaded to prepare to establish a secure connection.


//create a producer
const producer = new Kafka.Producer({
    'metadata.broker.list': process.env["kafka.uri"],
    'security.protocol': 'ssl',
    'ssl.key.location': process.env["ssl.key.location"],
    'ssl.certificate.location': process.env["ssl.certificate.location"],
    'ssl.ca.location': process.env["ssl.ca.location"],
    'dr_cb': true
});
Enter fullscreen mode Exit fullscreen mode

The producer will emit events when things happen, so to understand what is happening and to catch any errors, we subscribe to numerous events including delivery reports.


producer.on('event.log', function (log) {
    console.log(log);
});

//logging all errors
producer.on('event.error', function (err) {
    console.error(err);
});

producer.on('connection.failure', function (err) {
    console.error(err);
});

producer.on('delivery-report', function (err, report) {
    console.log('Message was delivered' + JSON.stringify(report));
});

producer.on('disconnected', function (arg) {
    console.log('producer disconnected. ' + JSON.stringify(arg));
});
Enter fullscreen mode Exit fullscreen mode

One last event, which is especially important to use, is called on ready. This is the moment where the producer is ready to dispatch a message to the topic. This method will rely on the callback provided by the mastostream module that we implemented in the previous section:

producer.on('ready', async () => {
    mastostream((status) => { 
        producer.produce(
            'mastodon',  // topic to send the message to
            null,  // partition, null for librdkafka default partitioner
            Buffer.from(status),  // value
            null,  // optional key
            Date.now()  // optional timestamp
        );
        producer.flush(2000);
    }).catch((error) => {
        throw error;
    });
});
Enter fullscreen mode Exit fullscreen mode

Yet, none of the above will work till you run the connect() method. With the snippet below, run your code and watch the data start to flow!

producer.connect({}, (err) => {
    if (err) {
        console.error(err);
    }
});
Enter fullscreen mode Exit fullscreen mode

This method has an optional second parameter, which is a callback that you can use to be informed about any errors during the connection.

We've now seen all the code and examined how it all works together. By separating the concerns of collecting data from Mastodon, and passing it to Apache Kafka, we have a system that can also be adapted to handle different data sources as needed.

What's next

With the data being constantly collected in the topic you can now use it as an input for other tools and databases, such as OpenSearch®, ClickHouse®, PostgreSQL®. Apache Kafka® Connect connectors will help you bring the data into other systems with no code required. Learn more about Apache Kafka and Kafka Connect and check the full list of sink connectors that are available in Aiven platform to see where you can bring the data for further storage and analysis.

Top comments (0)