From 9cb6fdfabb6af71ac22cd09a15e9fb7dbf6aab50 Mon Sep 17 00:00:00 2001 From: Toni Rudolf Date: Wed, 5 Feb 2020 09:54:08 +0100 Subject: [PATCH] Implemted receiving of old pending messages --- .../Redis/Tests/Transport/ConnectionTest.php | 66 ++++++++++++--- .../Transport/RedisExtIntegrationTest.php | 83 +++++++++++++++++++ .../Bridge/Redis/Transport/Connection.php | 68 +++++++++++++++ 3 files changed, 206 insertions(+), 11 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php index 50c3fef8f0..8530ae68b1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php @@ -123,7 +123,7 @@ class ConnectionTest extends TestCase $redis->expects($this->exactly(3))->method('xreadgroup') ->with('symfony', 'consumer', ['queue' => 0], 1, null) - ->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]); + ->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]); $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); $this->assertNotNull($connection->get()); @@ -164,23 +164,67 @@ class ConnectionTest extends TestCase $this->assertSame(2, $redis->getDbNum()); } - public function testFirstGetPendingMessagesThenNewMessages() + public function testGetPendingMessageFirst() { $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); - $count = 0; + $redis->expects($this->exactly(1))->method('xreadgroup') + ->with('symfony', 'consumer', ['queue' => '0'], 1, null) + ->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]); + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $connection->get(); + } + + public function testClaimAbandonedMessageWithRaceCondition() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + + $redis->expects($this->exactly(3))->method('xreadgroup') + ->withConsecutive( + ['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages + ['symfony', 'consumer', ['queue' => '0'], 1, null], // sencond call because of claimed message (redisid-123) + ['symfony', 'consumer', ['queue' => '>'], 1, null] // third call because of no result (other consumer claimed message redisid-123) + ) + ->willReturnOnConsecutiveCalls([], [], []); + + $redis->expects($this->once())->method('xpending')->willReturn([[ + 0 => 'redisid-123', // message-id + 1 => 'consumer-2', // consumer-name + 2 => 3600001, // idle + ]]); + + $redis->expects($this->exactly(1))->method('xclaim') + ->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID']) + ->willReturn([]); + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $connection->get(); + } + + public function testClaimAbandonedMessage() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); $redis->expects($this->exactly(2))->method('xreadgroup') - ->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) { - ++$count; + ->withConsecutive( + ['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages + ['symfony', 'consumer', ['queue' => '0'], 1, null] // sencond call because of claimed message (redisid-123) + ) + ->willReturnOnConsecutiveCalls( + [], // first call returns no result + ['queue' => [['message' => '{"body":"1","headers":[]}']]] // second call returns clamed message (redisid-123) + ); - if (1 === $count) { - return '0' === $arr_streams['queue']; - } + $redis->expects($this->once())->method('xpending')->willReturn([[ + 0 => 'redisid-123', // message-id + 1 => 'consumer-2', // consumer-name + 2 => 3600001, // idle + ]]); - return '>' === $arr_streams['queue']; - }), 1, null) - ->willReturn(['queue' => []]); + $redis->expects($this->exactly(1))->method('xclaim') + ->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID']) + ->willReturn([]); $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); $connection->get(); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php index cb65eddf03..4343c9c342 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php @@ -100,4 +100,87 @@ class RedisExtIntegrationTest extends TestCase $this->assertEquals($body, $encoded['body']); $this->assertEquals($headers, $encoded['headers']); } + + public function testConnectionBelowRedeliverTimeout() + { + // lower redeliver timeout and claim interval + $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis); + + $connection->cleanup(); + $connection->setup(); + + $body = '{"message": "Hi"}'; + $headers = ['type' => DummyMessage::class]; + + // Add two messages + $connection->add($body, $headers); + + // Read first message with other consumer + $this->redis->xreadgroup( + $this->getConnectionGroup($connection), + 'other-consumer2', + [$this->getConnectionStream($connection) => '>'], + 1 + ); + + // Queue will not have any messages yet + $this->assertNull($connection->get()); + } + + public function testConnectionClaimAndRedeliver() + { + // lower redeliver timeout and claim interval + $connection = Connection::fromDsn( + getenv('MESSENGER_REDIS_DSN'), + ['redeliver_timeout' => 0, 'claim_interval' => 500], + $this->redis + ); + + $connection->cleanup(); + $connection->setup(); + + $body1 = '{"message": "Hi"}'; + $body2 = '{"message": "Bye"}'; + $headers = ['type' => DummyMessage::class]; + + // Add two messages + $connection->add($body1, $headers); + $connection->add($body2, $headers); + + // Read first message with other consumer + $this->redis->xreadgroup( + $this->getConnectionGroup($connection), + 'other-consumer2', + [$this->getConnectionStream($connection) => '>'], + 1 + ); + + // Queue will return the pending message first because redeliver_timeout = 0 + $encoded = $connection->get(); + $this->assertEquals($body1, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); + $connection->ack($encoded['id']); + + // Queue will return the second message + $encoded = $connection->get(); + $this->assertEquals($body2, $encoded['body']); + $this->assertEquals($headers, $encoded['headers']); + $connection->ack($encoded['id']); + } + + private function getConnectionGroup(Connection $connection): string + { + $property = (new \ReflectionClass(Connection::class))->getProperty('group'); + $property->setAccessible(true); + + return $property->getValue($connection); + } + + private function getConnectionStream(Connection $connection): string + { + $property = (new \ReflectionClass(Connection::class))->getProperty('stream'); + $property->setAccessible(true); + + return $property->getValue($connection); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index 00cd63d7e1..f5499ec789 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -35,6 +35,8 @@ class Connection 'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries 'dbindex' => 0, 'tls' => false, + 'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds) + 'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked ]; private $connection; @@ -44,6 +46,9 @@ class Connection private $consumer; private $autoSetup; private $maxEntries; + private $redeliverTimeout; + private $nextClaim = 0; + private $claimInterval; private $couldHavePendingMessages = true; public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) @@ -70,6 +75,8 @@ class Connection $this->queue = $this->stream.'__queue'; $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']; $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries']; + $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000; + $this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval']; } public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self @@ -113,6 +120,18 @@ class Connection unset($redisOptions['tls']); } + $redeliverTimeout = null; + if (\array_key_exists('redeliver_timeout', $redisOptions)) { + $redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], FILTER_VALIDATE_INT); + unset($redisOptions['redeliver_timeout']); + } + + $claimInterval = null; + if (\array_key_exists('claim_interval', $redisOptions)) { + $claimInterval = filter_var($redisOptions['claim_interval'], FILTER_VALIDATE_INT); + unset($redisOptions['claim_interval']); + } + $configuration = [ 'stream' => $redisOptions['stream'] ?? null, 'group' => $redisOptions['group'] ?? null, @@ -120,6 +139,8 @@ class Connection 'auto_setup' => $autoSetup, 'stream_max_entries' => $maxEntries, 'dbindex' => $dbIndex, + 'redeliver_timeout' => $redeliverTimeout, + 'claim_interval' => $claimInterval, ]; if (isset($parsedUrl['host'])) { @@ -157,6 +178,49 @@ class Connection } } + private function claimOldPendingMessages() + { + try { + // This could soon be optimized with https://github.com/antirez/redis/issues/5212 or + // https://github.com/antirez/redis/issues/6256 + $pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1); + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + $claimableIds = []; + foreach ($pendingMessages as $pendingMessage) { + if ($pendingMessage[1] === $this->consumer) { + $this->couldHavePendingMessages = true; + + return; + } + + if ($pendingMessage[2] >= $this->redeliverTimeout) { + $claimableIds[] = $pendingMessage[0]; + } + } + + if (\count($claimableIds) > 0) { + try { + $this->connection->xclaim( + $this->stream, + $this->group, + $this->consumer, + $this->redeliverTimeout, + $claimableIds, + ['JUSTID'] + ); + + $this->couldHavePendingMessages = true; + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + $this->nextClaim = $this->getCurrentTimeInMilliseconds() + $this->claimInterval; + } + public function get(): ?array { if ($this->autoSetup) { @@ -191,6 +255,10 @@ class Connection } } + if (!$this->couldHavePendingMessages && $this->nextClaim <= $this->getCurrentTimeInMilliseconds()) { + $this->claimOldPendingMessages(); + } + $messageId = '>'; // will receive new messages if ($this->couldHavePendingMessages) {