bug #31387 [Messenger] Fix Redis Connection::get() after reject() (chalasr)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Fix Redis Connection::get() after reject()

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  |no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | n/a
| License       | MIT
| Doc PR        | n/a

If a message is rejected, another consumer cannot read from the stream because the first subsequent call to `\Redis::xreadgroup()` returns false for some reason.
Reproducer: https://github.com/chalasr/redis-transport-bug

ping @alexander-schranz

Commits
-------

c05273f793 [Messenger] Fix Redis Connection::get() after reject()
This commit is contained in:
Fabien Potencier 2019-05-06 13:25:34 +02:00
commit 4e61ff535e
2 changed files with 22 additions and 4 deletions

View File

@ -112,4 +112,22 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testGetAfterReject()
{
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
try {
$connection->setup();
} catch (TransportException $e) {
}
$connection->add('1', []);
$connection->add('2', []);
$failing = $connection->get();
$connection->reject($failing['id']);
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());
}
}

View File

@ -89,7 +89,7 @@ class Connection
} catch (\RedisException $e) {
}
if (false === $messages || $e) {
if ($e || (false === $messages && !$this->couldHavePendingMessages)) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
@ -123,7 +123,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$acknowledged || $e) {
if ($e || !$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
}
}
@ -136,7 +136,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$deleted || $e) {
if ($e || !$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
}
}
@ -151,7 +151,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$added || $e) {
if ($e || !$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
}
}