DEV Community

Shalvah
Shalvah

Posted on • Edited on • Originally published at blog.shalvah.me

Building a PHP client for Faktory, Part 2: Converting to code

Now I'll turn what I did in Part 1 into code.

First, I'll set up a new Composer project. My composer.json looks like this (truncated):

{
    "require": {
        "php": ">= 8.1",
        "clue/redis-protocol": "^0.3.1"
    },
    "require-dev": {
        "pestphp/pest": "^1.22"
    },
    "scripts": {
        "test": "pest"
    }
}
Enter fullscreen mode Exit fullscreen mode

clue/redis-protocol is a library I found for parsing or writing the RESP protocol. Good news, as it means I don't have to implement that myself.

Next, I'll try starting off with a simple test, using Pest:

it('can connect to the Faktory server', function () {
    $client = new TcpClient;
    expect($client->connect())->toBeTrue();
});
Enter fullscreen mode Exit fullscreen mode

Note that I haven't written the code yet—there's no TcpClient class. And I'm not certain this is the interface I'll eventually go with, but it works for now. Now I'll try to get this working:

use Clue\Redis\Protocol\Factory as ProtocolFactory;
use Clue\Redis\Protocol\Parser\ParserInterface;

class TcpClient
{
  protected ParserInterface $responseParser;
  /** @var resource|null */
  protected $connection;
  protected string $hostname = 'tcp://dreamatorium.local';
  protected int $port = 7419;
  protected bool $connected = false;
  protected array $workerInfo;

  public function __construct()
  {
    $factory = new ProtocolFactory();
    // For parsing RESP protocol messages from Faktory
    $this->responseParser = $factory->createResponseParser();
    $this->connection = null;

    $this->workerInfo = [
      "hostname" => gethostname(),
      "pid" => getmypid(),
      "labels" => [],
      "v" => 2
    ];
  }

  public function connect(): bool
  {
    $this->createTcpConnection();
    self::checkOk($this->handshake(), operation: "Handshake");

    return $this->connected = true;
  }

  // Utility function to verify that the server responded with an "OK"
  private static function checkOk(mixed $result, $operation = "Operation")
  {
    if ($result !== "OK") {
      throw new \Exception("$operation failed with response: $result");
    }
    return true;
  }
Enter fullscreen mode Exit fullscreen mode

The idea here is that connect() will create the TCP connection, then try the handshake, and verify the response is "OK". Now, let's implement these methods in detail. For the TCP connection, I'm using PHP's fsockopen() function.

  protected function createTcpConnection()
  {
    $filePointer = fsockopen($this->hostname, $this->port, $errorCode, $errorMessage, timeout: 3);
    if ($filePointer === false) {
      throw new \Exception("Failed to connect to Faktory on {$this->hostname}:{$this->port}: $errorMessage (error code $errorCode)");
    }

    $this->connection = $filePointer;
    stream_set_timeout($this->connection, seconds: 5);
  }
Enter fullscreen mode Exit fullscreen mode

As for the handshake, remember that it's a three-step process (HI, HELLO, OK):

Running netcat session, showing a HI message from Faktory, a HELLO from me and an OK from Faktory

First, we must read the HI message. Since fsockopen returns a file pointer, we can use it with other file-based functions (such as fwrite() for writing, and fread() and fgets() for reading. I'll create a small wrapper method that we can use to read and parse a response with the Redis protocol parser:

  protected function readLine()
  {
    $line = fgets($this->connection);
    $messages = $this->responseParser->pushIncoming($line);
    if (empty($messages)) return null;

    return $messages[0]?->getValueNative();
  }
Enter fullscreen mode Exit fullscreen mode

Okay, now we can read the HI. It should contain the protocol version, so we can do a defensive check that ensures we're on a supported version.

  protected function readHi()
  {
    $hi = $this->readLine();
    if (empty($hi)) throw new \Exception("Handshake failed");

    $version = json_decode(str_replace("HI ", "", $hi))->v;
    if (intval($version) > 2) echo "Expected Faktory protocol v2 or lower; found $version";
  }
Enter fullscreen mode Exit fullscreen mode

Next, the HELLO. Like readLine(), I'll create a method to send a command and args via fwrite(), and use that:

  protected function sendHello()
  {
      $workerInfo = json_encode($this->workerInfo, JSON_THROW_ON_ERROR);
      $this->send("HELLO", $workerInfo);
  }

  protected function send($command, ...$args): void
  {
    fwrite(
      $this->connection, $command . " " . join(' ', $args) . "\r\n"
    );
  }
Enter fullscreen mode Exit fullscreen mode

An important detail is that we have to end our messages with carriage-return + newline (\r\n), as the Faktory protocol spec dictates (although it also seems to work with only \n). I wasted an hour debugging because I forgot to include the newline.😬

The last piece in our handshake is to get the response (which will then be checked for an OK):

  public function handshake()
  {
    $this->readHi();
    $this->sendHello();
    return $this->readLine();
  }
Enter fullscreen mode Exit fullscreen mode

And with that, the test passes. ✅

So let's move forward. Next test: PUSH and FETCH.

it('can push to and fetch from the Faktory server', function () {
    $client = new TcpClient;
    $client->connect();

    $job = [
        "jid" => "123861239abnadsa",
        "jobtype" => "SomeJobClass",
        "args" => [1, 2, true, "hello"],
    ];
    expect($client->push($job))->toBeTrue();

    expect($client->fetch(queues: "default"))->toEqual($job);
});
Enter fullscreen mode Exit fullscreen mode

Note that I'm doing pushing and fetching as one test because I haven't yet found a way to auto-initialize Faktory with existing jobs. Which reminds me...I'm currently testing against my local Faktory, which means my tests won't work in CI. I should either mock the Faktory server (wouldn't recommend this, as I think it's a brittle approach), or get Faktory running in CI (and also figure out how to start it up with existing states, maybe).

To make this pass, we can reuse our existing building blocks of send() and readLine():

public function push(array $job)
{
  $this->send("PUSH", json_encode($job, JSON_THROW_ON_ERROR));
  return self::checkOk($this->readLine(), operation: "Job push");
}

public function fetch(string ...$queues)
{
  $this->send("FETCH", ...$queues);
  // The first line of the response just contains the length of the next line; skip it
  $this->readLine();
  return json_decode($this->readLine(), true, JSON_THROW_ON_ERROR);
}
Enter fullscreen mode Exit fullscreen mode

And now, the test passes...sorta:

  Failed asserting that two arrays are equal.

  at C:\Users\shalvah\Projects\faktory-php\tests\tcp_client_test.php:21
     19▕     expect($client->push($job))->toBeTrue();
     20▕
  ➜  21▕     expect($client->fetch(queues: "default"))->toEqual($job);
     22▕ });
     23▕

  --- Expected
  +++ Actual
  @@ @@
       'jobtype' => 'SomeJobClass'
       'args' => Array (...)
  +    'queue' => 'default'
  +    'created_at' => '2023-01-27T01:52:24.7119459Z'
  +    'enqueued_at' => '2023-01-27T21:21:15.3376584Z'
  +    'retry' => 25

  Tests:  1 failed, 1 passed
Enter fullscreen mode Exit fullscreen mode

No biggie. Faktory adds some extra field to the job when storing, and returns those to use, so we need to adjust the test a bit to ignore the time fields and expect the others:

$fetched = $client->fetch(queues: "default");
expect($fetched['created_at'])->not->toBeEmpty();
unset($fetched['created_at']);

expect($fetched['enqueued_at'])->not->toBeEmpty();
unset($fetched['enqueued_at']);

expect($fetched)->toEqual(array_merge($job, ['queue' => 'default', 'retry' => 25]));
Enter fullscreen mode Exit fullscreen mode

And everything is good!

Well, that wasn't too hard (I'm lying; it took a bunch of trial and error 😅).

Here are some things I've noted during this task that I'd like to address soon:

  • Logging: I'd like to easier see what was being sent and received. i was doing this manually with some dump() calls, but it should be a log setting on the client I can easily turn on and off.
  • Custom exceptions. Throwing just \Exception is never a good idea. Custom exception classes allow users to handle specific errors. But I'll do this later, when I have a larger sample of the kinds of errors we can encounter (thus far, I know we can at least have one for connection failure, and one for not-OK response).
  • Error handling. Even with custom exceptions, there's still room for a mix-up, thanks to PHP's wonky error handling. By default, if fsockopen fails to establish the connection (eg Faktory server is down), PHP will only log a warning, return false and continue executing. This is why I had to add a check for false and throw an exception. So, supposing we throw a custom ConnectionFailed class, this code will work:

    $client = new TcpClient;
    try {
      $client->connect(); // Supposing connection fails
    } catch (ConnectionFailed $e) {
      dump($e::class); // "ConnectionFailed"
    }
    

    However, in many PHP frameworks, a custom error handler is registered which auto-converts warnings to ErrorExceptions (or possibly some other error type), in which case your custom exception will never get thrown, but the framework's exception will!

    set_error_handler(function ($code, $message) {
      throw new ErrorException($message, $code);
    });
    $client = new TcpClient;
    try {
      $client->connect(); // Supposing connection fails
    } catch (ConnectionFailed $e) {
      dump($e::class); // ❌ Never called
    }
    // Script will crash with an ErrorException
    

    Anyway, that's a topic for another time. Until then!

Top comments (0)