DEV Community

Cover image for I Built chanprobe Because My Go Queues Were Invisible
Pavel Sanikovich
Pavel Sanikovich

Posted on

I Built chanprobe Because My Go Queues Were Invisible

I like Go channels.

They are one of those language features that feel simple in the best possible way.

You can write something like this:

jobs := make(chan Job, 1024)

go func() {
    for job := range jobs {
        process(job)
    }
}()
Enter fullscreen mode Exit fullscreen mode

And for a lot of cases, that is enough.

Clean, readable, idiomatic.

But after using channels in real services, I kept running into the same uncomfortable problem:

once a channel becomes part of your production pipeline, it also becomes a place where latency can hide.

And native channels do not tell you much.

You can check:

len(jobs)
cap(jobs)
Enter fullscreen mode Exit fullscreen mode

But that is basically it.

That tells you how many items are buffered right now, but it does not answer the questions I usually care about when something is slow.

For example:

Is the producer blocked?
Is the consumer too slow?
How long does an item wait before processing?
Did we drop anything?
When did backpressure start?
Which internal queue is causing the delay?
Enter fullscreen mode Exit fullscreen mode

That is why I started building chanprobe.

Repository:

github.com/devflex-pro/chanprobe

The kind of problem I wanted to solve

Imagine a service that delivers webhooks.

The flow is simple:

HTTP request -> validate -> enrich -> queue -> deliver to customer
Enter fullscreen mode Exit fullscreen mode

Somewhere in the middle, there is usually a channel:

deliveryQueue := make(chan WebhookJob, 10_000)
Enter fullscreen mode Exit fullscreen mode

This works fine until customers start saying:

“Sometimes webhooks arrive 30 seconds late.”

At that point, you start looking around.

CPU looks fine.

Memory looks fine.

The database is not obviously slow.

Logs do not show errors.

The service is not crashing.

But something is still wrong.

One possible explanation is that the delivery workers are slower than the producers. The queue starts filling up. Jobs spend more and more time waiting before a worker picks them up. Eventually, your latency is not in the database or in the network.

It is sitting inside an in-memory channel.

With a normal channel, I can inspect the current length:

fmt.Println(len(deliveryQueue))
Enter fullscreen mode Exit fullscreen mode

But that does not tell me how long the oldest job has been waiting.

And for production debugging, this difference matters a lot.

This is useful:

queue length: 8241 / 10000
Enter fullscreen mode Exit fullscreen mode

But this is much more useful:

oldest item age: 37s
Enter fullscreen mode Exit fullscreen mode

Because now I know that at least one job has already waited 37 seconds before processing.

That is not just a metric.

That is an explanation.

What I wanted the API to feel like

I did not want to build a huge framework.

I also did not want to replace every channel in a Go codebase.

I wanted something explicit that I could use at important async boundaries.

Something like this:

jobs := chanprobe.New[Job]("webhook_delivery", 10_000)

if err := jobs.Send(ctx, job); err != nil {
    return err
}

job, ok := jobs.Recv(ctx)
if !ok {
    return
}

process(job)
Enter fullscreen mode Exit fullscreen mode

The queue has a name, because names matter in observability.

I do not want to know that “some goroutine is blocked on channel send”.

I want to know that webhook_delivery is full, or that email_sender is dropping work, or that image_resize has items waiting for 12 seconds.

Basic usage

Here is a small example:

package main

import (
    "context"
    "fmt"

    "github.com/devflex-pro/chanprobe"
)

func main() {
    ctx := context.Background()

    jobs := chanprobe.New[string]("jobs", 1024)
    defer jobs.Close()

    if err := jobs.Send(ctx, "hello"); err != nil {
        panic(err)
    }

    job, ok := jobs.Recv(ctx)
    if !ok {
        return
    }

    fmt.Println("processed:", job)
}
Enter fullscreen mode Exit fullscreen mode

This is intentionally boring.

The interesting part is not that it can send and receive values.

Channels already do that.

The interesting part is that the queue can describe what is happening inside it.

snapshot := jobs.Snapshot()

fmt.Printf("name: %s\n", snapshot.Name)
fmt.Printf("len: %d\n", snapshot.Len)
fmt.Printf("cap: %d\n", snapshot.Cap)
fmt.Printf("sent: %d\n", snapshot.SentTotal)
fmt.Printf("received: %d\n", snapshot.ReceivedTotal)
fmt.Printf("dropped: %d\n", snapshot.DroppedTotal)
fmt.Printf("oldest item age: %s\n", snapshot.OldestItemAge)
Enter fullscreen mode Exit fullscreen mode

In a real service, this gives me a much better starting point during debugging.

Instead of guessing where latency lives, I can ask the queue directly.

Context-aware send and receive

One thing I wanted from the beginning was context support.

With a native channel send, this can block forever:

jobs <- job
Enter fullscreen mode Exit fullscreen mode

Of course, you can write a select manually:

select {
case jobs <- job:
    return nil
case <-ctx.Done():
    return ctx.Err()
}
Enter fullscreen mode Exit fullscreen mode

That is fine, but if every important queue needs the same behavior, I prefer to make it part of the abstraction.

With chanprobe:

if err := jobs.Send(ctx, job); err != nil {
    return err
}
Enter fullscreen mode Exit fullscreen mode

And receiving is similar:

job, ok := jobs.Recv(ctx)
if !ok {
    return
}
Enter fullscreen mode Exit fullscreen mode

For me, this is less about saving a few lines of code and more about making queue behavior consistent across the project.

Drop policies

Not every queue should block forever when it is full.

Sometimes blocking is correct.

For example, if every job must be processed, backpressure should probably propagate to the producer.

Sometimes dropping the newest item is correct.

For example, if the system is overloaded and new work can be rejected.

Sometimes dropping the oldest item is correct.

For example, if you only care about the latest state and old queued values are already stale.

So chanprobe supports different policies.

The default policy is blocking:

jobs := chanprobe.New[Job]("jobs", 1024)
Enter fullscreen mode Exit fullscreen mode

You can also choose DropNewest:

jobs := chanprobe.New[Job](
    "jobs",
    1024,
    chanprobe.WithDropPolicy(chanprobe.DropNewest),
)
Enter fullscreen mode Exit fullscreen mode

Or DropOldest:

jobs := chanprobe.New[Job](
    "latest_events",
    1024,
    chanprobe.WithDropPolicy(chanprobe.DropOldest),
)
Enter fullscreen mode Exit fullscreen mode

The point is not that one policy is better than another.

The point is that queue behavior should be intentional.

If work can be dropped, I want that to be visible.

If producers are blocked, I want that to be visible too.

What the queue can tell you

A snapshot contains things like:

type Snapshot struct {
    Name              string
    Len               int
    Cap               int
    Closed            bool

    SentTotal         uint64
    ReceivedTotal     uint64
    DroppedTotal      uint64

    SendBlockedTotal  uint64
    RecvBlockedTotal  uint64

    SendWaitTotal     time.Duration
    RecvWaitTotal     time.Duration
    ItemWaitTotal     time.Duration

    OldestItemAge     time.Duration
}
Enter fullscreen mode Exit fullscreen mode

The fields I personally care about most are usually not Len and Cap.

They are useful, but they are not enough.

The more interesting fields are:

snapshot.OldestItemAge
snapshot.DroppedTotal
snapshot.SendBlockedTotal
snapshot.SendWaitTotal
snapshot.ItemWaitTotal
Enter fullscreen mode Exit fullscreen mode

Because they explain behavior.

If DroppedTotal is growing, the system is losing work.

If SendBlockedTotal is growing, producers are being slowed down.

If OldestItemAge is high, queue latency is becoming part of user-visible latency.

That is the signal I wanted.

Debugging with expvar

I wanted the core package to stay lightweight.

I did not want to force Prometheus, OpenTelemetry, or any other dependency on users.

So the first built-in exporter is based on expvar.

Example:

package main

import (
    "net/http"

    "github.com/devflex-pro/chanprobe"
)

func main() {
    chanprobe.PublishExpvar("chanprobe", nil)

    http.ListenAndServe(":8080", nil)
}
Enter fullscreen mode Exit fullscreen mode

Then you can inspect:

curl http://localhost:8080/debug/vars
Enter fullscreen mode Exit fullscreen mode

This is not meant to be the final observability story for every production system.

It is just a simple way to expose what the queues know.

Prometheus and OpenTelemetry exporters can live separately without making the core package heavier.

Why not just use pprof or runtime/trace?

I use those tools too.

They are extremely useful.

But I see them as solving a slightly different problem.

pprof and runtime/trace help me understand what the Go runtime is doing.

chanprobe is more application-level.

It is not trying to tell me only that goroutines are blocked.

It is trying to tell me which named queue is responsible.

There is a big practical difference between these two statements:

some goroutines are blocked on channel send
Enter fullscreen mode Exit fullscreen mode

and:

webhook_delivery is 98% full and the oldest item has been waiting for 37s
Enter fullscreen mode Exit fullscreen mode

The second one is much closer to the way I debug real services.

What I intentionally did not build

I deliberately avoided the “clever” version of this project.

There is no unsafe.

There is no runtime monkey-patching.

There is no attempt to resize Go channels magically.

There is no global goroutine scanning.

There is no promise that this is faster than channels.

Actually, it should be obvious: this adds instrumentation, so it has overhead.

That is why I would not use it everywhere.

I would use it only where queue visibility is worth the cost.

For small internal coordination channels, native Go channels are still perfect.

For important queues in production pipelines, I want more information.

What I learned while building it

The hardest part was not making a queue.

The hard part was deciding what behavior should be explicit.

What should happen when the queue is full?

Should send block or fail?

What should happen after close?

Should existing items still be receivable?

What exactly counts as a dropped item?

Which metrics are actually useful, and which ones are just noise?

I also realized that a queue is not just an implementation detail.

In many services, it is part of the system’s behavior.

It can hide latency.

It can create backpressure.

It can drop work.

It can make producers slow.

It can make consumers look fine while users are waiting.

If a queue can affect production behavior, I think it deserves observability.

Current status

chanprobe currently has:

generic bounded queues
context-aware Send and Recv
non-blocking TrySend and TryRecv
drop policies
snapshots
registry
expvar support
examples
tests
benchmarks
Enter fullscreen mode Exit fullscreen mode

The repository is here:

github.com/devflex-pro/chanprobe

It is still small, but already useful enough to try in real Go services.

My next ideas are a Prometheus exporter, better examples, and maybe more detailed latency metrics without making the core package too heavy.

If you have Go services with worker pools, event pipelines, background jobs, or internal queues, I would be curious to hear if this kind of visibility would help you debug production issues faster.

Top comments (0)