DEV Community

Alex Spinov
Alex Spinov

Posted on

Benthos Has a Free API — Heres How to Build Stream Processing Pipelines

Benthos (now Redpanda Connect) is a stream processor that connects everything — Kafka, HTTP, S3, databases, and 200+ connectors. Configure pipelines in YAML, no code needed.

Why Benthos?

  • 200+ connectors: Kafka, NATS, S3, HTTP, AMQP, Redis, PostgreSQL
  • YAML config: No code needed for data pipelines
  • Processors: Map, filter, transform with Bloblang
  • Exactly-once: Delivery guarantees
  • Single binary: No JVM, no dependencies
  • Testable: Built-in unit testing for pipelines

Install

# Binary
curl -Lsf https://sh.benthos.dev | bash

# Docker
docker run --rm -v ./config.yaml:/benthos.yaml jeffail/benthos

# Homebrew
brew install benthos
Enter fullscreen mode Exit fullscreen mode

Basic Pipeline

# config.yaml
input:
  http_server:
    path: /post
    allowed_verbs: [POST]

pipeline:
  processors:
    - mapping: |
        root.message = this.text.uppercase()
        root.timestamp = now()
        root.source = "api"

output:
  stdout: {}
Enter fullscreen mode Exit fullscreen mode

Run: benthos -c config.yaml

Kafka to S3 Pipeline

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
    consumer_group: benthos-group

pipeline:
  processors:
    - mapping: |
        root = this
        root.processed_at = now()
    - filter:
        mapping: 'this.type == "purchase"'

output:
  aws_s3:
    bucket: my-data-lake
    path: 'events/${!count("files")}.json'
Enter fullscreen mode Exit fullscreen mode

Bloblang Transformations

pipeline:
  processors:
    - mapping: |
        root.user_id = this.user.id
        root.total = this.items.map_each(item -> item.price * item.qty).sum()
        root.currency = this.currency.or("USD")
        root.is_large_order = root.total > 1000
Enter fullscreen mode Exit fullscreen mode

HTTP to Database

input:
  http_client:
    url: https://api.example.com/data
    verb: GET
    rate_limit: api_limit
    headers:
      Authorization: 'Bearer ${API_TOKEN}'

rate_limit_resources:
  - label: api_limit
    local:
      count: 10
      interval: 1s

output:
  sql_insert:
    driver: postgres
    dsn: 'postgres://user:pass@localhost:5432/mydb'
    table: events
    columns: [id, type, data, created_at]
    args_mapping: 'root = [this.id, this.type, this.data.string(), now()]'
Enter fullscreen mode Exit fullscreen mode

Fan-Out (Multiple Outputs)

output:
  broker:
    pattern: fan_out
    outputs:
      - kafka:
          addresses: [localhost:9092]
          topic: processed-events
      - aws_s3:
          bucket: archive
          path: '${!count("s3")}.json'
      - http_client:
          url: https://webhook.example.com
          verb: POST
Enter fullscreen mode Exit fullscreen mode

Unit Testing

tests:
  - name: Test purchase filter
    target_processors: /pipeline/processors
    input_batch:
      - content: '{"type": "purchase", "amount": 50}'
      - content: '{"type": "view", "page": "/home"}'
    output_batches:
      - - content_equals: '{"type": "purchase", "amount": 50}'
Enter fullscreen mode Exit fullscreen mode

Run: benthos test config.yaml

Real-World Use Case

A data team replaced 2000 lines of Python ETL scripts with a 40-line Benthos YAML config. Pipeline went from 45-minute batch jobs to real-time streaming with exactly-once delivery.


Need to automate data collection? Check out my Apify actors for ready-made scrapers, or email spinov001@gmail.com for custom solutions.

Top comments (0)