DEV Community

Chen Debra
Chen Debra

Posted on

In-Depth Analysis of DolphinScheduler Task Scheduling, Splitting, and Execution Workflow

Introduction to Apache DolphinScheduler

Apache DolphinScheduler is an open-source distributed, scalable, and visual DAG-based workflow scheduling system. It is designed for enterprise-level scenarios and provides a visual solution for task operation, workflow management, and the full lifecycle of data processing.

Background Knowledge on DAG

The official definition of a Directed Acyclic Graph (DAG) is as follows:

A graph is formed by vertices and edges connecting pairs of vertices, where the vertices can be any kind of object that is connected in pairs by edges. In a directed graph, each edge has an orientation, from one vertex to another. A path in a directed graph is a sequence of edges where the ending vertex of each edge is the starting vertex of the next one. If the starting vertex of the first edge equals the ending vertex of the last, the path forms a cycle.

A directed acyclic graph (DAG) is a directed graph without cycles.

A vertex in a directed graph is said to be reachable from another vertex if a path exists between them. If a vertex can reach itself via a nontrivial path, then the graph contains a cycle. A DAG is a graph where no vertex can reach itself via a nontrivial path.

In DolphinScheduler, the DAG structure is represented by the following data structure:

public class DAG<Node, NodeInfo, EdgeInfo> {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * node map, key is node, value is node information
     */
    private final Map<Node, NodeInfo> nodesMap;

    /**
     * edge map, key is origin node, value is map with key for destination node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

    /**
     * reversed edge set, key is destination node, value is map with key for origin node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}
Enter fullscreen mode Exit fullscreen mode
  • Node: Represents the task ID.
  • NodeInfo: Represents detailed information about the task.
  • EdgeInfo: Contains the task ID and the dependent task ID.

Data Warehouse Tasks and Dependencies

In enterprise data warehouse construction, tasks are often layered, and there are many dependencies between tasks due to the widespread distribution of business logic and the diversity of data storage types. A DAG becomes the best structure for storing task dependencies and scheduling information.

In a DAG, each node represents a specific scheduling task, and the edges represent dependencies between tasks. Traversing the DAG structure corresponds to executing the data warehouse tasks.

Image description

Role Breakdown in DolphinScheduler System

Apache DolphinScheduler has two core roles: MasterServer and WorkerServer. Following a modular design, the Master focuses on DAG task splitting and task submission, while the Worker handles task execution and log services.

  • MasterServer: Manages DAG task splitting, submission, and monitoring. It registers temporary nodes in Zookeeper to enable fault tolerance.
  • WorkerServer: Responsible for task execution and log provision. It registers temporary nodes in Zookeeper and maintains heartbeats.

DolphinScheduler Task Scheduling Workflow

The core task execution flow is as follows (based on the official documentation):

Image description

Given the complexity of task scheduling, a large process can be broken down into smaller sub-processes. In addition to the main process, there are also auxiliary sub-processes. The following is an analysis of the execution scheduling process, making it easier to understand.

Here's the translation of the table into English:

Subprocess Consumption Mode Why is it designed this way?
Command Distribution Flow Asynchronous distributed consumption Cluster as a service, easy to develop and expand
DAG Traversal for Task Execution Break large tasks into smaller ones, task splitting Splitting task scheduling and task execution simplifies the process
Executable Task Dispatch Asynchronous distributed consumption Cluster as a service, easy to develop and expand
Task Execution Status Callback Notification RPC service, asynchronous processing, Master handles task status immediately Stable and reliable
Task Status Processing Submit to thread pool, register callback May experience delays when processing in the database

Command Distribution Flow

Commands are distributed asynchronously across Master servers.

Producer

The API server encapsulates the user's HTTP request to run a workflow into command data and inserts it into the t_ds_command table. Here's an example of a command to start a workflow instance:

{
    "commandType": "START_PROCESS",
    "processDefinitionCode": 14285512555584,
    "executorId": 1,
    "commandParam": "{}",
    "taskDependType": "TASK_POST",
    "failureStrategy": "CONTINUE",
    "warningType": "NONE",
    "startTime": 1723444881372,
    "processInstancePriority": "MEDIUM",
    "updateTime": 1723444881372,
    "workerGroup": "default",
    "tenantCode": "default",
    "environmentCode": -1,
    "dryRun": 0,
    "processInstanceId": 0,
    "processDefinitionVersion": 1,
    "testFlag": 0
}
Enter fullscreen mode Exit fullscreen mode

Consumer

In the Master server, the MasterSchedulerBootstrap loop program runs. The MasterSchedulerBootstrap assigns a slot using ZooKeeper and selects the commands that belong to its slot from the t_ds_command table using the following query:

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}
</select>
Enter fullscreen mode Exit fullscreen mode

The MasterSchedulerBootstrap loop polls and finds pending command tasks, generates a ProcessInstance with the command task and master host, and inserts the ProcessInstance object into the t_ds_process_instance table.

It also generates an executable task, workflowExecuteRunnable, that contains all the required context information. This workflowExecuteRunnable is cached locally in the processInstanceExecCacheManager. At the same time, the ProcessInstance's event type WorkflowEventType.START_WORKFLOW is produced and sent to the workflowEventQueue.

DAG Traversal for Task Execution

Master local cache buffering

The cache is implemented via ProcessInstanceExecCacheManagerImpl and provides the following core functionalities:

public interface ProcessInstanceExecCacheManager {

    /**
     * get WorkflowExecuteThread by process instance id
     *
     * @param processInstanceId processInstanceId
     * @return WorkflowExecuteThread
     */
    WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);

    /**
     * judge the process instance does it exist
     *
     * @param processInstanceId processInstanceId
     * @return true - if process instance id exists in cache
     */
    boolean contains(int processInstanceId);

    /**
     * remove cache by process instance id
     *
     * @param processInstanceId processInstanceId
     */
    void removeByProcessInstanceId(int processInstanceId);

    /**
     * cache
     *
     * @param processInstanceId     processInstanceId
     * @param workflowExecuteThread if it is null, will not be cached
     */
    void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);

    /**
     * get all WorkflowExecuteThread from cache
     *
     * @return all WorkflowExecuteThread in cache
     */
    Collection<WorkflowExecuteRunnable> getAll();

    void clearCache();
}
Enter fullscreen mode Exit fullscreen mode

Producer:

The MasterSchedulerBootstrap loop transforms commands into tasks with the necessary context and submits them for execution.

Consumer:

The EventExecuteService adds tasks without dependencies to the standByTaskInstancePriorityQueue, which are then dispatched in order of priority to the globalTaskDispatchWaitingQueue.

Task Dispatch

Master internal priority queue

Once tasks reach the globalTaskDispatchWaitingQueue, they have already become the smallest executable task units.

Producer:

The EventExecuteService traverses the DAG using a breadth-first search to submit tasks to the globalTaskDispatchWaitingQueue.

Consumer:

The consumer is GlobalTaskDispatchWaitingQueueLooper, which consumes tasks waiting to be dispatched from the globalTaskDispatchWaitingQueue. Task dispatching is executed based on the task type, using an RPC interface. Currently, there are two types of dispatchers:

  1. MasterTaskDispatcher
  2. WorkerTaskDispatcher

For the WorkerTaskDispatcher, after the RPC server receives the RPC request, the task is submitted to the workerTaskExecutorThreadPool for execution. This makes it an asynchronous task processing method, so the master server does not hang at this point. The task execution progress is tracked, with callbacks occurring at key points in the process.

Task Execution Status Callback

When a task is dispatched to the Worker, it is asynchronously submitted to the thread pool for execution. During various stages of asynchronous task execution, the Worker communicates the task status back to the Master by calling the RPC interface.

Producer:

In the asynchronous execution node of the Worker, the task execution status callback includes four possible states:

  1. TaskExecutionStatus.FAILURE โ€“ The task encountered an exception during execution and failed.
  2. TaskExecutionStatus.RUNNING_EXECUTION โ€“ The task has started executing.
  3. TaskExecutionStatus.KILL โ€“ The task was killed.
  4. TaskExecutionStatus.SUCCESS โ€“ The task was successfully executed.

Note: In the official event flowchart, the direction of the ACK was incorrect. The ACK is not sent from the Worker to the Master; rather, the Master notifies the Worker that it has finished processing this event status.

After making this correction, the overall process can be summarized as shown in the following diagram:

Image description

Consumer:

On the master node, the ITaskInstanceExecutionEventListener service receives RPC requests and adds tasks to the TaskEventService event queue for further processing.

Task Status Processing

Buffer Queue

The buffer queue is the TaskEventService eventQueue at the Master node.

Producer

There can be multiple producers, including:

  • User actions from the API server
  • Task scheduling at the Master node
  • Task execution at the Worker nodes
  • Task execution at the Master node

Consumer

The consumer is the TaskInstanceListenerImpl service at the Master node. The TaskInstanceListenerImpl transforms TaskEvent into TaskExecuteRunnable and submits it to the thread pool for execution in the taskExecuteThreadMap. The execution status of the task is modified within the thread pool.

Top comments (0)