[Messenger] Fix Redis Connection::get() after reject()

This commit is contained in:
Robin Chalas 2019-05-01 15:19:10 +02:00
parent ad6e888a6f
commit c05273f793
2 changed files with 22 additions and 4 deletions

View File

@ -112,4 +112,22 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testGetAfterReject()
{
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
try {
$connection->setup();
} catch (TransportException $e) {
}
$connection->add('1', []);
$connection->add('2', []);
$failing = $connection->get();
$connection->reject($failing['id']);
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());
}
}

View File

@ -89,7 +89,7 @@ class Connection
} catch (\RedisException $e) {
}
if (false === $messages || $e) {
if ($e || (false === $messages && !$this->couldHavePendingMessages)) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
@ -123,7 +123,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$acknowledged || $e) {
if ($e || !$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
}
}
@ -136,7 +136,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$deleted || $e) {
if ($e || !$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
}
}
@ -151,7 +151,7 @@ class Connection
} catch (\RedisException $e) {
}
if (!$added || $e) {
if ($e || !$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
}
}