DEV Community

Alex Spinov
Alex Spinov

Posted on

Benthos Has a Free API: Build Data Pipelines in YAML Without Writing Code

Benthos (now Redpanda Connect) is a stream processor that lets you build data pipelines entirely in YAML. It connects 200+ inputs, processors, and outputs — and has a built-in REST API for monitoring and management.

Why Benthos?

  • No code required — declarative YAML pipelines
  • 200+ connectors — Kafka, HTTP, S3, GCP, AMQP, NATS, SQL, and more
  • Built-in REST API — health checks, metrics, and runtime config
  • Single binary — download and run, no dependencies

Quick Start

# Install
curl -Lsf https://sh.benthos.dev | bash

# Or via Docker
docker run --rm -p 4195:4195 redpandadata/connect
Enter fullscreen mode Exit fullscreen mode

Example Pipeline (YAML)

# pipeline.yaml — HTTP to Kafka
input:
  http_server:
    path: /ingest
    allowed_verbs: [POST]

pipeline:
  processors:
    - mapping: |
        root.timestamp = now()
        root.data = this
        root.source = "api"

output:
  kafka:
    addresses: ["localhost:9092"]
    topic: events
Enter fullscreen mode Exit fullscreen mode
benthos -c pipeline.yaml

# Send data
curl -X POST http://localhost:4195/ingest \
  -d '{"user": "alice", "action": "login"}'
Enter fullscreen mode Exit fullscreen mode

Built-in REST API

# Health check
curl http://localhost:4195/ping
# Response: {"status": "ok"}

# Get current config
curl http://localhost:4195/resources

# Metrics (Prometheus format)
curl http://localhost:4195/metrics

# Readiness probe (for k8s)
curl http://localhost:4195/ready
Enter fullscreen mode Exit fullscreen mode

Real-World Pipeline Examples

CSV to PostgreSQL

input:
  file:
    paths: ["./data/*.csv"]
    codec: csv

pipeline:
  processors:
    - mapping: |
        root.name = this.name.uppercase()
        root.email = this.email.lowercase()
        root.created_at = now()

output:
  sql_insert:
    driver: postgres
    dsn: "postgres://user:pass@localhost:5432/mydb"
    table: users
    columns: [name, email, created_at]
    args_mapping: root = [this.name, this.email, this.created_at]
Enter fullscreen mode Exit fullscreen mode

S3 to Elasticsearch

input:
  aws_s3:
    bucket: my-logs
    prefix: "2026/"

pipeline:
  processors:
    - mapping: |
        root = this.parse_json()
        root.indexed_at = now()

output:
  elasticsearch:
    urls: ["http://localhost:9200"]
    index: logs-2026
Enter fullscreen mode Exit fullscreen mode

Bloblang: The Built-in Mapping Language

processors:
  - mapping: |
      # Filter
      root = if this.status == "error" { this } else { deleted() }

      # Transform
      root.full_name = this.first_name + " " + this.last_name

      # Enrich
      root.hash = this.email.hash("sha256")
Enter fullscreen mode Exit fullscreen mode

Key Features

Feature Details
Connectors 200+ (Kafka, S3, HTTP, SQL, AMQP, NATS, GCP)
Processing Mapping, filtering, batching, deduplication
Error handling Dead-letter queues, retries, circuit breakers
Monitoring Prometheus metrics, health endpoints
Deployment Binary, Docker, Kubernetes, Lambda

Resources


Need custom data pipelines or web scraping automation? Check out my scraping tools on Apify or email spinov001@gmail.com for tailored solutions.

Top comments (0)