DEV Community

Manish
Manish

Posted on

Learning, Experimenting - Concurrency in Go

Torn between writing a toy project I'd use vs learning Go, I decided to try my hand at writing a simple distributed chunked filestore. Here are a few things I learned and how I moved from a naive implementation to a parallelized implementation.

There are a lot of filestore papers already that involve much more sophisticated distribution/replication strategies, and I will be building upon my project as I continue - I've got lots to learn. Anyhow, let's get started!

The Test Suite

First up, a simple test suite:

type fakeReader struct {
    remaining int
}

func (r *fakeReader) Read(p []byte) (int, error) {
    if r.remaining <= 0 {
        return 0, io.EOF
    }
    n := len(p)
    if n > r.remaining {
        n = r.remaining
    }
    for i := 0; i < n; i++ {
        p[i] = 'x'
    }
    r.remaining -= n
    return n, nil
}

func BenchmarkStoreChunk(b *testing.B) {
    const size = 1000<<20
    s := &Server{config: Config{Store: b.TempDir(), Chunksize: 10, MaxConcurrency: 1}}

    b.ResetTimer()
    b.SetBytes(int64(size))

    for b.Loop(){
        r := &fakeReader{remaining: size}
        if _, err := s.storeChunks(r, "bench"); err != nil {
            b.Fatal(err)
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Pretty straightforward. Initialize a 1000 MB byte array, call my main storeChunks function to handle the write to disk. Note that Chunksize (in MB) and MaxConcurrency (num of concurrent workers) is a configurable parameter at server initialization (for the parallelized implementation). Running it is simply:

go test -bench=. -benchmem -count=3
Enter fullscreen mode Exit fullscreen mode

The option count parameter runs it 3 times in this example.

Note - I will be skipping the server handling and the input read part, and will be going over the core storage logic only.

1. The Naive Write

numChunks := 0
chunkSize := 5 << 20
buf := make([]byte, chunkSize)
for {
    part, err := reader.NextPart()
    if err == io.EOF {
        break
    }
    if err != nil {
        // handle
    }
    for i:=0;;i++{
        n, err := io.ReadFull(part, buf)
        if err != io.EOF && err != io.ErrUnexpectedEOF && err != nil {
            // handle
        }
        if n == 0 {
            break
        }
        chunkPath := fmt.Sprintf("%v/%v.%v", s.config.Store, part.FileName(), numChunks)
        chunk, createErr := os.Create(chunkPath)
        if createErr != nil {
            // handle
        }
        _, writeErr := chunk.Write(buf[:n])
        if writeErr != nil {
            // handle
        }
        chunk.Close()
        numChunks += 1
        if err == io.EOF || err == io.ErrUnexpectedEOF {
            break
        }
    }
    // handle
}

Enter fullscreen mode Exit fullscreen mode

Simple enough. Read from the incoming part in full and write into a buf of size 5 MB. Each buf chunk is then sequentially written to disk. Taking a look at the benchmark:

3   339820142 ns/op 3085.68 MB/s       10514901 B/op     608 allocs/op
Enter fullscreen mode Exit fullscreen mode

Not bad. 3 runs, 3GB/s, 10.5MB/op, 600 allocs/op. We will come back to these numbers later.

2. The Naive Concurrent rewrite

The main issue with the naive write is that concurrency is used nowhere. The incoming part is read in 5MB chunks and written to disk sequentially. This means there is a lot of idle CPU just waiting for the write to finish before reading the next 5MB chunk. Situations like these call for concurrency.

When spawning concurrent workers, it is inevitable that they will expect to share a common variable. This introduces the need for mutexes to prevent corruption/race-conditions. And so:

var wg sync.WaitGroup
var mu sync.Mutex
var firstErr error
numChunks := 0
chunkSize := s.config.Chunksize << 20

for {
    buf := make([]byte, chunkSize)          // Comment 1 - fresh allocation every chunk
    n, err := io.ReadFull(part, buf)
    if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
        // handle
    }
    if n == 0 {
        break
    }

    chunkPath := fmt.Sprintf("%v/%v.%v", s.config.Store, fileName, numChunks)
    b, nn, c := buf, n, chunkPath
    wg.Add(1)
    go func() {                              // Comment 2 - unbounded: one goroutine per chunk, no limit
        defer wg.Done()
        chunk, err := os.Create(c)
        if err != nil {
            mu.Lock()
            if firstErr == nil { firstErr = err }
            mu.Unlock()
            return
        }
        defer chunk.Close()
        if _, err := chunk.Write(b[:nn]); err != nil {
            mu.Lock()
            if firstErr == nil { firstErr = err }
            mu.Unlock()
        }
    }()
    numChunks++
    if err == io.EOF || err == io.ErrUnexpectedEOF {
        break
    }
}

wg.Wait()
if firstErr != nil {
    // handle
}
return numChunks, nil
Enter fullscreen mode Exit fullscreen mode

Okay. Now we have a concurrent implementation that spawns multiple workers to write in parallel. This should improve our benchmark - let's test it:

3       340605556 ns/op 3078.56 MB/s    1059307005 B/op 1028 allocs/op
Enter fullscreen mode Exit fullscreen mode

So - 3 runs, 3GB/s, 1GB/op, 1028 allocs/op.
Wait - 1 GB/op?

I've let the goroutines be created unbounded, which means that every goroutine uses a freshly initialized buf. 100 goroutines with 10MB chunks -> 1 GB. As pointed out in the Comments #1 and #2.

This is a problem. I've regressed - no improvements over my disk writes while eating up my RAM, inviting pesky OOM-Killed issues as well. We can do better.

3. The Better Concurrent rewrite

With the current concurrent implementation, we would need a way to limit the number of goroutines that can be spawned. One way is to simply initialize n buffers, spawn n goroutines at once, and wait for them to complete before moving on to the next iteration(s). While this approach would cap RAM for a single file, it wouldn't work effectively for highly parallelized workloads - the number of buffers is simply number of incoming requests * n. We could also implement a global cap, which would eventually be useful to handle multiple incoming requests.

The inherent problem here is that memory is held on to for far longer than needed. At n=50, we wait for all 50 goroutines to finish. But by the time we start n=25, the goroutines that were spawned for n=1..10 might've completed. A better way would be to have some sort of a resizable record that can provide already processed buffers for reuse. And hence, channels. With that in mind:

var wg sync.WaitGroup
var mu sync.Mutex
var Err error = nil
numChunks := 0
chunkSize := s.config.Chunksize << 20
bufPool := make(chan []byte, s.config.MaxConcurrency)
for range s.config.MaxConcurrency {
    bufPool <- make([]byte, chunkSize)
}
for {
    buf := <-bufPool 
    n, err := io.ReadFull(part, buf)
    if err != io.EOF && err != io.ErrUnexpectedEOF && err != nil {
        // handle
    }
    if n == 0 {
        break
    }
    chunkPath := fmt.Sprintf("%v/%v.%v", s.config.Store, fileName, numChunks)
    wg.Add(1)
    go func(chunkPath string, buf []byte, n int) {
        defer wg.Done()
        defer func() { bufPool <- buf }() 
        chunk, err := os.Create(chunkPath)
        if err != nil {
            mu.Lock()
            if Err == nil { Err = err }
            mu.Unlock()
            return
        }
        defer chunk.Close()
        _, err = chunk.Write(buf[:n])
        if err != nil {
            mu.Lock()
            if Err == nil { Err = err }
            mu.Unlock()
            return
        }
    }(chunkPath, buf, n)
    numChunks += 1

    if err == io.EOF || err == io.ErrUnexpectedEOF {
        fmt.Println("Reached end of input!")
        break
    }
}
wg.Wait()
if Err != nil {
    return numChunks, Err
}
return numChunks, nil
Enter fullscreen mode Exit fullscreen mode

Note that we have an additional configurable parameter now - the number of concurrent workers to spawn. Here's a few benchmarks for different values of that parameter:

N=1:   3    336508657 ns/op 3116.04 MB/s       10525466 B/op     816 allocs/op
N=4:   5    217452331 ns/op 4822.10 MB/s       42000254 B/op     858 allocs/op
N=6:   5    215637466 ns/op 4862.68 MB/s       62963259 B/op     842 allocs/op
Enter fullscreen mode Exit fullscreen mode

At 1 worker - basically sequential - we see 3.1GB/s with 10.5MB/op
At 4 workers, we see 4.8GB/s with 42MB/op.
At 6 workers, we see 4.8GB/s with 63MB/op.

The number of bytes per op is simply number of workers * chunk size, as you can see. Which gives us predictable bound to stay within.

The N=1 case is an interesting datapoint here. It is just as fast as the sequential write and naive concurrent rewrite (3.1 GB/s) while saving 100x the memory (10.5MB/op vs 1GB/op) as compared to the naive write-up.

Simulating environments

The measurements above were taken from a 64GB RAM machine. Target workloads might be different, such as kubernetes pods with a limits.memory of 2GB, or a memory-capped systemd service. K8s pods use cGroups anyway, so the aforementioned scenarios are identical. Luckily, there is an easy way to simulate this cGroup behaviour:

systemd-run --scope -p MemoryMax=$MEM go test -bench=BenchmarkStoreChunk -benchmem
Enter fullscreen mode Exit fullscreen mode

Simulating for $MEM=2G and $MEM=512M gives:

Version 2GB cap (MB/s) 512M cap (MB/s)
sequential 2991 1571
naive 2869 1858
N=1 3023 1627
N=4 4762 3382
N=6 4889 3294

Multiple factors come into play with such restrictions - Go Runtime Memory limits, Garbage collection parameters etc.

What next?

This was just a simple experiment I'm documenting to come back to when I need to refresh concurrency. As for the project, I hope to integrate it with my homelab and start using it properly. I haven't even written a download handler yet, so I suppose that would be next. And a UI, a store to maintain records of file uploads/locations, and then run it as a service/container, expose to relevant devices over LAN or TailScale / Reverse-Proxy - there's so much left to do, and I'm sure I'll learn more as I progress. If you see another blog post in a few months, know that I have succeeded. If not, I've probably procrastinated on it. Anyhow, until next time!

Top comments (0)