diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index 96f2942050..c48bfd4d56 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -12,7 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; use PHPUnit\Framework\TestCase; -use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\RedisExt\Connection; /** @@ -103,7 +103,7 @@ class ConnectionTest extends TestCase public function testUnexpectedRedisError() { - $this->expectException(LogicException::class); + $this->expectException(TransportException::class); $this->expectExceptionMessage('Redis error happens'); $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); $redis->expects($this->once())->method('xreadgroup')->willReturn(false); diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index 056159818e..90e7e2cb2c 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -12,7 +12,7 @@ namespace Symfony\Component\Messenger\Transport\RedisExt; use Symfony\Component\Messenger\Exception\InvalidArgumentException; -use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\TransportException; /** * A Redis connection. @@ -77,17 +77,21 @@ class Connection $messageId = '0'; // will receive consumers pending messages } - $messages = $this->connection->xreadgroup( - $this->group, - $this->consumer, - [$this->stream => $messageId], - 1, - $this->blockingTimeout - ); + $e = null; + try { + $messages = $this->connection->xReadGroup( + $this->group, + $this->consumer, + [$this->stream => $messageId], + 1, + $this->blockingTimeout + ); + } catch (\RedisException $e) { + } - if (false === $messages) { - throw new LogicException( - $this->connection->getLastError() ?: 'Unexpected redis stream error happened.' + if (false === $messages || $e) { + throw new TransportException( + ($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.' ); } @@ -113,23 +117,51 @@ class Connection public function ack(string $id): void { - $this->connection->xack($this->stream, $this->group, [$id]); + $e = null; + try { + $acknowledged = $this->connection->xAck($this->stream, $this->group, [$id]); + } catch (\RedisException $e) { + } + + if (!$acknowledged || $e) { + throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e); + } } public function reject(string $id): void { - $this->connection->xdel($this->stream, [$id]); + $e = null; + try { + $deleted = $this->connection->xDel($this->stream, [$id]); + } catch (\RedisException $e) { + } + + if (!$deleted || $e) { + throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e); + } } public function add(string $body, array $headers) { - $this->connection->xadd($this->stream, '*', ['message' => json_encode( - ['body' => $body, 'headers' => $headers] - )]); + $e = null; + try { + $added = $this->connection->xAdd($this->stream, '*', ['message' => json_encode( + ['body' => $body, 'headers' => $headers] + )]); + } catch (\RedisException $e) { + } + + if (!$added || $e) { + throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e); + } } public function setup(): void { - $this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true); + try { + $this->connection->xGroup('CREATE', $this->stream, $this->group, 0, true); + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } } }