DEV Community

sai teja
sai teja

Posted on

Pub Sub model in golang

created a basic pub-sub model in GO
please go thru it and let me know if any mistakes/suggestions etc

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

var Subscribers map[string]chan interface{}

// init empty subscribers
func init() {
    Subscribers = make(map[string]chan interface{})
}

type SubscriberClient struct {
    Name string
    Ch   chan interface{}
}

func main() {

    // register clients to subscribe
    subscriberOne := RegisterClient("one")
    subscriberTwo := RegisterClient("two")

    ctx := context.Background()

    // initiate subscribers 

    // set timeout to exit the subscriber 1
    ctx, cfunc := context.WithTimeout(ctx, time.Second*12)

    // it will send the signal to subscriber to exit
    defer cfunc()
    go subscriberOne.sub(ctx)

    // set timeout to exit the subscriber 2
    ctx, cfunc2 := context.WithTimeout(ctx, time.Second*8)
    defer cfunc2()
    go subscriberTwo.sub(ctx)

    // initiate publisher

    // set timeout to exit the publisher
    ctx, cfunc3 := context.WithTimeout(ctx, time.Second*6)
    defer cfunc3()
    go pub(ctx)

    // i am just keeping ticker to send events only for 6 seconds after that it will exit the main
    // else based on use case publisher can handle this . if you are implementing this in
    // API's then no need for ticker because publisher will run until program exists
    select {
    case <-time.Tick(time.Second * 13):
        fmt.Println("main exiting")
        break
    }
}

// it will return subscriber client with name and channel to read
func RegisterClient(name string) *SubscriberClient {
    // for async we  use buffered channels
    // for sync we use unbuffered channels
    ch := make(chan interface{}, 4)

    // store it channel in our global producer list
    Subscribers[name] = ch

    // return the channel to subscriber so they can read from it
    return &SubscriberClient{Name: name, Ch: ch}
}

func pub(ctx context.Context) {
    for {
        select {
        //when to stop condition (publisher need to handle this ...)
        case <-ctx.Done():
            fmt.Println("publisher exiting")
            return

        // it will publish messages every 2 seconds
        // other way we can keep constraints when to publish
        case <-time.Tick(time.Second * 2):
            publishToAllConsumers(rand.Int())
        }
    }
}

// subscriber
func (s *SubscriberClient) sub(ctx context.Context) {
    for {
        select {
        //when to stop condition (subscriber need to handle this ...)
        case <-ctx.Done():
            fmt.Println(s.Name, "exiting")
            return
        case x := <-s.Ch:
            fmt.Println(x, " value from ", s.Name, " subscriber")
        }
    }
}

// it will publish all registered subscribers (implemented TC:-O(N)// we can enhance it further....)
func publishToAllConsumers(data interface{}) {
    for k, v := range Subscribers {
        fmt.Println("sending data to ", k)
        v <- data
    }
}

Enter fullscreen mode Exit fullscreen mode

please follow medium for interesting things

Sai Teja – Medium

Read writing from Sai Teja on Medium. Software Engineer | Fitness trainer | Random Thinker. Every day, Sai Teja and thousands of other voices read, write, and share important stories on Medium.

favicon medium.com

Top comments (0)