This series of articles is based on Apache SeaTunnel version 2.3.6 and introduces the full process of handling a task from submission to execution with the Zeta engine. This document aims to assist newcomers to SeaTunnel by providing some guidance.
The article will be divided into three parts, covering the following aspects:
- Initialization of the SeaTunnel Server
- Task submission process on the Client side
- Task execution process upon receiving the task on the Server side
Due to the extensive source code analysis involved, this series of articles will document the overall task process.
References
- [ST-Engine][Design] The Design of LogicalPlan to PhysicalPlan: https://github.com/apache/seatunnel/issues/2269
Introduction to the Author
Hi the community, I'm Liu Naijie, a big data developer who has been involved in Apache SeaTunnel development for over a year. I have contributed some PRs to SeaTunnel and added some interesting features, including support for Avro file formats, nested structure queries in SQL Transform, and adding tags to nodes for resource isolation.
Recently, SeaTunnel has been implemented internally at my company, and I need to introduce SeaTunnel's technical architecture to my colleagues and bosses, as well as provide a detailed running process to help them better understand development and maintenance.
However, I found that there doesn't seem to be an article that analyzes the entire task execution process in detail, which would help developers more easily locate issues and add features.
So, I took some time to write this article, hoping to inspire other experts to write more source code analysis articles.
Cluster Topology
First, let's get an overview of the SeaTunnel Zeta engine architecture. SeaTunnel is implemented using Hazelcast for distributed cluster communication.
Since version 2.3.6, nodes in the cluster can be assigned as Master or Worker nodes, separating scheduling from execution to prevent excessive load on the Master node and avoid potential issues.
Version 2.3.6 also added a feature to add tag attributes to each node. When submitting a task, tags can be used to select the nodes where the task will run, achieving resource isolation.
The server side of the cluster is divided into Master and Worker nodes. The Master node is responsible for receiving requests, generating logical plans, allocating tasks, etc. (compared to previous versions, it now includes additional Backup nodes, which is a significant improvement for cluster stability).
The Worker node, on the other hand, is responsible only for task execution, which includes data reading and writing.
When submitting a task, you can create a Hazelcast client connection to the cluster for communication or use the REST API for communication.
Server Startup
After getting a general understanding of the cluster architecture, let's look at the specific process.
First, let's examine the Server startup process.
The command to start the Server is:
sh bin/seatunnel-cluster.sh -d -r <node role type>
Looking into this script, you’ll find that it ultimately executes the following command:
java -cp seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer <other_java_jvm_config_and_args>
Let’s check the code for starter.seatunnel.SeaTunnelServer
:
public class SeaTunnelServer {
public static void main(String[] args) throws CommandException {
ServerCommandArgs serverCommandArgs =
CommandLineUtils.parse(
args,
new ServerCommandArgs(),
EngineType.SEATUNNEL.getStarterShellName(),
true);
SeaTunnel.run(serverCommandArgs.buildCommand());
}
}
This part uses JCommander to parse user-provided arguments and build and run a command. The serverCommandArgs.buildCommand
returns the class:
public class ServerExecuteCommand implements Command<ServerCommandArgs> {
private final ServerCommandArgs serverCommandArgs;
public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
this.serverCommandArgs = serverCommandArgs;
}
@Override
public void execute() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
String clusterRole = this.serverCommandArgs.getClusterRole();
if (StringUtils.isNotBlank(clusterRole)) {
if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
} else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
// In Hazelcast, lite members will not store IMap data.
seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
} else {
throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
}
} else {
seaTunnelConfig
.getEngineConfig()
.setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
}
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(seaTunnelConfig));
}
}
Here, the configuration information is modified based on the role type.
When it is a Worker node, the Hazelcast node type is set to lite member
. In Hazelcast, lite members do not store data.
Then, a Hazelcast
instance is created and passed the SeaTunnelNodeContext
instance and the modified configuration information.
public class SeaTunnelNodeContext extends DefaultNodeContext {
private final SeaTunnelConfig seaTunnelConfig;
public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.seaTunnelConfig = seaTunnelConfig;
}
@Override
public NodeExtension createNodeExtension(@NonNull Node node) {
return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}
@Override
public Joiner createJoiner(Node node) {
JoinConfig join =
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();
if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {
super.createJoiner(node);
} else if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
} else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
|| isAnyAliasedConfigEnabled(join)
|| join.isAutoDetectionEnabled()) {
super.createJoiner(node);
}
return null;
}
private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
}
private boolean usePublicAddress(JoinConfig join, Node node) {
return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
|| allUsePublicAddress(
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
}
}
In SeaTunnelNodeContext
, the createNodeExtension
method is overridden to use the engine.server.NodeExtension
class.
The code for this class is:
public class NodeExtension extends DefaultNodeExtension {
private final NodeExtensionCommon extCommon;
public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
super(node);
extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
}
@Override
public void beforeStart() {
// TODO Get Config from Node here
super.beforeStart();
}
@Override
public void afterStart() {
super.afterStart();
extCommon.afterStart();
}
@Override
public void beforeClusterStateChange(
ClusterState currState, ClusterState requestedState, boolean isTransient) {
super.beforeClusterStateChange(currState, requestedState, isTransient);
extCommon.beforeClusterStateChange(requestedState);
}
@Override
public void onClusterStateChange(ClusterState newState, boolean isTransient) {
super.onClusterStateChange(newState, isTransient);
extCommon.onClusterStateChange(newState);
}
@Override
public Map<String, Object> createExtensionServices() {
return extCommon.createExtensionServices();
}
@Override
public TextCommandService createTextCommandService() {
return new TextCommandServiceImpl(node) {
{
register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
register(HTTP_GET, new RestHttpGetCommandProcessor(this));
register(HTTP_POST, new RestHttpPostCommandProcessor(this));
}
};
}
@Override
public void printNodeInfo() {
extCommon.printNodeInfo(systemLogger);
}
}
In this part, we see that the SeaTunnelServer
class is initialized in the constructor. This class is the core server-side class, and its full class name is org.apache.seatunnel.engine.server.SeaTunnelServer
.
Let's review the code for this class:
public class SeaTunnelServer
implements ManagedService, MembershipAwareService, LiveOperationsTracker {
private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
private NodeEngineImpl nodeEngine;
private final LiveOperationRegistry liveOperationRegistry;
private volatile SlotService slotService;
private TaskExecutionService taskExecutionService;
private ClassLoaderService classLoaderService;
private CoordinatorService coordinatorService;
private ScheduledExecutorService monitorService;
@Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;
private final SeaTunnelConfig seaTunnelConfig;
private volatile boolean isRunning = true;
public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
LOGGER.info("SeaTunnel server start...");
}
@Override
public void init(NodeEngine engine, Properties hzProperties) {
...
if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
startMaster();
} else if (EngineConfig.ClusterRole.WORKER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
} else {
startMaster();
}
...
}
....
}
This class is the core code on the SeaTunnel Server side, and it starts relevant components based on the role of the node.
A brief summary of the SeaTunnel process:
SeaTunnel utilizes Hazelcast's foundational capabilities to implement cluster networking and invoke core startup code.
For those interested in a deeper understanding of this area, it's worth checking out Hazelcast's related content. Here is a summary of the invocation path:
Classes loaded in sequence:
starter.SeaTunnelServer
ServerExecuteCommand
SeaTunnelNodeContext
NodeExtension
server.SeaTunnelServer
Next, let's look in detail at the components created in the Master and Worker nodes.
Worker Node
private void startWorker() {
taskExecutionService =
new TaskExecutionService(
classLoaderService, nodeEngine, nodeEngine.getProperties());
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
}
public SlotService getSlotService() {
if (slotService == null) {
synchronized (this) {
if (slotService == null) {
SlotService service =
new DefaultSlotService(
nodeEngine,
taskExecutionService,
seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
service.init();
slotService = service;
}
}
}
return slotService;
}
In the startWorker
method, two components are initialized: taskExecutionService
and slotService
. Both are related to task execution.
SlotService
First, let's look at the initialization of SlotService
.
@Override
public void init() {
initStatus = true;
slotServiceSequence = UUID.randomUUID().toString();
contexts = new ConcurrentHashMap<>();
assignedSlots = new ConcurrentHashMap<>();
unassignedSlots = new ConcurrentHashMap<>();
unassignedResource = new AtomicReference<>(new ResourceProfile());
assignedResource = new AtomicReference<>(new ResourceProfile());
scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
r ->
new Thread(
r,
String.format(
"hz.%s.seaTunnel.slotService.thread",
nodeEngine.getHazelcastInstance().getName())));
if (!config.isDynamicSlot()) {
initFixedSlots();
}
unassignedResource.set(getNodeResource());
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
LOGGER.fine(
"start send heartbeat to resource manager, this address: "
+ nodeEngine.getClusterService().getThisAddress());
sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
} catch (Exception e) {
LOGGER.warning(
"failed send heartbeat to resource manager, will retry later. this address: "
+ nodeEngine.getClusterService().getThisAddress());
}
},
0,
DEFAULT_HEARTBEAT_TIMEOUT,
TimeUnit.MILLISECONDS);
}
In SeaTunnel, there is a concept of dynamic Slots. If set to true
, each node does not have a fixed number of Slots and can accept any number of tasks. If set to a fixed number of Slots, the node can only accept the number of tasks equal to the fixed Slots.
During initialization, the number of Slots is set based on whether dynamic Slots are enabled or not.
private void initFixedSlots() {
long maxMemory = Runtime.getRuntime().maxMemory();
for (int i = 0; i < config.getSlotNum(); i++) {
unassignedSlots.put(
i,
new SlotProfile(
nodeEngine.getThisAddress(),
i,
new ResourceProfile(
CPU.of(0), Memory.of(maxMemory / config.getSlotNum())),
slotServiceSequence));
}
}
It can also be seen that a thread is started to periodically send heartbeats to the Master node. The heartbeat information includes the current node's information, such as the number of assigned and unassigned Slots. The Worker node updates this information to the Master node periodically through heartbeats.
@Override
public synchronized WorkerProfile getWorkerProfile() {
WorkerProfile workerProfile = new WorkerProfile(nodeEngine.getThisAddress());
workerProfile.setProfile(getNodeResource());
workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
workerProfile.setUnassignedResource(unassignedResource.get());
workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
workerProfile.setDynamicSlot(config.isDynamicSlot());
return workerProfile;
}
private ResourceProfile getNodeResource() {
return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
}
TaskExecutionService
This component is related to task submission. Here, we briefly look at the related code and will delve into it further later.
When the Worker node initializes, it creates a TaskExecutionService
object and calls its start
method.
private final ExecutorService executorService =
newCachedThreadPool(new BlockingTaskThreadFactory());
public TaskExecutionService(
ClassLoaderService classLoaderService,
NodeEngineImpl nodeEngine,
HazelcastProperties properties) {
// Load configuration
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
this.classLoaderService = classLoaderService;
this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
// Metrics related
MetricsRegistry registry = nodeEngine.getMetricsRegistry();
MetricDescriptor descriptor =
registry.newMetricDescriptor()
.withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
registry.registerStaticMetrics(descriptor, this);
// Scheduled task to update metrics in IMAP
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
this::updateMetricsContextInImap,
0,
seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
TimeUnit.SECONDS);
serverConnectorPackageClient =
new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
eventBuffer = new ArrayBlockingQueue<>(2048);
// Event forwarding service
eventForwardService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
eventForwardService.submit(
() -> {
List<Event> events = new ArrayList<>();
RetryUtils.RetryMaterial retryMaterial =
new RetryUtils.RetryMaterial(2, true, e -> true);
while (!Thread.currentThread().isInterrupted()) {
try {
events.clear();
Event first = eventBuffer.take();
events.add(first);
eventBuffer.drainTo(events, 500);
JobEventReportOperation operation = new JobEventReportOperation(events);
RetryUtils.retryWithException(
() ->
NodeEngineUtil.sendOperationToMasterNode(
nodeEngine, operation)
.join(),
retryMaterial);
logger.fine("Event forward success, events " + events.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Event forward thread interrupted");
} catch (Throwable t) {
logger.warning(
"Event forward failed, discard events " + events.size(), t);
}
}
});
}
public void start() {
runBusWorkSupplier.runNewBusWork(false);
}
In this class, a thread pool is created as a member variable. A scheduled task is created to update job status in IMAP, and a task is created to send Event information to the Master node. The Master node then sends these Events to external services.
Master Node
private void startMaster() {
coordinatorService =
new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
monitorService = Executors.newSingleThreadScheduledExecutor();
monitorService.scheduleAtFixedRate(
this::printExecutionInfo,
0,
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
TimeUnit.SECONDS);
}
We can see that two components are started in the Master node: the coordinator component and the monitoring component.
The monitoring component's task is straightforward: it periodically prints cluster information.
CoordinatorService
public CoordinatorService(
@NonNull NodeEngineImpl nodeEngine,
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d")
.build());
this.seaTunnelServer = seaTunnelServer;
this.engineConfig = engineConfig;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}
private void checkNewActiveMaster() {
try {
if (!isActive && this.seaTunnelServer.isMasterNode()) {
logger.info(
"This node become a new active master node, begin init coordinator service");
if (this.executorService.isShutdown()) {
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d")
.build());
}
initCoordinatorService();
isActive = true;
} else if (isActive && !this.seaTunnelServer.isMasterNode()) {
isActive = false;
logger.info(
"This node become leave active master node, begin clear coordinator service");
clearCoordinatorService();
}
} catch (Exception e) {
isActive = false;
logger.severe(ExceptionUtils.getMessage(e));
throw new SeaTunnelEngineException("check new active master error, stop loop", e);
}
}
During initialization, a thread is started to periodically check if the current node is a Master node. If the current node is not a Master but becomes one in the cluster, it will call initCoordinatorService()
to initialize its state and set the status to True
.
If the node is marked as a Master but is no longer a Master in the cluster, it will perform a state cleanup.
private void initCoordinatorService() {
// Retrieve distributed IMAP from Hazelcast
runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
// Initialize JobHistoryService
jobHistoryService =
new JobHistoryService(
runningJobStateIMap,
logger,
runningJobMasterMap,
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_METRICS),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
engineConfig.getHistoryJobExpireMinutes());
// Initialize EventProcessor for sending events to other services
eventProcessor =
createJobEventProcessor(
engineConfig.getEventReportHttpApi(),
engineConfig.getEventReportHttpHeaders(),
nodeEngine);
// If the user has configured the connector package service, create it on the master node.
ConnectorJarStorageConfig connectorJarStorageConfig =
engineConfig.getConnectorJarStorageConfig();
if (connectorJarStorageConfig.getEnable()) {
connectorPackageService = new ConnectorPackageService(seaTunnelServer);
}
// After cluster recovery, attempt to restore previous historical tasks
restoreAllJobFromMasterNodeSwitchFuture =
new PassiveCompletableFuture(
CompletableFuture.runAsync(
this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}
In CoordinatorService
, distributed maps (IMAPs), which are a data structure provided by Hazelcast, are pulled. This structure ensures data consistency across the cluster and is used in SeaTunnel to store task information, slot information, etc.
An EventProcessor
is also created here. This class is used to send event notifications to other services. For example, if a task fails, it can send a message to a configured endpoint to achieve event-driven notifications.
Lastly, since the node startup could be due to a cluster crash or a node switch, historical running tasks need to be restored. It will attempt to restore these tasks by fetching the list of previously running tasks from the IMAP.
The IMAP data can be persisted to file systems like HDFS, allowing task states to be retrieved and restored even after a complete system reboot.
Components running within CoordinatorService
include:
-
executorService
(available on all nodes that can be elected as Master) -
jobHistoryService
(runs on the Master node) -
eventProcessor
(runs on the Master node)
On both Master and standby nodes:
- Periodically check if the node is a Master; if it is, perform the corresponding state transition.
On the Master node:
- Periodically print cluster state information.
- Start the forwarding service to relay events to external services.
On Worker nodes, after startup:
- Periodically report state information to the Master node.
- Update task information in the IMAP.
- Forward events generated by the Worker to the Master node to be pushed to external services.
At this point, all server-side service components have been successfully started. This concludes the article!
Top comments (0)