DEV Community

Chen Debra
Chen Debra

Posted on

Driving Flink to Consume Kafka Data with Apache DolphinScheduler on Linux

Apache DolphinScheduler has already been deployed on a virtual machine.
Next, we want to try creating a Flink task node in DolphinScheduler and use Flink to consume data from Kafka.

Apache DolphinScheduler is deployed in standalone mode.
For detailed installation steps, please refer to the official documentation:
DolphinScheduler | Documentation Center
https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone

Prerequisites

The following components have already been installed:

  • Java 11
  • Apache DolphinScheduler 3.3.2
  • Apache Flink 1.18.1
  • Apache Kafka 3.6.0
  • ZooKeeper (using the built-in Kafka version)

It is recommended to download and install binary packages directly on the virtual machine. Installing via system package managers may introduce uncontrollable dependencies.

The downloaded binary packages are shown below:

Configure Flink Environment Variables

1. Edit environment variables

sudo vim ~/.bashrc
Enter fullscreen mode Exit fullscreen mode

Add the Flink installation path:

2. Make the configuration effective

# Reload environment variables
source ~/.bashrc

# Verify Flink environment variable
echo $Flink_HOME
Enter fullscreen mode Exit fullscreen mode

Modify Kafka, Flink, and DolphinScheduler Configuration Files

Since this setup runs inside a virtual machine, configuration changes are required so that services can be accessed from the host machine.

1. Modify Kafka configuration

Navigate to the config directory under the Kafka installation path and edit server.properties.

The listeners and advertised.listeners settings must be modified so that Kafka can be accessed externally. Otherwise, Kafka will default to localhost, which may cause connection failures.

broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
# Replace with your VM IP address
advertised.listeners=PLAINTEXT://192.168.146.132:9092
Enter fullscreen mode Exit fullscreen mode

2. Modify Flink configuration

Go to the conf directory under the Flink installation path and edit flink-conf.yaml.

Replace all localhost addresses with 0.0.0.0 so the Flink Web UI can be accessed externally.
Additionally, adjust JobManager and TaskManager memory settings.

jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m

taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1
Enter fullscreen mode Exit fullscreen mode


3. Modify Apache DolphinScheduler configuration

From the dolphinscheduler-daemon.sh startup script, we can see that DolphinScheduler loads environment variables from:

bin/env/dolphinscheduler_env.sh
Enter fullscreen mode Exit fullscreen mode

View dolphinscheduler-daemon.sh:

Edit dolphinscheduler_env.sh and add Java and Flink paths:

# Replace with your actual Java and Flink paths
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1
Enter fullscreen mode Exit fullscreen mode

Disable Firewall and Start Services

Start all required services, including ZooKeeper, Kafka, Flink, and Apache DolphinScheduler.

# Disable firewall
sudo systemctl stop firewalld

# Start Flink cluster
bin/start-cluster.sh

# Start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &

# Start Kafka broker
bin/kafka-server-start.sh config/server.properties &

# Create Kafka topic
bin/kafka-topics.sh --create --topic test \
  --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

# Produce messages
bin/kafka-console-producer.sh --topic test \
  --bootstrap-server localhost:9092

# Consume messages
bin/kafka-console-consumer.sh --topic test \
  --from-beginning --bootstrap-server localhost:9092

# Start DolphinScheduler Standalone Server
bash ./bin/dolphinscheduler-daemon.sh start standalone-server
Enter fullscreen mode Exit fullscreen mode

Verification

1. Verify Flink Web UI

Access the Flink dashboard at:

http://<VM-IP>:8081
Enter fullscreen mode Exit fullscreen mode

2. Verify DolphinScheduler Web UI

Access DolphinScheduler at:

http://<VM-IP>:12345/dolphinscheduler/ui
Enter fullscreen mode Exit fullscreen mode

Default credentials:

  • Username: admin
  • Password: dolphinscheduler123

Sample Implementation

This example demonstrates how Flink consumes data from Kafka, packages the job, uploads it to DolphinScheduler, and executes it as a Flink task.

1. Sample Code

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>

    <dependencies>
        <!-- Flink core dependency -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>

        <!-- Connector Base Dependency -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>

        <!-- Kafka Connector (Key Change) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>

        <!-- Logging Dependency -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Enter fullscreen mode Exit fullscreen mode

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;


public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // Set the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka configuration
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Consumer group
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Create Kafka Consumer
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // Consume from the earliest messages
        DataStream<String> stream = env.addSource(KafkaConsumer);

        // Process data: tokenization and counting
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);


        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;

                // Check whether the stop condition is met
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // Set a flag to indicate that the job should stop
                }
            }
        });

        // Execute the job and obtain JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // Start the job and obtain JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());

                // Monitor the condition and cancel the job
                while (!shouldStop) {
                    Thread.sleep(100); // Check every 100 milliseconds
                }

                // Cancel the job when the stop condition is met
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // Cancel the job
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // Wait for the job to finish in the main thread
        future.join(); // Wait for the job to finish
            }

    // Tokenizer Class for converting input strings into words
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

2. Package and Upload to DolphinScheduler

3. Create and Run a Flink Task Node

Kafka Producer and Flink Output

Start the Kafka producer in the virtual machine and send messages.
Flink successfully consumes and processes the Kafka data.

Top comments (0)