Today, we focus on SeaTunnel’s self-developed data synchronization engine, called Zeta.
First, if you are using the Zeta engine, the first step is always to run the bin/seatunnel-cluster.sh script. This script starts the Zeta server.
Opening seatunnel-cluster.sh, we can see that it actually launches the main() method in seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java.
This is the core startup method of Zeta.
As shown in the code below:
public class SeaTunnelServer {
public static void main(String[] args) throws CommandException {
ServerCommandArgs serverCommandArgs =
CommandLineUtils.parse(
args,
new ServerCommandArgs(),
EngineType.SEATUNNEL.getStarterShellName(),
acceptUnknownOptions: true
);
SeaTunnel.run(serverCommandArgs.buildCommand());
}
}
We should first look at the ServerCommandArgs class, which assembles the startup command based on command-line arguments. The entry point is serverCommandArgs.buildCommand().
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerCommandArgs extends CommandArgs {
@Parameter(
names = {"-cn", "--cluster"},
description = "The name of cluster"
)
private String clusterName;
@Parameter(
names = {"-d", "--daemon"},
description = "The cluster daemon mode"
)
private boolean daemonMode = false;
@Parameter(
names = {"-r", "--role"},
description =
"The cluster node role, default is master_and_worker, " +
"support master, worker, master_and_worker"
)
private String clusterRole;
@Override
public Command<?> buildCommand() {
return new ServerExecuteCommand(this);
}
}
Next, SeaTunnel.run() starts the SeaTunnelServer. The startup process can be summarized as follows:
Steps:
- Validate the current environment.
- Load SeaTunnel configuration.
- Set node role, including Master, Worker, and Master_And_Worker.
- Create a Hazelcast instance for cluster discovery, registration, and distributed data management.
@Override
public void execute() {
// Validate environment
checkEnvironment();
// Load SeaTunnel configuration
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
String clusterRole = this.serverCommandArgs.getClusterRole();
// Set node role
if (StringUtils.isNotBlank(clusterRole)) {
if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
} else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
// In Hazelcast lite node, it will not store IMap data.
seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
} else {
throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
}
} else {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
}
// Create Hazelcast instance for cluster discovery, registration, and distributed data management
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig, Thread.currentThread().getName());
}
Node Roles:
- Master Node:
-
Core Responsibilities: Handles cluster job scheduling, state management, and resource coordination. Runs
CoordinatorServiceto transform logical plans (LogicalDAG) into physical execution plans (PhysicalDAG) and generates execution schedules. It also manages checkpoints and job monitoring metrics. - High Availability: Active/Standby mode. Only one active master exists at a time; standby masters take over upon failure to ensure continuous operation.
- Data Storage: Uses built-in distributed memory grids (e.g., Hazelcast IMap) to store job state and metadata without relying on external systems like ZooKeeper. In separate deployment mode, all state is stored on the master node to prevent worker load from affecting data stability.
- Worker Node:
-
Core Responsibilities: Executes specific data processing tasks. Runs
TaskExecutionServiceandSlotService; the former provides runtime environment for tasks, the latter manages resources like CPU cores. -
Dynamic Resource Allocation:
SlotServiceallows dynamic resource allocation according to task parallelism, improving utilization. - Stateless Design: Does not store job state; if a worker fails, tasks are rescheduled by the master node.
- Hybrid Node (Legacy Architecture):
- Early versions allowed nodes to act as both Master and Worker (
master_and_workermode), which could reduce fault tolerance efficiency under heavy load. - Optimization: Since version 2.3.6, separate deployment of Master and Worker is recommended for stability and scalability.
Returning to Hazelcast instance creation:
The core code is HazelcastInstanceFactory.newHazelcastInstance(), which creates the Hazelcast instance.
private static HazelcastInstanceImpl initializeHazelcastInstance(
@NonNull SeaTunnelConfig seaTunnelConfig, String customInstanceName) {
// Set the default async executor for Hazelcast InvocationFuture
ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);
// Check whether to enable metrics reporting
boolean condition = checkTelemetryConfig(seaTunnelConfig);
String instanceName = customInstanceName != null
? customInstanceName
: HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig());
// Create Hazelcast instance
HazelcastInstanceImpl original = ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
instanceName,
new SeaTunnelNodeContext(seaTunnelConfig)))
.getOriginal();
// Initialize telemetry instance
// Enable metrics reporting, including: JvmCollector, JobInfoDetail, ThreadPoolStatus, NodeMetrics, ClusterMetrics
if (condition) {
initTelemetryInstance(original.node);
}
return original;
}
The most important part here is new SeaTunnelNodeContext(seaTunnelConfig).
This returns a SeaTunnelNodeContext class, which extends Hazelcast’s DefaultNodeContext. During Hazelcast startup, it calls createNodeExtension(), which in this case is implemented in SeaTunnelNodeContext.
@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
private final SeaTunnelConfig seaTunnelConfig;
public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.seaTunnelConfig = seaTunnelConfig;
}
@Override
public NodeExtension createNodeExtension(@NonNull Node node) {
return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}
}
Tracing into the node extension, we see that the Zeta engine is initialized here.
SeaTunnelServer implements a series of Hazelcast interfaces to listen for cluster state changes, including node initialization, member join/leave events, and distributed system operations.
Detailed operations:
-
Node Initialization:
Provides a complete startup process, ensuring all services are initialized correctly. Key methods:
startMaster()andstartWorker().
@Override
public void init(NodeEngine engine, Properties hzProperties) {
this.nodeEngine = (NodeEngineImpl) engine;
// Initialize class loader service
classLoaderService = new DefaultClassLoaderService(
seaTunnelConfig.getEngineConfig().isClassLoaderCacheMode(), nodeEngine);
// Event service for processing and forwarding
eventService = new EventService(nodeEngine);
// Start Master / Worker capabilities
if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
startMaster();
} else if (EngineConfig.ClusterRole.WORKER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
} else {
startMaster();
}
// Health monitor
seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());
// Task log management
if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null
&& seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null
&& seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) {
taskLogManagerService = new TaskLogManagerService(
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs());
taskLogManagerService.initClean();
}
// Jetty service: HTTP REST API, Web UI, job management, monitoring endpoints
if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) {
jettyService = new JettyService(nodeEngine, seaTunnelConfig);
jettyService.createJettyServer();
}
// Fix Hadoop Statistics cleaner thread class-loader leak
FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
}
Master Node Startup (startMaster): Initializes CoordinatorService, CheckpointService, and monitoring service.
private void startMaster() {
coordinatorService = new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
checkpointService = new CheckpointService(seaTunnelConfig.getEngineConfig().getCheckpointConfig());
monitorService = Executors.newSingleThreadScheduledExecutor();
monitorService.scheduleAtFixedRate(
this::printExecutionInfo,
0,
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
TimeUnit.SECONDS);
}
Worker Node Startup (startWorker): Focuses on TaskExecutionService, which executes data processing tasks efficiently and reliably.
private void startWorker() {
taskExecutionService = new TaskExecutionService(classLoaderService, nodeEngine, eventService);
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
}
Core process of TaskExecutionService.start():
Official description:
-
TaskExecutionService: Runs on every node; receives TaskGroups from JobMaster and executes Tasks. Maintains
TaskID -> TaskContext. Tasks hold anOperationServicefor remote calls. - CoordinatorService: Handles client commands and job recovery after master failover; caches task information and archives tasks after completion.
- SlotService: Manages available slots; reports resource info periodically to master.
- Cluster Member Join/Leave:
-
memberAdded: handles new members joining (empty implementation). -
memberRemoved: handles members leaving; critical for resource cleanup, task redistribution, and cluster state maintenance.
Why memberAdded is empty:
- Normal event; auto-initialized via other mechanisms (e.g., SlotService).
- Task scheduling is dynamic; no special handling needed.
memberRemoved responsibilities:
- Resource cleanup, task redistribution, cluster state update, maintaining reliability and data consistency.
- Distributed System Operation Tracking:
- Current implementation is empty; no special operations to track.
@Override
public void populate(LiveOperations liveOperations) {
// In SeaTunnelServer this implementation is empty,
// indicating the current version has no special operations to track.
}
Local IDEA startup note:
- Default config uses HDFS; local environment may lack HDFS, preventing service startup. Change to
localfileto start locally.
Finally, visit localhost:8080 to check service status:








Top comments (0)