Benthos (now Redpanda Connect) is a stream processor that connects everything — Kafka, HTTP, S3, databases, and 200+ connectors. Configure pipelines in YAML, no code needed.
Why Benthos?
- 200+ connectors: Kafka, NATS, S3, HTTP, AMQP, Redis, PostgreSQL
- YAML config: No code needed for data pipelines
- Processors: Map, filter, transform with Bloblang
- Exactly-once: Delivery guarantees
- Single binary: No JVM, no dependencies
- Testable: Built-in unit testing for pipelines
Install
# Binary
curl -Lsf https://sh.benthos.dev | bash
# Docker
docker run --rm -v ./config.yaml:/benthos.yaml jeffail/benthos
# Homebrew
brew install benthos
Basic Pipeline
# config.yaml
input:
http_server:
path: /post
allowed_verbs: [POST]
pipeline:
processors:
- mapping: |
root.message = this.text.uppercase()
root.timestamp = now()
root.source = "api"
output:
stdout: {}
Run: benthos -c config.yaml
Kafka to S3 Pipeline
input:
kafka:
addresses: [localhost:9092]
topics: [events]
consumer_group: benthos-group
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
- filter:
mapping: 'this.type == "purchase"'
output:
aws_s3:
bucket: my-data-lake
path: 'events/${!count("files")}.json'
Bloblang Transformations
pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.total = this.items.map_each(item -> item.price * item.qty).sum()
root.currency = this.currency.or("USD")
root.is_large_order = root.total > 1000
HTTP to Database
input:
http_client:
url: https://api.example.com/data
verb: GET
rate_limit: api_limit
headers:
Authorization: 'Bearer ${API_TOKEN}'
rate_limit_resources:
- label: api_limit
local:
count: 10
interval: 1s
output:
sql_insert:
driver: postgres
dsn: 'postgres://user:pass@localhost:5432/mydb'
table: events
columns: [id, type, data, created_at]
args_mapping: 'root = [this.id, this.type, this.data.string(), now()]'
Fan-Out (Multiple Outputs)
output:
broker:
pattern: fan_out
outputs:
- kafka:
addresses: [localhost:9092]
topic: processed-events
- aws_s3:
bucket: archive
path: '${!count("s3")}.json'
- http_client:
url: https://webhook.example.com
verb: POST
Unit Testing
tests:
- name: Test purchase filter
target_processors: /pipeline/processors
input_batch:
- content: '{"type": "purchase", "amount": 50}'
- content: '{"type": "view", "page": "/home"}'
output_batches:
- - content_equals: '{"type": "purchase", "amount": 50}'
Run: benthos test config.yaml
Real-World Use Case
A data team replaced 2000 lines of Python ETL scripts with a 40-line Benthos YAML config. Pipeline went from 45-minute batch jobs to real-time streaming with exactly-once delivery.
Need to automate data collection? Check out my Apify actors for ready-made scrapers, or email spinov001@gmail.com for custom solutions.
Top comments (0)