minor #31298 [Messenger] Ensure that a TransportException is thrown on redis error (chalasr)
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Ensure that a TransportException is thrown on redis error
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | n/a
| License | MIT
| Doc PR | n/a
<!--
Write a short README entry for your feature/bugfix here (replace this comment block.)
This will help people understand your PR and can be used as a start of the Doc PR.
Additionally:
- Bug fixes must be submitted against the lowest branch where they apply
(lowest branches are regularly merged to upper ones so they get the fixes too).
- Features and deprecations must be submitted against the master branch.
-->
Commits
-------
1857395137
[Messenger] Ensure that a TransportException is thrown on redis error
This commit is contained in:
commit
8aa02358d6
@ -12,7 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||
|
||||
/**
|
||||
@ -103,7 +103,7 @@ class ConnectionTest extends TestCase
|
||||
|
||||
public function testUnexpectedRedisError()
|
||||
{
|
||||
$this->expectException(LogicException::class);
|
||||
$this->expectException(TransportException::class);
|
||||
$this->expectExceptionMessage('Redis error happens');
|
||||
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
|
||||
|
@ -12,7 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
|
||||
/**
|
||||
* A Redis connection.
|
||||
@ -77,17 +77,21 @@ class Connection
|
||||
$messageId = '0'; // will receive consumers pending messages
|
||||
}
|
||||
|
||||
$messages = $this->connection->xreadgroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
1,
|
||||
$this->blockingTimeout
|
||||
);
|
||||
$e = null;
|
||||
try {
|
||||
$messages = $this->connection->xReadGroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
1,
|
||||
$this->blockingTimeout
|
||||
);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (false === $messages) {
|
||||
throw new LogicException(
|
||||
$this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
|
||||
if (false === $messages || $e) {
|
||||
throw new TransportException(
|
||||
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
|
||||
);
|
||||
}
|
||||
|
||||
@ -113,23 +117,51 @@ class Connection
|
||||
|
||||
public function ack(string $id): void
|
||||
{
|
||||
$this->connection->xack($this->stream, $this->group, [$id]);
|
||||
$e = null;
|
||||
try {
|
||||
$acknowledged = $this->connection->xAck($this->stream, $this->group, [$id]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$acknowledged || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function reject(string $id): void
|
||||
{
|
||||
$this->connection->xdel($this->stream, [$id]);
|
||||
$e = null;
|
||||
try {
|
||||
$deleted = $this->connection->xDel($this->stream, [$id]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$deleted || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function add(string $body, array $headers)
|
||||
{
|
||||
$this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
||||
['body' => $body, 'headers' => $headers]
|
||||
)]);
|
||||
$e = null;
|
||||
try {
|
||||
$added = $this->connection->xAdd($this->stream, '*', ['message' => json_encode(
|
||||
['body' => $body, 'headers' => $headers]
|
||||
)]);
|
||||
} catch (\RedisException $e) {
|
||||
}
|
||||
|
||||
if (!$added || $e) {
|
||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
{
|
||||
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
|
||||
try {
|
||||
$this->connection->xGroup('CREATE', $this->stream, $this->group, 0, true);
|
||||
} catch (\RedisException $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user