DEV Community

Cover image for Small Swoole Rx Events
sebk69
sebk69

Posted on

Small Swoole Rx Events

Reactive event bus for PHP powered by RxPHP and Swoole.

It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.

  • EventBus — simple Rx‐backed bus with on(), onMany(), payloads(), once(), request()
  • SwooleSchedulerAsyncSchedulerInterface using Swoole\Timer (works with RxPHP time operators)
  • Event modelBasicEvent (name, payload, meta, rid) and EventInterface (correlation id)

Requirements

  • PHP 8.3+
  • ext-swoole 4.8+ / 5.x
  • reactivex/rxphp (2.x)

Installation

composer require small/swoole-rx-events
Enter fullscreen mode Exit fullscreen mode

Quick start

use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;

// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());

// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
    echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});

// Emit an event
$bus->emitName('order.created', ['id' => 123]);

// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();
Enter fullscreen mode Exit fullscreen mode

Concepts

Event

All event must implement EventInterface

namespace Small\SwooleRxEvents\Contract;

interface EventInterface
{

    public function getName(): string;
    public function getRid(): string;
    public function setRid(string $rid): self;

}
Enter fullscreen mode Exit fullscreen mode

BasicEvent carries:

  • name (string)
  • payload (array)
  • meta (array, e.g. tracing, user)
  • rid (string, auto‐generated correlation id)

Bus

  • stream() — all events
  • on($name) / onMany([...]) — filtered streams
  • payloads($name) — payload‐only stream
  • once($name, ?map, ?timeoutMs) — resolve first matching event (optionally mapped)
  • request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs) Emits a request with a new rid, waits for the first response with the same rid.

Timeouts require an async scheduler. This library provides SwooleScheduler which implements AsyncSchedulerInterface.

API Examples

1) Listen & emit

$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);
Enter fullscreen mode Exit fullscreen mode

2) Request/Response with correlation id

// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
    $bus->emit(
        (new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
            ->setRid($e->getRid())   // correlate
    );
});

// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
    ->subscribe(
        fn($resp) => var_dump($resp->getPayload()),          // ['ok' => true]
        fn($err)  => error_log($err->getMessage())
    );
Enter fullscreen mode Exit fullscreen mode

3) once() with mapping & timeout

$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
    ->subscribe(
        fn($node) => echo "node=$node\n",
        fn($err)  => echo "timeout\n"
    );
$bus->emitName('health.ok', [], ['node' => 'api-1']);
Enter fullscreen mode Exit fullscreen mode

4) Backpressure / batching (Rx composition)

$bus->on('order.created')
    ->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
    ->filter(fn($batch) => !empty($batch))
    ->subscribe(fn(array $batch) => persist_batch($batch));
Enter fullscreen mode Exit fullscreen mode

Swoole integration tips

  • HTTP server: in on('request'), emit an event with meta containing a respond callable or the Response object. Downstream subscribers can produce a ResponseEvent.
  • Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
  • Event loop in CLI: outside a Swoole Server, start/stop the reactor with Swoole\Event::wait() / Event::exit() for timers to fire.

Top comments (0)