DEV Community

Stefan  πŸš€
Stefan πŸš€

Posted on • Originally published at wundergraph.com on

Scaling GraphQL Subscriptions in Go with Epoll and Event Driven Architecture

Make it work, make it right, make it fast. This is a mantra that you've probably heard before. It's a good mantra that helps you to focus on not over-engineering a solution. What I've came to realize is that it's usually enough to make it right, it's usually fast enough by making it right.

When we started implementing GraphQL Subscriptions in Cosmo Router, we focused on making it work. This was a few months ago. It was good enough for the first iteration and allowed us to get feedback from our users and better understand the problem space.

In the process of making it right, we've reduced the number of goroutines by 99% and the memory consumption by 90% without sacrificing performance. In this article, I'll explain how we've achieved this. Using Epoll/Kqueue played a big role in this, but also rethinking the architecture to be more event-driven.

Let's take a step back so that we're all on the same page.

What are GraphQL Subscriptions?

GraphQL Subscriptions are a way to subscribe to events that are happening in your application. For example, you could subscribe to a new comment being created. Whenever a new comment is created, the server will send you a notification. This is a very powerful feature that allows you to build real-time applications.

How do GraphQL Subscriptions work?

GraphQL Subscriptions are usually implemented using WebSockets, but they can also be run over HTTP/2 with Server-Sent Events (SSE).

The client opens a WebSocket connection to the server by sending a HTTP Upgrade request. The server upgrades the connection and negotiates the GraphQL Subscriptions protocol. There are currently two main protocols in use, graphql-ws and graphql-transport-ws.

Once the GraphQL Subscription protocol is negotiated, the client can send a message to the server to start a Subscription. The server will then send a message to the client whenever new data is available. To end a Subscription, eiter the client or the server can send a message to stop the Subscription.

How do federated GraphQL Subscriptions work?

Federated GraphQL Subscriptions are a bit more complicated than regular GraphQL Subscriptions. There's not just the client and the server, but also a Gateway or Router in between. That said, the flow is still very similar.

The client opens a WebSocket connection to the Gateway. The Gateway then opens a WebSocket connection to the origin server. The Gateway then forwards all messages between the client and the origin server.

Instead of setting up and negotiating a single GraphQL Subscription protocol, the Gateway sets up and negotiates two GraphQL Subscription protocols.

What are the limitations of using classic GraphQL Subscriptions with Federation?

The idea of GraphQL Federation is that resolving the fields of an Entity can be split up into multiple services. An Entity is a type that is defined in the GraphQL Schema. Entities have a @key directive that defines the fields that are used to identify the Entity. For example, a User Entity could have a @key(fields: "id") directive. Keys are used to resolve (join) Entity fields across services.

The problem with Entities and Subscriptions is Subscriptions must have a single root field, which ties the "invalidation" of the Subscription to a single service. So, if multiple services contribute fields to our User Entity, they have to coordinate with each other to invalidate the Subscription.

This creates dependencies between Subgraphs, which is something that we want to avoid for two reasons. First, it means that we have to implement some coordination mechanism between Subgraphs. Second, it means that different teams owning the Subgraphs cannot move independently anymore but have to coordinate deployments, etc... Both of these things defeat the purpose of using GraphQL Federation.

Introducing Event-Driven Federated Subscriptions (EDFS)

To solve the limitations of classic GraphQL Subscriptions with Federation, we've introduced Event-Driven Federated Subscriptions (EDFS). In a nutshell, EDFS allows you to drive Subscriptions from an Event Stream like Kafka or NATS. This decouples the Subscription root field from the Subgraphs, solving the aforementioned limitations. You can read the full announcement of EDFS here.

Getting EDFS right

When we started implementing EDFS in Cosmo Router, we focused on making it work. Our "naive" implementation was very simple:

  1. Client opens a WebSocket connection to the Router
  2. Client and Router negotiate the GraphQL Subscriptions protocol
  3. Client sends a message to start a Subscription
  4. Router subscribes to the event stream
  5. Router sends a message to the client whenever a new event is received

We've got this working but there were a few problems with this approach. We've set up a Benchmark to measure some stats with pprof when connecting 10.000 clients. We've setup our Router to enable the pprof HTTP server and started the Benchmark.

Using pprof to measure a running Go program

To measure a running Go program, we can use the pprof HTTP server. You can enable it like this:

//go:build pprof

package profile

import (
    "flag"
    "log"
    "net/http"
    "net/http/pprof"
    "strconv"
)

var (
    pprofPort = flag.Int("pprof-port", 6060, "Port for pprof server, set to zero to disable")
)

func initPprofHandlers() {
    // Allow compiling in pprof but still disabling it at runtime
    if *pprofPort == 0 {
        return
    }
    mux := http.NewServeMux()
    mux.HandleFunc("/debug/pprof/", pprof.Index)
    mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
    mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
    mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
    mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

    server := &http.Server{
        Addr: ":" + strconv.Itoa(*pprofPort),
    }
    log.Printf("starting pprof server on port %d - do not use this in production, it is a security risk", *pprofPort)
    go func() {
        if err := server.ListenAndServe(); err != nil {
            log.Fatal("error starting pprof server", err)
        }
    }()
}

Enter fullscreen mode Exit fullscreen mode

We can now start our program, connect 10.000 clients and run the following command to measure the number of goroutines:

go tool pprof http://localhost:6060/debug/pprof/goroutine

Enter fullscreen mode Exit fullscreen mode

In addition, we can also measure the heap allocations / memory consumption:

go tool pprof http://localhost:6060/debug/pprof/heap

Enter fullscreen mode Exit fullscreen mode

Let's take a look at the results!

Goroutines (naive implementation of EDFS)

First, let's take a look at the number of goroutines that are created.

This is a lot of goroutines! It looks like we're creating 4 goroutines per client and Subscription. Let's take a closer look at what's going on.

All 4 goroutines call runtime.gopark which means that they are waiting for something. 3 of them are calling runtime.selectgo which means that they are waiting for a channel to receive a message. The other one is calling runtime.netpollblock which means that it's waiting for a network event.

Of the 3 other goroutines, one is calling core.(*wsConnectionWrapper).ReadJSON, so it's waiting for the client to send a message. The second one is calling resolve.(*Resolver).ResolveGraphQLSubscription, so it's waiting on a channel for the next event to be resolved. The third one is calling pubsub.(*natsPubSub).Subscribe which means that it's waiting for a message from the event stream.

Lots of waiting going on here if you ask me. You might have heard that goroutines are cheap and that you can create millions of them. You can indeed create a lot of goroutines, but they are not free. Let's take a look at the memory consumption to see if the number of goroutines impacts the memory consumption.

Heap Allocations / Memory Consumption (naive implementation of EDFS)

The heap is at almost 2GB, which means that we're requesting around 3.5GB of memory from the OS (you can check this with top). Let's take a look at the allocations to see where all this memory is going. 92% of the memory is allocated by the resolve.NewResolvable func, which is called by resolve.(*Resolver).ResolveGraphQLSubscription, which is called by core.(*GraphQLHandler).ServeHTTP. The rest is negligible.

Next, let's contrast this with the optimized implementation of EDFS.

Goroutines (optimized implementation of EDFS)

We're now down to 42 goroutines, which is a reduction of 99%! How is it possible that we're doing the same thing with 99% less goroutines? We'll get to that in a bit. Let's first take a look at the memory consumption.

Heap Allocations / Memory Consumption (optimized implementation of EDFS)

The heap is down to 200MB, which is a reduction of 90%! The main contributors are now bufio.NewReaderSize and bufio.NewWriterSize, which are tied to http.(*conn).serve. Side note, these allocations were previously not really visible because they were hidden by other allocations.

Root cause analysis: How to reduce the number of goroutines and memory consumption for GraphQL Subscriptions?

There are two main questions that we need to answer:

  1. Why are we creating 4 goroutines per client and Subscription?
  2. Why is resolve.NewResolvable allocating so much memory?

Let's eliminate them one by one, starting with the easiest one.

Don't block in the ServeHTTP method

Here's the code of the WebsocketHandler ServeHTTP func:

func (h *WebsocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if isWsUpgradeRequest(r) {
        upgrader := websocket.Upgrader{
            HandshakeTimeout: 5 * time.Second,
            // TODO: WriteBufferPool,
            EnableCompression: true,
            Subprotocols: wsproto.Subprotocols(),
            CheckOrigin: func(_ *http.Request) bool {
                // Allow any origin to subscribe via WS
                return true
            },
        }
        c, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            // Upgrade() sends an error response already, just log the error
            h.logger.Warn("upgrading websocket", zap.Error(err))
            return
        }
        connectionHandler := NewWebsocketConnectionHandler(h.ctx, WebSocketConnectionHandlerOptions{
            IDs: h.ids,
            Parser: h.parser,
            Planner: h.planner,
            GraphQLHandler: h.graphqlHandler,
            Metrics: h.metrics,
            ResponseWriter: w,
            Request: r,
            Connection: conn,
            Protocol: protocol,
            Logger: h.logger,
        })
        defer connectionHandler.Close()
        connectionHandler.Serve()
        return
    }
    h.next.ServeHTTP(w, r)
}

Enter fullscreen mode Exit fullscreen mode

The ServeHTTP method is blocking until the connectionHandler.Serve() method returns. This was convenient because it allowed us to use the defer statement to close the connection, and we could use the r.Context() to propagate the context as it wasn't canceled until the ServeHTTP method returned.

The problem with this approach is that this keeps the goroutine alive for the entire duration of the Subscription. As the go net/http package spawns a new goroutine for each request, this means that we keep one goroutine alive for each client, even though we've already hijacked (upgraded) the connection, so this is completely unnecessary.

But instead of simply not blocking, we can do even better and eliminate another goroutine.

Don't read from a connection when you don't have to

Do we really need to blocking read with one goroutine per connected client? And how is it possible that single-threaded servers like nginx or Node.js can handle thousands of concurrent connections?

The answer is that these servers are driven by events. They don't block on a connection, but instead wait for an event to happen. This is usually done using Epoll/Kqueue on Linux/BSD and IOCP on Windows.

With Epoll/Kqueue, we can delegate the waiting for an event to the Operating System (OS). We can tell the OS to notify us when there is data to read from a connection.

The typical usage pattern of WebSocket connections with GraphQL Subscriptions is that the client initiates the connection, sends a message to start a Subscription and then waits for the server to send a message. So there's not a lot of back and forth going on. That's a perfect fit for Epoll/Kqueue.

Let's take a look at how we can use Epoll/Kqueue to manage our WebSocket connections:

func (h *WebsocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    upgrader := ws.HTTPUpgrader{
        Timeout: time.Second * 5,
        Protocol: func(s string) bool {
            if wsproto.IsSupportedSubprotocol(s) {
                subProtocol = s
                return true
            }
            return false
        },
    }
    c, rw, _, err := upgrader.Upgrade(r, w)
    if err != nil {
        requestLogger.Warn("Websocket upgrade", zap.Error(err))
        _ = c.Close()
        return
    }

    // After successful upgrade, we can't write to the response writer anymore
    // because it's hijacked by the websocket connection

    conn := newWSConnectionWrapper(c, rw)
    protocol, err := wsproto.NewProtocol(subProtocol, conn)
    if err != nil {
        requestLogger.Error("Create websocket protocol", zap.Error(err))
        _ = c.Close()
        return
    }

    handler := NewWebsocketConnectionHandler(h.ctx, WebSocketConnectionHandlerOptions{
        Parser: h.parser,
        Planner: h.planner,
        GraphQLHandler: h.graphqlHandler,
        Metrics: h.metrics,
        ResponseWriter: w,
        Request: r,
        Connection: conn,
        Protocol: protocol,
        Logger: h.logger,
        Stats: h.stats,
        ConnectionID: h.connectionIDs.Inc(),
        ClientInfo: clientInfo,
        InitRequestID: requestID,
    })
    err = handler.Initialize()
    if err != nil {
        requestLogger.Error("Initializing websocket connection", zap.Error(err))
        handler.Close()
        return
    }

    // Only when epoll is available. On Windows, epoll is not available
    if h.epoll != nil {
        err = h.addConnection(c, handler)
        if err != nil {
            requestLogger.Error("Adding connection to epoll", zap.Error(err))
            handler.Close()
        }
        return
    }

    // Handle messages sync when epoll is not available

    go h.handleConnectionSync(handler)
}

Enter fullscreen mode Exit fullscreen mode

If epoll is available, we add the connection to the epoll instance and return. Otherwise, we handle the connection synchronously in a new goroutine as a fallback, but at least we don't block the ServeHTTP method anymore.

Here's the code for using the epoll instance:

func (h *WebsocketHandler) runPoller() {
    done := h.ctx.Done()
    defer func() {
        h.connectionsMu.Lock()
        _ = h.epoll.Close(true)
        h.connectionsMu.Unlock()
    }()
    for {
        select {
        case <-done:
            return
        default:
            connections, err := h.epoll.Wait(128)
            if err != nil {
                h.logger.Warn("Epoll wait", zap.Error(err))
                continue
            }
            for i := 0; i < len(connections); i++ {
                if connections[i] == nil {
                    continue
                }
                conn := connections[i].(epoller.ConnImpl)
                // check if the connection is still valid
                fd := socketFd(conn)
                h.connectionsMu.RLock()
                handler, exists := h.connections[fd]
                h.connectionsMu.RUnlock()
                if !exists {
                    continue
                }

                err = handler.conn.conn.SetReadDeadline(time.Now().Add(h.readTimeout))
                if err != nil {
                    h.logger.Debug("Setting read deadline", zap.Error(err))
                    h.removeConnection(conn, handler, fd)
                    continue
                }

                msg, err := handler.protocol.ReadMessage()
                if err != nil {
                    if isReadTimeout(err) {
                        continue
                    }
                    h.logger.Debug("Client closed connection")
                    h.removeConnection(conn, handler, fd)
                    continue
                }

                err = h.HandleMessage(handler, msg)
                if err != nil {
                    h.logger.Debug("Handling websocket message", zap.Error(err))
                    if errors.Is(err, errClientTerminatedConnection) {
                        h.removeConnection(conn, handler, fd)
                        return
                    }
                }
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

We use one single goroutine to wait for events from the epoll instance. If a connection has an event, we check if the connection is still valid. If it is, we read the message and handle it. To handle the message, we use a thread pool behind the scenes to not block the epoll goroutine for too long. You can find the full implementation on GitHub.

With this approach, we've already reduced the number of goroutines by 50%. We're now down to 2 goroutines per client and Subscription. We're not blocking in the ServeHTTP func anymore, and we're not blocking to read from the connection anymore.

That makes it 3 problems left. We need to eliminate another 2 goroutines for the Subscription and reduce the memory consumption of resolve.NewResolvable. As it turn out, all of these problems are related.

Blocking reads vs event-driven architecture

Let's take a look at the naive implementation of ResolveGraphQLSubscription:

func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQLSubscription, writer FlushWriter) (err error) {

    buf := pool.BytesBuffer.Get()
    defer pool.BytesBuffer.Put(buf)
    if err := subscription.Trigger.InputTemplate.Render(ctx, nil, buf); err != nil {
        return err
    }
    rendered := buf.Bytes()
    subscriptionInput := make([]byte, len(rendered))
    copy(subscriptionInput, rendered)

    if len(ctx.InitialPayload) > 0 {
        subscriptionInput, err = jsonparser.Set(subscriptionInput, ctx.InitialPayload, "initial_payload")
        if err != nil {
            return err
        }
    }

    if ctx.Extensions != nil {
        subscriptionInput, err = jsonparser.Set(subscriptionInput, ctx.Extensions, "body", "extensions")
    }

    c, cancel := context.WithCancel(ctx.Context())
    defer cancel()
    resolverDone := r.ctx.Done()

    next := make(chan []byte)

    cancellableContext := ctx.WithContext(c)

    if err := subscription.Trigger.Source.Start(cancellableContext, subscriptionInput, next); err != nil {
        if errors.Is(err, ErrUnableToResolve) {
            msg := []byte(`{"errors":[{"message":"unable to resolve"}]}`)
            return writeAndFlush(writer, msg)
        }
        return err
    }

    t := r.getTools()
    defer r.putTools(t)

    for {
        select {
        case <-resolverDone:
            return nil
        case data, ok := <-next:
            if !ok {
                return nil
            }
            t.resolvable.Reset()
            if err := t.resolvable.InitSubscription(ctx, data, subscription.Trigger.PostProcessing); err != nil {
                return err
            }
            if err := t.loader.LoadGraphQLResponseData(ctx, subscription.Response, t.resolvable); err != nil {
                return err
            }
            if err := t.resolvable.Resolve(ctx.ctx, subscription.Response.Data, writer); err != nil {
                return err
            }
            writer.Flush()
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

There are a lot of problems with this implementation:

  1. It's blocking
  2. It keeps a buffer around for the entire duration of the Subscription, even if there's no data to send
  3. We create a trigger for each Subscription, even though the trigger might be the same for multiple Subscriptions
  4. We blocking read from the trigger
  5. The getTools func allocates a lot of memory. As we're blocking the parent func for the entire duration of the Subscription, this memory is not freed until the Subscription is done. That's the line of code that's allocating most of the memory.

To fix these problems, you have to recall what Rob Pike famously said about Go:

Don't communicate by sharing memory, share memory by communicating.

Instead of having a goroutine that's blocking on a channel, and another one that's blocking on the trigger, and all of this memory that's allocated while we're blocking, we could instead have a single goroutine that's waiting for events like client subscribes, client unsubscribes, trigger has data, trigger is done, etc...

This one goroutine would manage all of the events in a single loop, which is actually quite simple to implement and maintain. In addition, we can use a thread pool to handle the events to not block the main loop for too long. This sounds a lot like the epoll approach that we've used for the WebSocket connections, doesn't it?

Let's take a look at the optimized implementation of ResolveGraphQLSubscription:

func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *GraphQLSubscription, writer SubscriptionResponseWriter, id SubscriptionIdentifier) (err error) {
    if subscription.Trigger.Source == nil {
        return errors.New("no data source found")
    }
    input, err := r.subscriptionInput(ctx, subscription)
    if err != nil {
        msg := []byte(`{"errors":[{"message":"invalid input"}]}`)
        return writeFlushComplete(writer, msg)
    }
    xxh := pool.Hash64.Get()
    defer pool.Hash64.Put(xxh)
    err = subscription.Trigger.Source.UniqueRequestID(ctx, input, xxh)
    if err != nil {
        msg := []byte(`{"errors":[{"message":"unable to resolve"}]}`)
        return writeFlushComplete(writer, msg)
    }
    uniqueID := xxh.Sum64()
    select {
    case <-r.ctx.Done():
        return ErrResolverClosed
    case r.events <- subscriptionEvent{
        triggerID: uniqueID,
        kind: subscriptionEventKindAddSubscription,
        addSubscription: &addSubscription{
            ctx: ctx,
            input: input,
            resolve: subscription,
            writer: writer,
            id: id,
        },
    }:
    }
    return nil
}

Enter fullscreen mode Exit fullscreen mode

We've added a func to the Trigger interface to generate a unique ID. This is used to uniquely identify a Trigger. Internally, this func takes into consideration the input, request context, Headers, extra fields, etc. to ensure that we're not accidentally using the same Trigger for different Subscriptions.

Once we have a unique ID for the Trigger, we send an event to the main loop to "subscribe" to the Trigger. That's all we're doing in this func. We're not blocking anymore, no heavy allocations.

Next, let's take a look at the main loop:

func (r *Resolver) handleEvents() {
    done := r.ctx.Done()
    for {
        select {
        case <-done:
            r.handleShutdown()
            return
        case event := <-r.events:
            r.handleEvent(event)
        }
    }
}

func (r *Resolver) handleEvent(event subscriptionEvent) {
    switch event.kind {
    case subscriptionEventKindAddSubscription:
        r.handleAddSubscription(event.triggerID, event.addSubscription)
    case subscriptionEventKindRemoveSubscription:
        r.handleRemoveSubscription(event.id)
    case subscriptionEventKindRemoveClient:
        r.handleRemoveClient(event.id.ConnectionID)
    case subscriptionEventKindTriggerUpdate:
        r.handleTriggerUpdate(event.triggerID, event.data)
    case subscriptionEventKindTriggerDone:
        r.handleTriggerDone(event.triggerID)
    case subscriptionEventKindUnknown:
        panic("unknown event")
    }
}

Enter fullscreen mode Exit fullscreen mode

It's a simple loop that runs in one goroutine, waiting for events until the context is canceled. When an event is received, it's handled by calling the appropriate handler func.

There's something powerful about this pattern that might not be obvious at first glance. If we run this loop in a single goroutine, we don't have to use any locks to synchronize access to the triggers. E.g. when we add a subscriber to a trigger, or remove one, we don't have to use a lock because we're always doing this in the same goroutine.

Let's take a look at how we handle a trigger update:

func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
    trig, ok := r.triggers[id]
    if !ok {
        return
    }
    if r.options.Debug {
        fmt.Printf("resolver:trigger:update:%d\n", id)
    }
    wg := &sync.WaitGroup{}
    wg.Add(len(trig.subscriptions))
    trig.inFlight = wg
    for c, s := range trig.subscriptions {
        c, s := c, s
        r.triggerUpdatePool.Submit(func() {
            r.executeSubscriptionUpdate(c, s, data)
            wg.Done()
        })
    }
}

func (r *Resolver) executeSubscriptionUpdate(ctx *Context, sub *sub, sharedInput []byte) {
    sub.mux.Lock()
    sub.pendingUpdates++
    sub.mux.Unlock()
    if r.options.Debug {
        fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID)
        defer fmt.Printf("resolver:trigger:subscription:update:done:%d\n", sub.id.SubscriptionID)
    }
    t := r.getTools()
    defer r.putTools(t)
    input := make([]byte, len(sharedInput))
    copy(input, sharedInput)
    if err := t.resolvable.InitSubscription(ctx, input, sub.resolve.Trigger.PostProcessing); err != nil {
        return
    }
    if err := t.loader.LoadGraphQLResponseData(ctx, sub.resolve.Response, t.resolvable); err != nil {
        return
    }
    sub.mux.Lock()
    sub.pendingUpdates--
    defer sub.mux.Unlock()
    if sub.writer == nil {
        return // subscription was already closed by the client
    }
    if err := t.resolvable.Resolve(ctx.ctx, sub.resolve.Response.Data, sub.writer); err != nil {
        return
    }
    sub.writer.Flush()
    if r.reporter != nil {
        r.reporter.SubscriptionUpdateSent()
    }
}

Enter fullscreen mode Exit fullscreen mode

In the first func you can see how we're modifying the trigger and subscription structs. Keep in mind that all of this is still happening in the main loop, so it's safe to do this without any locks.

We're creating a wait group to prevent closing the trigger before all subscribers have been notified of an update. It's being used in another func in case we're closing the trigger.

Next, you can see that we're submitting the actual process of resolving the update per subscriber to a thread pool. This is the only place where we're using concurrency when handling events. Using a thread pool here has two advantages. First, the obvious one, we're not blocking the main loop while resolving the update. Second, but not less important, we're able to limit the number of concurrently resolving updates. This is very important because as you're aware, in the previous implementation we were allocating a lot of memory in the getTools func because we didn't limit this.

You can see that we're only calling getTools in the executeSubscriptionUpdate func when we're actually resolving the update. This func is very short lived, and as we're using a thread pool for the execution and a sync.Pool for the tools, we're able to re-use the tools efficiently and therefore reduce the overall memory consumption.

If you're interested in the full implementation of the resolver, you can find it on GitHub.

Summary

We've started with a naive implementation of EDFS that was working fine but we realized that it had some limitations. Thanks to having an initial implementation, we were able to define a test-suite to "nail" down the expected behaviour of the system.

Next, we've identified the key problems with our initial implementation:

  1. We were creating 4 goroutines per client and Subscription
  2. We were allocating a lot of memory in the resolve.NewResolvable func
  3. We were blocking in the ServeHTTP func
  4. We were blocking to read from the connection

We've solved these problems by:

  1. Using Epoll/Kqueue to not block on the connection
  2. Using an event-driven architecture to not block on the Subscription
  3. Using a thread pool to not block the main loop when resolving a Subscription update (and limit the number of concurrent updates)
  4. Using a sync.Pool to re-use the tools when resolving a Subscription update

With these changes, we've reduced the number of goroutines by 99% and the memory consumption by 90% without sacrificing performance.

We didn't fish in the dark, we've used pprof to analyze exactly what was going on and where the bottlenecks were. Furthermore, we've used pprof to measure the impact of our changes.

Thanks to our test-suite, we were able to make these changes without breaking anything.

Final thoughts

We could probably reduce memory consumption even further, as the allocations of the bufio package are still quite visible in the heap profile. That said, we're aware that premature optimization is the root of all evil, so we hold off on further optimizations until we really need them.

There's a spectrum between "fast code" and "fast to reason about code". The more you optimize, the more complex the code becomes. At this point, we're happy with the results and we're confident that we can still efficiently maintain the code.

If you're interested in learning more about EDFS, you can read the full announcement here. There's also some documentation available on the Cosmo Docs. If you enjoy watching videos more than reading, you can also watch a Demo of EDFS on YouTube.

I hope you've enjoyed this blog post and learned something new. If you have any questions or feedback, feel free to comment or reach out to me on Twitter.

Credits

I took inspiration from the 1M-go-websockets project. Thank you by Eran Yanay for creating this project and sharing your learnings with the community.

Top comments (0)