DEV Community

Chen Debra
Chen Debra

Posted on

2 1 1 1 1

Using DolphinScheduler API to Achieve Efficient Batch Workflow Import and Script Deployment

Image description

When I Implemented batch generation of DolphinScheduler tasks and imported them, it was found that tasks could only be imported one by one, so using the API is apparently more convenient.

DolphinScheduler API Documentation

DolphinScheduler has API documentation available at:

http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn
Enter fullscreen mode Exit fullscreen mode

However, the documentation is relatively brief, so you need to study it yourself.

Token: All APIs require a token.

  1. In Security Center - Token Management, create a token. Remember this token, as it will be needed for all subsequent API calls.

Header: Construct the request header using the token mentioned above:

token = ''
headers = {
    'Accept': 'application/json',
    'token': token
}
Enter fullscreen mode Exit fullscreen mode

Project ID: project_id can be found in the URL when viewing the project workflow.

DolphinScheduler Task Import API

The API for importing tasks is:

import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'
Enter fullscreen mode Exit fullscreen mode

Knowing the API, you can proceed with the import:

def import_job(file_path):
# Open the file and read it as binary data
    with open(file_path, 'rb') as file:
        files = {'file': file}
        # Import the workflow
        response = requests.post(import_url, headers=headers, files=files)
        print(response.status_code)
        if response.status_code != 200:
            print('Upload failed  ' + file_path)
Enter fullscreen mode Exit fullscreen mode

It should be noted that task imports only support binary files.

file_path is the workflow file. For implementation, you can export one from the workflow as a reference.

By repeating the above method, you can achieve batch task imports.

Workflow Deployment

After completing batch task uploads using the above method, there is still an issue: manually deploying each workflow is still a significant workload. Therefore, we continue using the API.

After research, it was found that deploying a workflow requires first obtaining the schedule ID of the workflow.

Get Workflow List -> Get Workflow Code -> Get Schedule IDs for All Workflows -> Deploy Workflows

  • Get Workflow List The API address is:
jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'
Enter fullscreen mode Exit fullscreen mode

However, this requires paginated queries, which is slightly inconvenient:

def get_jobs_list():
    # Paginated query
    # Initialize pagination parameters
    pageNo = 1
    pageSize = 10
    url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='
    # Construct the complete URL
    # Store all results
    all_items = list()
    while True:
        # Construct the complete URL
        url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='

        # Send GET request
        response = requests.get(url, headers=headers)

        # Check response status code
        if response.status_code == 200:
            # Request succeeded, process response data
            items = response.content.decode()
            total = json.loads(items)["data"]["total"]
            item = json.loads(items)["data"]["totalList"]
            # Add the current page's data to the result list
            for i in item:
                all_items.append(i)

            # Exit the loop if there is no data on the current page
            if pageNo * pageSize > total:
                break
            if not items:
                break
            # Increment page number
            pageNo += 1
        else:
            # Request failed, print error information
            print('Request failed:', response.status_code, response.text)
            break

    return all_items
Enter fullscreen mode Exit fullscreen mode

all_items contains the specific details of all workflows, which need to be extracted:

 all_jobs = get_jobs_list()
        job_codes = [job['code'] for job in all_jobs]
Enter fullscreen mode Exit fullscreen mode

This gives you the codes for all workflows.

  • Get Schedule ID The API for schedule IDs is as follows. To avoid pagination, 1000 entries per page are directly specified:
schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='
Enter fullscreen mode Exit fullscreen mode

Using this API, you can obtain all schedule IDs:

def schedule_id(job_code):
    url = schedules_url + str(job_code)
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.content.decode()
        js = json.loads(data)
        if len(js['data']['totalList']) > 0 and js['data']['totalList'][0]['releaseState'] == 'OFFLINE':
            return js['data']['totalList'][0]['id']
    else:
        return ''
Enter fullscreen mode Exit fullscreen mode

Here, schedule IDs that are already deployed are filtered out.

  • Deploy Everything is ready, and deployment can finally proceed:
online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'
Enter fullscreen mode Exit fullscreen mode

Specific implementation:

def online_job(scheduler_id):
    url = online_url.format(scheduler_id=scheduler_id)
    response = requests.post(url, headers=headers)
    if response.status_code == 200:
        print('success')
    else:
        print('online job failed')
Enter fullscreen mode Exit fullscreen mode

At this point, import and fully automated batch deployment can be achieved.

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

The Most Contextual AI Development Assistant

Pieces.app image

Our centralized storage agent works on-device, unifying various developer tools to proactively capture and enrich useful materials, streamline collaboration, and solve complex problems through a contextual understanding of your unique workflow.

👥 Ideal for solo developers, teams, and cross-company projects

Learn more

👋 Kindness is contagious

Explore a sea of insights with this enlightening post, highly esteemed within the nurturing DEV Community. Coders of all stripes are invited to participate and contribute to our shared knowledge.

Expressing gratitude with a simple "thank you" can make a big impact. Leave your thanks in the comments!

On DEV, exchanging ideas smooths our way and strengthens our community bonds. Found this useful? A quick note of thanks to the author can mean a lot.

Okay