DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

How to Add DingTalk Notifications to Apache SeaTunnel with a Custom Event Listener

Overview

Background

Use SeaTunnel to execute data synchronization tasks.
Deployment reference: Deploy Apache SeaTunnel Services.

Problem

  • Notifications need to be sent to DingTalk when tasks fail or when other critical events occur.
  • SeaTunnel itself does not provide built-in message notification capabilities and usually relies on DolphinScheduler or other external tools.

Solution

  • Use the event listener feature provided by SeaTunnel.
  • Develop a custom plugin to capture failure events and send notification messages.
  • Configure the group robot parameters through command-line submission.

Deployment

  • If you don’t want to write and package code and only need failure notifications, you can skip the plugin development steps and directly download the JAR package.
  • If you need customized notification content or additional event handling, you can modify the code yourself.

Develop the Plugin

The project can be obtained from:

Project Structure

The package name com.ts7ming can be customized.

│  pom.xml
│
└─src
    └─main
        ├─java
        │  └─com
        │      └─ts7ming
        │              DingTalkEventListener.java
        │
        └─resources
            └─META-INF
                └─services
                        org.apache.seatunnel.api.event.EventHandler
Enter fullscreen mode Exit fullscreen mode

pom.xml

  • Version 2.3.13 is used here. Adjust it according to your actual environment.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.w3.org/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ts7ming</groupId>
    <artifactId>SeatunnelExt</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <seatunnel.version>2.3.13</seatunnel.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-api</artifactId>
            <version>${seatunnel.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-engine-common</artifactId>
            <version>${seatunnel.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
Enter fullscreen mode Exit fullscreen mode

DingTalkEventListener.java

  • SeaTunnel supports the following event types.
package com.ts7ming;

import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

@Slf4j
public class DingTalkEventListener implements EventHandler {
    private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url", "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
    private static final String SECRET = System.getProperty("dingtalk.secret", "YOUR_SECRET");

    @Override
    public void handle(Event event) {
        EventType eventType = event.getEventType();

        if (eventType == EventType.JOB_STATUS) {
            handleJobStateEvent((JobStateEvent) event);
        } 
//        else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
//            handleAddColumnEvent((AlterTableAddColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
//            handleUpdateColumnEvent((AlterTableColumnsEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
//            handleDropColumnEvent((AlterTableDropColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
//            handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
//        }
        else {
            log.debug("Ignore unsupported event type: {}", eventType);
        }
    }

    private void handleJobStateEvent(JobStateEvent jobEvent) {
        String jobId = jobEvent.getJobId();
        String jobName = jobEvent.getJobName();
        JobStatus status = jobEvent.getJobStatus();
        long eventTime = jobEvent.getCreatedTime();

        switch (status) {
            case FAILED:
                sendAlert("【Task Failed】jobId: " + jobId + ", jobName: " + jobName);
                break;

            case FINISHED:
                //sendAlert("Task Finished: " + jobId + ", jobName: " + jobName);
                break;

            default:
                log.debug("Job status changed | jobId: {}, status: {}, time: {}", jobId, status, eventTime);
        }
    }

    private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "Unknown Column";
        sendAlert("【Schema Change】Table: " + tableName + ", Added Column: " + columnName);
    }

    private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        sendAlert("【Schema Change】Table: " + tableName + ", Updated Content: " + event);
    }

    private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn() : "Unknown Column";
        sendAlert("【Schema Change】Table: " + tableName + ", Dropped Column: " + columnName);
    }

    private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "Unknown Column";
        sendAlert("【Schema Change】Table: " + tableName + ", Modified Column: " + columnName);
    }

    private void sendAlert(String content) {
        sendDingTalkMessage(content);
    }

    void sendDingTalkMessage(String message) {
        try {
            long timestamp = System.currentTimeMillis();
            String sign = generateSign(timestamp, SECRET);
            String fullUrl = WEBHOOK_URL + "&timestamp=" + timestamp + "&sign=" + sign;

            String escapedMessage = message.replace("\\", "\\\\")
                                           .replace("\"", "\\\"")
                                           .replace("\n", "\\n")
                                           .replace("\r", "\\r")
                                           .replace("\t", "\\t");

            String jsonPayload = String.format(
                "{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}",
                escapedMessage
            );

            URL url = new URL(fullUrl);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();

            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type", "application/json");
            conn.setDoOutput(true);
            conn.setConnectTimeout(5000);
            conn.setReadTimeout(5000);

            try (OutputStream os = conn.getOutputStream()) {
                os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
                os.flush();
            }

            int responseCode = conn.getResponseCode();

            if (responseCode == 200) {
                log.info("DingTalk message sent successfully: {}", message);
            } else {
                log.error("Failed to send DingTalk message, response code: {}, message: {}", responseCode, message);
            }

        } catch (Exception e) {
            log.error("Exception while sending DingTalk message: {}", message, e);
        }
    }

    private String generateSign(long timestamp, String secret) throws Exception {
        String stringToSign = timestamp + "\n" + secret;

        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));

        byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));

        return URLEncoder.encode(
            new String(Base64.getEncoder().encode(signData)),
            "UTF-8"
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

org.apache.seatunnel.api.event.EventHandler

com.ts7ming.DingTalkEventListener
Enter fullscreen mode Exit fullscreen mode

Package the Project

mvn clean package
Enter fullscreen mode Exit fullscreen mode

Deploy the Plugin

Directly download the ready-to-use JAR file if you don’t want to package it yourself.

cd /opt/apache-seatunnel/lib

wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar

# Use Gitee if GitHub network access is slow
wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar

# If downloaded by another user, pay attention to permissions
chown -R seatunnel:seatunnel /opt/apache-seatunnel
Enter fullscreen mode Exit fullscreen mode

Upload

Upload the JAR package to the lib directory under the SeaTunnel root path.
For example:

/opt/apache-seatunnel/lib/
Enter fullscreen mode Exit fullscreen mode

Restart SeaTunnel Services

systemctl stop seatunnel-master.service
systemctl stop seatunnel-worker.service

systemctl start seatunnel-master.service
systemctl start seatunnel-worker.service
Enter fullscreen mode Exit fullscreen mode

Check Whether the Plugin Is Loaded

grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log
Enter fullscreen mode Exit fullscreen mode

Expected output:

INFO  [o.a.s.e.s.CoordinatorService  ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]
Enter fullscreen mode Exit fullscreen mode

Run the Task

Important:

  • The -D parameters must be placed before --config.
  • The SeaTunnel startup script does not flexibly parse parameter order.
sh bin/seatunnel.sh --async \
-Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=Your_DingTalk_Token" \
-Ddingtalk.secret="Your_DingTalk_Secret" \
--config task.conf \
-n "Task Name"
Enter fullscreen mode Exit fullscreen mode

Done!

Top comments (0)