DEV Community

Chen Debra
Chen Debra

Posted on

Demystifying DolphinScheduler’s Plugin Mechanism: Extend Task Types & Data Sources with Ease

1. High-Level Architecture Overview

DolphinScheduler’s plugin ecosystem is built upon the Java SPI (Service Provider Interface) framework, paired with Google AutoService to auto-generate registration files, enabling non-intrusive plugin-based extension capabilities.

dolphinscheduler-spi                    ← Core Interface Layer (SPI Infrastructure)
dolphinscheduler-datasource-plugin      ← Data Source Plugin Layer
dolphinscheduler-task-plugin            ← Task Plugin Layer
dolphinscheduler-worker                 ← Plugin Consumer Layer (Worker for task execution)
Enter fullscreen mode Exit fullscreen mode

2. SPI Core Infrastructure

PrioritySPI (Root Interface)
  └── getIdentify(): SPIIdentify  ← Unique plugin identifier + priority weight
  └── compareTo(Integer)          ← Priority comparison logic
Enter fullscreen mode Exit fullscreen mode

PrioritySPIFactory<T> acts as the core engine for plugin discovery

// Scan classpath via Java native ServiceLoader
for (T t : ServiceLoader.load(spiClass)) {
    if (map.containsKey(t.getIdentify().getName())) {
        resolveConflict(t);   // Resolve duplicate plugins by priority; throw exception if priorities match
    } else {
        map.put(t.getIdentify().getName(), t);
    }
}
Enter fullscreen mode Exit fullscreen mode

Plugin Registration Mechanism: Each plugin module leverages the @AutoService annotation. During compilation, SPI configuration files are automatically generated under the META-INF/services/ directory, eliminating manual config maintenance.

3. Deep Dive into Data Source Plugin Workflow

3.1 Interface Hierarchy

DataSourceChannelFactory (SPI Entry Point)
  └── getName()          ← Unique plugin identifier, e.g. "MYSQL"
  └── create()           ← Instantiate DataSourceChannel

DataSourceChannel (Connection Channel)
  └── createAdHocDataSourceClient()    ← Create one-off temporary connection client
  └── createPooledDataSourceClient()   ← Create pooled connection client

DataSourceClient (Base Interface)
  └── getConnection(): Connection

PooledDataSourceClient extends DataSourceClient
  └── createDataSourcePool()           ← Initialize HikariCP connection pool
Enter fullscreen mode Exit fullscreen mode

3.2 End-to-End Implementation Example: MySQL Plugin

MySQLDataSourceChannelFactory          ← Registered via @AutoService annotation
  └── create() → MySQLDataSourceChannel
        └── createPooledDataSourceClient() → MySQLPooledDataSourceClient
              └── extends BasePooledDataSourceClient
                    └── createDataSourcePool() → HikariDataSource
                          ├── setDriverClassName()
                          ├── setJdbcUrl()
                          ├── setUsername() / setPassword()
                          ├── setMinimumIdle() / setMaximumPoolSize()
                          └── setConnectionTestQuery()
Enter fullscreen mode Exit fullscreen mode

4. Deep Dive into Task Plugin Workflow

4.1 Interface Hierarchy

TaskChannelFactory (SPI Entry Point) extends UiChannelFactory, PrioritySPI
  └── getName()          ← Unique task type identifier, e.g. "SHELL"
  └── create()           ← Instantiate TaskChannel
  └── getParams()        ← Return UI configuration parameters for frontend rendering

TaskChannel (Task Execution Channel)
  └── createTask(TaskExecutionContext) → AbstractTask
  └── parseParameters(ParametersNode) → AbstractParameters
  └── getResources(parameters)        → ResourceParametersHelper
  └── cancelApplication(boolean)

AbstractTask (Base Task Execution Class)
  └── init()             ← Task initialization logic
  └── handle(callback)   ← Core execution logic (abstract method)
  └── cancel()           ← Task termination logic (abstract method)
  └── getExitStatus()    ← Map exit code to standardized task status
Enter fullscreen mode Exit fullscreen mode

4.2 End-to-End Implementation Example: Shell Task Plugin

ShellTaskChannelFactory                ← Registered via @AutoService annotation
  └── getName() → "SHELL"
  └── getParams() → [nodeName, runFlag, ...]  ← Frontend UI configuration fields
  └── create() → ShellTaskChannel
        └── createTask(ctx) → ShellTask
              └── handle(callback)
                    └── ShellCommandExecutor.run(shellActuatorBuilder)
                          └── Spawn & execute shell script process
Enter fullscreen mode Exit fullscreen mode

5. Side-by-Side Comparison of Two Plugin Types

Feature Redshift Concurrency Scaling DolphinScheduler Task Groups
Control Granularity Cluster-level, auto-scaling Task/Workflow-level, manual control
Modification Cost Low; only parameter configuration required Medium; task grouping planning needed
Cost Impact Pay-as-you-go, potential extra charges No additional fees
Applicable Scenarios Sudden, unpredictable load spikes Known, stable high-concurrency scheduling scenarios
Recommended Usage Acts as a "fuse" to handle unexpected traffic surges Acts as a "throttle valve" for daily concurrency governance

Top comments (0)