From c05273f793650d3bfa802a150a938f0e1fc2f6fe Mon Sep 17 00:00:00 2001 From: Robin Chalas Date: Wed, 1 May 2019 15:19:10 +0200 Subject: [PATCH] [Messenger] Fix Redis Connection::get() after reject() --- .../Transport/RedisExt/ConnectionTest.php | 18 ++++++++++++++++++ .../Transport/RedisExt/Connection.php | 8 ++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index c48bfd4d56..fae72fa781 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -112,4 +112,22 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); $connection->get(); } + + public function testGetAfterReject() + { + $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget'); + try { + $connection->setup(); + } catch (TransportException $e) { + } + + $connection->add('1', []); + $connection->add('2', []); + + $failing = $connection->get(); + $connection->reject($failing['id']); + + $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget'); + $this->assertNotNull($connection->get()); + } } diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index c90a475406..6219c46a5f 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -89,7 +89,7 @@ class Connection } catch (\RedisException $e) { } - if (false === $messages || $e) { + if ($e || (false === $messages && !$this->couldHavePendingMessages)) { throw new TransportException( ($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.' ); @@ -123,7 +123,7 @@ class Connection } catch (\RedisException $e) { } - if (!$acknowledged || $e) { + if ($e || !$acknowledged) { throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e); } } @@ -136,7 +136,7 @@ class Connection } catch (\RedisException $e) { } - if (!$deleted || $e) { + if ($e || !$deleted) { throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e); } } @@ -151,7 +151,7 @@ class Connection } catch (\RedisException $e) { } - if (!$added || $e) { + if ($e || !$added) { throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e); } }