I like Go channels.
They are one of those language features that feel simple in the best possible way.
You can write something like this:
jobs := make(chan Job, 1024)
go func() {
for job := range jobs {
process(job)
}
}()
And for a lot of cases, that is enough.
Clean, readable, idiomatic.
But after using channels in real services, I kept running into the same uncomfortable problem:
once a channel becomes part of your production pipeline, it also becomes a place where latency can hide.
And native channels do not tell you much.
You can check:
len(jobs)
cap(jobs)
But that is basically it.
That tells you how many items are buffered right now, but it does not answer the questions I usually care about when something is slow.
For example:
Is the producer blocked?
Is the consumer too slow?
How long does an item wait before processing?
Did we drop anything?
When did backpressure start?
Which internal queue is causing the delay?
That is why I started building chanprobe.
Repository:
github.com/devflex-pro/chanprobe
The kind of problem I wanted to solve
Imagine a service that delivers webhooks.
The flow is simple:
HTTP request -> validate -> enrich -> queue -> deliver to customer
Somewhere in the middle, there is usually a channel:
deliveryQueue := make(chan WebhookJob, 10_000)
This works fine until customers start saying:
“Sometimes webhooks arrive 30 seconds late.”
At that point, you start looking around.
CPU looks fine.
Memory looks fine.
The database is not obviously slow.
Logs do not show errors.
The service is not crashing.
But something is still wrong.
One possible explanation is that the delivery workers are slower than the producers. The queue starts filling up. Jobs spend more and more time waiting before a worker picks them up. Eventually, your latency is not in the database or in the network.
It is sitting inside an in-memory channel.
With a normal channel, I can inspect the current length:
fmt.Println(len(deliveryQueue))
But that does not tell me how long the oldest job has been waiting.
And for production debugging, this difference matters a lot.
This is useful:
queue length: 8241 / 10000
But this is much more useful:
oldest item age: 37s
Because now I know that at least one job has already waited 37 seconds before processing.
That is not just a metric.
That is an explanation.
What I wanted the API to feel like
I did not want to build a huge framework.
I also did not want to replace every channel in a Go codebase.
I wanted something explicit that I could use at important async boundaries.
Something like this:
jobs := chanprobe.New[Job]("webhook_delivery", 10_000)
if err := jobs.Send(ctx, job); err != nil {
return err
}
job, ok := jobs.Recv(ctx)
if !ok {
return
}
process(job)
The queue has a name, because names matter in observability.
I do not want to know that “some goroutine is blocked on channel send”.
I want to know that webhook_delivery is full, or that email_sender is dropping work, or that image_resize has items waiting for 12 seconds.
Basic usage
Here is a small example:
package main
import (
"context"
"fmt"
"github.com/devflex-pro/chanprobe"
)
func main() {
ctx := context.Background()
jobs := chanprobe.New[string]("jobs", 1024)
defer jobs.Close()
if err := jobs.Send(ctx, "hello"); err != nil {
panic(err)
}
job, ok := jobs.Recv(ctx)
if !ok {
return
}
fmt.Println("processed:", job)
}
This is intentionally boring.
The interesting part is not that it can send and receive values.
Channels already do that.
The interesting part is that the queue can describe what is happening inside it.
snapshot := jobs.Snapshot()
fmt.Printf("name: %s\n", snapshot.Name)
fmt.Printf("len: %d\n", snapshot.Len)
fmt.Printf("cap: %d\n", snapshot.Cap)
fmt.Printf("sent: %d\n", snapshot.SentTotal)
fmt.Printf("received: %d\n", snapshot.ReceivedTotal)
fmt.Printf("dropped: %d\n", snapshot.DroppedTotal)
fmt.Printf("oldest item age: %s\n", snapshot.OldestItemAge)
In a real service, this gives me a much better starting point during debugging.
Instead of guessing where latency lives, I can ask the queue directly.
Context-aware send and receive
One thing I wanted from the beginning was context support.
With a native channel send, this can block forever:
jobs <- job
Of course, you can write a select manually:
select {
case jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
}
That is fine, but if every important queue needs the same behavior, I prefer to make it part of the abstraction.
With chanprobe:
if err := jobs.Send(ctx, job); err != nil {
return err
}
And receiving is similar:
job, ok := jobs.Recv(ctx)
if !ok {
return
}
For me, this is less about saving a few lines of code and more about making queue behavior consistent across the project.
Drop policies
Not every queue should block forever when it is full.
Sometimes blocking is correct.
For example, if every job must be processed, backpressure should probably propagate to the producer.
Sometimes dropping the newest item is correct.
For example, if the system is overloaded and new work can be rejected.
Sometimes dropping the oldest item is correct.
For example, if you only care about the latest state and old queued values are already stale.
So chanprobe supports different policies.
The default policy is blocking:
jobs := chanprobe.New[Job]("jobs", 1024)
You can also choose DropNewest:
jobs := chanprobe.New[Job](
"jobs",
1024,
chanprobe.WithDropPolicy(chanprobe.DropNewest),
)
Or DropOldest:
jobs := chanprobe.New[Job](
"latest_events",
1024,
chanprobe.WithDropPolicy(chanprobe.DropOldest),
)
The point is not that one policy is better than another.
The point is that queue behavior should be intentional.
If work can be dropped, I want that to be visible.
If producers are blocked, I want that to be visible too.
What the queue can tell you
A snapshot contains things like:
type Snapshot struct {
Name string
Len int
Cap int
Closed bool
SentTotal uint64
ReceivedTotal uint64
DroppedTotal uint64
SendBlockedTotal uint64
RecvBlockedTotal uint64
SendWaitTotal time.Duration
RecvWaitTotal time.Duration
ItemWaitTotal time.Duration
OldestItemAge time.Duration
}
The fields I personally care about most are usually not Len and Cap.
They are useful, but they are not enough.
The more interesting fields are:
snapshot.OldestItemAge
snapshot.DroppedTotal
snapshot.SendBlockedTotal
snapshot.SendWaitTotal
snapshot.ItemWaitTotal
Because they explain behavior.
If DroppedTotal is growing, the system is losing work.
If SendBlockedTotal is growing, producers are being slowed down.
If OldestItemAge is high, queue latency is becoming part of user-visible latency.
That is the signal I wanted.
Debugging with expvar
I wanted the core package to stay lightweight.
I did not want to force Prometheus, OpenTelemetry, or any other dependency on users.
So the first built-in exporter is based on expvar.
Example:
package main
import (
"net/http"
"github.com/devflex-pro/chanprobe"
)
func main() {
chanprobe.PublishExpvar("chanprobe", nil)
http.ListenAndServe(":8080", nil)
}
Then you can inspect:
curl http://localhost:8080/debug/vars
This is not meant to be the final observability story for every production system.
It is just a simple way to expose what the queues know.
Prometheus and OpenTelemetry exporters can live separately without making the core package heavier.
Why not just use pprof or runtime/trace?
I use those tools too.
They are extremely useful.
But I see them as solving a slightly different problem.
pprof and runtime/trace help me understand what the Go runtime is doing.
chanprobe is more application-level.
It is not trying to tell me only that goroutines are blocked.
It is trying to tell me which named queue is responsible.
There is a big practical difference between these two statements:
some goroutines are blocked on channel send
and:
webhook_delivery is 98% full and the oldest item has been waiting for 37s
The second one is much closer to the way I debug real services.
What I intentionally did not build
I deliberately avoided the “clever” version of this project.
There is no unsafe.
There is no runtime monkey-patching.
There is no attempt to resize Go channels magically.
There is no global goroutine scanning.
There is no promise that this is faster than channels.
Actually, it should be obvious: this adds instrumentation, so it has overhead.
That is why I would not use it everywhere.
I would use it only where queue visibility is worth the cost.
For small internal coordination channels, native Go channels are still perfect.
For important queues in production pipelines, I want more information.
What I learned while building it
The hardest part was not making a queue.
The hard part was deciding what behavior should be explicit.
What should happen when the queue is full?
Should send block or fail?
What should happen after close?
Should existing items still be receivable?
What exactly counts as a dropped item?
Which metrics are actually useful, and which ones are just noise?
I also realized that a queue is not just an implementation detail.
In many services, it is part of the system’s behavior.
It can hide latency.
It can create backpressure.
It can drop work.
It can make producers slow.
It can make consumers look fine while users are waiting.
If a queue can affect production behavior, I think it deserves observability.
Current status
chanprobe currently has:
generic bounded queues
context-aware Send and Recv
non-blocking TrySend and TryRecv
drop policies
snapshots
registry
expvar support
examples
tests
benchmarks
The repository is here:
github.com/devflex-pro/chanprobe
It is still small, but already useful enough to try in real Go services.
My next ideas are a Prometheus exporter, better examples, and maybe more detailed latency metrics without making the core package too heavy.
If you have Go services with worker pools, event pipelines, background jobs, or internal queues, I would be curious to hear if this kind of visibility would help you debug production issues faster.
Top comments (0)