diff --git a/.travis.yml b/.travis.yml index 86ae88730c..691f8ee2d6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ env: - MIN_PHP=7.1.3 - SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages - - MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages + - MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages - SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1 matrix: @@ -59,7 +59,7 @@ before_install: - | # Start Redis cluster docker pull grokzen/redis-cluster:5.0.4 - docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4 + docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4 export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' - | diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 939ce83833..7ae4a2b63c 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -23,6 +23,7 @@ CHANGELOG * [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`. * The component is not marked as `@experimental` anymore. * Marked the `MessengerDataCollector` class as `@final`. + * Added support for `DelayStamp` to the `redis` transport. 4.3.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php index 66ada676ed..e2375511d6 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Transport\RedisExt\Connection; /** * @requires extension redis + * @group time-sensitive */ class RedisExtIntegrationTest extends TestCase { @@ -31,7 +32,7 @@ class RedisExtIntegrationTest extends TestCase $this->redis = new \Redis(); $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis); - $this->clearRedis(); + $this->connection->cleanup(); $this->connection->setup(); } @@ -55,11 +56,48 @@ class RedisExtIntegrationTest extends TestCase $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); } - private function clearRedis() + public function testConnectionSendWithSameContent() { - $parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN')); - $pathParts = explode('/', $parsedUrl['path'] ?? ''); - $stream = $pathParts[1] ?? 'symfony'; - $this->redis->del($stream); + $body = '{"message": "Hi"}'; + $headers = ['type' => DummyMessage::class]; + + $this->connection->add($body, $headers); + $this->connection->add($body, $headers); + + $encoded = $this->connection->get(); + $this->assertEquals($body, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); + + $encoded = $this->connection->get(); + $this->assertEquals($body, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); + } + + public function testConnectionSendAndGetDelayed() + { + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 500); + $encoded = $this->connection->get(); + $this->assertNull($encoded); + sleep(2); + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + } + + public function testConnectionSendDelayedMessagesWithSameContent() + { + $body = '{"message": "Hi"}'; + $headers = ['type' => DummyMessage::class]; + + $this->connection->add($body, $headers, 500); + $this->connection->add($body, $headers, 500); + sleep(2); + $encoded = $this->connection->get(); + $this->assertEquals($body, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); + + $encoded = $this->connection->get(); + $this->assertEquals($body, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); } } diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index 2cb5fb233a..33d6057f67 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -38,6 +38,7 @@ class Connection private $connection; private $stream; + private $queue; private $group; private $consumer; private $autoSetup; @@ -65,6 +66,7 @@ class Connection $this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream']; $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group']; $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer']; + $this->queue = $this->stream.'__queue'; $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']; $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries']; } @@ -125,6 +127,34 @@ class Connection $this->setup(); } + try { + $queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds()); + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + if ($queuedMessageCount) { + for ($i = 0; $i < $queuedMessageCount; ++$i) { + try { + $queuedMessages = $this->connection->zpopmin($this->queue, 1); + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + foreach ($queuedMessages as $queuedMessage => $time) { + $queuedMessage = json_decode($queuedMessage, true); + // if a futured placed message is actually popped because of a race condition with + // another running message consumer, the message is readded to the queue by add function + // else its just added stream and will be available for all stream consumers + $this->add( + $queuedMessage['body'], + $queuedMessage['headers'], + $time - $this->getCurrentTimeInMilliseconds() + ); + } + } + } + $messageId = '>'; // will receive new messages if ($this->couldHavePendingMessages) { @@ -203,24 +233,40 @@ class Connection } } - public function add(string $body, array $headers): void + public function add(string $body, array $headers, int $delayInMs = 0): void { if ($this->autoSetup) { $this->setup(); } try { - if ($this->maxEntries) { - $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( - ['body' => $body, 'headers' => $headers] - )], $this->maxEntries, true); + if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message + $message = json_encode([ + 'body' => $body, + 'headers' => $headers, + // Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue + 'uniqid' => uniqid('', true), + ]); + + $score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs); + $added = $this->connection->zadd($this->queue, ['NX'], $score, $message); } else { - $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( - ['body' => $body, 'headers' => $headers] - )]); + $message = json_encode([ + 'body' => $body, + 'headers' => $headers, + ]); + + if ($this->maxEntries) { + $added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true); + } else { + $added = $this->connection->xadd($this->stream, '*', ['message' => $message]); + } } } catch (\RedisException $e) { - throw new TransportException($e->getMessage(), 0, $e); + if ($error = $this->connection->getLastError() ?: null) { + $this->connection->clearLastError(); + } + throw new TransportException($error ?? $e->getMessage(), 0, $e); } if (!$added) { @@ -246,4 +292,15 @@ class Connection $this->autoSetup = false; } + + private function getCurrentTimeInMilliseconds(): int + { + return (int) (microtime(true) * 1000); + } + + public function cleanup(): void + { + $this->connection->del($this->stream); + $this->connection->del($this->queue); + } } diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php index ecbdb1ed27..beda996870 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\RedisExt; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -37,7 +38,11 @@ class RedisSender implements SenderInterface { $encodedMessage = $this->serializer->encode($envelope); - $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []); + /** @var DelayStamp|null $delayStamp */ + $delayStamp = $envelope->last(DelayStamp::class); + $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0; + + $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs); return $envelope; }