Implement redis transport

This commit is contained in:
soyuka 2018-09-28 17:26:27 +02:00 committed by Alexander Schranz
parent fba11b4dc3
commit 7162d2ec1d
13 changed files with 904 additions and 0 deletions

View File

@ -0,0 +1,54 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('redis://');
}
public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection('queue', array(
'host' => 'localhost',
'port' => 6379,
)),
Connection::fromDsn('redis://localhost/queue')
);
}
public function testOverrideOptionsViaQueryParameters()
{
$this->assertEquals(
new Connection('queue', array(
'host' => '127.0.0.1',
'port' => 6379,
), array(
'processing_ttl' => '8000',
)),
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
);
}
}

View File

@ -0,0 +1,43 @@
<?php
$componentRoot = $_SERVER['COMPONENT_ROOT'];
if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
}
if (!file_exists($autoload)) {
exit('You should run "composer install --dev" in the component before running this script.');
}
require_once $autoload;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new RedisReceiver($connection, $serializer);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($envelope)
{
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
sleep(30);
echo "Done.\n";
}
});
echo "Receiving messages...\n";
$worker->run();

View File

@ -0,0 +1,146 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @requires extension redis
*/
class RedisExtIntegrationTest extends TestCase
{
protected function setUp()
{
parent::setUp();
if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}
}
public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
$sender = new RedisSender($connection, $serializer);
$receiver = new RedisReceiver($connection, $serializer);
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}
public function testItReceivesSignals()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
$sender = new RedisSender($connection, $serializer);
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
'COMPONENT_ROOT' => __DIR__.'/../../../',
'DSN' => $dsn,
));
$process->start();
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
$signalTime = microtime(true);
$timedOutTime = time() + 10;
$process->signal(15);
while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
}
/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
$receiver = new RedisReceiver($connection, $serializer);
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
while (time() < $timedOutTime) {
if (0 === strpos($process->getOutput(), $output)) {
return;
}
usleep(100 * 1000); // 100ms
}
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
}
}

View File

@ -0,0 +1,118 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @requires extension redis
*/
class RedisReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('ack')->with($encoded);
$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}
public function testItSendNoMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn(null);
$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertNull($envelope);
$receiver->stop();
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('requeue')->with($encoded);
$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('reject')->with($encoded);
$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}
}
class InterruptException extends \Exception
{
}
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
{
}

View File

@ -0,0 +1,40 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @requires extension redis
*/
class RedisSenderTest extends TestCase
{
public function testItSendsTheEncodedMessage()
{
$envelope = Envelope::wrap(new DummyMessage('Oy'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('add')->with($encoded);
$sender = new RedisSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,43 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class RedisTransportFactoryTest extends TestCase
{
public function testSupportsOnlyRedisTransports()
{
$factory = new RedisTransportFactory(
$this->getMockBuilder(SerializerInterface::class)->getMock()
);
$this->assertTrue($factory->supports('redis://localhost', array()));
$this->assertFalse($factory->supports('sqs://localhost', array()));
$this->assertFalse($factory->supports('invalid-dsn', array()));
}
public function testItCreatesTheTransport()
{
$factory = new RedisTransportFactory(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
);
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', array('foo' => 'bar'), true), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', array('foo' => 'bar')));
}
}

View File

@ -0,0 +1,61 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @requires extension redis
*/
class RedisTransportTest extends TestCase
{
public function testItIsATransport()
{
$transport = $this->getTransport();
$this->assertInstanceOf(TransportInterface::class, $transport);
}
public function testReceivesMessages()
{
$transport = $this->getTransport(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
);
$decodedMessage = new DummyMessage('Decoded.');
$encodedMessage = array('body' => 'body', 'headers' => array('my' => 'header'));
$serializer->method('decode')->with($encodedMessage)->willReturn(Envelope::wrap($decodedMessage));
$connection->method('waitAndGet')->willReturn($encodedMessage);
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
$transport->stop();
});
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
{
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
return new RedisTransport($connection, $serializer);
}
}

View File

@ -0,0 +1,156 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class Connection
{
const PROCESSING_QUEUE_SUFFIX = '_processing';
const DEFAULT_CONNECTION_CREDENTIALS = array('host' => '127.0.0.1', 'port' => 6379);
const DEFAULT_REDIS_OPTIONS = array('serializer' => \Redis::SERIALIZER_PHP, 'processing_ttl' => 10000, 'blocking_timeout' => 1000);
/**
* @var \Redis
*/
private $connection;
/**
* @var string
*/
private $queue;
public function __construct(string $queue, array $connectionCredentials = self::DEFAULT_CONNECTION_CREDENTIALS, array $redisOptions = self::DEFAULT_REDIS_OPTIONS)
{
$this->connection = new \Redis();
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
// We force this because we rely on the fact that redis doesn't timeout with bRPopLPush
$this->connection->setOption(\Redis::OPT_READ_TIMEOUT, -1);
$this->queue = $queue;
$this->processingTtl = $redisOptions['processing_ttl'] ?? self::DEFAULT_REDIS_OPTIONS['processing_ttl'];
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? self::DEFAULT_REDIS_OPTIONS['blocking_timeout'];
}
public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_REDIS_OPTIONS): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : $redisOptions['queue'] ?? 'messages';
$connectionCredentials = array(
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
);
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $parsedQuery);
$redisOptions = array_replace_recursive($redisOptions, $parsedQuery);
}
return new self($queue, $connectionCredentials, $redisOptions);
}
/**
* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
* Also sets a key with TTL that will be checked by the `doCheck` method.
*/
public function waitAndGet(): ?array
{
$this->doCheck();
$value = $this->connection->bRPopLPush($this->queue, $this->queue.self::PROCESSING_QUEUE_SUFFIX, $this->blockingTimeout);
// false in case of timeout
if (false === $value) {
return null;
}
$key = md5($value['body']);
$this->connection->set($key, 1, array('px' => $this->processingTtl));
return $value;
}
/**
* Acknowledge the message:
* 1. Remove the ttl key
* 2. LREM the message from the processing list.
*/
public function ack($message)
{
$key = md5($message['body']);
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
$this->connection->multi()
->lRem($processingQueue, $message)
->del($key)
->exec();
}
/**
* Reject the message: we acknowledge it, means we remove it form the queues.
*
* @TODO: log something?
*/
public function reject($message)
{
$this->ack($message);
}
/**
* Requeue - add it back to the queue
* All we have to do is to make our key expire and let the `doCheck` system manage it.
*/
public function requeue($message)
{
$key = md5($message['body']);
$this->connection->expire($key, -1);
}
/**
* Add item at the tail of list.
*/
public function add($message)
{
$this->connection->lpush($this->queue, $message);
}
/**
* The check:
* 1. Get the processing queue items
* 2. Check if the TTL is over
* 3. If it is, rpush back the message to the origin queue.
*/
private function doCheck()
{
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
$pending = $this->connection->lRange($processingQueue, 0, -1);
foreach ($pending as $temp) {
$key = md5($temp['body']);
if ($this->connection->ttl($key) > 0) {
continue;
}
$this->connection
->multi()
->del($key)
->lRem($processingQueue, $temp, 1)
->rPush($this->queue, $temp)
->exec();
}
}
}

View File

@ -0,0 +1,25 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt\Exception;
/**
* If something goes wrong while consuming and handling a message from the Redis broker, there are two choices: rejecting
* or re-queuing the message.
*
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will
* be rejected. Otherwise, it will be re-queued.
*
* @author Antoine Bluchet <soyuka@gmail.com>
*/
interface RejectMessageExceptionInterface extends \Throwable
{
}

View File

@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
private $shouldStop = false;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
if (null === $message = $this->connection->waitAndGet()) {
$handler(null);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
continue;
}
try {
$handler($this->serializer->decode($message));
$this->connection->ack($message);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($message);
throw $e;
} catch (\Throwable $e) {
$this->connection->requeue($message);
throw $e;
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
}
}
public function stop(): void
{
$this->shouldStop = true;
}
}

View File

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisSender implements SenderInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope)
{
$this->connection->add($this->serializer->encode($envelope));
}
}

View File

@ -0,0 +1,68 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransport implements TransportInterface
{
private $connection;
private $serializer;
private $receiver;
private $sender;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($handler);
}
/**
* {@inheritdoc}
*/
public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): void
{
($this->sender ?? $this->getSender())->send($envelope);
}
private function getReceiver()
{
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
}
private function getSender()
{
return $this->sender = new RedisSender($this->connection, $this->serializer);
}
}

View File

@ -0,0 +1,40 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransportFactory implements TransportFactoryInterface
{
private $serializer;
public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer ?? Serializer::create();
}
public function createTransport(string $dsn, array $options): TransportInterface
{
return new RedisTransport(Connection::fromDsn($dsn, $options), $this->serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'redis://');
}
}