Ever wondered how systems like Amazon S3 or MinIO actually work under the hood? I spent the last 2 months building Lilio — a distributed object storage system from scratch in Go. No frameworks, no shortcuts, just pure distributed systems engineering.
In this post, I'll walk you through the architecture, the problems I faced, the tradeoffs I made, and some benchmarks. Let's dive in.
What is Lilio?
Lilio is an S3-inspired distributed object storage system that handles:
- Chunking: Splitting large files into smaller pieces
- Replication: Storing chunks on multiple nodes for fault tolerance
- Consistent Hashing: Distributing data evenly across nodes
- Quorum Reads/Writes: Ensuring consistency in a distributed environment
- Encryption: AES-256-GCM at the bucket level
- Pluggable Backends: Support for local storage, S3, Google Drive
- Observability: Prometheus metrics out of the box
┌─────────────────────────────────────────────────────────────┐
│ Lilio │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Chunking│──│ Hashing │──│ Store │──│ Encrypt │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Storage Backends (Pluggable) │ │
│ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │ Local │ │ S3 │ │ GDrive│ │ │
│ │ └───────┘ └───────┘ └───────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
The Architecture
High-Level Flow
When you upload a file to Lilio, here's what happens:
1. File comes in (e.g., 100MB video)
2. Split into chunks (1MB each = 100 chunks)
3. For each chunk:
a. Generate chunk ID
b. Hash to find target nodes (consistent hashing)
c. Replicate to N nodes (quorum write)
d. Encrypt if bucket has encryption enabled
4. Store metadata (chunk locations, checksums)
5. Return success when write quorum achieved
Component Breakdown
┌────────────────────────────────────────────────────────────────┐
│ Lilio Engine │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ HTTP API │ │ CLI │ │
│ │ (server.go) │ │ (cli/) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ └────────────┬───────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Core Engine │ │
│ │ (lilio.go) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Streaming │ │ Quorum │ │ Encryption │ │ │
│ │ │ Chunker │ │ Manager │ │ (AES-GCM) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Metadata │ │ Hash Ring │ │ Backends │ │
│ │ Store │ │ (consist.) │ │ Registry │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
Deep Dive: The Interesting Parts
Here are the 5 most interesting technical challenges I solved:
1. Consistent Hashing — Why Simple Modulo Fails
My first implementation used simple modulo hashing:
// DON'T DO THIS
func getNode(chunkID string, totalNodes int) int {
hash := sha256.Sum256([]byte(chunkID))
return int(hash[0]) % totalNodes
}
The problem? When I added a 5th node to my 4-node cluster, approximately 80% of my data needed to relocate. In production, that's hours of downtime and massive network load.
The solution: Consistent Hashing with Virtual Nodes
type HashRing struct {
ring []uint32 // Sorted positions
positionToNode map[uint32]string // Position -> Node name
virtualNodes int // 150 per physical node
}
func (hr *HashRing) AddNode(nodeName string) {
for i := 0; i < hr.virtualNodes; i++ {
key := fmt.Sprintf("%s#%d", nodeName, i)
position := hash(key)
hr.ring = append(hr.ring, position)
hr.positionToNode[position] = nodeName
}
sort.Slice(hr.ring, func(i, j int) bool {
return hr.ring[i] < hr.ring[j]
})
}
Now when I add a node, only ~20% of data moves. That's a 4x improvement.
But I hit another bug. When two virtual nodes hashed to the same position, my collision handling was:
// WRONG: Causes clustering
for hr.positionToNode[position] != "" {
position++ // Sequential increment
}
If positions 5000, 5001, 5002 all collide, they cluster together — defeating the purpose of consistent hashing.
Fix: Rehash on collision
// CORRECT: Maintains random distribution
retryCount := 0
for hr.positionToNode[position] != "" && retryCount < 10 {
retryKey := fmt.Sprintf("%s#%d#retry%d", nodeName, i, retryCount)
position = hash(retryKey) // New random position
retryCount++
}
2. Streaming I/O — Memory Matters
My first upload implementation:
// DON'T DO THIS
func uploadFile(file io.Reader) {
data, _ := io.ReadAll(file) // 1GB in RAM
chunks := splitIntoChunks(data) // Another 1GB
for _, chunk := range chunks {
encrypt(chunk) // More copies
store(chunk)
}
}
// Result: 1GB file = 3GB RAM usage. Server crashes at 500MB.
The fix: Stream chunk by chunk
type ChunkReader struct {
reader io.Reader
chunkSize int
buffer []byte
index int
}
func (cr *ChunkReader) NextChunk() ([]byte, int, error) {
n, err := io.ReadFull(cr.reader, cr.buffer)
if err == io.EOF {
return nil, cr.index, io.EOF
}
chunk := cr.buffer[:n]
cr.index++
return chunk, cr.index - 1, nil
}
Now I process one chunk at a time. Memory usage: constant 2MB regardless of file size.
| File Size | Before (Buffered) | After (Streaming) |
|---|---|---|
| 10 MB | ~30 MB RAM | ~2 MB RAM |
| 100 MB | ~300 MB RAM | ~2 MB RAM |
| 1 GB | ~3 GB RAM (crash) | ~2 MB RAM |
| 10 GB | 💀 | ~2 MB RAM ✅ |
3. Quorum — The Heart of Distributed Consensus
In a distributed system, nodes can fail, be slow, or have stale data. Quorum ensures consistency.
The formula:
N = Total replicas
W = Write quorum (minimum nodes for successful write)
R = Read quorum (minimum nodes to read from)
Rule: W + R > N (guarantees overlap)
Example with N=3, W=2, R=2:
WRITE (storing chunk):
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node A │ │ Node B │ │ Node C │
│ v2 ✅ │ │ v2 ✅ │ │ (slow) │
└─────────┘ └─────────┘ └─────────┘
↑ ↑
Write Write
W=2 achieved → Write successful!
(Node C will catch up eventually)
READ (later):
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node A │ │ Node B │ │ Node C │
│ v2 │ │ v2 │ │ v1 │ ← stale!
└─────────┘ └─────────┘ └─────────┘
↑ ↑ ↑
Read Read Read
R=2 → Got v2 from A and B → Return v2 ✅
Background: Repair Node C with v2 🔧
Implementation:
func (s *Lilio) retrieveChunkWithQuorum(chunkInfo ChunkInfo) ([]byte, error) {
var responses []ChunkResponse
var wg sync.WaitGroup
// Read from ALL nodes in parallel
for _, nodeName := range chunkInfo.StorageNodes {
wg.Add(1)
go func(name string) {
defer wg.Done()
data, _ := backend.RetrieveChunk(chunkInfo.ChunkID)
checksum := calculateChecksum(data)
responses = append(responses, ChunkResponse{
Data: data,
Checksum: checksum,
Valid: checksum == chunkInfo.Checksum,
})
}(nodeName)
}
wg.Wait()
// Check quorum
if len(responses) < s.Quorum.R {
return nil, errors.New("read quorum failed")
}
// Find valid responses, trigger repair for stale ones
var validData []byte
var staleNodes []string
for _, resp := range responses {
if resp.Valid {
validData = resp.Data
} else {
staleNodes = append(staleNodes, resp.NodeName)
}
}
// Self-healing: fix stale nodes in background
if len(staleNodes) > 0 {
go s.readRepair(chunkInfo.ChunkID, validData, staleNodes)
}
return validData, nil
}
4. Pluggable Everything — Interface-Driven Design
I wanted Lilio to work with different storage backends and metadata stores without code changes. Go interfaces made this clean:
Storage Backend Interface:
type StorageBackend interface {
StoreChunk(chunkID string, data []byte) error
RetrieveChunk(chunkID string) ([]byte, error)
DeleteChunk(chunkID string) error
ListChunks() ([]string, error)
Info() BackendInfo
}
// Implementations
type LocalBackend struct { ... } // Local filesystem
type S3Backend struct { ... } // Amazon S3
type GDriveBackend struct { ... } // Google Drive
Metadata Store Interface:
type MetadataStore interface {
CreateBucket(name string) error
GetBucket(name string) (*BucketMetadata, error)
SaveObjectMetadata(meta *ObjectMetadata) error
GetObjectMetadata(bucket, key string) (*ObjectMetadata, error)
ListObjects(bucket, prefix string) ([]string, error)
Type() string
}
// Implementations
type LocalStore struct { ... } // JSON files
type EtcdStore struct { ... } // Distributed etcd
type MemoryStore struct { ... } // In-memory (testing)
Config-driven selection:
{
"metadata": {
"type": "etcd",
"etcd": {
"endpoints": ["node1:2379", "node2:2379", "node3:2379"]
}
},
"storages": [
{"name": "local-1", "type": "local", "path": "/data/1"},
{"name": "s3-backup", "type": "s3", "bucket": "my-bucket"}
]
}
Switch from local development to distributed production? Just change the config. Zero code changes.
5. Observability with Prometheus Metrics
One lesson I learned: metrics aren't optional. Adding them later is painful. I instrumented from day one.
Metrics Interface:
type Collector interface {
RecordPutObject(bucket string, sizeBytes int64, duration time.Duration)
RecordQuorumWrite(success bool, nodesAttempted, nodesSucceeded int)
RecordReadRepair(node string)
RecordBackendHealth(node string, healthy bool)
// ... 15+ metrics total
}
What I track:
lilio_objects_total{bucket="photos", operation="put"}
lilio_object_size_bytes{bucket="photos"} (histogram)
lilio_request_duration_seconds{operation="put"} (histogram)
lilio_quorum_write_total{status="success"}
lilio_read_repairs_total{node="backend-1"}
lilio_backend_health{node="backend-1"} (gauge: 1=up, 0=down)
Example PromQL queries:
# Average upload speed
rate(lilio_object_size_bytes_sum[5m]) / rate(lilio_object_size_bytes_count[5m])
# Quorum success rate
sum(rate(lilio_quorum_write_total{status="success"}[5m]))
/
sum(rate(lilio_quorum_write_total[5m]))
# P95 latency
histogram_quantile(0.95, lilio_request_duration_seconds_bucket)
Setup:
# docker-compose.yaml
services:
prometheus:
image: prom/prometheus
ports: ["9090:9090"]
grafana:
image: grafana/grafana
ports: ["3000:3000"]
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
Pre-built dashboard included with:
- Request rates per bucket
- P95/P99 latencies
- Quorum success rates
- Backend health status
- Read repair activity
Why this matters: When I ran load tests and saw P99 latency spike at 10MB files, metrics helped me identify the bottleneck in under 5 minutes. No metrics = flying blind.
Tradeoffs I Made
Every system has tradeoffs. Here's my CAP theorem positioning:
Consistency
/\
/ \
/ \
/ Lilio\
/ (CP) \
/ \
/____________\
Availability Partition
Tolerance
Lilio prioritizes Consistency + Partition Tolerance (CP):
| Tradeoff | Choice | Why |
|---|---|---|
| Consistency vs Availability | Consistency | Quorum writes fail if W nodes unavailable |
| Memory vs Speed | Memory | Streaming is slower but handles any file size |
| Complexity vs Features | Complexity | Pluggable backends add code but enable flexibility |
| Chunk Size | 1MB default | Balance between metadata overhead and parallelism |
Chunk Size Analysis:
Small chunks (64KB):
✅ Better parallelism
✅ Faster recovery (less data to re-replicate)
❌ More metadata overhead
❌ More network round trips
Large chunks (64MB):
✅ Less metadata
✅ Fewer network calls
❌ Slower recovery
❌ More memory per operation
Sweet spot: 1MB - 16MB depending on use case
Benchmarks
Tested on: MacBook Air M1, 8GB RAM, 3 in-memory storage backends
Configuration: N=3, W=2, R=2 (majority quorum)
Upload Performance
| File Size | Latency | Throughput | Memory Usage |
|---|---|---|---|
| 1 KB | 0.32ms | 3.16 MB/s | 1.0 MB |
| 100 KB | 0.70ms | 145.83 MB/s | 1.1 MB |
| 1 MB | 2.63ms | 398.04 MB/s | 2.1 MB |
| 10 MB | 25ms | 419.35 MB/s | 11.6 MB |
| 100 MB | 270ms | 387.68 MB/s | 106.1 MB |
Key Insight: Memory scales at ~1.06× file size. Streaming prevents loading the entire file into memory at once, but we still need buffers for chunking and encryption. The sweet spot is 1-10MB files where we hit peak throughput of 420 MB/s.
The Small File Problem
During benchmarking, I discovered an interesting bottleneck:
| File Size | Throughput | What's happening? |
|---|---|---|
| 1 KB | 3.16 MB/s | Quorum overhead dominates |
| 100 KB | 145.83 MB/s | Still overhead-heavy |
| 1 MB | 398.04 MB/s | 100× faster than 1KB! |
For tiny files, the cost of coordinating with W=2 nodes (network calls, checksums, metadata updates) overshadows the actual data transfer.
The fix? Implement a fast path for files <10KB that:
- Skips chunking (store as single chunk)
- Batches metadata updates
- Uses simpler replication logic
Estimated improvement: 10-100× throughput for small files.
Concurrent Performance (Where It Shines)
| Test | Sequential | 10 Concurrent | Scaling Factor |
|---|---|---|---|
| 1MB uploads | 398 MB/s | 1,014 MB/s | 2.5× |
The system scales almost perfectly with concurrency. Ten parallel uploads achieve 2.5× sequential throughput, validating the parallel replication architecture.
Download Performance
| File Size | Latency | Throughput |
|---|---|---|
| 1 KB | 0.14ms | 7.41 MB/s |
| 100 KB | 0.47ms | 217.79 MB/s |
| 1 MB | 2.66ms | 394.74 MB/s |
| 100 MB | 485ms | 216.15 MB/s |
Reads are generally faster than writes since there's no replication coordination — we just fetch from any R=2 replicas.
Quorum Impact (Surprising Results)
| Config | Throughput (1MB) | Expected Behavior |
|---|---|---|
| N=3, W=1, R=1 | ❌ Rejected | W+R must be > N |
| N=3, W=2, R=2 | 185.82 MB/s | Baseline (majority) |
| N=3, W=3, R=3 | 264.95 MB/s | Should be slower! |
Wait, W=3 is faster than W=2?
In my test environment (no network latency), waiting for all 3 nodes is actually simpler than checking "did we hit quorum of 2 yet?". The goroutine coordination overhead is lower.
In production with real network delays, W=2 would be faster since it doesn't wait for the slowest node. This shows why real-world testing matters — local benchmarks can mislead you.
Production Architecture
For production, here's the recommended setup:
┌─────────────────────────────────────────────────────────────┐
│ Load Balancer (NGINX/ALB) │
└─────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Lilio 1 │ │Lilio 2 │ │Lilio 3 │
│ AZ-1 │ │ AZ-2 │ │ AZ-3 │
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
└────────────────────┼────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ etcd Cluster (3 nodes) │
│ Distributed Metadata + Consensus │
└─────────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Local │ │ S3 │ │ GDrive │
│Storage │ │(backup)│ │(backup)│
└────────┘ └────────┘ └────────┘
Why etcd for metadata?
- Raft consensus: Battle-tested distributed coordination (same algorithm as Consul, CockroachDB)
- Strong consistency: Linearizable reads/writes for metadata
- Production proven: Powers Kubernetes
- Watch API: Get notified when metadata changes (enables caching)
- Multi-node: Survives node failures with quorum
Alternative I considered:
- PostgreSQL: Good for single-region, but complex to distribute
- Consul: Similar to etcd, good choice too
- DynamoDB: AWS-only, vendor lock-in
- Local JSON: Works for dev, not for production
Chose etcd because it's designed for exactly this use case — distributed metadata with strong consistency.
Limitations & What I'd Do Differently
Being honest about limitations is important. Here's what's not perfect:
1. Small File Performance
- Current: 3.16 MB/s for 1KB files
- Issue: Quorum coordination overhead dominates
- Fix: Fast path for <10KB files (planned)
2. Memory Not Truly Constant
- I initially thought streaming would keep memory flat
- Reality: Memory scales at 1.06× file size
- Why: Still need buffers for chunks, encryption, checksums
- It's still efficient (prevents 2-3× overhead of naive approach)
3. Test vs Production
- My benchmarks show W=3 faster than W=2 (no network latency)
- Production would flip this (network delays matter)
- Learning: Always test in environment similar to production
4. No Rollback on Partial Writes
- If 1 of 2 nodes fails mid-write, the successful chunk stays
- This leaves the system in inconsistent state until retry
- Fix: Implement two-phase commit or rollback logic
5. Version-Based Conflict Resolution Incomplete
- ChunkInfo has a Version field but it's not used for conflict resolution
- Read repair uses checksums, not timestamps
- Fix: Implement proper last-write-wins with version comparison
These aren't bugs — they're priorities. For a portfolio project, getting 80% working beats 100% planning.
Lessons Learned
Start with interfaces: I refactored twice before learning this. Design interfaces first, implement later.
Streaming isn't optional: Memory will bite you. Always assume files can be larger than RAM.
Distributed ≠ Complicated: Consistent hashing and quorum are surprisingly implementable. Don't be scared.
Test failure modes: Kill nodes randomly. Corrupt data. See what breaks. That's where bugs hide.
Metrics from day one: Adding Prometheus later is painful. Instrument everything upfront.
What's Next?
Performance Optimizations (from benchmarking):
- [ ] Small file fast path (10-100× improvement for <10KB files)
- [ ] Early quorum exit (20-40% latency reduction when W < N)
- [ ] Buffer pooling (30% memory reduction for high-throughput scenarios)
- [ ] Parallel chunk upload (2-3× faster for large files)
Features:
- [ ] Erasure coding (Reed-Solomon) for storage efficiency
- [ ] Multi-region replication
- [ ] Garbage collection for orphaned chunks
- [ ] Web UI dashboard
- [ ] Kubernetes operator
Try It Yourself
git clone https://github.com/subhammahanty235/lilio
cd lilio
go build -o lilio ./cmd/lilio
# Start server
./lilio server
# Create bucket and upload
./lilio bucket create photos
./lilio put vacation.jpg photos/vacation.jpg
# Download
./lilio get photos/vacation.jpg downloaded.jpg
Want to see it in action with metrics?
# Start infrastructure (etcd + Prometheus + Grafana)
docker-compose up -d
# Run with etcd metadata store
./lilio server --metadata etcd
# Open Grafana dashboard
open http://localhost:3000
# Login: admin/admin
# Navigate to pre-configured Lilio dashboard
Conclusion
Building a distributed storage system taught me more about systems design than any course or book. The concepts — consistent hashing, quorum, streaming I/O, interface-driven design — are applicable everywhere.
If you're learning Go or distributed systems, I highly recommend building something like this. Start simple, add complexity incrementally, and don't be afraid to break things.
Repository: github.com/subhammahanty235/lilio
Questions? Feedback? Found a bug? Open an issue or reach out!
If you found this useful, consider:
- ⭐ Starring the repo on GitHub
- 📢 Sharing this post with someone learning distributed systems
- 💬 Leaving feedback — what worked? what confused you?
Tags: #go #golang #distributedsystems #systemdesign #backend #storage #programming #s3 #objectstorage
Top comments (0)