[Messenger] Ensure that a TransportException is thrown on redis error
This commit is contained in:
parent
f24e9a4973
commit
1857395137
@ -12,7 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
|
||||
/**
|
||||
@ -103,7 +103,7 @@ class ConnectionTest extends TestCase
|
||||
|
||||
public function testUnexpectedRedisError()
|
||||
{
|
||||
$this->expectException(LogicException::class);
|
||||
$this->expectException(TransportException::class);
|
||||
$this->expectExceptionMessage('Redis error happens');
|
||||
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
|
||||
|
@ -12,7 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
|
||||
/**
|
||||
* A Redis connection.
|
||||
@ -77,17 +77,21 @@ class Connection
|
||||
$messageId = '0'; // will receive consumers pending messages
|
||||
}
|
||||
|
||||
$messages = $this->connection->xreadgroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
1,
|
||||
$this->blockingTimeout
|
||||
);
|
||||
$e = null;
|
||||
try {
|
||||
$messages = $this->connection->xReadGroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
1,
|
||||
$this->blockingTimeout
|
||||
);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (false === $messages) {
|
||||
throw new LogicException(
|
||||
$this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
|
||||
if (false === $messages || $e) {
|
||||
throw new TransportException(
|
||||
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
|
||||
);
|
||||
}
|
||||
|
||||
@ -113,23 +117,51 @@ class Connection
|
||||
|
||||
public function ack(string $id): void
|
||||
{
|
||||
$this->connection->xack($this->stream, $this->group, [$id]);
|
||||
$e = null;
|
||||
try {
|
||||
$acknowledged = $this->connection->xAck($this->stream, $this->group, [$id]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$acknowledged || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function reject(string $id): void
|
||||
{
|
||||
$this->connection->xdel($this->stream, [$id]);
|
||||
$e = null;
|
||||
try {
|
||||
$deleted = $this->connection->xDel($this->stream, [$id]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$deleted || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function add(string $body, array $headers)
|
||||
{
|
||||
$this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
||||
['body' => $body, 'headers' => $headers]
|
||||
)]);
|
||||
$e = null;
|
||||
try {
|
||||
$added = $this->connection->xAdd($this->stream, '*', ['message' => json_encode(
|
||||
['body' => $body, 'headers' => $headers]
|
||||
)]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$added || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
{
|
||||
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
|
||||
try {
|
||||
$this->connection->xGroup('CREATE', $this->stream, $this->group, 0, true);
|
||||
} catch (\RedisException $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user