[Messenger] Fix redis last error not cleared between calls

This commit is contained in:
Robin Chalas 2019-07-24 18:10:47 +02:00
parent a07d6a2d53
commit 9c263ffca8
2 changed files with 59 additions and 15 deletions

View File

@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection; use Symfony\Component\Messenger\Transport\RedisExt\Connection;
/** /**
* @requires extension redis * @requires extension redis >= 4.3.0
*/ */
class ConnectionTest extends TestCase class ConnectionTest extends TestCase
{ {
@ -119,7 +119,7 @@ class ConnectionTest extends TestCase
$redis->expects($this->once())->method('xreadgroup')->willReturn(false); $redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens'); $redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis); $connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
$connection->get(); $connection->get();
} }
@ -152,4 +152,31 @@ class ConnectionTest extends TestCase
$connection->reject($message['id']); $connection->reject($message['id']);
$redis->del('messenger-getnonblocking'); $redis->del('messenger-getnonblocking');
} }
public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xadd')->willReturn(0);
$redis->expects($this->once())->method('xack')->willReturn(0);
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
$redis->expects($this->exactly(2))->method('clearLastError');
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
try {
$connection->add('message', []);
} catch (TransportException $e) {
}
$this->assertSame('xadd error', $e->getMessage());
try {
$connection->ack('1');
} catch (TransportException $e) {
}
$this->assertSame('xack error', $e->getMessage());
}
} }

View File

@ -114,12 +114,15 @@ class Connection
1 1
); );
} catch (\RedisException $e) { } catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} }
if ($e || false === $messages) { if (false === $messages) {
throw new TransportException( if ($error = $this->connection->getLastError() ?: null) {
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.' $this->connection->clearLastError();
); }
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
} }
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) { if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
@ -144,28 +147,34 @@ class Connection
public function ack(string $id): void public function ack(string $id): void
{ {
$e = null;
try { try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]); $acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
} catch (\RedisException $e) { } catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} }
if ($e || !$acknowledged) { if (!$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e); if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
} }
} }
public function reject(string $id): void public function reject(string $id): void
{ {
$e = null;
try { try {
$deleted = $this->connection->xack($this->stream, $this->group, [$id]); $deleted = $this->connection->xack($this->stream, $this->group, [$id]);
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted; $deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
} catch (\RedisException $e) { } catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} }
if ($e || !$deleted) { if (!$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e); if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
} }
} }
@ -175,16 +184,19 @@ class Connection
$this->setup(); $this->setup();
} }
$e = null;
try { try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers] ['body' => $body, 'headers' => $headers]
)]); )]);
} catch (\RedisException $e) { } catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} }
if ($e || !$added) { if (!$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e); if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
} }
} }
@ -196,6 +208,11 @@ class Connection
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
// group might already exist, ignore
if ($this->connection->getLastError()) {
$this->connection->clearLastError();
}
$this->autoSetup = false; $this->autoSetup = false;
} }
} }