DEV Community

Manish
Manish

Posted on

Learning, Experimenting - Concurrency in Go - Part 2

Refresher - I'm building a distributed chunked filestore in Go, and I setup a post for Part 1 here. That part dealt with uploading a file - this post is about downloads.

Setup

Requirements

  • User hits our endpoint with the filename/fileid
  • We use this fileid to get a list of chunks
  • Our retrieve mechanism only depends on this list of chunks
  • We want to be able to retrieve the associated chunks in parallel
  • We want correctness (duh, but I'll explain why I explicitly stated this)

The Test Suite

First up, a simple test suite:

type fakeWriter struct {
    hdr http.Header
}

func (w *fakeWriter) Header() http.Header {
    if w.hdr == nil {
        w.hdr = make(http.Header)
    }
    return w.hdr
}

func (w *fakeWriter) WriteHeader(int) {}
func (w *fakeWriter) Write(p []byte) (int, error) {
    return len(p), nil
}

func BenchmarkRetrieveChunk(b *testing.B) {
    dir := b.TempDir()
    const numChunks = 12
    const chunkBytes = 64 << 10

    chunks := make([]Chunk, numChunks)
    for i:= range(numChunks) {
        body := bytes.Repeat([]byte{byte('A'+i)}, chunkBytes)
        path := filepath.Join(dir, fmt.Sprintf("%v.%v", "chunk", i))
        if err := os.WriteFile(path, body, 0o600); err != nil {
            b.Fatal(fmt.Errorf("Error while setting up benchmark - %v", err))
        }
        chunks[i] = Chunk{ChunkIndex: i, ChunkSize: chunkBytes, StoreLoc: path}
    }
    s := &Server{
    config: Config{
        Store: b.TempDir(), 
        Chunksize: 10,
    }, 
    concurrencyConfig: ConcurrencyConfig{
        MaxUploadConcurrency: 5, 
        MaxDownloadConcurrencyPerFile: 1,
    }}

    b.SetBytes(int64(numChunks)*chunkBytes)
    b.ResetTimer()
    for b.Loop() {
        s.retrieveChunks(&fakeWriter{}, "chunk", &chunks)
    }
}
Enter fullscreen mode Exit fullscreen mode

A bit more involved than our storing mechanism.

Why define a fakeWriter? The files are usually large, which means we'd have to stream the chunks as we read them. In such a case, as soon as a chunk is read, we want to immediately write it to the upstream http writer while the other chunks download in the background in parallel.

The Concurrent Chunk retrieval

With the concurrent file upload, we are able to reuse a global chan. This is because once the chunk info is fed, it does not matter in which order the goroutines complete - they just need to get stored anyway. However, with our concurrent file download, we want to have them fetched in order. For example, for a file split into 5 chunks, we have to stream the file back in order of 1..5. However, if chunk 5 finishes before chunk 4, we have to wait until chunk 4 is finished before streaming it back.

But that just means we have to fetch them in order. Why would it mean we can't reuse a global chan? We could, but it would be a bit more complex than what we want to explore at the moment. With a global chan, we also have to handle ordering in such a way that it doesn't overlap across multiple file download requests i.e. separate requests must be isolated from their ordering. I will be exploring ways to do it eventually, but for now we shall move on.

Implementing ordering

How do we go about implementing ordering? We know that chunks are fetched from the bufpool. We can start by doing spawning them in order like this:

queue := make(chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
            return s.readChunk(ctx, queue, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%v", filename))
for res := range(queue) {
    if _, err := w.Write(res); err != nil {
        return
    }
}
if err := g.Wait(); err != nil{
    http.Error(w, "Stream disconnected", http.StatusInternalServerError)
    return
}

Enter fullscreen mode Exit fullscreen mode

This would spawn the goroutine for chunk i before i+1. But that is simply ordered spawning, not ordered completion. A chan simply says - give me the next value that was sent. In that case, what happens if chunk i+1 finishes before chunk i? When looping through the chan to retrieve data, we get chunk i+1 before chunk i, which we then write back the upstream client - which violates correctness.

So what we want is to retrieve chunks in parallel but write them back sequentially. One naive way to do so would be something like this - since we know the number of chunks in advance, we can have a var readmask bool of size numchunks and block our write loop on the ith element until chunk i completes:

queue := make(chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
readMask := make([]bool, numChunks)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
                        defer func() { readMask[i] = true }
            return s.readChunk(ctx, queue, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%v", filename))
for res := range(queue) {
        while !readMask[i] {}
    if _, err := w.Write(res); err != nil {
        return
    }
}
if err := g.Wait(); err != nil{
    http.Error(w, "Stream disconnected", http.StatusInternalServerError)
    return
}
Enter fullscreen mode Exit fullscreen mode

We have a readmask defined that waits and runs an inf loop until the ith chunk is read. Seems simple, right? But this leads to a new problem: we are modifying the readmask within multiple goroutines, which means we have to wrap a mutex to prevent simultaneous data access i.e. data race conditions.

What we could do to solve this problem, is introduce an array of chans, like so:

queue := make([]chan []byte, numChunks)
g.Go(func() error {
...
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
            return s.readChunk(ctx, queue[i], (*chunkInfo)[i]) 
...
})
for i := range queue {
        data, ok := <- queue[i]
        // handle data and ok
    if _, err := w.Write(data); err != nil {
        return
    }
}

Enter fullscreen mode Exit fullscreen mode

queue[i] is closed at readChunk, and so, our <-queue[i] blocks until this closure. How does this our mutex problem, you ask? Well, we are not mutating the slice after setup. Every subsequent operation is read only. queue[i] simply points to the chan at that index, which was already initialized. The readChunk op just writes to the same underlying chan, not the queue[i] object - which means queue itself doesn't need a mutex.

This solves our concurrency problem. But what about boundedness? We could encounter a situation where while the 1st chunk is being streamed, all 100 chunks are read and sit in memory - because nothing is stopping us from limiting the number of goroutines we spawn. We could do something like holding a sem with capacity and releasing it. I chose a different approach - make our queue a chan of chans!

queue := make(chan chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        res := make(chan []byte, 1)
        select {
            case queue <- res:
            case <- ctx.Done():
                return ctx.Err()
        }
        g.Go(func() error { 
            defer close(res)
            return s.readChunk(ctx, res, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%v", filename))
for res := range(queue) {
    data, ok := <- res
    if !ok {
        break
    }
    if _, err := w.Write(data); err != nil {
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we have everything we want! Concurrent workers and correctness in writes.

Benchmarking

I tried running the benchmark as in, and ran into similar results (N = number of concurrent workers):

N=1:    246297966 ns/op 510.88 MB/s 125860644 B/op  188 allocs/op
N=5:    253591228 ns/op 496.19 MB/s 125853408 B/op  201 allocs/op
Enter fullscreen mode Exit fullscreen mode

Interesting, we would expect concurrency to speed things up. But its just slowing things down. The main culprit - page cache.

Looking at my benchmark - I am writing my file to tempDir and reading it immediately. The read is served fresh from the page cache, which means it is not read-bound at all. We see no benefits from concurrency since we don't simulate cache misses and disk fetch.

What I tried next is to introduce an artifical lag in my read routine, like so:

func (s *Server) readChunk(ctx context.Context, cache chan[]byte, chunk Chunk) error {
    time.Sleep(50*time.Millisecond)
...
Enter fullscreen mode Exit fullscreen mode

This delays every goroutine by 50ms - cache misses are in ms range, so this should be a decent approximation for cold fetches.

And so, the results - read with 50ms delay in readChunk, 12 chunks × 10MB each:

N=1:    464712476 ns/op 270.77 MB/s 125873928 B/op  231 allocs/op
N=5:    306165247 ns/op 410.98 MB/s 125854244 B/op  217 allocs/op
N=10:   314576026 ns/op 400.00 MB/s 125857106 B/op  232 allocs/op
Enter fullscreen mode Exit fullscreen mode

This tells us where concurrency wins out - when our reads from disk are slow and data encounters a cache-miss, which means they are fetched cold from our disk.

What next?

I don't know, honestly. On the concurrency front, I think I've explored a decent amount. I can work on making this a working product but that requires auth, ui, storing file info etc. I am working on them, but they are more trivial than the above experiments. If I do learn something new, I will be sure to post them. So, until next time!

Top comments (0)