DEV Community

Chen Debra
Chen Debra

Posted on

1 1 1

Dolphinscheduler DAG Core Source Code Analysis

Background

Image description

Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks.

Dolphinscheduler DAG Implementation

org.apache.dolphinscheduler.common.graph.DAG

Three important data structures of DAG:

// Vertex information
private final Map<Node, NodeInfo> nodesMap;

// Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

// Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
Enter fullscreen mode Exit fullscreen mode

Example below:

DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");

// Add an edge from B to C, A is still floating
graph.addEdge("B", "C");

// If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added.
graph.addEdge("A", "B");
Enter fullscreen mode Exit fullscreen mode

Source code analysis:
org.apache.dolphinscheduler.common.graph.DAG#addEdge

public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
    lock.writeLock().lock();

    try {
        // TODO Whether the edge can be added
        if (!isLegalAddEdge(fromNode, toNode, createNode)) {
            log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
            return false;
        }

        // TODO Add nodes
        addNodeIfAbsent(fromNode, null);
        addNodeIfAbsent(toNode, null);

        // TODO Add edges
        addEdge(fromNode, toNode, edge, edgesMap);
        addEdge(toNode, fromNode, edge, reverseEdgesMap);

        return true;
    } finally {
        lock.writeLock().unlock();
    }
}

private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
    // TODO If fromNode and toNode are the same vertex, this edge cannot be added
    if (fromNode.equals(toNode)) {
        log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
        return false;
    }

    // TODO If not creating a node, meaning fromNode and toNode must be existing vertices
    if (!createNode) {
        if (!containsNode(fromNode) || !containsNode(toNode)) {
            log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
            return false;
        }
    }

    // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the
    // DAG has a cycle!
    // TODO Get the number of nodes
    int verticesCount = getNodesCount();

    Queue<Node> queue = new LinkedList<>();

    // TODO Put toNode into the queue
    queue.add(toNode);

    // If DAG doesn't find fromNode, it's not a cycle!
    // TODO When the queue is not empty, it is definitely not empty here
    while (!queue.isEmpty() && (--verticesCount > 0)) {
        // TODO Get the element in the queue
        Node key = queue.poll();

        for (Node subsequentNode : getSubsequentNodes(key)) {
            // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue
            // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle)
            if (subsequentNode.equals(fromNode)) {
                return false;
            }

            queue.add(subsequentNode);
        }
    }

    return true;
}
Enter fullscreen mode Exit fullscreen mode

Dolphinscheduler DagHelper Explanation

The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG.

org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph

public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {

    // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance
    List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
            workflowInstance.getProcessDefinitionCode(),
            workflowInstance.getProcessDefinitionVersion());

    // TODO Get the corresponding task definition log
    List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);

    // TODO Get TaskNode
    List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);

    // generate process to get DAG info
    // TODO Here is to parse whether the start node list is manually specified, which is not by default
    List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());

    // TODO If the default startNodeNameList is empty
    List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());

    // TODO Build a ProcessDag object instance
    ProcessDag processDag = DagHelper.generateFlowDag(
            taskNodeList,
            startNodeNameList,
            recoveryTaskNodeCodeList,
            workflowInstance.getTaskDependType());

    if (processDag == null) {
        log.error("ProcessDag is null");
        throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
    }

    // TODO Generate DAG
    DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
    log.debug("Build dag success, dag: {}", dagGraph);

    // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph
    return new WorkflowGraph(taskNodeList, dagGraph);
}
Enter fullscreen mode Exit fullscreen mode

org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag

public static ProcessDag generateFlowDag(
                                             List<TaskNode> totalTaskNodeList,
                                             List<Long> startNodeNameList,
                                             List<Long> recoveryNodeCodeList,
                                             TaskDependType depNodeType) throws Exception {

    // TODO Actually, it is to get all nodes
    List<TaskNode> destTaskNodeList =
            generateFlowNodeListByStartNode(
                    totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);

    if (destTaskNodeList.isEmpty()) {
        return null;
    }

    // TODO Get the relationship between task nodes
    List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);

    // TODO Actually, it is to instantiate a ProcessDag
    ProcessDag processDag = new ProcessDag();
    // TODO Set the edges of DAG
    processDag.setEdges(taskNodeRelations);
    // TODO Set the vertices of DAG
    processDag.setNodes(destTaskNodeList);
    return processDag;
}
Enter fullscreen mode Exit fullscreen mode

Set destTaskNodeList and taskNodeRelations

org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph

public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();

    // TODO Add vertices
    if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
        for (TaskNode node : processDag.getNodes()) {
            dag.addNode(node.getCode(), node);
        }
    }

    // TODO Add edges
    if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
        for (TaskNodeRelation edge : processDag.getEdges()) {
            dag.addEdge(edge.getStartNode(), edge.getEndNode());
        }
    }
    return dag;
}
Enter fullscreen mode Exit fullscreen mode

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay