DEV Community

Mohammad Fikri
Mohammad Fikri

Posted on

go-typedpipe: A Typed, Context-Aware Pipe for Go

Background

Go channels are one of the best things about the language. But the moment you need context cancellation, error propagation, and safe concurrent shutdown all at once, a simple chan T starts asking you to write a lot of code just to use it correctly. A common pattern looks something like this:

out := make(chan Result, len(urls))
errc := make(chan error, 1)

go func() {
    defer close(out)
    for _, url := range urls {
        select {
        case <-ctx.Done():
            errc <- ctx.Err()
            return
        default:
        }
        resp, err := fetch(ctx, url)
        if err != nil {
            errc <- err
            return
        }
        select {
        case out <- Result{Data: resp}:
        case <-ctx.Done():
            errc <- ctx.Err()
            return
        }
    }
}()
Enter fullscreen mode Exit fullscreen mode

This works. But if you're not careful, it's easy to introduce bugs:

  • Double-close panics — closing a channel twice crashes the program
  • Goroutine leaks — the writer forgets to close, readers block forever
  • Lost values — values buffered before close are never read
  • Zombie producers — the writer keeps running after the consumer has already failed

The core problem is that Go channels are low-level primitives. They're powerful, but they leave all the hard parts entirely to the developer.


The Inspiration: io.Pipe

Before reaching for a channel, consider io.Pipe. It has an elegant design that solves many of the problems above.

io.Pipe is purpose-built for streaming bytes between goroutines — reading a file, proxying an HTTP body, piping command output. It handles backpressure, idempotent close, and error propagation cleanly:

pr, pw := io.Pipe()

go func() {
    defer pw.Close()
    io.Copy(pw, file) // stream file bytes to the reader
}()

io.Copy(dst, pr) // consume on the other end
Enter fullscreen mode Exit fullscreen mode

But it only works with []byte. The moment your data is a struct, you're out of luck — io.Pipe can't help you there.

That's exactly the gap go-typedpipe fills — taking the same design philosophy as io.Pipe and making it work for any type T.


Enter go-typedpipe

go-typedpipe is io.Pipe, but for any type T.

w, r := typedpipe.New[Event]()

go func() {
    defer w.Close()
    for _, event := range events {
        if err := w.Write(ctx, event); err != nil {
            return
        }
    }
}()

err := r.ReadAll(ctx, func(event Event) error {
    return process(event)
})
Enter fullscreen mode Exit fullscreen mode

No separate error channel. No manual select on every send. No serialization. The same guarantees as io.Pipe — plus buffering, context-awareness, and a drain guarantee.

What it adds over a raw channel

chan T go-typedpipe
Context-aware blocking Manual select on every send/receive Built into Write and Read
Close error propagation Not supported CloseWithError propagates to all consumers
Safe concurrent close Panics on double-close Idempotent, safe to call multiple times
Drain guarantee Values may be lost after close All buffered values remain readable after close
Consumer loop Boilerplate for range or select ReadAll encapsulates the loop

How It Works

Understanding the internals will make you a better Go developer, not just a better user of this library. Let's walk through the key decisions one by one.

Under the hood, the pipe holds two channels:

type pipe[T any] struct {
    ch   chan T        // carries values from writers to readers
    done chan struct{} // closed once on shutdown
    once sync.Once
    err  pipeError
}
Enter fullscreen mode Exit fullscreen mode

Why ch is never closed

ch is never closed. This is the most important design decision in the library. In Go, sending to a closed channel causes an immediate panic — and there's no way to recover from it gracefully. Here's what that looks like:

ch := make(chan int, 1)
close(ch)
ch <- 1 // panic: send on closed channel
Enter fullscreen mode Exit fullscreen mode

In a concurrent setting with multiple writers, this becomes a real danger. Two goroutines racing to close the same channel will also panic:

go func() { close(ch) }() // goroutine 1
go func() { close(ch) }() // goroutine 2 — panic: close of closed channel
Enter fullscreen mode Exit fullscreen mode

go-typedpipe sidesteps the problem entirely by never closing ch. Instead, shutdown is signaled by closing done — a separate zero-value channel that carries no data, only a signal. Writers and readers select on done to know when to stop. ch stays open for the lifetime of the pipe, so there's no window for a send-on-closed panic, no matter how many goroutines are writing concurrently.

You might wonder: is it safe to never close ch? Yes — Go has a garbage collector that automatically cleans up memory that is no longer used by the program. Think of it like this: as long as something in your code is still holding a reference to an object, the GC will leave it alone. Once nothing references it anymore, the GC will reclaim the memory — no manual cleanup needed. This applies to channels too. Once the pipe goes out of scope and no goroutines are holding references to it, the GC will automatically clean up ch along with the rest of the struct. You can read more about how Go's GC works in the official guide. Not closing a channel never causes a memory leak — it only means receivers won't get the zero-value signal that closing normally provides. In this library, done takes over that responsibility entirely.

Why Write uses two selects

The double select in Write exists to give shutdown priority. To understand why, you need to know one thing about Go's select: when multiple cases are ready at the same time, Go picks one at random. That means in a single select, even if done is already closed, Go might still choose to send to ch:

// Dangerous — Go might pick p.ch <- v even after Close() is called
select {
case <-p.done:
    return
case p.ch <- v: // could still be picked randomly
    return
}
Enter fullscreen mode Exit fullscreen mode

The fix is a non-blocking pre-check before the main select. If the pipe is already closed, we return immediately without ever touching ch:

// Priority check: bail immediately if already closed.
select {
case <-p.done:
    return p.err.Load()
case <-ctx.Done():
    return ctx.Err()
default: // not closed, continue
}

// Safe to proceed with the blocking send.
select {
case <-p.done:
    return p.err.Load()
case <-ctx.Done():
    return ctx.Err()
case p.ch <- v:
    return nil
}
Enter fullscreen mode Exit fullscreen mode

How the drain guarantee works

The drain guarantee is handled in Read. When done fires, Read does one non-blocking attempt to pull a buffered value before returning the close error. This means values written before Close are never lost — readers will consume everything in the buffer before seeing the close error.

How errors are propagated safely

Errors are stored atomically so that concurrent goroutines can safely read and write the close error without data races. The first error set always wins — subsequent calls to CloseWithError are silently ignored.


Real Examples

Using ReadAll

ReadAll is the right choice for the consume-all case. It handles ErrPipeClosed internally — returning nil on normal close — so callers only see real errors. It also closes the pipe automatically when it returns.

type Result struct {
    URL        string
    StatusCode int
    Body       []byte
}

func scrape(ctx context.Context, urls []string, w typedpipe.Writer[Result]) {
    defer w.Close()
    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go func(url string) {
            defer wg.Done()
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                w.CloseWithError(fmt.Errorf("build request %s: %w", url, err))
                return
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                w.CloseWithError(fmt.Errorf("fetch %s: %w", url, err))
                return
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            if err := w.Write(ctx, Result{
                URL:        url,
                StatusCode: resp.StatusCode,
                Body:       body,
            }); err != nil {
                return
            }
        }(url)
    }
    wg.Wait()
}

func main() {
    ctx := context.Background()
    urls := []string{
        "https://example.com",
        "https://example.org",
        "https://example.net",
    }

    w, r := typedpipe.New[Result](typedpipe.WithBufferSize(len(urls)))
    go scrape(ctx, urls, w)

    err := r.ReadAll(ctx, func(result Result) error {
        if err := saveToDatabase(result); err != nil {
            return fmt.Errorf("save %s: %w", result.URL, err)
        }
        log.Printf("saved %s (%d)", result.URL, result.StatusCode)
        return nil
    })
    if err != nil {
        log.Fatal("scraper stopped:", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Important: always use defer w.Close() in the writer goroutine. If the writer exits without closing — for example when its context is canceled — readers will block forever. Since Close is idempotent, calling it after CloseWithError is always safe.

If saveToDatabase fails, ReadAll propagates that error back to the writer via CloseWithError — so the scraper stops fetching on its next Write call.

Using Read

Use Read when you need branching logic between reads — for example, routing results differently based on status code.

for {
    result, err := r.Read(ctx)
    if err != nil {
        if !errors.Is(err, typedpipe.ErrPipeClosed) {
            log.Fatal("reader stopped:", err)
        }
        break
    }
    switch {
    case result.StatusCode == http.StatusOK:
        if err := saveToDatabase(result); err != nil {
            r.CloseWithError(fmt.Errorf("save %s: %w", result.URL, err))
            return
        }
        log.Printf("saved %s (%d)", result.URL, result.StatusCode)
    case result.StatusCode >= 500:
        log.Printf("server error %s (%d), retrying later", result.URL, result.StatusCode)
        scheduleRetry(result.URL)
    default:
        log.Printf("skipping %s (%d)", result.URL, result.StatusCode)
    }
}
Enter fullscreen mode Exit fullscreen mode

When using Read directly, you're responsible for distinguishing ErrPipeClosed (normal exit) from a real error. ReadAll handles this internally — one reason to prefer it for the straightforward case.


Performance

The design decisions in "How It Works" aren't free — the double select, the done channel check, and the atomic error storage all add overhead over a raw channel. Here's what that costs in practice:

BenchmarkPipe_WriteRead/buffer_64-14      27531162    128.9 ns/op    0 B/op    0 allocs/op
BenchmarkPipe_WriteRead/buffer_1024-14    33059798    109.3 ns/op    0 B/op    0 allocs/op
BenchmarkPipe_ReadAll/buffer_64-14        27784093    130.7 ns/op    0 B/op    0 allocs/op
BenchmarkPipe_ReadAll/buffer_1024-14      33114181    108.6 ns/op    0 B/op    0 allocs/op
Enter fullscreen mode Exit fullscreen mode

Zero heap allocations per operation — the pipe struct, channel, and synchronization primitives are allocated once at construction, not on every Write or Read. The extra nanoseconds come from the shutdown-priority select and context checks. That's the deliberate tradeoff: a small, predictable CPU cost in exchange for correctness guarantees that are very hard to get right with raw channels.


When Not to Use It

go-typedpipe is a synchronization primitive for in-process goroutine communication. It's not the right tool when:

  • You need to broadcast the same value to multiple consumers — each value is delivered to exactly one reader
  • You need durable messaging across process restarts
  • You need cross-process communication — use a message broker instead

Getting Started

go get github.com/fikrimohammad/go-typedpipe/v2
Enter fullscreen mode Exit fullscreen mode

Requires Go 1.18 or later. Full documentation and source at github.com/fikrimohammad/go-typedpipe.

Channels are one of Go's best features — but like any primitive, they come with sharp edges when used in complex concurrent scenarios. go-typedpipe doesn't replace channels; it wraps them with the patterns you'd write anyway, so you can focus on your business logic instead of synchronization correctness. If it saves you from one subtle goroutine leak or one send-on-closed panic, it's done its job.

Top comments (0)