Background
Go channels are one of the best things about the language. But the moment you need context cancellation, error propagation, and safe concurrent shutdown all at once, a simple chan T starts asking you to write a lot of code just to use it correctly. A common pattern looks something like this:
out := make(chan Result, len(urls))
errc := make(chan error, 1)
go func() {
defer close(out)
for _, url := range urls {
select {
case <-ctx.Done():
errc <- ctx.Err()
return
default:
}
resp, err := fetch(ctx, url)
if err != nil {
errc <- err
return
}
select {
case out <- Result{Data: resp}:
case <-ctx.Done():
errc <- ctx.Err()
return
}
}
}()
This works. But if you're not careful, it's easy to introduce bugs:
- Double-close panics — closing a channel twice crashes the program
- Goroutine leaks — the writer forgets to close, readers block forever
- Lost values — values buffered before close are never read
- Zombie producers — the writer keeps running after the consumer has already failed
The core problem is that Go channels are low-level primitives. They're powerful, but they leave all the hard parts entirely to the developer.
The Inspiration: io.Pipe
Before reaching for a channel, consider io.Pipe. It has an elegant design that solves many of the problems above.
io.Pipe is purpose-built for streaming bytes between goroutines — reading a file, proxying an HTTP body, piping command output. It handles backpressure, idempotent close, and error propagation cleanly:
pr, pw := io.Pipe()
go func() {
defer pw.Close()
io.Copy(pw, file) // stream file bytes to the reader
}()
io.Copy(dst, pr) // consume on the other end
But it only works with []byte. The moment your data is a struct, you're out of luck — io.Pipe can't help you there.
That's exactly the gap go-typedpipe fills — taking the same design philosophy as io.Pipe and making it work for any type T.
Enter go-typedpipe
go-typedpipe is io.Pipe, but for any type T.
w, r := typedpipe.New[Event]()
go func() {
defer w.Close()
for _, event := range events {
if err := w.Write(ctx, event); err != nil {
return
}
}
}()
err := r.ReadAll(ctx, func(event Event) error {
return process(event)
})
No separate error channel. No manual select on every send. No serialization. The same guarantees as io.Pipe — plus buffering, context-awareness, and a drain guarantee.
What it adds over a raw channel
chan T |
go-typedpipe |
|
|---|---|---|
| Context-aware blocking | Manual select on every send/receive |
Built into Write and Read
|
| Close error propagation | Not supported |
CloseWithError propagates to all consumers |
| Safe concurrent close | Panics on double-close | Idempotent, safe to call multiple times |
| Drain guarantee | Values may be lost after close | All buffered values remain readable after close |
| Consumer loop | Boilerplate for range or select
|
ReadAll encapsulates the loop |
How It Works
Understanding the internals will make you a better Go developer, not just a better user of this library. Let's walk through the key decisions one by one.
Under the hood, the pipe holds two channels:
type pipe[T any] struct {
ch chan T // carries values from writers to readers
done chan struct{} // closed once on shutdown
once sync.Once
err pipeError
}
Why ch is never closed
ch is never closed. This is the most important design decision in the library. In Go, sending to a closed channel causes an immediate panic — and there's no way to recover from it gracefully. Here's what that looks like:
ch := make(chan int, 1)
close(ch)
ch <- 1 // panic: send on closed channel
In a concurrent setting with multiple writers, this becomes a real danger. Two goroutines racing to close the same channel will also panic:
go func() { close(ch) }() // goroutine 1
go func() { close(ch) }() // goroutine 2 — panic: close of closed channel
go-typedpipe sidesteps the problem entirely by never closing ch. Instead, shutdown is signaled by closing done — a separate zero-value channel that carries no data, only a signal. Writers and readers select on done to know when to stop. ch stays open for the lifetime of the pipe, so there's no window for a send-on-closed panic, no matter how many goroutines are writing concurrently.
You might wonder: is it safe to never close ch? Yes — Go has a garbage collector that automatically cleans up memory that is no longer used by the program. Think of it like this: as long as something in your code is still holding a reference to an object, the GC will leave it alone. Once nothing references it anymore, the GC will reclaim the memory — no manual cleanup needed. This applies to channels too. Once the pipe goes out of scope and no goroutines are holding references to it, the GC will automatically clean up ch along with the rest of the struct. You can read more about how Go's GC works in the official guide. Not closing a channel never causes a memory leak — it only means receivers won't get the zero-value signal that closing normally provides. In this library, done takes over that responsibility entirely.
Why Write uses two selects
The double select in Write exists to give shutdown priority. To understand why, you need to know one thing about Go's select: when multiple cases are ready at the same time, Go picks one at random. That means in a single select, even if done is already closed, Go might still choose to send to ch:
// Dangerous — Go might pick p.ch <- v even after Close() is called
select {
case <-p.done:
return
case p.ch <- v: // could still be picked randomly
return
}
The fix is a non-blocking pre-check before the main select. If the pipe is already closed, we return immediately without ever touching ch:
// Priority check: bail immediately if already closed.
select {
case <-p.done:
return p.err.Load()
case <-ctx.Done():
return ctx.Err()
default: // not closed, continue
}
// Safe to proceed with the blocking send.
select {
case <-p.done:
return p.err.Load()
case <-ctx.Done():
return ctx.Err()
case p.ch <- v:
return nil
}
How the drain guarantee works
The drain guarantee is handled in Read. When done fires, Read does one non-blocking attempt to pull a buffered value before returning the close error. This means values written before Close are never lost — readers will consume everything in the buffer before seeing the close error.
How errors are propagated safely
Errors are stored atomically so that concurrent goroutines can safely read and write the close error without data races. The first error set always wins — subsequent calls to CloseWithError are silently ignored.
Real Examples
Using ReadAll
ReadAll is the right choice for the consume-all case. It handles ErrPipeClosed internally — returning nil on normal close — so callers only see real errors. It also closes the pipe automatically when it returns.
type Result struct {
URL string
StatusCode int
Body []byte
}
func scrape(ctx context.Context, urls []string, w typedpipe.Writer[Result]) {
defer w.Close()
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
w.CloseWithError(fmt.Errorf("build request %s: %w", url, err))
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
w.CloseWithError(fmt.Errorf("fetch %s: %w", url, err))
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if err := w.Write(ctx, Result{
URL: url,
StatusCode: resp.StatusCode,
Body: body,
}); err != nil {
return
}
}(url)
}
wg.Wait()
}
func main() {
ctx := context.Background()
urls := []string{
"https://example.com",
"https://example.org",
"https://example.net",
}
w, r := typedpipe.New[Result](typedpipe.WithBufferSize(len(urls)))
go scrape(ctx, urls, w)
err := r.ReadAll(ctx, func(result Result) error {
if err := saveToDatabase(result); err != nil {
return fmt.Errorf("save %s: %w", result.URL, err)
}
log.Printf("saved %s (%d)", result.URL, result.StatusCode)
return nil
})
if err != nil {
log.Fatal("scraper stopped:", err)
}
}
Important: always use
defer w.Close()in the writer goroutine. If the writer exits without closing — for example when its context is canceled — readers will block forever. SinceCloseis idempotent, calling it afterCloseWithErroris always safe.
If saveToDatabase fails, ReadAll propagates that error back to the writer via CloseWithError — so the scraper stops fetching on its next Write call.
Using Read
Use Read when you need branching logic between reads — for example, routing results differently based on status code.
for {
result, err := r.Read(ctx)
if err != nil {
if !errors.Is(err, typedpipe.ErrPipeClosed) {
log.Fatal("reader stopped:", err)
}
break
}
switch {
case result.StatusCode == http.StatusOK:
if err := saveToDatabase(result); err != nil {
r.CloseWithError(fmt.Errorf("save %s: %w", result.URL, err))
return
}
log.Printf("saved %s (%d)", result.URL, result.StatusCode)
case result.StatusCode >= 500:
log.Printf("server error %s (%d), retrying later", result.URL, result.StatusCode)
scheduleRetry(result.URL)
default:
log.Printf("skipping %s (%d)", result.URL, result.StatusCode)
}
}
When using Read directly, you're responsible for distinguishing ErrPipeClosed (normal exit) from a real error. ReadAll handles this internally — one reason to prefer it for the straightforward case.
Performance
The design decisions in "How It Works" aren't free — the double select, the done channel check, and the atomic error storage all add overhead over a raw channel. Here's what that costs in practice:
BenchmarkPipe_WriteRead/buffer_64-14 27531162 128.9 ns/op 0 B/op 0 allocs/op
BenchmarkPipe_WriteRead/buffer_1024-14 33059798 109.3 ns/op 0 B/op 0 allocs/op
BenchmarkPipe_ReadAll/buffer_64-14 27784093 130.7 ns/op 0 B/op 0 allocs/op
BenchmarkPipe_ReadAll/buffer_1024-14 33114181 108.6 ns/op 0 B/op 0 allocs/op
Zero heap allocations per operation — the pipe struct, channel, and synchronization primitives are allocated once at construction, not on every Write or Read. The extra nanoseconds come from the shutdown-priority select and context checks. That's the deliberate tradeoff: a small, predictable CPU cost in exchange for correctness guarantees that are very hard to get right with raw channels.
When Not to Use It
go-typedpipe is a synchronization primitive for in-process goroutine communication. It's not the right tool when:
- You need to broadcast the same value to multiple consumers — each value is delivered to exactly one reader
- You need durable messaging across process restarts
- You need cross-process communication — use a message broker instead
Getting Started
go get github.com/fikrimohammad/go-typedpipe/v2
Requires Go 1.18 or later. Full documentation and source at github.com/fikrimohammad/go-typedpipe.
Channels are one of Go's best features — but like any primitive, they come with sharp edges when used in complex concurrent scenarios. go-typedpipe doesn't replace channels; it wraps them with the patterns you'd write anyway, so you can focus on your business logic instead of synchronization correctness. If it saves you from one subtle goroutine leak or one send-on-closed panic, it's done its job.
Top comments (0)