[Messenger] Ensure that a TransportException is thrown on redis error

This commit is contained in:
Robin Chalas 2019-04-28 01:35:59 +02:00
parent f24e9a4973
commit 1857395137
2 changed files with 51 additions and 19 deletions

View File

@ -12,7 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
/**
@ -103,7 +103,7 @@ class ConnectionTest extends TestCase
public function testUnexpectedRedisError()
{
$this->expectException(LogicException::class);
$this->expectException(TransportException::class);
$this->expectExceptionMessage('Redis error happens');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);

View File

@ -12,7 +12,7 @@
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* A Redis connection.
@ -77,17 +77,21 @@ class Connection
$messageId = '0'; // will receive consumers pending messages
}
$messages = $this->connection->xreadgroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
1,
$this->blockingTimeout
);
$e = null;
try {
$messages = $this->connection->xReadGroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
1,
$this->blockingTimeout
);
} catch (\RedisException $e) {
}
if (false === $messages) {
throw new LogicException(
$this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
if (false === $messages || $e) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
}
@ -113,23 +117,51 @@ class Connection
public function ack(string $id): void
{
$this->connection->xack($this->stream, $this->group, [$id]);
$e = null;
try {
$acknowledged = $this->connection->xAck($this->stream, $this->group, [$id]);
} catch (\RedisException $e) {
}
if (!$acknowledged || $e) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
}
}
public function reject(string $id): void
{
$this->connection->xdel($this->stream, [$id]);
$e = null;
try {
$deleted = $this->connection->xDel($this->stream, [$id]);
} catch (\RedisException $e) {
}
if (!$deleted || $e) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
}
}
public function add(string $body, array $headers)
{
$this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
$e = null;
try {
$added = $this->connection->xAdd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
} catch (\RedisException $e) {
}
if (!$added || $e) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
}
}
public function setup(): void
{
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
try {
$this->connection->xGroup('CREATE', $this->stream, $this->group, 0, true);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
}