Benthos (now Redpanda Connect) is a stream processor that lets you build data pipelines entirely in YAML. It connects 200+ inputs, processors, and outputs — and has a built-in REST API for monitoring and management.
Why Benthos?
- No code required — declarative YAML pipelines
- 200+ connectors — Kafka, HTTP, S3, GCP, AMQP, NATS, SQL, and more
- Built-in REST API — health checks, metrics, and runtime config
- Single binary — download and run, no dependencies
Quick Start
# Install
curl -Lsf https://sh.benthos.dev | bash
# Or via Docker
docker run --rm -p 4195:4195 redpandadata/connect
Example Pipeline (YAML)
# pipeline.yaml — HTTP to Kafka
input:
http_server:
path: /ingest
allowed_verbs: [POST]
pipeline:
processors:
- mapping: |
root.timestamp = now()
root.data = this
root.source = "api"
output:
kafka:
addresses: ["localhost:9092"]
topic: events
benthos -c pipeline.yaml
# Send data
curl -X POST http://localhost:4195/ingest \
-d '{"user": "alice", "action": "login"}'
Built-in REST API
# Health check
curl http://localhost:4195/ping
# Response: {"status": "ok"}
# Get current config
curl http://localhost:4195/resources
# Metrics (Prometheus format)
curl http://localhost:4195/metrics
# Readiness probe (for k8s)
curl http://localhost:4195/ready
Real-World Pipeline Examples
CSV to PostgreSQL
input:
file:
paths: ["./data/*.csv"]
codec: csv
pipeline:
processors:
- mapping: |
root.name = this.name.uppercase()
root.email = this.email.lowercase()
root.created_at = now()
output:
sql_insert:
driver: postgres
dsn: "postgres://user:pass@localhost:5432/mydb"
table: users
columns: [name, email, created_at]
args_mapping: root = [this.name, this.email, this.created_at]
S3 to Elasticsearch
input:
aws_s3:
bucket: my-logs
prefix: "2026/"
pipeline:
processors:
- mapping: |
root = this.parse_json()
root.indexed_at = now()
output:
elasticsearch:
urls: ["http://localhost:9200"]
index: logs-2026
Bloblang: The Built-in Mapping Language
processors:
- mapping: |
# Filter
root = if this.status == "error" { this } else { deleted() }
# Transform
root.full_name = this.first_name + " " + this.last_name
# Enrich
root.hash = this.email.hash("sha256")
Key Features
| Feature | Details |
|---|---|
| Connectors | 200+ (Kafka, S3, HTTP, SQL, AMQP, NATS, GCP) |
| Processing | Mapping, filtering, batching, deduplication |
| Error handling | Dead-letter queues, retries, circuit breakers |
| Monitoring | Prometheus metrics, health endpoints |
| Deployment | Binary, Docker, Kubernetes, Lambda |
Resources
Need custom data pipelines or web scraping automation? Check out my scraping tools on Apify or email spinov001@gmail.com for tailored solutions.
Top comments (0)