Argo Workflows is an open-source, container-native workflow engine for orchestrating parallel jobs on Kubernetes. It implements workflows as Kubernetes CRDs, making it ideal for ML pipelines, data processing, and CI/CD.
What Is Argo Workflows?
Argo Workflows is a CNCF graduated project that runs multi-step workflows where each step is a container. It supports DAG-based and step-based workflows with features like retries, timeouts, and conditional execution.
Key Features:
- DAG and step-based workflows
- Artifact passing between steps
- Parameterized workflows (templates)
- Cron workflows
- Retry and timeout policies
- Web UI dashboard
- REST and gRPC API
- S3/GCS/MinIO artifact storage
Installation
kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.6.0/install.yaml
# Install CLI
brew install argo
# Port-forward UI
kubectl -n argo port-forward svc/argo-server 2746:2746
Workflow Examples
DAG Workflow
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: data-pipeline
spec:
entrypoint: pipeline
templates:
- name: pipeline
dag:
tasks:
- name: extract
template: extract-data
- name: transform
template: transform-data
dependencies: [extract]
- name: validate
template: validate-data
dependencies: [extract]
- name: load
template: load-data
dependencies: [transform, validate]
- name: extract-data
container:
image: python:3.12-slim
command: [python, -c]
args: ["print('Extracting data...')"]
- name: transform-data
container:
image: python:3.12-slim
command: [python, -c]
args: ["print('Transforming data...')"]
- name: validate-data
container:
image: python:3.12-slim
command: [python, -c]
args: ["print('Validating data...')"]
- name: load-data
container:
image: python:3.12-slim
command: [python, -c]
args: ["print('Loading to warehouse...')"]
Parameterized Workflow
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: ml-training
spec:
entrypoint: train
arguments:
parameters:
- name: dataset
- name: epochs
value: "100"
- name: learning-rate
value: "0.001"
templates:
- name: train
container:
image: ml-trainer:latest
args:
- --dataset={{workflow.parameters.dataset}}
- --epochs={{workflow.parameters.epochs}}
- --lr={{workflow.parameters.learning-rate}}
resources:
requests:
memory: 4Gi
nvidia.com/gpu: 1
Argo Workflows API
import requests
ARGO = "https://localhost:2746/api/v1"
HEADERS = {"Authorization": "Bearer your-token"}
# List workflows
workflows = requests.get(f"{ARGO}/workflows/argo", headers=HEADERS, verify=False).json()
for wf in workflows.get("items", []):
print(f"Workflow: {wf['metadata']['name']}, Phase: {wf['status']['phase']}")
# Submit a workflow
result = requests.post(f"{ARGO}/workflows/argo", headers=HEADERS, verify=False, json={
"workflow": {
"metadata": {"generateName": "data-pipeline-"},
"spec": {
"entrypoint": "main",
"templates": [{
"name": "main",
"container": {
"image": "python:3.12-slim",
"command": ["python", "-c"],
"args": ["print('Hello from Argo!')"]
}
}]
}
}
}).json()
print(f"Submitted: {result['metadata']['name']}")
# Get workflow logs
logs = requests.get(
f"{ARGO}/workflows/argo/{wf_name}/log",
headers=HEADERS, verify=False,
params={"logOptions.container": "main"}
).text
print(logs)
Cron Workflows
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: daily-report
spec:
schedule: "0 8 * * *"
timezone: "UTC"
concurrencyPolicy: Replace
workflowSpec:
entrypoint: report
templates:
- name: report
container:
image: reporter:latest
command: [python, generate_report.py]
Resources
- Argo Workflows Docs
- Argo Workflows GitHub — 15K+ stars
- Examples
Need to scrape web data for your data pipelines? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com
Top comments (0)