created a basic pub-sub model in GO
please go thru it and let me know if any mistakes/suggestions etc
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
var Subscribers map[string]chan interface{}
// init empty subscribers
func init() {
Subscribers = make(map[string]chan interface{})
}
type SubscriberClient struct {
Name string
Ch chan interface{}
}
func main() {
// register clients to subscribe
subscriberOne := RegisterClient("one")
subscriberTwo := RegisterClient("two")
ctx := context.Background()
// initiate subscribers
// set timeout to exit the subscriber 1
ctx, cfunc := context.WithTimeout(ctx, time.Second*12)
// it will send the signal to subscriber to exit
defer cfunc()
go subscriberOne.sub(ctx)
// set timeout to exit the subscriber 2
ctx, cfunc2 := context.WithTimeout(ctx, time.Second*8)
defer cfunc2()
go subscriberTwo.sub(ctx)
// initiate publisher
// set timeout to exit the publisher
ctx, cfunc3 := context.WithTimeout(ctx, time.Second*6)
defer cfunc3()
go pub(ctx)
// i am just keeping ticker to send events only for 6 seconds after that it will exit the main
// else based on use case publisher can handle this . if you are implementing this in
// API's then no need for ticker because publisher will run until program exists
select {
case <-time.Tick(time.Second * 13):
fmt.Println("main exiting")
break
}
}
// it will return subscriber client with name and channel to read
func RegisterClient(name string) *SubscriberClient {
// for async we use buffered channels
// for sync we use unbuffered channels
ch := make(chan interface{}, 4)
// store it channel in our global producer list
Subscribers[name] = ch
// return the channel to subscriber so they can read from it
return &SubscriberClient{Name: name, Ch: ch}
}
func pub(ctx context.Context) {
for {
select {
//when to stop condition (publisher need to handle this ...)
case <-ctx.Done():
fmt.Println("publisher exiting")
return
// it will publish messages every 2 seconds
// other way we can keep constraints when to publish
case <-time.Tick(time.Second * 2):
publishToAllConsumers(rand.Int())
}
}
}
// subscriber
func (s *SubscriberClient) sub(ctx context.Context) {
for {
select {
//when to stop condition (subscriber need to handle this ...)
case <-ctx.Done():
fmt.Println(s.Name, "exiting")
return
case x := <-s.Ch:
fmt.Println(x, " value from ", s.Name, " subscriber")
}
}
}
// it will publish all registered subscribers (implemented TC:-O(N)// we can enhance it further....)
func publishToAllConsumers(data interface{}) {
for k, v := range Subscribers {
fmt.Println("sending data to ", k)
v <- data
}
}
please follow medium for interesting things
Top comments (0)