DEV Community

Alessio Marinelli
Alessio Marinelli

Posted on

Openwhisk Pulsar Spark Integration — Part 1

There are several alternatives to Kafka that could be considered to increase the resilience and management of Spark’s jobs performed by OpenWhisk.

Below is an overview to understand the differences in to use them and to see how each of the proposed products perform in this role.

The products offered as intermediaries between OpenWhisk and Spark are the following:

· Apache Pulsar;

· Apache Flink;

· RabbitMQ;

Currently, there is a kafka provider for Apache OpenWhisk and this feasibility study should be used to find a viable alternative with other products.

Apache Kafka

Strength: It excels at processing real-time data streams and handling large volumes of data.

Usage: It acts as a buffer for data between OpenWhisk and Spark, allowing Spark to process data in a way that is fault-resilient and scalable.

Ideal scenarios: Best for situations where efficient and scalable management of data flows is essential.

Apache Pulsar

It offers messaging and data streaming capabilities, but with some additional features such as native support for multi-tenancy and a sharper separation between storage and computing.

  1. Strength: It offers similar functionality to Kafka but with a more modern architecture that separates storage from compute, making it easier to scale.

  2. Usage: Like Kafka, Pulsar can be used to capture and store data from OpenWhisk before it is processed by Spark.

It’s also suitable for multi-tenant use cases.

  1. Ideal scenarios: Great for environments that require efficient scalability and more granular data management.

Apache Flink

Flink is more of a stream processing engine than a messaging system, but it can be used in conjunction with OpenWhisk and Spark to improve real-time data management and resiliency.

It is particularly strong in processing complex data streams and can maintain a consistent state even in the event of failures.

In summary, while Kafka and Pulsar are more focused on messaging and data buffering, Flink offers more advanced stream processing capabilities.

The choice depends on the type of data processing you need to perform between OpenWhisk and Spark and the level of complexity your system needs to handle.

  1. Strength: It is not just a messaging system but a complete stream processing engine that can handle complex data processing in real-time.

  2. Usage: Flink can be used to directly process streaming data from OpenWhisk before passing it to Spark for further batch processing or analysis.

It gives you more accurate control over processing status and logic.

  1. Ideal scenarios: Ideal for applications that require complex data stream processing and real-time state management.

RabbitMQ

Another messaging system that is more directed towards the message queue.

It’s less suitable for processing real-time data streams than Kafka, but it can be a good choice for scenarios where reliable message delivery is the priority.

Architectural Approaches for Intermediation Between OpenWhisk and Spark

To use Kafka, Pulsar, or Flink as intermediaries in an architecture that includes OpenWhisk and Spark, there are two main approaches:

  1. create a custom package (similar to the one that exists for Kafka in OpenWhisk)

  2. Direct use

Here’s how both approaches work:

  1. Create a Custom Package:

In this approach, you create an OpenWhisk-specific package that integrates the intermediary service (Kafka, Pulsar or Flink) as a native part of the OpenWhisk ecosystem.

This requires more upfront work, as you will have to write code to handle the interaction between OpenWhisk and the intermediary, but it offers greater integration and ease of use in the long run.

A custom package is useful if you plan to reuse this integration across many projects, or if you have specific needs that require closer interaction between OpenWhisk and the intermediary.

  1. Direct use:

Here, you use Kafka, Pulsar or Flink independently, without creating a specific package for OpenWhisk.

In this scenario, OpenWhisk handles its actions and triggers as usual, and interaction with the intermediary takes place through standard network connections, APIs, or messaging protocols.

This approach is faster to set up and doesn’t require additional development, but it may be less integrated and may require more manual management.

The choice between these two approaches depends on how often you plan to use the intermediary with OpenWhisk, the complexity of your needs, and the resources available for development.

If you plan to use it frequently and have specific needs, developing a custom package may be the best choice.

If, on the other hand, you need a easier and faster solution, direct use may be adequate.

How to write a package with Pulsar that can act as an intermediary between OpenWhisk and spark.

Writing a package to integrate Apache Pulsar with OpenWhisk is a project that requires an understanding of both OpenWhisk and Pulsar, as well as programming skills.

Here are the general steps you should follow:

  1. API knowledge: Familiarize yourself with the OpenWhisk and Apache Pulsar APIs. This will help you understand how the two platforms interact.

  2. Setting Up the Development Environment:

  3. Set up a development environment with OpenWhisk and Apache Pulsar installed.

  4. You might need to install additional software like Docker if you’re working locally.

  5. Make sure you have access to a Pulsar environment to test the integration.

Package Definition:

  1. Identify the specific Pulsar features that you want to expose through the package.

This could include posting messages, subscribing to topics, etc.

  1. Decide how to manage configuration, such as login credentials and connection parameters.

  2. Develop the code for package components, which can include actions, triggers, and rules in OpenWhisk.

  3. Write the necessary code to interface OpenWhisk with Pulsar.

This could include creating Pulsar clients, managing message publishing and subscribing, and handling errors.

  1. Test the package in different scenarios to make sure it works as expected.

  2. Make sure that the package handles errors and abnormal situations correctly.

  3. Write clear documentation for your package. This should include instructions on how to install and use it, along with examples.

  4. Deployment and Maintenance.

High-level overview.

Writing a package is a very complex project. It requires a thorough understanding of all the technologies covered and the ability to write robust and secure code.

Preparing the Environment

  1. Installing Apache OpenWhisk:

Make sure you have a working version of OpenWhisk. If you’re working locally, you may need to install Docker and run OpenWhisk via containers.

  1. Installing Apache Pulsar:

Set up an instance of Apache Pulsar.

You can install it locally or use a cloud service.

  1. Installing Apache Spark:

Set up an instance of Apache Spark.

You can install it locally or use a cloud service.

  1. Setting Up a Development Environment:

Configure your development environment with the necessary languages and tools (e.g., Java or Python, IDEs, version control systems).

Package Design

  1. Defining Features:

Decide which Pulsar features you want to expose. This could include creating topics, posting posts, and subscribing to topics.

  1. Structuring the Package:

Plan how to structure the package.

Normally, an OpenWhisk package includes actions, triggers, and rules.

Development

  1. Code for the Pulsar Interface:

Develop code to interact with the Pulsar API.

For example, to post a message on a topic.

  1. OpenWhisk Integration:

Write code to integrate these features with OpenWhisk. This could include actions that trigger events in Pulsar.

Error Management and Security: Make sure your package handles errors correctly and implements standard security practices.

Test e Debugging

  1. Test the Package:

Test the package in different scenarios to make sure it works as expected.

Debugging: Fix any issues or bugs that emerge from testing.

Documentation & Distribution

  1. Document the Package:

Write clear documentation that includes instructions on how to install and use the package.

  1. Deploy the Package:

Share the package with the community or deploy it in a production environment.

Basic Implementation

Project Structure

OpenWhisk Action: This will be the entry point.

The OpenWhisk action will take the input data and send it to Pulsar.

Apache Pulsar: Will act as a messaging system to transfer data from the OpenWhisk action to the Spark job.

Spark Job: Will process the data received from Pulsar.

OpenWhisk Action development

Language: Choose a language that OpenWhisk supports, such as Python or Java.

Pulsar Interface: The action must include a Pulsar client to send messages to the Pulsar topic.

Python Example: OpenWhisk Action Sending Data to Pulsar

from pulsar import Client

def main(args):

pulsar_client = Client(‘pulsar://localhost:6650’)

producer = pulsar_client.create_producer(‘my-topic’)

Assume that ‘date’ is passed as part of ‘args’

data = args.get(“data”, “default data”)

producer.send((data).encode(‘utf-8’))

pulsar_client.close()

return {“result”: “Data sent to Pulsar”}

Apache Pulsar Configuration

Make sure Pulsar is configured to receive and store messages from the specified topic.

Configure how long messages are retained to suit your needs.

Development of the Job Spark

Write a Spark job that I can read from the Pulsar topic.

The Spark job should be able to process the data and, if necessary, produce an output.

Python

Python Example: Spark Job to Read from Pulsar

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(“PulsarSpark”).getOrCreate()

Configure the DataFrame to read data from Pulsar

df = spark.read.format(“pulsar”).option(“service.url”, “pulsar://localhost:6650”).option(“admin.url”, “http://localhost:8080").option("topic", “persistent://public/default/my-topic”).load()

Data Processing

spark.stop()

Testing & Integration

OpenWhisk Action Test: Verifies that the OpenWhisk action can successfully send data to Pulsar.

Spark Job Test: Make sure the Spark job is able to read and process data from Pulsar.

Integration: Test the entire flow from OpenWhisk to Pulsar to Spark.

Deploy & Monitor

Deploy all components in production.

Monitor the application to make sure it’s working as expected and to handle any issues.

Final Thoughts

Error Handling: Make sure your code handles errors correctly in each step.

Security: Implement appropriate security measures, such as authentication for Pulsar and Spark.

Detailed Implementation

OpenWhisk Actions

First, let’s create an OpenWhisk action that sends data to a Pulsar topic.

Make sure you have the Pulsar client for Python installed.

writeOnPulsar.py

from pulsar import Client

def main(args):

pulsar_client = Client(‘pulsar://localhost:6650’)

producer = pulsar_client.create_producer(‘my-topic’)

‘data’ can be passed as part of ‘args’

data = args.get(“data”, “default data”)

producer.send((data).encode(‘utf-8’))

pulsar_client.close()

return {“result”: “Data sent to Pulsar”}

To perform this action in OpenWhisk, you’ll need to package and deploy it to your OpenWhisk instance.

Spark Job

Now, let’s write a simple Spark job that reads data from the Pulsar topic. Make sure you’ve set up Spark with Pulsar support.

spark_pulsar_job.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(“PulsarSpark”).getOrCreate()

Configura il DataFrame per leggere i dati da Pulsar

df = spark.read.format(“pulsar”) \

.option(“service.url”, “pulsar://localhost:6650”) \

.option(“admin.url”, “http://localhost:8080") \

.option(“topic”, “persistent://public/default/my-topic”) \

.load()

Here you can process the data as needed

df.show()

spark.stop()

Questo codice deve essere eseguito in un ambiente Spark.

Testing & Deployment

Once you’ve written these scripts, you should test them locally or in a development environment to ensure that they work as expected.

You can start the OpenWhisk action and then run the Spark job to see if the data is being transmitted correctly.

To use the OpenWhisk action in a real-world environment, you’ll need to package it in a way that’s manageable and deployable on OpenWhisk.

Create a Virtual Environment (Optional but Recommended)

First of all, it’s a good practice to create a Python virtual environment to manage dependencies.

python -m venv myenv

source myenv/bin/activate # on Windows usa myenv\Scripts\activate

Installing Dependencies

Install the necessary dependencies, in this case the Pulsar client for Python.

pip install ‘pulsar-client==3.4.0’

opzional

avro serialization

pip install ‘pulsar-client[avro]==3.4.0’

functions runtime

pip install ‘pulsar-client[functions]==3.4.0’

all optional components

pip install ‘pulsar-client[all]==3.4.0’

Create the Requirements File

Create a requirements.txt file that lists all dependencies.

This is important to ensure that the OpenWhisk environment has all the necessary libraries.

Replace with the specific version you’re using

pip install -r requirements.txt

Writing the Action Code

Write the action code (like the openwhisk_pulsar_action.py example I provided earlier) and make sure it’s working and tested.

Pack Action

OpenWhisk allows you to package Python actions into a zip file that includes the code and dependencies.

Here’s how:

zip -r my_action.zip openwhisk_pulsar_action.py myenv

This command creates a zip archive containing your script and virtual environment.

Deploying Action on OpenWhisk

You can now use the wsk command to deploy the action.

wsk action create myPulsarAction my_action.zip — kind python:3.7 — main main

This command creates an action on OpenWhisk called myPulsarAction using your zip file.

Test the action

Once you’ve deployed your action, it’s a good practice to test it to make sure it works as expected in your OpenWhisk environment.

wsk action invoke myPulsarAction — result — param data “test data”

This invokes the action with a test parameter.

Once you have zipped your OpenWhisk package, you will use the OpenWhisk Command Line Interface (CLI) wsk command for its distribution.

Here are the detailed steps:

· Install and configure the OpenWhisk CLI.

· Log in to your OpenWhisk instance.

· Creating the Action from a Zip Package

· Navigate to the Zip File Directory:

· Make sure you’re in the same directory as your zip file.

· For example, if your file is called my_action.zip, navigate to the directory that contains it.

Run the following command to create the action in OpenWhisk:

wsk action create nomeAzione my_action.zip — kind python:3.7 — main nomeFunzionePrincipale

Where:

nomeAzione is the name you want to give to the action in OpenWhisk.

my_action.zip is the name of your zip file.

— kind python:3.7 specifies the Python runtime you are using. Replace it with the appropriate version if necessary.

— main nomeFunzionePrincipale specify the name of the main function in your Python code.

For example, if your function is called main, you use — main main.

Practical Example

Let’s say you have a my_action.zip file and inside it is a Python script with a main function.

The command becomes:

wsk action create myPulsarAction my_action.zip — kind python:3.7 — main main

This command tells OpenWhisk to create an action called myPulsarAction using the my_action.zip file, with the Python 3.7 environment and the main function as the entry point.

Once you’ve deployed your action, you can test it with:

wsk action invoke myPulsarAction — result — param data “test data”

This invokes the myPulsarAction action with a test parameter and returns the result.

Remember that these commands assume that you have configured your OpenWhisk CLI correctly and that you have the necessary permissions to create actions on your instance OpenWhisk. If you encounter any issues during the deployment process, it may be helpful to check the OpenWhisk documentation or seek assistance in the OpenWhisk community.

Intercept and call the OpenWhisk action from Spark by reading from Pulsar’s tail.

  1. OpenWhisk Action Publishes Data on Pulsar

When the OpenWhisk action is invoked, it publishes data to a specific topic in Pulsar. This step has already been configured in the OpenWhisk action code we discussed.

  1. Configuring Spark to Read from Pulsar

To have Spark read data from a Pulsar topic, you need to set up a Spark job that uses the Pulsar-Spark Connector library.

Here’s a basic example:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(“SparkPulsarJob”).getOrCreate()

Configuring to read from a Pulsar topic

df = spark.read.format(“pulsar”) \

.option(“service.url”, “pulsar://localhost:6650”) \

.option(“admin.url”, “http://localhost:8080") \

.option(“topic”, “persistent://public/default/my-topic”) \

.load()

Data processing, for example, print the data for testing.

df.show()

spark.stop()

This Spark script connects to the specified Pulsar topic and reads the data that is published to it.

  1. Running the Job Spark

Once you’ve configured your Spark job to read from Pulsar, you can run it in a Spark environment. When the OpenWhisk action is invoked and publishes the data to Pulsar, the Spark job will read and process it as scheduled.

  1. Synchronization and Orchestration

Make sure there is proper synchronization between invoking the OpenWhisk action and running the Spark job.

You may need to orchestrate your workflow to ensure that the Spark job is running and ready to read data when the OpenWhisk action is invoked.

Final Thoughts

Error Handling: Make sure your Spark job handles any errors reading or connecting to Pulsar correctly.

Performance and Scalability: Monitor performance and scale resources as needed, depending on the volume of data and frequency of operations.

Synchronizing Between Writing to Pulsar and Reading from Spark

Synchronizing between writing to Apache Pulsar by an OpenWhisk action and reading by a Spark job can be a challenge, especially in a distributed, scalable environment. Here are some strategies you can use:

  1. Using Persistent Topic in Pulsar

Make sure Pulsar uses persistent topics. This ensures that messages are stored until they are read, reducing the chance of the Spark job losing messages.

  1. Windowing e Buffering

Configure your Spark job to use a time window or buffering. This approach allows Spark to read data from Pulsar in batches, reducing the risk of missing messages that could be published while the Spark job is not running.

  1. Checking the Status of the Job Spark

Implement logic that checks to see if the Spark job is running before invoking the OpenWhisk action. This can be accomplished through scripts or by using workflow orchestration systems.

  1. Workflow Orchestration

Use workflow orchestration tools such as Apache Airflow to coordinate when the OpenWhisk action is invoked and when the Spark job is executed. This allows you to have more precise control over when and how data is processed.

  1. Polling Continuo in Spark

Configure the Spark job to continuously poll the Pulsar topic.

This means that Spark will continue to control the topic for new data, reducing latency between writing and reading data.

  1. Exponential Backoff and Reconnections

In case of connection failures or network issues, implement an exponential backoff strategy in your Spark job.

This helps to manage times when Pulsar is unavailable or there are network issues.

  1. Monitoring & Alarms

Put in place a monitoring and alarm system to be alerted when there are problems in the flow of data between OpenWhisk, Pulsar and Spark. This allows you to intervene quickly in case of misalignments or errors.

  1. State Documentation

Consider keeping a record of the state of your messages (for example, in a database or queue system) to keep track of which messages have already been processed by Spark.

A very effective technique for synchronizing writing to Apache Pulsar from OpenWhisk and reading from Apache Spark is using persistent topics in Pulsar with continuous polling in Spark. This approach offers a good combination of reliability and simplicity. Here’s an example of how you could implement it:

Configuring the Persistent Topic in Pulsar

Make sure your topic in Pulsar is set up as persistent. Usually, topics in Pulsar are persistent by default. A persistent topic might have a name similar to:

persistent://public/default/my-topic.

  1. OpenWhisk Script to Publish on Pulsar

The OpenWhisk action publishes the data on the Pulsar topic.

Here there are no changes from the script provided earlier.

  1. Job Spark con Polling Continuo

The Spark job will continuously poll the Pulsar topic.

Configure the Spark job to read data continuously from Pulsar.

from pyspark.sql import SparkSession

def main():

spark = SparkSession.builder.appName(“SparkPulsarJob”).getOrCreate()

Configuration to read continuously from a Pulsar topic

df = spark.readStream.format(“pulsar”) \

.option(“service.url”, “pulsar://localhost:6650”) \

.option(“admin.url”, “http://localhost:8080") \

.option(“topic”, “persistent://public/default/my-topic”) \

.load()

Qui puoi definire l’elaborazione dei dati

query = df.writeStream \

.outputMode(“append”) \

.format(“console”) \

.start()

query.awaitTermination()

if name == “main”:

main()

This script configures Spark to stream from the Pulsar topic.

It uses readStream for continuous polling and writeStream to process the received data.

  1. Execution & Monitoring

Run the Spark job and monitor it to make sure it continuously reads data from Pulsar. Whenever the OpenWhisk action is invoked and publishes data to Pulsar, the Spark job will detect it in near real-time.

Final Thoughts

Reliability: This approach is reliable since data is not lost due to topic persistence.

Simplicity: The logic is quite simple and straightforward.

Performance: Monitor performance and scale resources as needed.

Please note that this is only a hypothetical implementation, you will have to adapt it according to a specific configuration of Pulsar and Spark, and to your data processing needs

Top comments (0)