bug #32726 [Messenger] Fix redis last error not cleared between calls (chalasr)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Fix redis last error not cleared between calls

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | -
| License       | MIT
| Doc PR        | -

Not clearing it gives misleading errors coming from previous calls which makes debugging hard.
@alexander-schranz FYI

Commits
-------

9c263ffca8 [Messenger] Fix redis last error not cleared between calls
This commit is contained in:
Fabien Potencier 2019-07-27 19:04:32 +02:00
commit 90c6482b7e
2 changed files with 59 additions and 15 deletions

View File

@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
/**
* @requires extension redis
* @requires extension redis >= 4.3.0
*/
class ConnectionTest extends TestCase
{
@ -119,7 +119,7 @@ class ConnectionTest extends TestCase
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
$connection->get();
}
@ -152,4 +152,31 @@ class ConnectionTest extends TestCase
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}
public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xadd')->willReturn(0);
$redis->expects($this->once())->method('xack')->willReturn(0);
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
$redis->expects($this->exactly(2))->method('clearLastError');
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
try {
$connection->add('message', []);
} catch (TransportException $e) {
}
$this->assertSame('xadd error', $e->getMessage());
try {
$connection->ack('1');
} catch (TransportException $e) {
}
$this->assertSame('xack error', $e->getMessage());
}
}

View File

@ -114,12 +114,15 @@ class Connection
1
);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($e || false === $messages) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
if (false === $messages) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
}
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
@ -144,28 +147,34 @@ class Connection
public function ack(string $id): void
{
$e = null;
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($e || !$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
if (!$acknowledged) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
}
}
public function reject(string $id): void
{
$e = null;
try {
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($e || !$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
if (!$deleted) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
}
}
@ -175,16 +184,19 @@ class Connection
$this->setup();
}
$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($e || !$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
if (!$added) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
}
}
@ -196,6 +208,11 @@ class Connection
throw new TransportException($e->getMessage(), 0, $e);
}
// group might already exist, ignore
if ($this->connection->getLastError()) {
$this->connection->clearLastError();
}
$this->autoSetup = false;
}
}