DEV Community

Cover image for Orchestrate Dataflow pipelines easily with GCP Workflows
matthieucham for Stack Labs

Posted on • Updated on

Orchestrate Dataflow pipelines easily with GCP Workflows

Dataflow pipelines rarely are on their own. Most of the time, they are part of a more global process. For example : one pipeline collects events from the source into BigTable, then a second pipeline computes aggregated data from BigTable and store them into BigQuery.

Alt Text

Of course, each pipeline could be scheduled independently with Cloud Scheduler.
But if these pipelines need to be linked somehow, such as launching the second pipeline when the first is done, then orchestration is required.

Until recently, GCP had one tool in the box for this kind of purpose : Cloud Composer , a (slightly) managed Apache Airflow. Despite its rich and numerous functionalities and its broad community, this service had several caveats for the kind of simple orchestration I was after:

  • it's not fully integrated : you need to manage costly resources such as a GKE cluster, a CloudSQL instance

  • it pushes Python in your codebase, there is no other choice

  • any change in the setup (like, adding an environment variable) is painfully slow to propagate

  • the wide variety of operators in the ecosystem can lead to a poor separation of concerns between orchestration and business processes

And I won't even talk about the Airflow UI... (I've heard that some people like it)

Because of these, orchestrating with Composer is overly difficult. Yet, as it is often the case with the GCP platform, if you face too many difficulties when doing something that should be simple enough, you're probably not doing it right. This proved true once again: Cloud Composer wasn't the right product for my need...

Enter GCP Workflows !

Workflows is a new service : it has been promoted out of bêta very recently. And luckily, it already offers most of the needed functionality to do the orchestration of GCP services' jobs, and doing it simply:

  • it is fully managed and serverless, which means you don't pay when you don't use it

  • it does only one job and does it well : orchestrating HTTP calls

  • all is configured in YAML files, whose syntax is short and easy to learn

  • the UI is neatly integrated and feels more "part of GCP" than Composer (Although there is still quite a few display bugs at the moment)

With this new product it becomes really easy to write a Workflow which chains multiple Dataflow jobs like in the diagram above.

A sample workflow for Dataflow jobs

Workflow files are YAML. It is simple and straightforward:

main:
  steps:
    - init:
        assign:
          - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - region: "europe-west1"
          - topic: "myTopic"
    - firstPipeline:
        call: LaunchDataflow
        args:
          project: ${project}
          region: ${region}
          template: "first"
        result: firstJobId
    - waitFirstDone:
        call: DataflowWaitUntilStatus
        args:
          project: ${project}
          region: ${region}
          jobId: ${firstJobId}
          status: "JOB_STATE_DONE"
    - secondPipeline:
        call: LaunchDataflow
        args:
          project: ${project}
          region: ${region}
          template: "second"
        result: secondJobId    
    - waitSecondDone:
        call: DataflowWaitUntilStatus
        args:
          project: ${project}
          region: ${region}
          jobId: ${secondJobId}
          status: "JOB_STATE_DONE"
    - publish:
        call: googleapis.pubsub.v1.projects.topics.publish
        args:
          topic: ${"projects/" + project + "/topics/" + topic}
          body:
            messages:
              - data: ${base64.encode(text.encode("{\"message\":\"workflow done\"}"))}
Enter fullscreen mode Exit fullscreen mode

Let's break it down. The sample workflow has the following steps:

  • init: preprocessing stage, where workflow variables are initialized.
  • firstPipeline: Launch the first dataflow job
  • waitFirstDone: Wait until the first dataflow job is completed
  • secondPipeline: Launch the second dataflow job
  • waitSecondDone: Wait until the second dataflow job is completed -publish: Push a sample PubSub notification at the end of the workflow

As you have noticed, firstPipeline and secondPipeline call a custom routine, a subworkflow, which is defined in the same file:

LaunchDataflow:
  params: [project, region, template]
  steps:
    - launch:
        call: http.post
        args:
          url: ${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/flexTemplates:launch"}
          auth:
            type: OAuth2
          body:
            launchParameter:
              jobName: ${"workflow-" + template }
              environment:
                numWorkers: 1
                maxWorkers: 8
              containerSpecGcsPath: ${template}
        result: dataflowResponse
        next: jobCreated
    - jobCreated:
        return: ${dataflowResponse.body.job.id}
Enter fullscreen mode Exit fullscreen mode

This subworkflow calls the Dataflow Rest API to launch a job (here, a flex template). With workflows you can easily call any service's API or any external HTTP endpoint.

Similarly, waitFirstDone and waitSecondDone call another subworkflow:

DataflowWaitUntilStatus:
  params: [project, region, jobId, status]
  steps:
    - init:
        assign:
          - currentStatus: ""
          - failureStatuses: ["JOB_STATE_FAILED", "JOB_STATE_CANCELLED", "JOB_STATE_UPDATED", "JOB_STATE_DRAINED"]
    - check_condition:
        switch:
          - condition: ${currentStatus in failureStatuses}
            next: exit_fail
          - condition: ${currentStatus != status}
            next: iterate
        next: exit_success
    - iterate:
        steps:
          - sleep30s:
              call: sys.sleep
              args:
                seconds: 30
          - getJob:
              call: http.get
              args:
                url: ${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/jobs/"+jobId}
                auth:
                  type: OAuth2
              result: getJobResponse
          - getStatus:
              assign:
                - currentStatus: ${getJobResponse.body.currentState}
          - log:
              call: sys.log
              args:
                text: ${"Current job status="+currentStatus}
                severity: "INFO"
        next: check_condition
    - exit_success:
        return: ${currentStatus}
    - exit_fail:
        raise: ${"Job in unexpected terminal status "+currentStatus}
Enter fullscreen mode Exit fullscreen mode

This subworkflow also calls the Dataflow Rest API, this time in a kind of loop until the job reach a terminal status. In case of unexpected state, an exception is raised and the workflow stops and is marked as failed. Otherwise, it proceeds to the next stage

Finally, just deploy this workflow, via the UI or gcloud for example:

#! /bin/bash
localDir=$(dirname "$0")

WORKFLOW="sample"
DESCRIPTION="Sample workflow"
SOURCE="sample.yaml"
PROJECT="my-gcp-project"
REGION="europe-west4"
SERVICE_ACCOUNT="sa-workflows@my-gcp-project.iam.gserviceaccount.com"

gcloud beta workflows deploy "${WORKFLOW}" --location="${REGION}" --service-account="${SERVICE_ACCOUNT}" --source="${localDir}/${SOURCE}" --description="${DESCRIPTION}"
Enter fullscreen mode Exit fullscreen mode

Breaking news Workflows resources are now available in Terraform for you IAC freaks

Once deployed, it can be launched, for example from Scheduler, by POSTing to this endpoint https://workflowexecutions.googleapis.com/v1/projects/${PROJECT}/locations/${REGION}/workflows/${WORKFLOW}/executions

Conclusion

Thanks to Workflows, with just a relatively small YAML file we were able to chain two Dataflow jobs the easy way: serverlessly.


Thanks for reading! I’m Matthieu, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.

Top comments (0)