DEV Community

Roman Samoilov
Roman Samoilov

Posted on • Originally published at reddit.com

Applying some Rage to Discourse, Mastodon, and GitLab

I wanted to look at some real-world patterns from popular Ruby open-source codebases and show how they could be modelled using Rage, a Rails-compatible framework built on fibers.

I picked Discourse, Mastodon, and GitLab because they share a pattern: in each case, what would normally require extra complexity, infrastructure, or indirection becomes a few lines of application code with Rage.

Request fan-out | Discourse

One of the patterns fibers make especially straightforward is concurrent I/O.

Consider this code from Discourse:

def fetch_pr_or_issue_texts(project, number)
  [
    client.get("/repos/#{project}/issues/#{number}")["body"].to_s,
    *client
      .get("/repos/#{project}/issues/#{number}/comments", per_page: 100)
      .map { |comment| comment["body"].to_s },
  ]
end
Enter fullscreen mode Exit fullscreen mode

Two sequential requests to build a return value. I've seen this pattern in many codebases, and the reason is usually the same: there's no simple enough way to parallelise these requests that would justify the added complexity.

How Rage does it

In Rage, you just wrap the requests into fibers:

def fetch_pr_or_issue_texts(project, number)
  issues_request = Fiber.schedule do
    client.get("/repos/#{project}/issues/#{number}")["body"].to_s
  end

  comments_request = Fiber.schedule do
    client
      .get("/repos/#{project}/issues/#{number}/comments", per_page: 100)
      .map { |comment| comment["body"].to_s }
  end

  Fiber.await([issues_request, comments_request]).flatten
end
Enter fullscreen mode Exit fullscreen mode

The two requests now run concurrently, improving latency at the price of two new Fiber calls.

The same pattern scales to loops. Discourse's PushNotificationPusher iterates over a user's subscriptions and sends notifications sequentially - wrapping those calls in Fiber.schedule + Fiber.await would send them all concurrently, with the total time dropping to the duration of the slowest call:

class PushNotificationPusher
  def self.push(user, payload)
    # ...

    Fiber.await(
      subscriptions(user).map { |subscription| Fiber.schedule { send_notification(user, subscription, message) } }
    )
  end
end
Enter fullscreen mode Exit fullscreen mode

Streaming | Mastodon

Mastodon uses a separate streaming service for real-time events:

  1. Ruby (Rails + Sidekiq) - Workers serialise events and publish them to a Redis channel.
  2. Node.js (Express + ws) - a separate ~1400-line server subscribes to Redis and pushes events to clients over SSE or WebSockets.

Here's what the Node streaming handler looks like:

const streamToHttp = (req, res) => {
  const channelName = channelNameFromPath(req);

  // ...

  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'private, no-store');
  res.setHeader('Transfer-Encoding', 'chunked');

  res.write(':)\n');

  const heartbeat = setInterval(() => res.write(':thump\n\n'), 15000);

  req.on('close', () => {
    // ...

    clearInterval(heartbeat);
  });

  return (event, payload) => {
    res.write(`event: ${event}\n`);
    res.write(`data: ${payload}\n\n`);
  };
};
Enter fullscreen mode Exit fullscreen mode

To send an event, Rails first publishes to Redis:

def publish!
  redis.publish(@timeline_id, message)
end
Enter fullscreen mode Exit fullscreen mode

The Node service receives and relays it:

const listener = message => {
  const { event, payload } = message;

  if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
    transmit(event, payload);
    return;
  }

  // ...
  pgPool.connect((err, client, release) => {
    // ...
    transmit(event, payload);
  });
};
Enter fullscreen mode Exit fullscreen mode

How Rage does it

A fiber-based server can hold thousands of concurrent connections in a single Ruby process without blocking - the same property that drives Mastodon's decision to offload streaming to a separate Node process.

With Rage, the same Redis streaming becomes:

class Api::V1::Streaming::UserController < RageController::API
  before_action :require_user!

  def index
    render sse: Rage::SSE.stream([:timeline, current_account.id])
  end
end
Enter fullscreen mode Exit fullscreen mode

The framework handles the SSE headers, heartbeats, subscription lifecycle, and connection cleanup. When the client disconnects, Rage removes it from the stream.

Publishing uses a Redis pub/sub adapter:

# config/pubsub.yml
production:
  adapter: redis
  url: <%= ENV["REDIS_URL"] %>
Enter fullscreen mode Exit fullscreen mode

Then, publish from anywhere:

def publish!
  Rage::SSE.broadcast(
    [:timeline, @account_id],
    Rage::SSE.message(@payload, event: update? ? "status.update" : "update")
  )
end
Enter fullscreen mode Exit fullscreen mode

The streaming server lives in the same Ruby process, with access to the same Active Record models and the rest of the stack.

Domain Events | GitLab

GitLab has built its own domain event system to decouple bounded contexts.

To publish an event, you instantiate a class inheriting from Gitlab::EventStore::Event and pass it to the event store:

Gitlab::EventStore.publish(
  Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id, partition_id: pipeline.partition_id })
)
Enter fullscreen mode Exit fullscreen mode

Subscribers are Sidekiq workers that include a Subscriber concern and implement handle_event:

class UpdateHeadPipelineWorker
  include Gitlab::EventStore::Subscriber
  # …

  def handle_event(event)
    # ...
  end
end
Enter fullscreen mode Exit fullscreen mode

Nothing in this file tells you what event is - the worker doesn't reference PipelineCreatedEvent. The wiring lives in a separate subscription registry. And because every subscriber is a Sidekiq worker, all reactions go through the full enqueue-serialise-deserialise-execute cycle, regardless of how lightweight they are.

How Rage does it

Publishing looks similar:

Rage::Events.publish(
  Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id, partition_id: pipeline.partition_id })
)
Enter fullscreen mode Exit fullscreen mode

The difference is in the subscriber. Instead of wiring events in a separate registry, each subscriber declares what it listens to:

class UpdateHeadPipelineWorker
  include Rage::Events::Subscriber
  subscribe_to Ci::PipelineCreatedEvent

  def call(event)
    # `event` is a Ci::PipelineCreatedEvent
  end
end
Enter fullscreen mode Exit fullscreen mode

Open this file and you immediately know: this subscriber handles Ci::PipelineCreatedEvent, which has pipeline_id and partition_id fields.

For subscribers that do require background execution, you simply add deferred: true:

class UpdateHeadPipelineWorker
  include Rage::Events::Subscriber
  subscribe_to Ci::PipelineCreatedEvent, deferred: true

  def call(event)
    # ...
  end
end
Enter fullscreen mode Exit fullscreen mode

Light reactions run inline; heavy or failure-prone ones are deferred to the background. You choose per subscriber, rather than routing everything through a job queue by default.

Understanding what happens when a PipelineCreatedEvent is published also gets simpler. Instead of grepping registry files, you run:

$ rage events

├─ Ci::PipelineCreatedEvent
│   ├─ UpdateHeadPipeline
│   └─ TrackPipelineTriggerEvents
├─ Ci::PipelineFinishedEvent
│   └─ UpdateWorkloadStatus
Enter fullscreen mode Exit fullscreen mode

The entire subscription graph, visible in one command.


The common thread across all three examples: the framework handles the machinery, so the application code just says what it wants to happen - run these concurrently, stream this channel, react to this event.

Top comments (0)