DEV Community

Cover image for Scaling multi-node GPU data pipelines using Dask on Kubernetes
beefed.ai
beefed.ai

Posted on • Originally published at beefed.ai

Scaling multi-node GPU data pipelines using Dask on Kubernetes

  • Architectural patterns that enable linear multi-node GPU scaling
  • Allocating GPUs and scheduling with the Kubernetes GPU Operator
  • Design GPU partitioning and minimize shuffle to keep GPUs fed
  • Monitoring and profiling to find the real bottlenecks
  • Scaling strategies across nodes, fabrics, and failure domains
  • Production-ready checklist and step-by-step deploy protocol

Linear, predictable scaling on multi-node GPU pipelines doesn’t come from adding GPUs — it comes from removing the friction that starves them: bad partitioning, host/device hops, and expensive shuffles. I’ve engineered Dask GPU pipelines that scale near-linearly by treating data layout, communication fabric, and memory management as first‑class design constraints.

You see low GPU utilization, frequent OOMs, and long tail latencies while the cluster network screams during shuffles — those are the symptoms. On the ground this looks like: tiny partitions generating enormous scheduler overhead, workers thrashing to spill to host, host-to-device copies multiplying, and the scheduler becoming the single-threaded choke point for shuffle coordination. The practical consequence: adding GPUs gives diminishing returns because the system is limited by communication and memory-management mistakes you can fix.

Architectural patterns that enable linear multi-node GPU scaling

  • One-worker-per-GPU as the default unit. Treat each GPU as a capacity unit and run one dask-worker / dask-cuda-worker process per GPU. This model simplifies memory accounting, lets you set a deterministic rmm pool per process, and avoids complex intra-process GPU allocator interactions that lead to fragmentation and OOMs. Use multi-process-per-GPU only for very specific micro-batch workloads where you measure benefit.

  • Design the data plane first: choose whether the data plane will be (a) Object-store-backed, read into GPU memory per task via Arrow IPC, or (b) long-lived GPU-resident partitions. For streaming/near-real-time pipelines keep a small set of GPU-resident partitions; for large-batch ETL use columnar formats (Parquet/Arrow) and read into GPU buffers with zero-copy paths when possible. cuDF supports device Arrow interoperability so you can avoid copies with Arrow/device arrays.

  • Use UCX + GPUDirect for inter-GPU transfers. When nodes have NVLink or InfiniBand, configure the cluster to use UCX as the transport so you get peer-to-peer GPU transfers (NVLink or GPUDirect RDMA) rather than falling back to host-mediated TCP copies. That change is frequently the single biggest runtime improvement for shuffle-heavy jobs. dask-cuda and ucx-py provide the integration and configuration knobs.

  • Memory management is not optional: enable the RAPIDS Memory Manager (RMM) pool on every worker so allocations and temporary buffers reuse the same device memory and reduce fragmentation and allocation latency. Tune rmm_pool_size to leave 20–40% headroom for system and ML libraries unless you're using MIG/explicit sharing. dask-cuda exposes these flags and integrates with external allocators like PyTorch and CuPy.

  • Prefer columnar, vectorized operators (cuDF, cuGraph, cuML). When your compute is GPU-native, ensure upstream IO produces columnar buffers that map to GPU memory with minimal conversion. This avoids serializing rows, which is expensive in distributed pipelines.

Sources for these architectural levers: dask-cuda configuration for rmm and UCX examples ; cuDF Arrow-device interop ; UCX/ucx-py explanation of GPU communication .

Allocating GPUs and scheduling with the Kubernetes GPU Operator

  • Automate the GPU stack with the NVIDIA GPU Operator. Use the GPU Operator to install drivers, device plugin, Container Toolkit, DCGM monitoring and Node Feature Discovery (NFD) so GPU nodes are automatically labeled for scheduling; this avoids manual host maintenance and makes node reprovisioning safe. The operator also bundles DCGM telemetry for Prometheus integration.

  • Request GPUs via extended resources. Pods request GPUs via limits like nvidia.com/gpu: 1. Kubernetes will schedule those pods only onto nodes that advertise the device plugin resource. GPUs cannot be overcommitted as numeric fractional resources — use MIG (multi-instance GPUs) only when supported and intentionally allocated. Example pod fragment:

spec:
  containers:
    - name: dask-worker
      image: your-registry/dask-gpu:2025.04.1
      resources:
        limits:
          nvidia.com/gpu: 1
Enter fullscreen mode Exit fullscreen mode
  • Match Kubernetes resource limits to the worker process flags. The worker’s --memory-limit and --nthreads must reflect the Kubernetes resources so the kubelet doesn’t evict the process. Use the restartPolicy: Never pattern for ephemeral workers launched from the Dask operator or gateway to avoid Kubernetes repeatedly scheduling failing workers.

  • Leverage Node Feature Discovery labels. Use the GPU Ope­rator’s NFD labels or cloud provider labels in nodeSelector/nodeAffinity to ensure pods land on the right GPU type (e.g., A100 vs T4). The exact label key varies by install; query your NFD/cluster to use the canonical label.

  • MIG and CDI for multi-tenant GPU sharing. When you must multiplex GPUs between tenants, advertise MIG partitions and use Container Device Interface (CDI) to ensure consistent device mappings in pods. The GPU Operator integrates MIG and CDI tooling.

  • Prefer one process per GPU and pin CPUs. Set requests/limits for CPU and memory and use nodeAffinity to colocate heavy CPU tasks (IO/serialization) on the same NUMA domain as the GPU where possible; Kubernetes Topology Manager and device plugins can surface necessary NUMA hints.

Practical mapping: install the GPU Operator via Helm, then deploy the Dask Helm chart (or Dask Operator / Dask Gateway) for cluster lifecycle management; pin chart versions in production.

Design GPU partitioning and minimize shuffle to keep GPUs fed

  • Partition sizing is a trade-off: aim for partitions that make each GPU task run in the high‑tens to low‑hundreds of milliseconds but that also fit comfortably inside the working set of GPU memory. Rule-of-thumb ranges for GPU-backed DataFrames: 100MB – 1GB per partition, adjusting for complex string-heavy columns or wide schemas; for ETL and NVTabular-style flows a part_size of ~100MB is a common starting point. Too many tiny partitions inflate scheduler overhead; too few reduce parallelism and make shuffles expensive.

  • Avoid full-data shuffles whenever possible. Shuffles are all-to-all by nature: minimize them by:

    • Partitioning on your join/group key at source (Hive/Parquet partitioning or pre-partition writing).
    • Broadcasting small lookup tables to workers instead of shuffling them. Re-broadcasting a small table once costs far less than repeated all‑to‑all movements.
    • Using pre-aggregation / combiner steps (map → partial aggregate → reduce) so the amount of data sent in the shuffle is reduced.
  • Leverage Dask’s newer P2P shuffle when beneficial. The p2p/UCX-enabled shuffle reduces scheduler task-count blowup and scales linearly for large shuffles; ensure your cluster’s fabric and UCX setup support RDMA/NVLink before switching. The optimizer will try to avoid shuffles when it can — chain operations and persist strategic intermediates so the planner can exploit existing partitioning.

  • Use cuDF spilling carefully. Enable --enable-cudf-spill only when you understand its semantics; spilling moves device data to host/disk and can cost you significant transfer time. In many pipelines it’s better to rework partitioning or use rmm pools and controlled spilling thresholds. dask-cuda offers flags to configure these behaviors.

  • Materialize and persist heavy intermediates. After an expensive shuffle, client.persist() the resulting dataset and client.rebalance() to avoid hotspots when downstream tasks read the same data many times. Keep an eye on memory headroom — persistent GPU datasets are fast but occupy device memory.

Example broadcast-join pattern (Dask DataFrame):

# small_df is small enough to broadcast
small_local = small_ddf.compute()
result = big_ddf.map_partitions(lambda part: part.merge(small_local, on='key'))
Enter fullscreen mode Exit fullscreen mode

Sources: Dask DataFrame best-practices and shuffle documentation, NVTabular examples and Dask-cuda RMM/shuffle flags.

Monitoring and profiling to find the real bottlenecks

  • Observe GPU-level telemetry first. Use the DCGM exporter (deployed as part of the GPU Operator or standalone daemonset) to collect DCGM_FI_DEV_* metrics into Prometheus and display them in Grafana templates. Monitor GPU memory usage, SM utilization, memory bandwidth, PCIe/NVLink traffic, and power/thermal events — those tell you whether you are compute‑bound, memory‑bound, or network‑bound.

  • Combine Dask-level metrics with GPU metrics. The Dask scheduler and workers expose Prometheus metrics and the live dashboard. Capture dask_scheduler_tasks, dask_worker_memory, and network bandwidth alongside GPU metrics to correlate scheduler stalls with physical bottlenecks. Dask’s performance_report, Client.profile() and get_task_stream() are invaluable for offline post-mortems.

  • Kernel and stream profiling for hot kernels. Use NVIDIA Nsight Systems for timeline traces and Nsight Compute for kernel-level metrics when you need to inspect kernel occupancy, tensor core usage, or memory utilization per kernel. Add NVTX ranges in your code path so GPU traces map to logical phases of your pipeline.

  • Watch the right alerts. Typical alert examples:

    • GPU memory > 90% for 3 minutes — likely imminent OOM.
    • Sustained low SM utilization (< 20%) while PCIe is saturated — likely host-mediated transfers.
    • Scheduler backlog (# tasks queued) rising while overall GPU utilization stays low — likely too many tiny tasks or heavy serialization overhead.

Important: GPU utilization alone is a misleading health signal. Low SM utilization with high PCIe traffic means the GPUs are waiting on data; high utilization but high spill rates means memory pressure. Correlate multiple signals before making scaling decisions.

Operational plumbing: deploy kube-prometheus-stack + dcgm-exporter and import NVIDIA’s DCGM Grafana dashboard for fast insights.

Scaling strategies across nodes, fabrics, and failure domains

  • Use adaptive scaling at the right layer. For developer-experimentation and bursty workloads run Dask adaptive scaling (cluster.adapt(minimum=..., maximum=...)) so workers follow backlog. For production, rely on Kubernetes cluster autoscaler for node provisioning and control cluster shape (GPU types, accelerators) with node pools. Combine Dask adaptive scaling with Kubernetes autoscaler so you don’t oversubscribe nodes or trigger churn.

  • Warm pools and image pre-pull reduce startup friction. GPU instance boot and driver init are expensive. Keep a small warm pool of pre-warmed nodes or use DaemonSet pre-pulls to minimize time-to-capacity during scaling events.

  • Tune UCX per fabric. On NVLink-only nodes enable nvlink transport; on IB clusters enable infiniband and rdmacm interface selection in the UCX config. Explicitly set DASK_DISTRIBUTED__UCXX__CREATE_CUDA_CONTEXT=True where recommended so UCX initializes correctly in scheduler/worker processes. These settings enable GPUDirect paths and remove host-copy dominated transfers.

  • Design for fault domains. Spread replicas across Kubernetes topology zones and nodes; use application-level checkpointing on critical intermediates (e.g., write pre-shuffle aggregates to S3 or Parquet) so retries do not re-run large upstream pipelines. Use Dask-friendly object stores (S3, GCS, or a shared POSIX layer) for durable intermediate storage.

  • Resistance to stragglers. Use partial aggregations and replication of hot partitions where acceptable (keep a few extra copies of critical partitions) so the scheduler can reschedule work without waiting on a slow node.

Operational citations: UCX and Dask integration examples; Dask Kubernetes and Dask Gateway deployment patterns for autoscaling and multi-tenant management.

Production-ready checklist and step-by-step deploy protocol

  1. Image and dependency hygiene

    • Build a GPU base image with the exact CUDA, cuDF/cuML and dask/dask-cuda versions your pipeline uses. Pin versions and publish with digest tags to your registry.
    • Install dcgm-exporter and ensure the GPU Operator’s DCGM integration is enabled for metrics.
  2. Install infrastructure via Helm (example commands)

# GPU Operator
helm repo add nvidia https://helm.ngc.nvidia.com/nvidia && helm repo update
helm install nvidia-gpu-operator nvidia/gpu-operator -n gpu-operator --create-namespace --wait

# Dask (single-tenant) - pin chart versions for repeatability
helm repo add dask https://helm.dask.org && helm repo update
helm install my-dask dask/dask -n dask --create-namespace --wait
Enter fullscreen mode Exit fullscreen mode

Sources: GPU Operator and Dask Helm charts.

  1. Configure UCX + RMM for scheduler and workers (scheduler example)
# Scheduler (run in a Pod spec or container command)
env:
  - name: DASK_DISTRIBUTED_UCXX__CREATE_CUDA_CONTEXT
    value: "True"
  - name: DASK_DISTRIBUTED_UCXX__RMM__POOL_SIZE
    value: "12GB"
command: ["dask-scheduler", "--protocol", "ucx", "--interface", "ib0"]
Enter fullscreen mode Exit fullscreen mode

Worker example (dask-cuda worker CLI):

dask-cuda-worker tcp://scheduler:8786 \
  --nthreads 1 \
  --memory-limit 0.85 \
  --rmm-pool-size 12GB \
  --enable-cudf-spill \
  --protocol ucx
Enter fullscreen mode Exit fullscreen mode

Validate that UCX picks the correct transports and that workers show ucx traffic in the dashboard.

  1. Kubernetes pod spec details

    • limits.nvidia.com/gpu: 1 in the container.
    • Match container --memory-limit to the pod resources.limits.memory.
    • Set nodeSelector/nodeAffinity to GPU node labels set by NFD or your cloud provider.
  2. Test and CI

    • Unit tests run locally in a small CPU/GPU matrix.
    • Integration: spin a minimal test cluster using kind, k3d, or a small cloud staging cluster with the GPU Operator and a single GPU node (or use a mocked workflow where GPUs are not required for the CI but the operator and CRDs are exercised). Dask Gateway test strategies show patterns for CI with Kubernetes backends.
    • Add performance_report capture in integration tests for a reproducible profiling artifact.
  3. Observability and runbook

    • Dashboards: Dask UI + Grafana dashboard with DCGM panel.
    • Alerts: GPU memory pressure, scheduler backlog, long-running tasks, spill thresholds.
    • Runbook: documented steps for diagnosing OOMs (check rmm pool, inspect dask-worker logs, capture performance_report, collect DCGM timeseries).
  4. Progressive rollout

    • Deploy changes to a staging namespace with identical GPU type and drivers.
    • Use canary traffic for heavy shuffle jobs (run subset of production queries) and compare latency/throughput against baseline.
    • Promote images by digest; do not depend on :latest in production.
  5. Cost & capacity planning

    • Measure TB/hour processed and GPU hours per TB as a KPI. Use these metrics to size node pools and balance TCO vs latency requirements.

Quick checklist table

Phase Must-have artifacts
Image build Pinned image with CUDA & RAPIDS, digest tag
Infra GPU Operator Helm + Dask Helm install manifests
Run config UCX env, rmm_pool_size, --enable-cudf-spill flags
Observability DCGM exporter + Dask Prometheus + Grafana dashboards
CI Integration test that runs performance_report

Sources and further reading used for these steps: GPU Operator install guides; dask-cuda UCX & RMM flags; Dask Helm chart and Gateway docs; DCGM exporter guidance.

Treat this as an engineering checklist you run down before scaling your next pipeline: pin images and libraries, let the GPU Operator manage drivers and telemetry, tune RMM and UCX for your fabric, partition and pre-aggregate to avoid shuffles, instrument both Dask and GPU stacks, and use adaptive + cluster autoscaling in concert rather than separately. This approach turns GPU counts into predictable capacity rather than a hope.

Sources:
NVIDIA GPU Operator (latest docs) - Operator responsibilities, NFD node labeling, DCGM integration, MIG and CDI support, and Helm install examples.

dask-cuda (RAPIDS) deployment docs - dask-cuda-worker / UCX examples, rmm_pool_size and --enable-cudf-spill flags and per-worker memory controls.

Dask DataFrame best practices & shuffle documentation - Partition sizing guidance, avoiding shuffles, broadcast patterns and optimizer notes.

NVIDIA dcgm-exporter (GitHub) - How to deploy DCGM exporter, Prometheus integration, and recommended Grafana dashboards.

cuDF Arrow interop documentation - ArrowDeviceArray and zero-copy device <-> Arrow interop details for avoiding host copies.

Dask Helm charts and Kubernetes deployment docs - Dask Helm charts, Dask Kubernetes operator and Dask Gateway deployment patterns for Kubernetes.

RMM (RAPIDS Memory Manager) GitHub repo - RMM features, pool and async allocator options, and integration notes for other libraries.

UCX / ucx-py and integration guidance - UCX/ucx-py rationale for NVLink / RDMA and how it enables GPU-to-GPU communication; plus dask-cuda UCX configuration references.

Dask diagnostics: performance_report, Client.profile, task streams - performance_report, Client.profile() and get_task_stream() usage for offline analysis.

Kubernetes device plugins and scheduling GPUs - How Kubernetes advertises and schedules GPUs (nvidia.com/gpu), and device plugin behavior and constraints.

Top comments (0)