Recently a NATS user was looking for a generic way to be able to
increase the concurrency of the async NATS handler subscriptions which
by default process the messages sequentially in order. This means that
if a message takes very long to be processed, then the next message
will be processed with some delay even if the server has already
delivered it to the client:
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatalf("Failed to create a connection: %v\n", err)
}
nc.Subscribe("foo", func(m *nats.Msg) {
log.Printf("[Received on '>' ] %+v", string(msg.Data))
time.Sleep(1 * time.Second) // Blocks each message for 1 second
})
An easy way to work around the head of line blocking, would be to
simply add a Goroutine:
nc.Subscribe("foo", func(m *nats.Msg) {
log.Printf("[Received on '>' ] %+v", string(msg.Data))
go func(){
// ... Processing ...
time.Sleep(1 * time.Second)
}()
})
This works for regular NATS connections and subscriptions so it is
often the approach I suggest. There is one exception to this though,
which is when using NATS encoded connections, the type of the function
can be of a multiple number of types, with the NATS client internally
handling the serialization/deserialization of the payload:
package main
import (
"log"
"time"
"github.com/nats-io/go-nats"
)
func init() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
}
func main() {
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatalf("Failed to create a connection: %v\n", err)
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatalf("Failed to create an encoded connection: %v\n", err)
}
defer ec.Close()
type Event struct {
Message string `json:"message"`
}
ec.Subscribe(">", func(b []byte) {
log.Printf("[Received on '>' ] %+v", string(b))
time.Sleep(1 * time.Second)
})
ec.Subscribe("foo", func(s string) {
log.Printf("[Received on 'foo'] %+v", s)
time.Sleep(1 * time.Second)
})
ec.Subscribe("bar", func(e *Event) {
log.Printf("[Received on 'bar'] %+v", e)
time.Sleep(1 * time.Second)
})
payload := []byte("Hello World!")
for i := 0; i < 3; i++ {
ec.Publish("foo", payload)
ec.Publish("bar", &Event{Message: string(payload)})
}
select {
case <-time.After(2 * time.Second):
}
}
Now the simple workaround for the Goroutine does not work since would
have to implement it for a multiple number of function types. This is
because internally the NATS client uses reflect
to find out how to
properly call the callback using the decoded payload.
In order to workaround this issue, I dug my way out
using more reflection to dynamically rewrap a function so that it is
ran in its own Goroutine:
type subscriber interface {
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
QueueSubscribe(subject, group string, cb nats.Handler) (*nats.Subscription, error)
}
func Subscribe(ec subscriber, subject string, cb nats.Handler) (*nats.Subscription, error) {
parallelize := func(f, fptr interface{}) {
// Original function.
oV := reflect.ValueOf(f)
// Pointer to the function that will be replaced.
ffn := reflect.ValueOf(fptr).Interface()
fn := reflect.ValueOf(ffn).Elem()
// Type of original function used to create new one
// that will replace the original.
typ := reflect.TypeOf(f)
// Rewrap original function.
handler := func(in []reflect.Value) []reflect.Value {
go oV.Call(in)
return nil
}
v := reflect.MakeFunc(typ, handler)
fn.Set(v)
}
parallelize(cb, &cb)
return ec.Subscribe(subject, cb)
}
Now a user can use this helper function that has a similar API as the
original NATS client subscribe, though internally it is handling the
messages each in its own Goroutine:
Subscribe(ec, ">", func(b []byte) {
log.Printf("[Received on '>' ] %+v", string(b))
time.Sleep(1 * time.Second)
})
Subscribe(ec, "foo", func(s string) {
log.Printf("[Received on 'foo'] %+v", s)
time.Sleep(1 * time.Second)
})
Subscribe(ec, "bar", func(e *Event) {
log.Printf("[Received on 'bar'] %+v", e)
time.Sleep(1 * time.Second)
})
Full example below:
package main
import (
"log"
"reflect"
"time"
"github.com/nats-io/go-nats"
)
type subscriber interface {
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
QueueSubscribe(subject, group string, cb nats.Handler) (*nats.Subscription, error)
}
func Subscribe(ec subscriber, subject string, cb nats.Handler) (*nats.Subscription, error) {
parallelize := func(f, fptr interface{}) {
// Original function.
oV := reflect.ValueOf(f)
// Pointer to the function that will be replaced.
ffn := reflect.ValueOf(fptr).Interface()
fn := reflect.ValueOf(ffn).Elem()
// Type of original function used to create new one
// that will replace the original.
typ := reflect.TypeOf(f)
// Rewrap original function.
handler := func(in []reflect.Value) []reflect.Value {
go oV.Call(in)
return nil
}
v := reflect.MakeFunc(typ, handler)
fn.Set(v)
}
parallelize(cb, &cb)
return ec.Subscribe(subject, cb)
}
func init() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
}
func main() {
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatalf("Failed to create a connection: %v\n", err)
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatalf("Failed to create an encoded connection: %v\n", err)
}
defer ec.Close()
type Event struct {
Message string `json:"message"`
}
Subscribe(ec, ">", func(b []byte) {
log.Printf("[Received on '>' ] %+v", string(b))
time.Sleep(1 * time.Second)
})
Subscribe(ec, "foo", func(s string) {
log.Printf("[Received on 'foo'] %+v", s)
time.Sleep(1 * time.Second)
})
Subscribe(ec, "bar", func(e *Event) {
log.Printf("[Received on 'bar'] %+v", e)
time.Sleep(1 * time.Second)
})
testBytes := []byte("Hello World!")
ec.Publish("foo", testBytes)
ec.Publish("foo", testBytes)
ec.Publish("foo", testBytes)
ec.Publish("bar", &Event{Message: string(testBytes)})
ec.Publish("bar", &Event{Message: string(testBytes)})
ec.Publish("bar", &Event{Message: string(testBytes)})
ec.PublishRequest("bar", "inbox", &Event{Message: string(testBytes)})
select {
case <-time.After(2 * time.Second):
}
}
Top comments (0)