DEV Community

Chen Debra
Chen Debra

Posted on

Worker Module Source Code Analysis: How DolphinScheduler Achieves Billion-Level Task Scheduling

The Worker module of Apache DolphinScheduler is one of the core components of its distributed scheduling system, responsible for task execution, resource management, and dynamic cluster scheduling. This article will analyze its source code to reveal its design philosophy and implementation details.

1. Worker Receives Master RPC Request Architecture

Image description

The Netty service of the Worker provides interfaces that are called by the Master via JDK dynamic proxies. For details, please refer to the explanation for the DolphinScheduler alert module.

Interface Definition:

@RpcService
public interface ITaskInstanceOperator {

    @RpcMethod
    TaskInstanceDispatchResponse dispatchTask(TaskInstanceDispatchRequest taskInstanceDispatchRequest);

    @RpcMethod
    TaskInstanceKillResponse killTask(TaskInstanceKillRequest taskInstanceKillRequest);

    @RpcMethod
    TaskInstancePauseResponse pauseTask(TaskInstancePauseRequest taskPauseRequest);

    @RpcMethod
    UpdateWorkflowHostResponse updateWorkflowInstanceHost(UpdateWorkflowHostRequest updateWorkflowHostRequest);
}
Enter fullscreen mode Exit fullscreen mode

Interfaces annotated with @RpcService and methods annotated with @RpcMethod are injected into the Worker’s Netty handler and dynamically proxied by the Master.

2. Task Dispatch

Image description

2.1 WorkerConfig

WorkerConfig reads configurations starting with "worker" from application.yaml under the Worker module.

2.2 WorkerTaskExecutorFactoryBuilder

This is a factory builder for task executors, encapsulating DefaultWorkerTaskExecutorFactory, which in turn creates DefaultWorkerTaskExecutor. The parent class of DefaultWorkerTaskExecutor is WorkerTaskExecutor, which itself is a thread.

2.3 WorkerTaskExecutorThreadPool

This is essentially a wrapper around a fixed-thread pool.

2.4 Start from the operator

public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
    log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
    TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
    try {
        taskExecutionContext.setHost(workerConfig.getWorkerAddress());
        taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
        LogUtils.setWorkflowAndTaskInstanceIDMDC(
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());

        if (!ServerLifeCycleManager.isRunning()) {
            log.error("Server is not running. Reject task: {}", taskExecutionContext.getProcessInstanceId());
            return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
                    "Server is not running");
        }

        TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

        WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
                .createWorkerTaskExecutorFactory(taskExecutionContext)
                .createWorkerTaskExecutor();

        if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
            log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
            return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
                    "WorkerManagerThread is full");
        } else {
            log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
            return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
        }
    } finally {
        LogUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}
Enter fullscreen mode Exit fullscreen mode

LogUtils.getTaskInstanceLogFullPath(taskExecutionContext) analysis

org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath: Get the full path of task logs.

/**
     * Get task instance log full path.
     *
     * @param taskExecutionContext task execution context.
     * @return task instance log full path.
     */
    public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) {
        return getTaskInstanceLogFullPath(
                DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
    }
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath:Splicing the full path of task logs.

/**
     * todo: Remove the submitTime parameter?
     * The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log
     *
     * @param taskFirstSubmitTime       task first submit time
     * @param workflowDefinitionCode    workflow definition code
     * @param workflowDefinitionVersion workflow definition version
     * @param workflowInstanceId        workflow instance id
     * @param taskInstanceId            task instance id.
     * @return task instance log full path.
     */
    public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime,
                                                    Long workflowDefinitionCode,
                                                    int workflowDefinitionVersion,
                                                    int workflowInstanceId,
                                                    int taskInstanceId) {
        if (TASK_INSTANCE_LOG_BASE_PATH == null) {
            throw new IllegalArgumentException(
                    "Cannot find the task instance log base path, please check your logback.xml file");
        }
        final String taskLogFileName = Paths.get(
                String.valueOf(workflowDefinitionCode),
                String.valueOf(workflowDefinitionVersion),
                String.valueOf(workflowInstanceId),
                String.format("%s.log", taskInstanceId)).toString();
        return TASK_INSTANCE_LOG_BASE_PATH
                .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null))
                .resolve(taskLogFileName)
                .toString();
    }
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogBasePath: Read the configuration in logback-spring.xml and get the basic path of the task instance log. In fact, it is to get the /logs-based path in the root directory.

/**
     * Get task instance log base absolute path, this is defined in logback.xml
     *
     * @return
     */
    public static Path getTaskInstanceLogBasePath() {
        return Optional.of(LoggerFactory.getILoggerFactory())
                .map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
                .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
                .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
                .map(TaskLogDiscriminator::getLogBase)
                .map(e -> Paths.get(e).toAbsolutePath())
                .orElse(null);
    }
Enter fullscreen mode Exit fullscreen mode

worker的 logback-spring.xml :

<configuration scan="true" scanPeriod="120 seconds">
  <property name="log.base" value="logs"/>
  ...
  <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
          <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
          <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
              <key>taskInstanceLogFullPath</key>
              <logBase>${log.base}</logBase>
          </Discriminator>
          <sift>
              <appender name="FILE-${taskInstanceLogFullPath}" class="ch.qos.logback.core.FileAppender">
                  <file>${taskInstanceLogFullPath}</file>
                  <encoder>
                      <pattern>
                          [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n
                      </pattern>
                      <charset>UTF-8</charset>
                  </encoder>
                  <append>true</append>
              </appender>
          </sift>
      </appender>
  ...
  <root level="INFO">
      <appender-ref ref="STDOUT"/>
      <appender-ref ref="TASKLOGFILE"/>
  </root>

</configuration>
Enter fullscreen mode Exit fullscreen mode

The final adess is:

/opt/dolphinscheduler/worker-server/logs/20240615/13929490938784/1/1815/1202.log
Enter fullscreen mode Exit fullscreen mode

2.5 Explanation of DefaultWorkerTaskExecutor

TaskInstanceDispatchOperationFunction#operate

// TODO Create a WorkerTaskExecutor using WorkerTaskExecutorFactoryBuilder
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
        .createWorkerTaskExecutorFactory(taskExecutionContext)
        .createWorkerTaskExecutor();

// TODO Directly submit the task
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
    log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
    return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
            "WorkerManagerThread is full");
} else {
    log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
    return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}
Enter fullscreen mode Exit fullscreen mode

Directly submits the task using:

workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)
Enter fullscreen mode Exit fullscreen mode

WorkerTaskExecutor is a thread

Since it is a thread, let's check its run method:

public void run() {
    try {
        // TODO Set process and task instance ID in MDC (similar to ThreadLocal usage)
        LogUtils.setWorkflowAndTaskInstanceIDMDC(
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());

        // TODO Set task log path in MDC
        LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());

        // TODO Print task header
        TaskInstanceLogHeader.printInitializeTaskContextHeader();

        // TODO Initialize the task (set start time and taskAppId)
        initializeTask();

        // TODO If in DRY_RUN mode, skip execution and mark success
        if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
            taskExecutionContext.setEndTime(System.currentTimeMillis());
            WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());

            // TODO Send result message
            workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                    ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
            log.info("The current execute mode is dry run, stopping further execution.");
            return;
        }

        // TODO Print task plugin header
        TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();

        // TODO Pre-execution steps
        beforeExecute();

        // TODO Create callback function
        TaskCallBack taskCallBack = TaskCallbackImpl.builder()
                .workerMessageSender(workerMessageSender)
                .taskExecutionContext(taskExecutionContext)
                .build();

        TaskInstanceLogHeader.printExecuteTaskHeader();

        // TODO Execute the task
        executeTask(taskCallBack);

        TaskInstanceLogHeader.printFinalizeTaskHeader();

        // TODO Post-execution cleanup
        afterExecute();

        closeLogAppender();
    } catch (Throwable ex) {
        log.error("Task execution failed due to an exception", ex);
        afterThrowing(ex);
        closeLogAppender();
    } finally {
        LogUtils.removeWorkflowAndTaskInstanceIdMDC();
        LogUtils.removeTaskInstanceLogFullPathMDC();
    }
}
Enter fullscreen mode Exit fullscreen mode

2.5.1 Dry Run Mode

If the task is in dry run mode, it is marked as successful without execution:

if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
    taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
    taskExecutionContext.setEndTime(System.currentTimeMillis());
    WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
            ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
    log.info("The current execute mode is dry run, stopping further execution.");
    return;
}
Enter fullscreen mode Exit fullscreen mode

2.5.2 Pre-Execution Setup (beforeExecute())

This phase sets up the task execution environment, such as reporting to the Master, creating a working directory, downloading necessary resources, and initializing the task.

protected void beforeExecute() {
    taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
            ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING);
    log.info("Sent task status RUNNING_EXECUTION to master: {}", taskExecutionContext.getWorkflowInstanceHost());

    TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
    log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());

    TaskChannel taskChannel = Optional.ofNullable(taskPluginManager.getTaskChannelMap()
            .get(taskExecutionContext.getTaskType()))
            .orElseThrow(() -> new TaskPluginException("Task plugin not found"));

    log.info("Create TaskChannel: {} successfully", taskChannel.getClass().getName());

    ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(taskExecutionContext);
    taskExecutionContext.setResourceContext(resourceContext);

    task = taskChannel.createTask(taskExecutionContext);
    log.info("Task plugin instance: {} created successfully", taskExecutionContext.getTaskType());

    task.init();
    log.info("Successfully initialized task plugin instance.");

    task.getParameters().setVarPool(taskExecutionContext.getVarPool());
    log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
}
Enter fullscreen mode Exit fullscreen mode

2.5.3 Task Execution (executeTask())

TaskCallBack taskCallBack = TaskCallbackImpl.builder()
        .workerMessageSender(workerMessageSender)
        .taskExecutionContext(taskExecutionContext)
        .build();

TaskInstanceLogHeader.printExecuteTaskHeader();
executeTask(taskCallBack);
Enter fullscreen mode Exit fullscreen mode

Execution involves replacing parameters, setting up the shell script, and running it in a process:

process = iShellInterceptor.execute();
Enter fullscreen mode Exit fullscreen mode

Shell execution logs are parsed and monitored:

parseProcessOutput(this.process);
Enter fullscreen mode Exit fullscreen mode

2.5.4 Post-Execution Cleanup (afterExecute())

protected void afterExecute() {
    sendAlertIfNeeded();
    sendTaskResult();
    WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
    clearTaskExecPathIfNeeded();
}
Enter fullscreen mode Exit fullscreen mode

Key Logics in Explanation

1. Result Log Printing

protected LinkedBlockingQueue<String> logBuffer;

public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
                               TaskExecutionContext taskRequest) {
    this.logHandler = logHandler;
    this.taskRequest = taskRequest;
    this.logBuffer = new LinkedBlockingQueue<>();
    this.logBuffer.add(EMPTY_STRING);

    if (this.taskRequest != null) {
        // set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
        this.taskRequest.setLogBufferEnable(true);
    }
}
Enter fullscreen mode Exit fullscreen mode

The logBuffer temporarily stores log messages, which are then consumed by the parseProcessOutputExecutorService thread.


Log Production Side

while ((line = inReader.readLine()) != null) {
    // TODO Log buffer
    logBuffer.add(line);
    // TODO Parses taskOutputParams, such as parsing `echo '${setValue(output=1)}'`
    // Essentially extracting the string `${setValue(output=1)}`
    taskOutputParameterParser.appendParseLog(line);
}
Enter fullscreen mode Exit fullscreen mode

Here, the log messages are added to logBuffer and processed for extracting specific output parameters.


Log Consumption Side

this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);

public void logHandle(LinkedBlockingQueue<String> logs) {
    StringJoiner joiner = new StringJoiner("\n\t");
    while (!logs.isEmpty()) {
        joiner.add(logs.poll());
    }
    log.info(" -> {}", joiner);
}

while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
    if (logBuffer.size() > 1) {
        logHandler.accept(logBuffer);
        logBuffer.clear();
        logBuffer.add(EMPTY_STRING);
    } else {
        // TODO If there is no log output, wait for 1 second by default
        Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. Logs are continuously polled from logBuffer and formatted before being output.
  2. If logBuffer has more than one line, it is consumed and cleared, with an empty string added back.
  3. If no new log messages are available, the system waits for one second before checking again.

Shell Task Log Output Example

Since shell task logs are prefixed with ->, a sample log output looks like this:

[INFO] 2024-06-24 09:35:44.678 +0800 -  -> 
    .
    ├── 1893_1321.sh
    └── input_dir
        ├── test1
        │   └── text.txt
        └── test2
            └── text.txt

    3 directories, 3 files
    test1 message
    test2 message
Enter fullscreen mode Exit fullscreen mode

This ensures that all shell execution logs are formatted consistently for better readability and debugging.

2. Parsing the Variable Pool

while ((line = inReader.readLine()) != null) {
    // TODO Log buffer
    logBuffer.add(line);
    // TODO Parses taskOutputParams, such as parsing `echo '${setValue(output=1)}'`
    // Essentially extracting the string `${setValue(output=1)}`
    taskOutputParameterParser.appendParseLog(line);
}
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser#appendParseLog

public void appendParseLog(String logLine) {
    if (logLine == null) {
        return;
    }

    // TODO Initially, this part won't execute
    if (currentTaskOutputParam != null) {
        if (currentTaskOutputParam.size() > maxOneParameterRows
                || currentTaskOutputParamLength > maxOneParameterLength) {
            log.warn(
                    "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
                    String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
            currentTaskOutputParam = null;
            currentTaskOutputParamLength = 0;
            return;
        }
        // Continue parsing the rest of the line
        int i = logLine.indexOf(")}");
        if (i == -1) {
            // The end of the variable pool is not found
            currentTaskOutputParam.add(logLine);
            currentTaskOutputParamLength += logLine.length();
        } else {
            // The end of the variable pool is found
            currentTaskOutputParam.add(logLine.substring(0, i + 2));
            Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
            if (keyValue.getKey() != null && keyValue.getValue() != null) {
                // TODO Once parsed, store it in `taskOutputParams`
                taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
            }
            currentTaskOutputParam = null;
            currentTaskOutputParamLength = 0;
            // Continue parsing the remaining part of the line
            if (i + 2 != logLine.length()) {
                appendParseLog(logLine.substring(i + 2));
            }
        }
        return;
    }

    int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
    if (indexOfVarPoolBegin == -1) {
        indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
    }
    if (indexOfVarPoolBegin == -1) {
        return;
    }
    currentTaskOutputParam = new ArrayList<>();
    appendParseLog(logLine.substring(indexOfVarPoolBegin));
}
Enter fullscreen mode Exit fullscreen mode

Once parsing is complete, the extracted values are stored in taskOutputParams.


Updating Process ID (Reporting to Master)

// Update process ID before waiting for execution to complete
if (null != taskCallBack) {
    // TODO Update task instance information
    taskCallBack.updateTaskInstanceInfo(processId);
}
Enter fullscreen mode Exit fullscreen mode

Timeout Handling

long remainTime = getRemainTime();

private long getRemainTime() {
    long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) / 1000;
    long remainTime = taskRequest.getTaskTimeout() - usedTime;

    if (remainTime < 0) {
        throw new RuntimeException("task execution time out");
    }

    return remainTime;
}

......

// Waiting for execution to finish
// TODO This is essentially a timeout wait; if no timeout is set, it will wait indefinitely
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
Enter fullscreen mode Exit fullscreen mode

If execution time exceeds the allowed limit, an exception is thrown.


Handling Exit Codes

// TODO Set exit code
// If SHELL task exits normally
if (status && kubernetesStatus.isSuccess()) {
    // SHELL task exit status
    result.setExitStatusCode(this.process.exitValue());
} else {
    log.error("Process failed, task timeout value: {}, preparing to kill...",
            taskRequest.getTaskTimeout());
    result.setExitStatusCode(EXIT_CODE_FAILURE);
    cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "Process has been killed." : "Process has exited.";
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
        exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
Enter fullscreen mode Exit fullscreen mode
  1. If the process finishes successfully and Kubernetes reports success, the exit status is stored.
  2. Otherwise, the process is terminated and marked as failed.

Processing Execution Results and Output Parameters

// TODO Store exit status code
setExitStatusCode(commandExecuteResult.getExitStatusCode());
// TODO Store process ID
setProcessId(commandExecuteResult.getProcessId());
// TODO `shellCommandExecutor.getTaskOutputParams()` returns values such as `output -> 123`
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#dealOutParam

public void dealOutParam(Map<String, String> taskOutputParams) {
    // TODO If `localParams` is empty, output parameters will not be processed
    if (CollectionUtils.isEmpty(localParams)) {
        return;
    }

    // TODO Filter out parameters where `localParams` is set to `OUT`
    List<Property> outProperty = getOutProperty(localParams);
    if (CollectionUtils.isEmpty(outProperty)) {
        return;
    }

    // TODO If `taskOutputParams` is empty, store output parameters in `varPool`
    if (MapUtils.isEmpty(taskOutputParams)) {
        outProperty.forEach(this::addPropertyToValPool);
        return;
    }

    // TODO Match `outProperty` with `taskOutputParams` keys and update values accordingly
    // TODO Finally, store in the variable pool
    for (Property info : outProperty) {
        String propValue = taskOutputParams.get(info.getProp());
        if (StringUtils.isNotEmpty(propValue)) {
            info.setValue(propValue);
            addPropertyToValPool(info);
        } else {
            log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. Filters Output Parameters: Extracts parameters from localParams that are marked as OUT.
  2. Stores in Variable Pool: If taskOutputParams is empty, the extracted values are placed in varPool.
  3. Updates Values: If taskOutputParams contains matching keys, it updates outProperty accordingly.

Updating the Variable Pool and Reporting to Master

At this stage, the system matches keys in outProperty and taskOutputParams, updating the corresponding values in taskOutputParams. These values are then stored in the TaskInstance variable pool before being reported to the Master.


2.5.4 Post-Execution Cleanup

protected void afterExecute() throws TaskException {
    if (task == null) {
        throw new TaskException("The current task instance is null");
    }
    // TODO Determine whether to send an alert via RPC communication using JDK dynamic proxy
    // Calls the alert module `AlertBootstrapService`
    sendAlertIfNeeded();

    // TODO Send execution results
    sendTaskResult();

    WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());

    // TODO If `development.state=false` in `common.properties` (default is false)
    // TODO If set to `true`, DolphinScheduler will not delete scripts and JAR files
    log.info("Remove the current task execution context from worker cache");
    clearTaskExecPathIfNeeded();
}
Enter fullscreen mode Exit fullscreen mode

Sending Task Execution Results

protected void sendTaskResult() {
    taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
    taskExecutionContext.setProcessId(task.getProcessId());
    taskExecutionContext.setAppIds(task.getAppIds());

    // TODO Sending the variable pool (this is where the variable pool is transmitted)
    taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
    taskExecutionContext.setEndTime(System.currentTimeMillis());

    // Upload output files and modify the "OUT FILE" property in the variable pool
    // TODO Upload output files and update them in the variable pool
    TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
    log.info("Upload output files: {} successfully",
            TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));

    // TODO Send the task execution result
    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
            ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
    log.info("Send task execution status: {} to master : {}", 
            taskExecutionContext.getCurrentExecutionStatus().name(),
            taskExecutionContext.getWorkflowInstanceHost());
}
Enter fullscreen mode Exit fullscreen mode
  1. Set Task Status: Updates the task’s execution status, process ID, and application IDs.
  2. Store Variable Pool Data: Converts the variable pool into JSON format and stores it in taskExecutionContext.
  3. Upload Output Files: Transfers any output files and updates their references in the variable pool.
  4. Report Results to Master: Sends the execution status and variable pool data to the Master node for further processing.

Shell Exit Codes: A Quick Note

[root@node opt]# vim test.sh 
[root@node opt]# sh test.sh 
me is journey
[root@node opt]# echo $?
0
[root@node opt]# vim test.sh 
[root@node opt]# sh test.sh 
test.sh: line 2: echo1: command not found
[root@node opt]# echo $?
127
[root@node opt]# vim test.sh 
[root@node opt]# sh test.sh 
me is 10.253.26.85
Killed
[root@node opt]# echo $?
137
Enter fullscreen mode Exit fullscreen mode

Summary:

  • A normal exit for a Shell task returns 0.
  • If the task fails due to a missing command, the exit code is 127.
  • If the process is killed, the exit code is 137.
  • Any other exit codes indicate an unexpected error.

Task Status Code Determination Logic

taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.AbstractTask#getExitStatus

// If the exit code is 0, the task is marked as SUCCESS;
// If the exit code is 137, it is marked as KILL;
// Any other exit code is considered FAILURE.
public TaskExecutionStatus getExitStatus() {
    switch (getExitStatusCode()) {
        case TaskConstants.EXIT_CODE_SUCCESS:
            return TaskExecutionStatus.SUCCESS;
        case TaskConstants.EXIT_CODE_KILL:
            return TaskExecutionStatus.KILL;
        default:
            return TaskExecutionStatus.FAILURE;
    }
}
Enter fullscreen mode Exit fullscreen mode

This method determines the final execution status of the task based on the exit code received from the execution process.

  • Exit Code 0Task is successful
  • Exit Code 137Task was killed
  • Any other exit codeTask failed

The exit status is set during the execution of executeTask().


Uploading Output Files to the Resource Center

org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#uploadOutputFiles

public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
                                     StorageOperate storageOperate) throws TaskException {
    List<Property> varPools = getVarPools(taskExecutionContext);
    // Create a map from varPools for quick lookup
    Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));

    // Get OUTPUT FILE parameters
    List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);

    if (localParamsProperty.isEmpty()) {
        return;
    }

    log.info("Uploading output files ...");

    for (Property property : localParamsProperty) {
        // Get local file path
        String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());

        // TODO `packIfDir` and `crc`:
        // If the output is a directory, it will be compressed into a zip file and a CRC (checksum) is generated.
        // If it is a file, only a CRC will be generated.
        String srcPath = packIfDir(path);

        // Get the CRC file path
        String srcCRCPath = srcPath + CRC_SUFFIX;
        try {
            FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
        } catch (IOException ex) {
            throw new TaskException(ex.getMessage(), ex);
        }

        // Get the remote file path
        // TODO Storage structure: 
        // `DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName`
        String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
        String resourceCRCPath = resourcePath + CRC_SUFFIX;

        try {
            // Upload file to storage
            // TODO For example, in HDFS:
            // HDFS base path: `/tenantCode/resources/DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName`
            String resourceWholePath =
                    storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
            String resourceCRCWholePath =
                    storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath);
            log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
            storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
            log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
            storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false, true);
        } catch (IOException ex) {
            throw new TaskException("Upload file to storage error", ex);
        }

        // Update varPool
        Property oriProperty;
        // If the property is not already in varPool, add it
        if (varPoolsMap.containsKey(property.getProp())) { // This branch should theoretically never be executed
            oriProperty = varPoolsMap.get(property.getProp());
        } else {
            oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
            // TODO Add to variable pool
            varPools.add(oriProperty);
        }

        // TODO Assign the task name as a prefix to the property name
        oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
        // TODO This is crucial: The resource's relative path is stored as the value in the variable pool
        oriProperty.setValue(resourcePath);
    }

    // TODO Store FILE-related variables in the variable pool
    taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
}
Enter fullscreen mode Exit fullscreen mode

Breakdown of Output File Upload Process

  1. Retrieve Output Files

    • Extracts OUTPUT FILE parameters from the task context.
  2. Check if Directory or File

    • If a directory, it is compressed into a zip file, and a CRC checksum is generated.
    • If a file, only the CRC checksum is generated.
  3. Determine Storage Path

    • The path follows a structured format:
     DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
    
  • The storage location depends on the system (e.g., HDFS or other storage solutions).
  1. Upload File and CRC Checksum

    • Uploads both the file and its CRC checksum to the remote storage.
  2. Update Variable Pool

    • The uploaded file's relative path is stored in the variable pool, which is later sent to the Master.

Sending Task Execution Results

workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
Enter fullscreen mode Exit fullscreen mode

This ensures that the task execution result is reliably reported to the Master.


3. Role of the WorkerMessageSender Component

Image description

4. Task Kill Logic

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#operate

public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) {
    log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest);

    // TODO Task Instance
    int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
    try {
        LogUtils.setTaskInstanceIdMDC(taskInstanceId);
        // TODO Worker Task Executor
        WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
        if (workerTaskExecutor == null) {
            log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
            return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor");
        }

        // TODO Task Execution Context
        TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
        LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());

        // TODO Kill process execution
        boolean result = doKill(taskExecutionContext);

        // TODO `Process.destroy()` in Java terminates the subprocess associated with this `Process` object.
        this.cancelApplication(workerTaskExecutor);

        int processId = taskExecutionContext.getProcessId();
        // TODO If `processId == 0`, mark the task as KILL and report it accordingly.
        // TODO Note: This does not necessarily mean the process was actually killed, but ensures DS has the correct status.
        if (processId == 0) {
            workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
            // TODO The task might have been executed, but the process ID is 0.
            WorkerTaskExecutorHolder.remove(taskInstanceId);
            log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId);
            return TaskInstanceKillResponse.success(taskExecutionContext);
        }

        // TODO If successful, Worker will detect the task was killed and report it in `sendResult FINISH`.
        taskExecutionContext
                .setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);

        WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
        // TODO Remove retry messages
        messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
        return TaskInstanceKillResponse.success(taskExecutionContext);
    } finally {
        LogUtils.removeTaskInstanceIdMDC();
        LogUtils.removeTaskInstanceLogFullPathMDC();
    }
}
Enter fullscreen mode Exit fullscreen mode

Killing the Process and Yarn/Kubernetes Tasks

// TODO Kill process execution
boolean result = doKill(taskExecutionContext);
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#doKill

private boolean doKill(TaskExecutionContext taskExecutionContext) {
    // Kill system process
    // TODO Kill the Shell-related process
    boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());

    // TODO Kill Yarn or Kubernetes application
    try {
        ProcessUtils.cancelApplication(taskExecutionContext);
    } catch (TaskException e) {
        return false;
    }
    return processFlag;
}
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#killProcess

This function kills the main process and its child processes.

Note: Unlike the official documentation, this implementation has been modified. If an exception occurs, only a warning is logged instead of failing entirely. This is because some environments do not grant permissions to kill all processes.

Killing the Process (Including Child Processes)

killProcess(String tenantCode, Integer processId)

protected boolean killProcess(String tenantCode, Integer processId) {
    // TODO Directly interrupt the process
    if (processId == null || processId.equals(0)) {
        return true;
    }

    try {
        String pidsStr = ProcessUtils.getPidsStr(processId);
        if (!Strings.isNullOrEmpty(pidsStr)) {
            String cmd = String.format("kill -9 %s", pidsStr);
            cmd = OSUtils.getSudoCmd(tenantCode, cmd);
            log.info("process id:{}, cmd:{}", processId, cmd);
            OSUtils.exeCmd(cmd);
        }
    } catch (Exception e) {
        log.warn("kill task error", e);
    }
    return true;
}
Enter fullscreen mode Exit fullscreen mode
  • If processId is null or 0, the function returns immediately, assuming no process needs to be killed.
  • It retrieves the list of process IDs using ProcessUtils.getPidsStr(processId).
  • If valid process IDs are found, it constructs a kill -9 command and executes it using OSUtils.exeCmd(cmd).
  • Any exceptions during process termination only trigger warnings instead of causing a failure.

Killing YARN/Kubernetes Jobs

org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils#cancelApplication

public static void cancelApplication(TaskExecutionContext taskExecutionContext) {
    try {
        // TODO Handle Kubernetes-based tasks
        if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
            if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
                // Set an empty container name for Spark on K8S tasks
                applicationManagerMap.get(ResourceManagerType.KUBERNETES)
                        .killApplication(new KubernetesApplicationManagerContext(
                                taskExecutionContext.getK8sTaskExecutionContext(),
                                taskExecutionContext.getTaskAppId(), ""));
            }
        } else {
            // TODO Handle YARN-based tasks
            String host = taskExecutionContext.getHost();
            String executePath = taskExecutionContext.getExecutePath();
            String tenantCode = taskExecutionContext.getTenantCode();
            List<String> appIds;

            // TODO If failover occurred, use the stored app IDs
            if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
                appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
            } else {
                String logPath = taskExecutionContext.getLogPath();
                String appInfoPath = taskExecutionContext.getAppInfoPath();
                if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
                    log.error("Kill YARN job error, missing required parameters: host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
                            host, logPath, appInfoPath, executePath, tenantCode);
                    throw new TaskException("Cancel application failed!");
                }

                log.info("Extracting appIds from worker {}, taskLogPath: {}", host, logPath);
                // TODO Use regex to extract app IDs from logs
                appIds = LogUtils.getAppIds(logPath, appInfoPath,
                        PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
                taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
            }

            // TODO If no appIds were found, return immediately
            if (CollectionUtils.isEmpty(appIds)) {
                log.info("The appId list is empty.");
                return;
            }

            ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
            applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
        }
    } catch (Exception e) {
        log.error("Cancel application failed.", e);
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Process for Killing YARN Jobs

  1. Kubernetes Task Handling

    • If the task is running on Kubernetes, it fetches the KubernetesApplicationManager and kills the corresponding container or pod.
  2. YARN Task Handling

    • If the task is running on YARN, it determines whether it is part of a failover scenario.
    • If no appIds are available, it extracts them from task logs using regex parsing (LogUtils.getAppIds()).
    • The list of appIds is then passed to ApplicationManager, which attempts to terminate them.

Extracting YARN App IDs from Logs

appIds = LogUtils.getAppIds(logPath, appInfoPath,
                            PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));

public List<String> getAppIds(String logPath, String appInfoPath, String fetchWay) {
    if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) {
        log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay);
        // TODO If using AOP-based interception, read from the app info file
        return getAppIdsFromAppInfoFile(appInfoPath);
    } else {
        log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay);
        // TODO Extract App IDs from logs using regex
        return getAppIdsFromLogFile(logPath);
    }
}
Enter fullscreen mode Exit fullscreen mode
  • By default, app IDs are extracted from logs instead of using AOP-based interception.
  • The function getAppIdsFromLogFile(logPath) scans the log file for YARN application IDs using regular expressions.

Executing YARN Kill Command

applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager#killApplication

public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
    YarnApplicationManagerContext yarnApplicationManagerContext =
            (YarnApplicationManagerContext) applicationManagerContext;
    String executePath = yarnApplicationManagerContext.getExecutePath();
    String tenantCode = yarnApplicationManagerContext.getTenantCode();
    List<String> appIds = yarnApplicationManagerContext.getAppIds();

    try {
        String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds));
        String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds);
        execYarnKillCommand(tenantCode, commandFile, cmd);
    } catch (Exception e) {
         log.warn("Kill YARN application {} failed", appIds, e);
    }
    return true;
}
Enter fullscreen mode Exit fullscreen mode

Executing the Kill Command

private void execYarnKillCommand(String tenantCode, String commandFile, String cmd) throws Exception {
    StringBuilder sb = new StringBuilder();
    sb.append("#!/bin/sh\n");
    sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
    sb.append("cd $BASEDIR\n");

    // TODO Set the default environment source, e.g., `/etc/profile`
    if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
        // TODO Add environment files from `ENV_SOURCE_LIST` to `systemEnvs`
        ShellUtils.ENV_SOURCE_LIST.forEach(env -> sb.append("source " + env + "\n"));
    }

    sb.append("\n\n");
    sb.append(cmd);

    File f = new File(commandFile);

    if (!f.exists()) {
        org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
                StandardCharsets.UTF_8);
    }

    String runCmd = String.format("%s %s", Constants.SH, commandFile);
    runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
    log.info("Kill command: {}", runCmd);
    org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
}
Enter fullscreen mode Exit fullscreen mode
  • Constructs a shell script to execute the yarn application -kill command.
  • Ensures environment variables are correctly set before execution.
  • Uses OSUtils.exeCmd(runCmd) to execute the termination command with the required privileges.

Summary

  1. If the task is running on a process (e.g., Shell, Java)

    • It retrieves child process IDs and kills them using kill -9.
  2. If the task is running on Kubernetes

    • It terminates the container or pod associated with the task.
  3. If the task is running on YARN

    • It extracts YARN app IDs using log analysis.
    • Executes yarn application -kill to terminate the job.
  4. If the kill operation succeeds

    • The WorkerTaskExecutor detects the termination and reports KILL status.
    • If the task is already completed (PID=0), the task context is updated, and the thread is removed.

5. Pause

public class TaskInstancePauseOperationFunction
        implements
            ITaskInstanceOperationFunction<TaskInstancePauseRequest, TaskInstancePauseResponse> {

    @Override
    public TaskInstancePauseResponse operate(TaskInstancePauseRequest taskInstancePauseRequest) {
        try {
            LogUtils.setTaskInstanceIdMDC(taskInstancePauseRequest.getTaskInstanceId());
            log.info("Receive TaskInstancePauseRequest: {}", taskInstancePauseRequest);
            log.warn("TaskInstancePauseOperationFunction is not support for worker task yet!");
            return TaskInstancePauseResponse.success();
        } finally {
            LogUtils.removeTaskInstanceIdMDC();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Key points:
In fact, for a Worker, nothing happens when pausing. It’s not possible either. If you really think about it, can a task be paused? Unless the engine program has control, like MR, SPARK, and FLINK, which cannot be paused. The core logic of pausing is to send a notification to the process instance, telling the process instance that I’m going to pause the process, and let the next task running in the process pause. Of course, if there is only one task, the task cannot be paused, and it will ultimately succeed. Another case is if it is the last task, it cannot be paused. There is also the situation where the task is executing very quickly, and just when you try to pause it, the program is about to proceed, but there are no more tasks downstream. These are cases where pausing is not possible.

6. Update process instance host

This is fault tolerance, which will be explained in more detail in the fault tolerance section.

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay