DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Apache SeaTunnel 2.3.10 Source Code Analysis: Zeta Engine Service Startup

Today, we focus on SeaTunnel’s self-developed data synchronization engine, called Zeta.
First, if you are using the Zeta engine, the first step is always to run the bin/seatunnel-cluster.sh script. This script starts the Zeta server.

Opening seatunnel-cluster.sh, we can see that it actually launches the main() method in seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java.

This is the core startup method of Zeta.

As shown in the code below:

public class SeaTunnelServer {

    public static void main(String[] args) throws CommandException {

        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        acceptUnknownOptions: true
                );

        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}
Enter fullscreen mode Exit fullscreen mode

We should first look at the ServerCommandArgs class, which assembles the startup command based on command-line arguments. The entry point is serverCommandArgs.buildCommand().

@EqualsAndHashCode(callSuper = true)
@Data
public class ServerCommandArgs extends CommandArgs {

    @Parameter(
            names = {"-cn", "--cluster"},
            description = "The name of cluster"
    )
    private String clusterName;

    @Parameter(
            names = {"-d", "--daemon"},
            description = "The cluster daemon mode"
    )
    private boolean daemonMode = false;

    @Parameter(
            names = {"-r", "--role"},
            description =
                    "The cluster node role, default is master_and_worker, " +
                    "support master, worker, master_and_worker"
    )
    private String clusterRole;

    @Override
    public Command<?> buildCommand() {
        return new ServerExecuteCommand(this);
    }
}
Enter fullscreen mode Exit fullscreen mode

Next, SeaTunnel.run() starts the SeaTunnelServer. The startup process can be summarized as follows:

Steps:

  1. Validate the current environment.
  2. Load SeaTunnel configuration.
  3. Set node role, including Master, Worker, and Master_And_Worker.
  4. Create a Hazelcast instance for cluster discovery, registration, and distributed data management.
@Override
public void execute() {
    // Validate environment
    checkEnvironment();

    // Load SeaTunnel configuration
    SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    String clusterRole = this.serverCommandArgs.getClusterRole();

    // Set node role
    if (StringUtils.isNotBlank(clusterRole)) {
        if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
            seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
        } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
            seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
            // In Hazelcast lite node, it will not store IMap data.
            seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
        } else {
            throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
        }
    } else {
        seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
    }

    // Create Hazelcast instance for cluster discovery, registration, and distributed data management
    SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig, Thread.currentThread().getName());
}
Enter fullscreen mode Exit fullscreen mode

Node Roles:

  1. Master Node:
  • Core Responsibilities: Handles cluster job scheduling, state management, and resource coordination. Runs CoordinatorService to transform logical plans (LogicalDAG) into physical execution plans (PhysicalDAG) and generates execution schedules. It also manages checkpoints and job monitoring metrics.
  • High Availability: Active/Standby mode. Only one active master exists at a time; standby masters take over upon failure to ensure continuous operation.
  • Data Storage: Uses built-in distributed memory grids (e.g., Hazelcast IMap) to store job state and metadata without relying on external systems like ZooKeeper. In separate deployment mode, all state is stored on the master node to prevent worker load from affecting data stability.
  1. Worker Node:
  • Core Responsibilities: Executes specific data processing tasks. Runs TaskExecutionService and SlotService; the former provides runtime environment for tasks, the latter manages resources like CPU cores.
  • Dynamic Resource Allocation: SlotService allows dynamic resource allocation according to task parallelism, improving utilization.
  • Stateless Design: Does not store job state; if a worker fails, tasks are rescheduled by the master node.
  1. Hybrid Node (Legacy Architecture):
  • Early versions allowed nodes to act as both Master and Worker (master_and_worker mode), which could reduce fault tolerance efficiency under heavy load.
  • Optimization: Since version 2.3.6, separate deployment of Master and Worker is recommended for stability and scalability.

Returning to Hazelcast instance creation:

The core code is HazelcastInstanceFactory.newHazelcastInstance(), which creates the Hazelcast instance.

private static HazelcastInstanceImpl initializeHazelcastInstance(
        @NonNull SeaTunnelConfig seaTunnelConfig, String customInstanceName) {

    // Set the default async executor for Hazelcast InvocationFuture
    ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);

    // Check whether to enable metrics reporting
    boolean condition = checkTelemetryConfig(seaTunnelConfig);

    String instanceName = customInstanceName != null
            ? customInstanceName
            : HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig());

    // Create Hazelcast instance
    HazelcastInstanceImpl original = ((HazelcastInstanceProxy)
            HazelcastInstanceFactory.newHazelcastInstance(
                    seaTunnelConfig.getHazelcastConfig(),
                    instanceName,
                    new SeaTunnelNodeContext(seaTunnelConfig)))
            .getOriginal();

    // Initialize telemetry instance
    // Enable metrics reporting, including: JvmCollector, JobInfoDetail, ThreadPoolStatus, NodeMetrics, ClusterMetrics
    if (condition) {
        initTelemetryInstance(original.node);
    }

    return original;
}
Enter fullscreen mode Exit fullscreen mode

The most important part here is new SeaTunnelNodeContext(seaTunnelConfig).
This returns a SeaTunnelNodeContext class, which extends Hazelcast’s DefaultNodeContext. During Hazelcast startup, it calls createNodeExtension(), which in this case is implemented in SeaTunnelNodeContext.

@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }
}
Enter fullscreen mode Exit fullscreen mode

Tracing into the node extension, we see that the Zeta engine is initialized here.

Zeta engine init

SeaTunnelServer implements a series of Hazelcast interfaces to listen for cluster state changes, including node initialization, member join/leave events, and distributed system operations.

Hazelcast interfaces

Detailed operations:

  1. Node Initialization: Provides a complete startup process, ensuring all services are initialized correctly. Key methods: startMaster() and startWorker().
@Override
public void init(NodeEngine engine, Properties hzProperties) {
    this.nodeEngine = (NodeEngineImpl) engine;

    // Initialize class loader service
    classLoaderService = new DefaultClassLoaderService(
            seaTunnelConfig.getEngineConfig().isClassLoaderCacheMode(), nodeEngine);

    // Event service for processing and forwarding
    eventService = new EventService(nodeEngine);

    // Start Master / Worker capabilities
    if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
        startWorker();
        startMaster();
    } else if (EngineConfig.ClusterRole.WORKER.ordinal()
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
        startWorker();
    } else {
        startMaster();
    }

    // Health monitor
    seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());

    // Task log management
    if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null
            && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null
            && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) {
        taskLogManagerService = new TaskLogManagerService(
                seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs());
        taskLogManagerService.initClean();
    }

    // Jetty service: HTTP REST API, Web UI, job management, monitoring endpoints
    if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) {
        jettyService = new JettyService(nodeEngine, seaTunnelConfig);
        jettyService.createJettyServer();
    }

    // Fix Hadoop Statistics cleaner thread class-loader leak
    FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
}
Enter fullscreen mode Exit fullscreen mode

Master Node Startup (startMaster): Initializes CoordinatorService, CheckpointService, and monitoring service.

private void startMaster() {
    coordinatorService = new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    checkpointService = new CheckpointService(seaTunnelConfig.getEngineConfig().getCheckpointConfig());

    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);
}
Enter fullscreen mode Exit fullscreen mode

Worker Node Startup (startWorker): Focuses on TaskExecutionService, which executes data processing tasks efficiently and reliably.

private void startWorker() {
    taskExecutionService = new TaskExecutionService(classLoaderService, nodeEngine, eventService);
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
}
Enter fullscreen mode Exit fullscreen mode

Core process of TaskExecutionService.start():

TaskExecutionService flow

Official description:

  • TaskExecutionService: Runs on every node; receives TaskGroups from JobMaster and executes Tasks. Maintains TaskID -> TaskContext. Tasks hold an OperationService for remote calls.
  • CoordinatorService: Handles client commands and job recovery after master failover; caches task information and archives tasks after completion.
  • SlotService: Manages available slots; reports resource info periodically to master.
  1. Cluster Member Join/Leave:
  • memberAdded: handles new members joining (empty implementation).
  • memberRemoved: handles members leaving; critical for resource cleanup, task redistribution, and cluster state maintenance.

Cluster join/leave

Why memberAdded is empty:

  • Normal event; auto-initialized via other mechanisms (e.g., SlotService).
  • Task scheduling is dynamic; no special handling needed.

memberRemoved responsibilities:

  • Resource cleanup, task redistribution, cluster state update, maintaining reliability and data consistency.
  1. Distributed System Operation Tracking:
  • Current implementation is empty; no special operations to track.
@Override
public void populate(LiveOperations liveOperations) {
    // In SeaTunnelServer this implementation is empty,
    // indicating the current version has no special operations to track.
}
Enter fullscreen mode Exit fullscreen mode

Local IDEA startup note:

  • Default config uses HDFS; local environment may lack HDFS, preventing service startup. Change to localfile to start locally.

Localfile startup

Finally, visit localhost:8080 to check service status:

Service status
Service monitoring
Task overview

Top comments (0)