DEV Community

Cover image for go-pubsub: Lightweight Pub/Sub for Go.
F2077
F2077

Posted on

go-pubsub: Lightweight Pub/Sub for Go.

go-pubsub - A Lightweight Pub-Sub Library for Golang

Hey everyone, I've been working on a Golang library called go-pubsub. It's a lightweight publish-subscribe tool designed for scenarios where you need to handle transient data flows efficiently. Think live dashboards, game events, short-lived alerts, or real-time streaming-media packet fan-out. The library is built with a fire-and-forget approach: no persistence, no delivery guarantees—just ultra-fast, one-way messaging.

Why I Built This

I created go-pubsub while working on a Golang-based streaming media protocol conversion gateway. One of the core features of this gateway was real-time media stream fan-out, where a single input stream needed to be distributed to multiple output streams. This required an efficient Pub-Sub mechanism.

Initially, I used Redis's Pub-Sub to implement this functionality, but that made my application dependent on an external service, which I wanted to avoid for a self-contained solution. So, I decided to roll my own lightweight Pub-Sub library, and that's how go-pubsub came to be—a simple, dependency-free solution focused on real-time, low-latency scenarios.

Quick Start

package main

import (
    "fmt"
    "github.com/F2077/go-pubsub/pubsub"
    "time"
)

func main() {
    // 1. Create a broker (supports generics)
    broker, _ := pubsub.NewBroker[string]()

    // 2. Create a publisher
    publisher := pubsub.NewPublisher[string](broker)

    // 3. Create a subscriber
    subscriber := pubsub.NewSubscriber[string](broker)

    // 4. Subscribe to a topic with buffer size and timeout
    sub, _ := subscriber.Subscribe("alerts",
        pubsub.WithChannelSize[string](pubsub.Medium), // Buffer 100 messages
        pubsub.WithTimeout[string](5*time.Second),     // Auto-close if idle
    )
    defer func(sub *pubsub.Subscription[string]) {
        _ = sub.Close()
    }(sub)

    // 5. Publish a message
    go func() {
        _ = publisher.Publish("alerts", "CPU over 90%!")
    }()

    // 6. Listen for messages or timeouts
    select {
    case msg := <-sub.Ch:
        fmt.Println("Received:", msg) // Output: "CPU over 90%!"
    case err := <-sub.ErrCh:
        fmt.Println("Error:", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Please try it out and share your thoughts - feedback, ideas, or questions are all welcome!

Top comments (1)

Collapse
 
ryansgi profile image
Ryan B

While I commend your efforts - pub/sub is easy to just drop a fire & forget solution like redis - I think there's room for improvement. I haven't done a deep dive, but one thing is your BenchmarkMultiPublisher path. The allocs look very high; once you add concurrency it looks like per publish slice copies, and what I'm seeing may be timer churn on the sub timeout feature.

  • Switch subscriber list to copy on write (COW) & atomics = no allocs on publish
  • Make sure channels are typed (chan T), not chan any to avoid boxing allocs
  • Use sync.Pool for short-lived envelopes if you're wrapping messages
  • Fix timeout handling - one Timer per sub or a global sweeper goroutine
  • Nuke logging from hot path (varargs alloc like you wouldn't believe)
  • Only allocate ErrCh if needed
  • Prefer map + RWMutex over sync.Map - this has bitten me before, you'll have less hidden allocs
  • Watch for closures in publish loop because they can escape

Otherwise, I dig it! Good looking project!