Now I'll turn what I did in Part 1 into code.
First, I'll set up a new Composer project. My composer.json
looks like this (truncated):
{
"require": {
"php": ">= 8.1",
"clue/redis-protocol": "^0.3.1"
},
"require-dev": {
"pestphp/pest": "^1.22"
},
"scripts": {
"test": "pest"
}
}
clue/redis-protocol is a library I found for parsing or writing the RESP protocol. Good news, as it means I don't have to implement that myself.
Next, I'll try starting off with a simple test, using Pest:
it('can connect to the Faktory server', function () {
$client = new TcpClient;
expect($client->connect())->toBeTrue();
});
Note that I haven't written the code yet—there's no TcpClient
class. And I'm not certain this is the interface I'll eventually go with, but it works for now. Now I'll try to get this working:
use Clue\Redis\Protocol\Factory as ProtocolFactory;
use Clue\Redis\Protocol\Parser\ParserInterface;
class TcpClient
{
protected ParserInterface $responseParser;
/** @var resource|null */
protected $connection;
protected string $hostname = 'tcp://dreamatorium.local';
protected int $port = 7419;
protected bool $connected = false;
protected array $workerInfo;
public function __construct()
{
$factory = new ProtocolFactory();
// For parsing RESP protocol messages from Faktory
$this->responseParser = $factory->createResponseParser();
$this->connection = null;
$this->workerInfo = [
"hostname" => gethostname(),
"pid" => getmypid(),
"labels" => [],
"v" => 2
];
}
public function connect(): bool
{
$this->createTcpConnection();
self::checkOk($this->handshake(), operation: "Handshake");
return $this->connected = true;
}
// Utility function to verify that the server responded with an "OK"
private static function checkOk(mixed $result, $operation = "Operation")
{
if ($result !== "OK") {
throw new \Exception("$operation failed with response: $result");
}
return true;
}
The idea here is that connect()
will create the TCP connection, then try the handshake, and verify the response is "OK". Now, let's implement these methods in detail. For the TCP connection, I'm using PHP's fsockopen()
function.
protected function createTcpConnection()
{
$filePointer = fsockopen($this->hostname, $this->port, $errorCode, $errorMessage, timeout: 3);
if ($filePointer === false) {
throw new \Exception("Failed to connect to Faktory on {$this->hostname}:{$this->port}: $errorMessage (error code $errorCode)");
}
$this->connection = $filePointer;
stream_set_timeout($this->connection, seconds: 5);
}
As for the handshake, remember that it's a three-step process (HI, HELLO, OK):
First, we must read the HI
message. Since fsockopen
returns a file pointer, we can use it with other file-based functions (such as fwrite()
for writing, and fread()
and fgets()
for reading. I'll create a small wrapper method that we can use to read and parse a response with the Redis protocol parser:
protected function readLine()
{
$line = fgets($this->connection);
$messages = $this->responseParser->pushIncoming($line);
if (empty($messages)) return null;
return $messages[0]?->getValueNative();
}
Okay, now we can read the HI. It should contain the protocol version, so we can do a defensive check that ensures we're on a supported version.
protected function readHi()
{
$hi = $this->readLine();
if (empty($hi)) throw new \Exception("Handshake failed");
$version = json_decode(str_replace("HI ", "", $hi))->v;
if (intval($version) > 2) echo "Expected Faktory protocol v2 or lower; found $version";
}
Next, the HELLO. Like readLine()
, I'll create a method to send a command and args via fwrite()
, and use that:
protected function sendHello()
{
$workerInfo = json_encode($this->workerInfo, JSON_THROW_ON_ERROR);
$this->send("HELLO", $workerInfo);
}
protected function send($command, ...$args): void
{
fwrite(
$this->connection, $command . " " . join(' ', $args) . "\r\n"
);
}
An important detail is that we have to end our messages with carriage-return + newline (\r\n
), as the Faktory protocol spec dictates (although it also seems to work with only \n
). I wasted an hour debugging because I forgot to include the newline.😬
The last piece in our handshake is to get the response (which will then be checked for an OK):
public function handshake()
{
$this->readHi();
$this->sendHello();
return $this->readLine();
}
And with that, the test passes. ✅
So let's move forward. Next test: PUSH and FETCH.
it('can push to and fetch from the Faktory server', function () {
$client = new TcpClient;
$client->connect();
$job = [
"jid" => "123861239abnadsa",
"jobtype" => "SomeJobClass",
"args" => [1, 2, true, "hello"],
];
expect($client->push($job))->toBeTrue();
expect($client->fetch(queues: "default"))->toEqual($job);
});
Note that I'm doing pushing and fetching as one test because I haven't yet found a way to auto-initialize Faktory with existing jobs. Which reminds me...I'm currently testing against my local Faktory, which means my tests won't work in CI. I should either mock the Faktory server (wouldn't recommend this, as I think it's a brittle approach), or get Faktory running in CI (and also figure out how to start it up with existing states, maybe).
To make this pass, we can reuse our existing building blocks of send()
and readLine()
:
public function push(array $job)
{
$this->send("PUSH", json_encode($job, JSON_THROW_ON_ERROR));
return self::checkOk($this->readLine(), operation: "Job push");
}
public function fetch(string ...$queues)
{
$this->send("FETCH", ...$queues);
// The first line of the response just contains the length of the next line; skip it
$this->readLine();
return json_decode($this->readLine(), true, JSON_THROW_ON_ERROR);
}
And now, the test passes...sorta:
Failed asserting that two arrays are equal.
at C:\Users\shalvah\Projects\faktory-php\tests\tcp_client_test.php:21
19▕ expect($client->push($job))->toBeTrue();
20▕
➜ 21▕ expect($client->fetch(queues: "default"))->toEqual($job);
22▕ });
23▕
--- Expected
+++ Actual
@@ @@
'jobtype' => 'SomeJobClass'
'args' => Array (...)
+ 'queue' => 'default'
+ 'created_at' => '2023-01-27T01:52:24.7119459Z'
+ 'enqueued_at' => '2023-01-27T21:21:15.3376584Z'
+ 'retry' => 25
Tests: 1 failed, 1 passed
No biggie. Faktory adds some extra field to the job when storing, and returns those to use, so we need to adjust the test a bit to ignore the time fields and expect the others:
$fetched = $client->fetch(queues: "default");
expect($fetched['created_at'])->not->toBeEmpty();
unset($fetched['created_at']);
expect($fetched['enqueued_at'])->not->toBeEmpty();
unset($fetched['enqueued_at']);
expect($fetched)->toEqual(array_merge($job, ['queue' => 'default', 'retry' => 25]));
And everything is good!
Well, that wasn't too hard (I'm lying; it took a bunch of trial and error 😅).
Here are some things I've noted during this task that I'd like to address soon:
- Logging: I'd like to easier see what was being sent and received. i was doing this manually with some
dump()
calls, but it should be a log setting on the client I can easily turn on and off. - Custom exceptions. Throwing just
\Exception
is never a good idea. Custom exception classes allow users to handle specific errors. But I'll do this later, when I have a larger sample of the kinds of errors we can encounter (thus far, I know we can at least have one for connection failure, and one for not-OK response). -
Error handling. Even with custom exceptions, there's still room for a mix-up, thanks to PHP's wonky error handling. By default, if
fsockopen
fails to establish the connection (eg Faktory server is down), PHP will only log a warning, return false and continue executing. This is why I had to add a check for false and throw an exception. So, supposing we throw a customConnectionFailed
class, this code will work:
$client = new TcpClient; try { $client->connect(); // Supposing connection fails } catch (ConnectionFailed $e) { dump($e::class); // "ConnectionFailed" }
However, in many PHP frameworks, a custom error handler is registered which auto-converts warnings to
ErrorException
s (or possibly some other error type), in which case your custom exception will never get thrown, but the framework's exception will!
set_error_handler(function ($code, $message) { throw new ErrorException($message, $code); }); $client = new TcpClient; try { $client->connect(); // Supposing connection fails } catch (ConnectionFailed $e) { dump($e::class); // ❌ Never called } // Script will crash with an ErrorException
Anyway, that's a topic for another time. Until then!
Top comments (0)