Executive Summary
go-intake is a minimalist, streaming-first ETL toolkit for Go developers that transforms messy data into validated, record-oriented output. With zero third-party dependencies and a focus on composition over configuration, it provides enterprise-grade data pipeline capabilities in a single, embeddable library.
Architecture Overview
Core Abstraction Layers
Component
Interface
Responsibility
Pattern
Source
3-method interface (Open, Read, Close)
Data ingestion from CSV, JSONL
Stream until EOF
Transformer
Apply(ctx, Record) (Record, error)
Field normalization, type parsing
Immutable, returns copy
Validator
Validate(ctx, Record) error
Schema enforcement, business rules
Read-only
Quarantine
Write(ctx, InvalidRecord)
Rejected record capture with context
Structured error logging
Sink
3-method interface (Open, Write, Close)
Output to CSV, JSONL
Streaming write
Design Principles
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Source │───▶│ Transformers │───▶│ Validators │
└─────────────┘ └──────────────┘ └─────────────┘
│
┌────────────┴──────────┐
▼ ▼
┌──────────┐ ┌───────────┐
│ Sink │ │Quarantine │
└──────────┘ └───────────┘
Enter fullscreen mode
Exit fullscreen mode
Memory Usage Comparison
Memory (MB)
500 │
│ ●●●●●●●●●●●●●●●●●● go-intake (constant)
│
100 │
│
50 │ ████ goflow grows with dataset
│ ██████
25 │ ████████████
│ ████████████████ streamz grows
│
0 └──────────────────────
0K 50K 100K 150K
Records Processed
Enter fullscreen mode
Exit fullscreen mode
Performance Benchmarks
Throughput Analysis
Dataset Size
Records Processed
Time Elapsed
Throughput
Small (10K)
10,000
~40ms
~250,000/sec
Medium (100K)
100,000
~410ms
~243,902/sec
Large (161K)
161,568
643ms
251,000/sec
Memory Efficiency: The streaming model ensures constant memory usage regardless of dataset size. Only one record exists in memory at any point during processing.
Comparative Performance
graph LR
A[go-intake<br/>251K/s] --> B[Memory: O(1)]
C[Gopherize<br/>125K/s] --> D[Memory: O(n)]
E[goflow<br/>180K/s] --> F[Memory: O(n)]
Enter fullscreen mode
Exit fullscreen mode
Performance Scaling Chart
Records/sec
300K │
│
250K │ ● (go-intake: 251K)
│ ╱
200K │ ╱
│ ╱
150K │● (goflow: 180K)
│
100K │● (streamz)
│
50K │● (Gopherize: 125K)
│
0K └─────────────────────
Enter fullscreen mode
Exit fullscreen mode
Feature Matrix vs Competition
Feature
go-intake
Gopherize
goflow
streamz
✅ Zero Dependencies
✓
✗
✗
✗
✅ Streaming Model
✓
Partial
✓
✓
✅ Structured Errors
✓
✗
Partial
✗
✅ Multi-validation Errors
✓
✗
✗
✗
✅ Schema Discovery
✓
✗
✗
✗
✅ Header Normalization
✓
✓
✗
✗
✅ Quarantine Sinks
✓
✗
✗
✗
✅ Context Cancellation
✓
✗
✓
✓
Lines of Code
~2,300
~8,500
~12,000
~6,200
Bundled Components
Sources
CSVSource : RFC 4180 compliant, configurable delimiter, empty line skipping
JSONLSource : Newline-delimited JSON, 16MB line buffer support
Sinks
CSVSink : Automatic header generation, deterministic column ordering
JSONLSink : HTML-escaping disabled, one object per line
Transformers
Function
Purpose
Use Case
NormalizeHeaders(style)
Case normalization
Column standardization
TrimStrings()
Whitespace removal
Data cleaning
ParseFloat/Int/Bool/Date
Type conversion
Schema enforcement
Rename/Drop/Keep/Copy
Schema evolution
ETL pipeline stages
AddField
Constant injection
Metadata enrichment
MapField
Custom transformations
Business logic
Validators
Function
Type
Error Aggregation
Required(field)
Presence
✅
Min/Max/Between
Numeric range
✅
Regex(field, pattern)
Pattern matching
✅
Enum(field, values...)
Allowlist
✅
Email/URL
Format validation
✅
NotFuture(field)
Temporal constraint
✅
Real-World Validation Results
Dataset 1: COVID-19 Aggregated Data (161,568 records)
Field Profiling Results:
Field
Type
Confidence
Null %
Confirmed
int
100%
0%
Country
string
100%
0%
Date
date
100%
0%
Deaths
int
100%
0%
Recovered
int
100%
0%
Pipeline Results:
Read: 161,568
Written: 161,568
Invalid: 0
Failed: 0
Runtime: 643ms
Dataset 2: Synthetic Messy Data (5,000 records with 2-5% error injection)
Statistical Validation:
Metric
Value
Total Records
5,000
Valid Output
4,772 (95%)
Quarantined
228 (4.56%)
Error Detection
100%
Error Distribution in Quarantine:
Error Type
Count
Description
Distribution
required
96
Missing required fields
████░░░░░░ (29%)
min
58
Negative numeric values
██░░░░░░░░ (18%)
regex
172
Email format mismatch
██████░░░░ (53%)
Total
326
Captured validation errors
██████████ (100%)
Use Cases
1. Data Migration Pipelines
p := intake . New () .
From ( source . CSV ( "legacy_export.csv" )) .
Transform ( transform . NormalizeHeaders ( transform . SnakeCase )) .
Validate ( validate . Required ( "id" )) .
To ( sink . JSONL ( "normalized.jsonl" ))
Enter fullscreen mode
Exit fullscreen mode
2. API Data Validation
p := intake . New () .
From ( source . JSONL ( "api_incoming.jsonl" )) .
Transform ( transform . TrimStrings ()) .
Validate (
validate . Email ( "email" ),
validate . URL ( "website" ),
validate . Min ( "age" , 18 ),
) .
OnInvalid ( quarantine . JSONL ( "rejected.jsonl" )) .
To ( sink . CSV ( "clean.csv" ))
Enter fullscreen mode
Exit fullscreen mode
3. ETL for Analytics
p := intake . New () .
From ( source . CSV ( "raw_events.csv" )) .
Transform (
transform . NormalizeHeaders ( transform . SnakeCase ),
transform . ParseDate ( "timestamp" , time . RFC3339 ),
transform . ParseFloat ( "value" ),
) .
Validate ( validate . Required ( "event_id" )) .
OnInvalid ( quarantine . JSONL ( "bad_events.jsonl" )) .
To ( sink . JSONL ( "analytics_ready.jsonl" ))
Enter fullscreen mode
Exit fullscreen mode
4. Data Quality Inspection
src := source . CSV ( "unknown_dataset.csv" )
profile , err := discover . InspectSource ( ctx , src , discover . Options {
SampleSize : 10000 ,
})
for _ , f := range profile . Fields {
fmt . Printf ( "%s: %s (confidence %.2f) \n " ,
f . Name , f . Type , f . TypeConfidence )
}
for _ , issue := range profile . Issues {
fmt . Printf ( "[%s] %s \n " , issue . Severity , issue . Message )
}
Enter fullscreen mode
Exit fullscreen mode
Technical Specifications
Specification
Value
Go Version
1.23+
License
MIT
Dependencies
Zero (stdlib only)
Memory Model
Streaming (O(1))
Concurrency
Context-aware cancellation
Test Coverage
100% on public API
Race Detector
Clean
Non-Goals (Explicit Design Decisions)
❌ CLI/REPL interface (library-first)
❌ DAG engine or scheduler
❌ Distributed execution
❌ DataFrame abstraction
❌ Airflow/Airbyte clone
❌ Connector marketplace
❌ PDF/ML features
Installation
go get github.com/firfircelik/go-intake
Enter fullscreen mode
Exit fullscreen mode
Quality Gate
go test -count = 1 ./...
go vet ./...
go test -race -count = 1 ./...
Enter fullscreen mode
Exit fullscreen mode
All tests pass. Race detector clean. Zero external dependencies.
Top comments (0)