From 135c6504f1766ce116cb1a87055250a76dd7aacf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Wed, 6 May 2020 15:47:05 +0200 Subject: [PATCH] [Messenger] Add option to prevent Redis from deleting messages on rejection --- .../Messenger/Bridge/Redis/CHANGELOG.md | 6 ++++++ .../Redis/Tests/Transport/ConnectionTest.php | 15 +++++++++++++++ .../Bridge/Redis/Transport/Connection.php | 18 +++++++++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md index ddf294b6ae..050bef29f6 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md @@ -1,6 +1,12 @@ CHANGELOG ========= +5.2.0 +----- + + * Added a `delete_after_reject` option to the DSN to allow control over message + deletion, similar to `delete_after_ack`. + 5.1.0 ----- diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php index eff5951e75..ab1a3217b1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php @@ -346,6 +346,21 @@ class ConnectionTest extends TestCase $connection->ack('1'); } + public function testDeleteAfterReject() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + + $redis->expects($this->exactly(1))->method('xack') + ->with('queue', 'symfony', ['1']) + ->willReturn(1); + $redis->expects($this->exactly(1))->method('xdel') + ->with('queue', ['1']) + ->willReturn(1); + + $connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis); // 1 = always + $connection->reject('1'); + } + public function testLastErrorGetsCleared() { $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index b0a4279ea1..984b03f82c 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -33,6 +33,7 @@ class Connection 'consumer' => 'consumer', 'auto_setup' => true, 'delete_after_ack' => false, + 'delete_after_reject' => true, 'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries 'dbindex' => 0, 'tls' => false, @@ -51,6 +52,7 @@ class Connection private $nextClaim = 0; private $claimInterval; private $deleteAfterAck; + private $deleteAfterReject; private $couldHavePendingMessages = true; public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) @@ -89,6 +91,7 @@ class Connection $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']; $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries']; $this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack']; + $this->deleteAfterReject = $configuration['delete_after_reject'] ?? self::DEFAULT_OPTIONS['delete_after_reject']; $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000; $this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval']; } @@ -128,6 +131,12 @@ class Connection unset($redisOptions['delete_after_ack']); } + $deleteAfterReject = null; + if (\array_key_exists('delete_after_reject', $redisOptions)) { + $deleteAfterReject = filter_var($redisOptions['delete_after_reject'], \FILTER_VALIDATE_BOOLEAN); + unset($redisOptions['delete_after_reject']); + } + $dbIndex = null; if (\array_key_exists('dbindex', $redisOptions)) { $dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT); @@ -159,6 +168,7 @@ class Connection 'auto_setup' => $autoSetup, 'stream_max_entries' => $maxEntries, 'delete_after_ack' => $deleteAfterAck, + 'delete_after_reject' => $deleteAfterReject, 'dbindex' => $dbIndex, 'redeliver_timeout' => $redeliverTimeout, 'claim_interval' => $claimInterval, @@ -348,7 +358,9 @@ class Connection { try { $deleted = $this->connection->xack($this->stream, $this->group, [$id]); - $deleted = $this->connection->xdel($this->stream, [$id]) && $deleted; + if ($this->deleteAfterReject) { + $deleted = $this->connection->xdel($this->stream, [$id]) && $deleted; + } } catch (\RedisException $e) { throw new TransportException($e->getMessage(), 0, $e); } @@ -426,7 +438,7 @@ class Connection $this->connection->clearLastError(); } - if ($this->deleteAfterAck) { + if ($this->deleteAfterAck || $this->deleteAfterReject) { $groups = $this->connection->xinfo('GROUPS', $this->stream); if ( // support for Redis extension version 5+ @@ -434,7 +446,7 @@ class Connection // support for Redis extension version 4.x || (\is_string($groups) && substr_count($groups, '"name"')) ) { - throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream)); + throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream)); } }