Overview
Background
Use SeaTunnel to execute data synchronization tasks.
Deployment reference: Deploy Apache SeaTunnel Services.
Problem
- Notifications need to be sent to DingTalk when tasks fail or when other critical events occur.
- SeaTunnel itself does not provide built-in message notification capabilities and usually relies on DolphinScheduler or other external tools.
Solution
- Use the event listener feature provided by SeaTunnel.
- Develop a custom plugin to capture failure events and send notification messages.
- Configure the group robot parameters through command-line submission.
Deployment
- If you don’t want to write and package code and only need failure notifications, you can skip the plugin development steps and directly download the JAR package.
- If you need customized notification content or additional event handling, you can modify the code yourself.
Develop the Plugin
The project can be obtained from:
Project Structure
The package name com.ts7ming can be customized.
│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─ts7ming
│ DingTalkEventListener.java
│
└─resources
└─META-INF
└─services
org.apache.seatunnel.api.event.EventHandler
pom.xml
- Version
2.3.13is used here. Adjust it according to your actual environment.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.w3.org/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ts7ming</groupId>
<artifactId>SeatunnelExt</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<seatunnel.version>2.3.13</seatunnel.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-common</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
DingTalkEventListener.java
- SeaTunnel supports the following event types.
package com.ts7ming;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@Slf4j
public class DingTalkEventListener implements EventHandler {
private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url", "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
private static final String SECRET = System.getProperty("dingtalk.secret", "YOUR_SECRET");
@Override
public void handle(Event event) {
EventType eventType = event.getEventType();
if (eventType == EventType.JOB_STATUS) {
handleJobStateEvent((JobStateEvent) event);
}
// else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
// handleAddColumnEvent((AlterTableAddColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
// handleUpdateColumnEvent((AlterTableColumnsEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
// handleDropColumnEvent((AlterTableDropColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
// handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
// }
else {
log.debug("Ignore unsupported event type: {}", eventType);
}
}
private void handleJobStateEvent(JobStateEvent jobEvent) {
String jobId = jobEvent.getJobId();
String jobName = jobEvent.getJobName();
JobStatus status = jobEvent.getJobStatus();
long eventTime = jobEvent.getCreatedTime();
switch (status) {
case FAILED:
sendAlert("【Task Failed】jobId: " + jobId + ", jobName: " + jobName);
break;
case FINISHED:
//sendAlert("Task Finished: " + jobId + ", jobName: " + jobName);
break;
default:
log.debug("Job status changed | jobId: {}, status: {}, time: {}", jobId, status, eventTime);
}
}
private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "Unknown Column";
sendAlert("【Schema Change】Table: " + tableName + ", Added Column: " + columnName);
}
private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
String tableName = event.getTableIdentifier().getTableName();
sendAlert("【Schema Change】Table: " + tableName + ", Updated Content: " + event);
}
private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn() : "Unknown Column";
sendAlert("【Schema Change】Table: " + tableName + ", Dropped Column: " + columnName);
}
private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "Unknown Column";
sendAlert("【Schema Change】Table: " + tableName + ", Modified Column: " + columnName);
}
private void sendAlert(String content) {
sendDingTalkMessage(content);
}
void sendDingTalkMessage(String message) {
try {
long timestamp = System.currentTimeMillis();
String sign = generateSign(timestamp, SECRET);
String fullUrl = WEBHOOK_URL + "×tamp=" + timestamp + "&sign=" + sign;
String escapedMessage = message.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
String jsonPayload = String.format(
"{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}",
escapedMessage
);
URL url = new URL(fullUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
try (OutputStream os = conn.getOutputStream()) {
os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
os.flush();
}
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
log.info("DingTalk message sent successfully: {}", message);
} else {
log.error("Failed to send DingTalk message, response code: {}, message: {}", responseCode, message);
}
} catch (Exception e) {
log.error("Exception while sending DingTalk message: {}", message, e);
}
}
private String generateSign(long timestamp, String secret) throws Exception {
String stringToSign = timestamp + "\n" + secret;
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return URLEncoder.encode(
new String(Base64.getEncoder().encode(signData)),
"UTF-8"
);
}
}
org.apache.seatunnel.api.event.EventHandler
com.ts7ming.DingTalkEventListener
Package the Project
mvn clean package
Deploy the Plugin
Directly download the ready-to-use JAR file if you don’t want to package it yourself.
cd /opt/apache-seatunnel/lib
wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# Use Gitee if GitHub network access is slow
wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# If downloaded by another user, pay attention to permissions
chown -R seatunnel:seatunnel /opt/apache-seatunnel
Upload
Upload the JAR package to the lib directory under the SeaTunnel root path.
For example:
/opt/apache-seatunnel/lib/
Restart SeaTunnel Services
systemctl stop seatunnel-master.service
systemctl stop seatunnel-worker.service
systemctl start seatunnel-master.service
systemctl start seatunnel-worker.service
Check Whether the Plugin Is Loaded
grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log
Expected output:
INFO [o.a.s.e.s.CoordinatorService ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]
Run the Task
Important:
- The
-Dparameters must be placed before--config. - The SeaTunnel startup script does not flexibly parse parameter order.
sh bin/seatunnel.sh --async \
-Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=Your_DingTalk_Token" \
-Ddingtalk.secret="Your_DingTalk_Secret" \
--config task.conf \
-n "Task Name"
Done!
Top comments (0)