DEV Community

Alex Spinov
Alex Spinov

Posted on

Argo Workflows Has a Free API: Kubernetes-Native Workflow Engine for Data Pipelines

Argo Workflows is an open-source, container-native workflow engine for orchestrating parallel jobs on Kubernetes. It implements workflows as Kubernetes CRDs, making it ideal for ML pipelines, data processing, and CI/CD.

What Is Argo Workflows?

Argo Workflows is a CNCF graduated project that runs multi-step workflows where each step is a container. It supports DAG-based and step-based workflows with features like retries, timeouts, and conditional execution.

Key Features:

  • DAG and step-based workflows
  • Artifact passing between steps
  • Parameterized workflows (templates)
  • Cron workflows
  • Retry and timeout policies
  • Web UI dashboard
  • REST and gRPC API
  • S3/GCS/MinIO artifact storage

Installation

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.6.0/install.yaml

# Install CLI
brew install argo

# Port-forward UI
kubectl -n argo port-forward svc/argo-server 2746:2746
Enter fullscreen mode Exit fullscreen mode

Workflow Examples

DAG Workflow

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: data-pipeline
spec:
  entrypoint: pipeline
  templates:
    - name: pipeline
      dag:
        tasks:
          - name: extract
            template: extract-data
          - name: transform
            template: transform-data
            dependencies: [extract]
          - name: validate
            template: validate-data
            dependencies: [extract]
          - name: load
            template: load-data
            dependencies: [transform, validate]

    - name: extract-data
      container:
        image: python:3.12-slim
        command: [python, -c]
        args: ["print('Extracting data...')"]

    - name: transform-data
      container:
        image: python:3.12-slim
        command: [python, -c]
        args: ["print('Transforming data...')"]

    - name: validate-data
      container:
        image: python:3.12-slim
        command: [python, -c]
        args: ["print('Validating data...')"]

    - name: load-data
      container:
        image: python:3.12-slim
        command: [python, -c]
        args: ["print('Loading to warehouse...')"]
Enter fullscreen mode Exit fullscreen mode

Parameterized Workflow

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: ml-training
spec:
  entrypoint: train
  arguments:
    parameters:
      - name: dataset
      - name: epochs
        value: "100"
      - name: learning-rate
        value: "0.001"
  templates:
    - name: train
      container:
        image: ml-trainer:latest
        args:
          - --dataset={{workflow.parameters.dataset}}
          - --epochs={{workflow.parameters.epochs}}
          - --lr={{workflow.parameters.learning-rate}}
        resources:
          requests:
            memory: 4Gi
            nvidia.com/gpu: 1
Enter fullscreen mode Exit fullscreen mode

Argo Workflows API

import requests

ARGO = "https://localhost:2746/api/v1"
HEADERS = {"Authorization": "Bearer your-token"}

# List workflows
workflows = requests.get(f"{ARGO}/workflows/argo", headers=HEADERS, verify=False).json()
for wf in workflows.get("items", []):
    print(f"Workflow: {wf['metadata']['name']}, Phase: {wf['status']['phase']}")

# Submit a workflow
result = requests.post(f"{ARGO}/workflows/argo", headers=HEADERS, verify=False, json={
    "workflow": {
        "metadata": {"generateName": "data-pipeline-"},
        "spec": {
            "entrypoint": "main",
            "templates": [{
                "name": "main",
                "container": {
                    "image": "python:3.12-slim",
                    "command": ["python", "-c"],
                    "args": ["print('Hello from Argo!')"]
                }
            }]
        }
    }
}).json()
print(f"Submitted: {result['metadata']['name']}")

# Get workflow logs
logs = requests.get(
    f"{ARGO}/workflows/argo/{wf_name}/log",
    headers=HEADERS, verify=False,
    params={"logOptions.container": "main"}
).text
print(logs)
Enter fullscreen mode Exit fullscreen mode

Cron Workflows

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: daily-report
spec:
  schedule: "0 8 * * *"
  timezone: "UTC"
  concurrencyPolicy: Replace
  workflowSpec:
    entrypoint: report
    templates:
      - name: report
        container:
          image: reporter:latest
          command: [python, generate_report.py]
Enter fullscreen mode Exit fullscreen mode

Resources


Need to scrape web data for your data pipelines? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com

Top comments (0)