Implement redis transport
This commit is contained in:
parent
fba11b4dc3
commit
7162d2ec1d
@ -0,0 +1,54 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
|
||||
/**
|
||||
* @requires extension redis
|
||||
*/
|
||||
class ConnectionTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @expectedException \InvalidArgumentException
|
||||
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
|
||||
*/
|
||||
public function testItCannotBeConstructedWithAWrongDsn()
|
||||
{
|
||||
Connection::fromDsn('redis://');
|
||||
}
|
||||
|
||||
public function testItGetsParametersFromTheDsn()
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection('queue', array(
|
||||
'host' => 'localhost',
|
||||
'port' => 6379,
|
||||
)),
|
||||
Connection::fromDsn('redis://localhost/queue')
|
||||
);
|
||||
}
|
||||
|
||||
public function testOverrideOptionsViaQueryParameters()
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection('queue', array(
|
||||
'host' => '127.0.0.1',
|
||||
'port' => 6379,
|
||||
), array(
|
||||
'processing_ttl' => '8000',
|
||||
)),
|
||||
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
|
||||
);
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
$componentRoot = $_SERVER['COMPONENT_ROOT'];
|
||||
|
||||
if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
|
||||
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
|
||||
}
|
||||
|
||||
if (!file_exists($autoload)) {
|
||||
exit('You should run "composer install --dev" in the component before running this script.');
|
||||
}
|
||||
|
||||
require_once $autoload;
|
||||
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
use Symfony\Component\Serializer as SerializerComponent;
|
||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$connection = Connection::fromDsn(getenv('DSN'));
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
|
||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||
public function dispatch($envelope)
|
||||
{
|
||||
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
|
||||
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
|
||||
|
||||
sleep(30);
|
||||
echo "Done.\n";
|
||||
}
|
||||
});
|
||||
|
||||
echo "Receiving messages...\n";
|
||||
$worker->run();
|
@ -0,0 +1,146 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Process\PhpProcess;
|
||||
use Symfony\Component\Process\Process;
|
||||
use Symfony\Component\Serializer as SerializerComponent;
|
||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||
|
||||
/**
|
||||
* @requires extension redis
|
||||
*/
|
||||
class RedisExtIntegrationTest extends TestCase
|
||||
{
|
||||
protected function setUp()
|
||||
{
|
||||
parent::setUp();
|
||||
|
||||
if (!getenv('MESSENGER_REDIS_DSN')) {
|
||||
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
|
||||
}
|
||||
}
|
||||
|
||||
public function testItSendsAndReceivesMessages()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
|
||||
|
||||
$sender = new RedisSender($connection, $serializer);
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
|
||||
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
||||
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
||||
|
||||
$receivedMessages = 0;
|
||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
|
||||
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
|
||||
|
||||
if (2 === ++$receivedMessages) {
|
||||
$receiver->stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function testItReceivesSignals()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
|
||||
|
||||
$sender = new RedisSender($connection, $serializer);
|
||||
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
||||
|
||||
$amqpReadTimeout = 30;
|
||||
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
|
||||
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
|
||||
'COMPONENT_ROOT' => __DIR__.'/../../../',
|
||||
'DSN' => $dsn,
|
||||
));
|
||||
|
||||
$process->start();
|
||||
|
||||
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
|
||||
|
||||
$signalTime = microtime(true);
|
||||
$timedOutTime = time() + 10;
|
||||
|
||||
$process->signal(15);
|
||||
|
||||
while ($process->isRunning() && time() < $timedOutTime) {
|
||||
usleep(100 * 1000); // 100ms
|
||||
}
|
||||
|
||||
$this->assertFalse($process->isRunning());
|
||||
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
|
||||
$this->assertSame($expectedOutput.<<<'TXT'
|
||||
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
||||
with items: [
|
||||
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
|
||||
]
|
||||
Done.
|
||||
|
||||
TXT
|
||||
, $process->getOutput());
|
||||
}
|
||||
|
||||
/**
|
||||
* @runInSeparateProcess
|
||||
*/
|
||||
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
|
||||
$receivedMessages = 0;
|
||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
||||
$this->assertNull($envelope);
|
||||
|
||||
if (2 === ++$receivedMessages) {
|
||||
$receiver->stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
|
||||
{
|
||||
$timedOutTime = time() + $timeoutInSeconds;
|
||||
|
||||
while (time() < $timedOutTime) {
|
||||
if (0 === strpos($process->getOutput(), $output)) {
|
||||
return;
|
||||
}
|
||||
|
||||
usleep(100 * 1000); // 100ms
|
||||
}
|
||||
|
||||
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
|
||||
}
|
||||
}
|
@ -0,0 +1,118 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Serializer as SerializerComponent;
|
||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||
|
||||
/**
|
||||
* @requires extension redis
|
||||
*/
|
||||
class RedisReceiverTest extends TestCase
|
||||
{
|
||||
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$envelope = Envelope::wrap(new DummyMessage('Hi'));
|
||||
$encoded = $serializer->encode($envelope);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('waitAndGet')->willReturn($encoded);
|
||||
|
||||
$connection->expects($this->once())->method('ack')->with($encoded);
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
||||
$receiver->stop();
|
||||
});
|
||||
}
|
||||
|
||||
public function testItSendNoMessageToTheHandler()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('waitAndGet')->willReturn(null);
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||
$this->assertNull($envelope);
|
||||
$receiver->stop();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
|
||||
*/
|
||||
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$envelope = Envelope::wrap(new DummyMessage('Hi'));
|
||||
$encoded = $serializer->encode($envelope);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('waitAndGet')->willReturn($encoded);
|
||||
$connection->expects($this->once())->method('requeue')->with($encoded);
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
$receiver->receive(function () {
|
||||
throw new InterruptException('Well...');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
|
||||
*/
|
||||
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||
);
|
||||
|
||||
$envelope = Envelope::wrap(new DummyMessage('Hi'));
|
||||
$encoded = $serializer->encode($envelope);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('waitAndGet')->willReturn($encoded);
|
||||
$connection->expects($this->once())->method('reject')->with($encoded);
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
$receiver->receive(function () {
|
||||
throw new WillNeverWorkException('Well...');
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class InterruptException extends \Exception
|
||||
{
|
||||
}
|
||||
|
||||
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
|
||||
{
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @requires extension redis
|
||||
*/
|
||||
class RedisSenderTest extends TestCase
|
||||
{
|
||||
public function testItSendsTheEncodedMessage()
|
||||
{
|
||||
$envelope = Envelope::wrap(new DummyMessage('Oy'));
|
||||
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
|
||||
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->expects($this->once())->method('add')->with($encoded);
|
||||
|
||||
$sender = new RedisSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
class RedisTransportFactoryTest extends TestCase
|
||||
{
|
||||
public function testSupportsOnlyRedisTransports()
|
||||
{
|
||||
$factory = new RedisTransportFactory(
|
||||
$this->getMockBuilder(SerializerInterface::class)->getMock()
|
||||
);
|
||||
|
||||
$this->assertTrue($factory->supports('redis://localhost', array()));
|
||||
$this->assertFalse($factory->supports('sqs://localhost', array()));
|
||||
$this->assertFalse($factory->supports('invalid-dsn', array()));
|
||||
}
|
||||
|
||||
public function testItCreatesTheTransport()
|
||||
{
|
||||
$factory = new RedisTransportFactory(
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
|
||||
);
|
||||
|
||||
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', array('foo' => 'bar'), true), $serializer);
|
||||
|
||||
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', array('foo' => 'bar')));
|
||||
}
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @requires extension redis
|
||||
*/
|
||||
class RedisTransportTest extends TestCase
|
||||
{
|
||||
public function testItIsATransport()
|
||||
{
|
||||
$transport = $this->getTransport();
|
||||
|
||||
$this->assertInstanceOf(TransportInterface::class, $transport);
|
||||
}
|
||||
|
||||
public function testReceivesMessages()
|
||||
{
|
||||
$transport = $this->getTransport(
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
$decodedMessage = new DummyMessage('Decoded.');
|
||||
$encodedMessage = array('body' => 'body', 'headers' => array('my' => 'header'));
|
||||
|
||||
$serializer->method('decode')->with($encodedMessage)->willReturn(Envelope::wrap($decodedMessage));
|
||||
$connection->method('waitAndGet')->willReturn($encodedMessage);
|
||||
|
||||
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
|
||||
$this->assertSame($decodedMessage, $envelope->getMessage());
|
||||
|
||||
$transport->stop();
|
||||
});
|
||||
}
|
||||
|
||||
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
|
||||
{
|
||||
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
|
||||
return new RedisTransport($connection, $serializer);
|
||||
}
|
||||
}
|
@ -0,0 +1,156 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
|
||||
/**
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
class Connection
|
||||
{
|
||||
const PROCESSING_QUEUE_SUFFIX = '_processing';
|
||||
const DEFAULT_CONNECTION_CREDENTIALS = array('host' => '127.0.0.1', 'port' => 6379);
|
||||
const DEFAULT_REDIS_OPTIONS = array('serializer' => \Redis::SERIALIZER_PHP, 'processing_ttl' => 10000, 'blocking_timeout' => 1000);
|
||||
|
||||
/**
|
||||
* @var \Redis
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private $queue;
|
||||
|
||||
public function __construct(string $queue, array $connectionCredentials = self::DEFAULT_CONNECTION_CREDENTIALS, array $redisOptions = self::DEFAULT_REDIS_OPTIONS)
|
||||
{
|
||||
$this->connection = new \Redis();
|
||||
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
|
||||
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
|
||||
// We force this because we rely on the fact that redis doesn't timeout with bRPopLPush
|
||||
$this->connection->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
$this->queue = $queue;
|
||||
$this->processingTtl = $redisOptions['processing_ttl'] ?? self::DEFAULT_REDIS_OPTIONS['processing_ttl'];
|
||||
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? self::DEFAULT_REDIS_OPTIONS['blocking_timeout'];
|
||||
}
|
||||
|
||||
public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_REDIS_OPTIONS): self
|
||||
{
|
||||
if (false === $parsedUrl = parse_url($dsn)) {
|
||||
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
|
||||
}
|
||||
|
||||
$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : $redisOptions['queue'] ?? 'messages';
|
||||
$connectionCredentials = array(
|
||||
'host' => $parsedUrl['host'] ?? '127.0.0.1',
|
||||
'port' => $parsedUrl['port'] ?? 6379,
|
||||
);
|
||||
|
||||
if (isset($parsedUrl['query'])) {
|
||||
parse_str($parsedUrl['query'], $parsedQuery);
|
||||
$redisOptions = array_replace_recursive($redisOptions, $parsedQuery);
|
||||
}
|
||||
|
||||
return new self($queue, $connectionCredentials, $redisOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
|
||||
* Also sets a key with TTL that will be checked by the `doCheck` method.
|
||||
*/
|
||||
public function waitAndGet(): ?array
|
||||
{
|
||||
$this->doCheck();
|
||||
$value = $this->connection->bRPopLPush($this->queue, $this->queue.self::PROCESSING_QUEUE_SUFFIX, $this->blockingTimeout);
|
||||
|
||||
// false in case of timeout
|
||||
if (false === $value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$key = md5($value['body']);
|
||||
$this->connection->set($key, 1, array('px' => $this->processingTtl));
|
||||
|
||||
return $value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acknowledge the message:
|
||||
* 1. Remove the ttl key
|
||||
* 2. LREM the message from the processing list.
|
||||
*/
|
||||
public function ack($message)
|
||||
{
|
||||
$key = md5($message['body']);
|
||||
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
|
||||
$this->connection->multi()
|
||||
->lRem($processingQueue, $message)
|
||||
->del($key)
|
||||
->exec();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reject the message: we acknowledge it, means we remove it form the queues.
|
||||
*
|
||||
* @TODO: log something?
|
||||
*/
|
||||
public function reject($message)
|
||||
{
|
||||
$this->ack($message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Requeue - add it back to the queue
|
||||
* All we have to do is to make our key expire and let the `doCheck` system manage it.
|
||||
*/
|
||||
public function requeue($message)
|
||||
{
|
||||
$key = md5($message['body']);
|
||||
$this->connection->expire($key, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add item at the tail of list.
|
||||
*/
|
||||
public function add($message)
|
||||
{
|
||||
$this->connection->lpush($this->queue, $message);
|
||||
}
|
||||
|
||||
/**
|
||||
* The check:
|
||||
* 1. Get the processing queue items
|
||||
* 2. Check if the TTL is over
|
||||
* 3. If it is, rpush back the message to the origin queue.
|
||||
*/
|
||||
private function doCheck()
|
||||
{
|
||||
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
|
||||
$pending = $this->connection->lRange($processingQueue, 0, -1);
|
||||
|
||||
foreach ($pending as $temp) {
|
||||
$key = md5($temp['body']);
|
||||
|
||||
if ($this->connection->ttl($key) > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->connection
|
||||
->multi()
|
||||
->del($key)
|
||||
->lRem($processingQueue, $temp, 1)
|
||||
->rPush($this->queue, $temp)
|
||||
->exec();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt\Exception;
|
||||
|
||||
/**
|
||||
* If something goes wrong while consuming and handling a message from the Redis broker, there are two choices: rejecting
|
||||
* or re-queuing the message.
|
||||
*
|
||||
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will
|
||||
* be rejected. Otherwise, it will be re-queued.
|
||||
*
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
interface RejectMessageExceptionInterface extends \Throwable
|
||||
{
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
class RedisReceiver implements ReceiverInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
private $shouldStop = false;
|
||||
|
||||
public function __construct(Connection $connection, SerializerInterface $serializer)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(callable $handler): void
|
||||
{
|
||||
while (!$this->shouldStop) {
|
||||
if (null === $message = $this->connection->waitAndGet()) {
|
||||
$handler(null);
|
||||
if (\function_exists('pcntl_signal_dispatch')) {
|
||||
pcntl_signal_dispatch();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$handler($this->serializer->decode($message));
|
||||
$this->connection->ack($message);
|
||||
} catch (RejectMessageExceptionInterface $e) {
|
||||
$this->connection->reject($message);
|
||||
|
||||
throw $e;
|
||||
} catch (\Throwable $e) {
|
||||
$this->connection->requeue($message);
|
||||
|
||||
throw $e;
|
||||
} finally {
|
||||
if (\function_exists('pcntl_signal_dispatch')) {
|
||||
pcntl_signal_dispatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->shouldStop = true;
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
class RedisSender implements SenderInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
|
||||
public function __construct(Connection $connection, SerializerInterface $serializer)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope)
|
||||
{
|
||||
$this->connection->add($this->serializer->encode($envelope));
|
||||
}
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
class RedisTransport implements TransportInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
private $receiver;
|
||||
private $sender;
|
||||
|
||||
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer ?? Serializer::create();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(callable $handler): void
|
||||
{
|
||||
($this->receiver ?? $this->getReceiver())->receive($handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function stop(): void
|
||||
{
|
||||
($this->receiver ?? $this->getReceiver())->stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope): void
|
||||
{
|
||||
($this->sender ?? $this->getSender())->send($envelope);
|
||||
}
|
||||
|
||||
private function getReceiver()
|
||||
{
|
||||
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
|
||||
}
|
||||
|
||||
private function getSender()
|
||||
{
|
||||
return $this->sender = new RedisSender($this->connection, $this->serializer);
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @author Antoine Bluchet <soyuka@gmail.com>
|
||||
*/
|
||||
class RedisTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private $serializer;
|
||||
|
||||
public function __construct(SerializerInterface $serializer = null)
|
||||
{
|
||||
$this->serializer = $serializer ?? Serializer::create();
|
||||
}
|
||||
|
||||
public function createTransport(string $dsn, array $options): TransportInterface
|
||||
{
|
||||
return new RedisTransport(Connection::fromDsn($dsn, $options), $this->serializer);
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === strpos($dsn, 'redis://');
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user