This series of articles is a beginner-to-advanced tutorial on DolphinScheduler, covering setup, secondary development, core principle analysis, operations, and management. It is suitable for readers who want to understand DolphinScheduler or deepen their knowledge.
Today’s article is the second on cleaning scheduling data. We know that DolphinScheduler workflows are version-controlled. Each time a task is updated, added, or modified, a new version is generated. At the same time, data in process_definition_log and process_task_relation_log increases. Over time, a large amount of “useless data” accumulates, and MySQL records grow, which can impact scheduling services and user experience.
Here’s an example: 👇👇👇
As shown in the figure, this workflow has accumulated over 600 versions with iterations. During all this scheduling, we have never needed to switch to a historical version. These historical versions are basically “useless” data. To maintain stability, we agreed with the data warehouse team to keep only the most recent 20 versions.
Therefore, it is necessary to clean the above historical version records to ensure page responsiveness and MySQL service performance.
Cleaning workflow history versions is still best done via API, as directly operating on the database carries a high risk.
This article is simple: first, we explain the API logic, then show how to use a Python script to call the API to delete historical versions.
1. API Logic Introduction
DolphinScheduler provides an API for deleting version records. Request type: DELETE, endpoint: process-definition/{dag_code}/versions/{version}. The logic is simple and will not be detailed here.
2. Using Python Script to Call API
The Python script uses four APIs in sequence:
- Get the project list
- Get the workflow list
- Get the current workflow version information
- Delete historical versions
In step three, note that the version info list is paginated with a size of 20, starting from page 2, because we want to keep the most recent 20 versions.
Input: None
Python environment: 2.7
The code is as follows:
#!/usr/bin/python
# -*- coding: utf8 -*-
# Clean workflow history versions via API to reduce data in process_definition_log and process_task_relation_log.
import io
import subprocess
import requests
import json
import time
import datetime
# Configuration: modify IP, port, token
base_url = 'http://xxxx:xxxx'
token = 'xxxxx'
# Get project list
def get_project_list():
url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'language': 'zh_CN',
'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
'User-Agent': 'Mozilla/5.0 ...',
'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
'token':token
}
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
totalList = response_data['data']['totalList']
return totalList
# Get workflow definition list
def get_definition_detail(project_code):
...
return all_data
# Get workflow version info list, starting from page 2, size 20
def get_version_detail(project_code, dag_code, current_version):
...
return all_version
# Delete a version
def delete(project_code, dag_code, version):
...
response = requests.request("DELETE", url, headers=headers, data=payload)
print('Execution result:')
print(response.text)
if __name__ == '__main__':
projects = get_project_list()
for project in projects:
project_code = project['code']
print('Processing project:'+ str(project_code))
all_dags = get_definition_detail(project_code)
for dag in all_dags:
dag_code = dag['code']
current_version = dag['version']
print(dag_code)
print(current_version)
all_data = get_version_detail(project_code, dag_code, current_version)
print(all_data)
for v in all_data:
delete(project_code, dag_code, v)
Usage example: save the script as dolphin_clean_version.py
python dolphin_clean_version.py
The script is also maintained on GitHub. Welcome to star:
https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_version.py
3. Notes
- How to obtain the token: You can get the token from Apache DolphinScheduler's "Security Center."
The above shows how to reduce the size of relation log and task definition log tables with one click via API. If you have any questions, feel free to reach out. Hope this helps! See you next time.

Top comments (0)