[Messenger] Fix Redis Connection::get() after reject()
This commit is contained in:
parent
ad6e888a6f
commit
c05273f793
@ -112,4 +112,22 @@ class ConnectionTest extends TestCase
|
|||||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||||
$connection->get();
|
$connection->get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testGetAfterReject()
|
||||||
|
{
|
||||||
|
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
|
||||||
|
try {
|
||||||
|
$connection->setup();
|
||||||
|
} catch (TransportException $e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
$connection->add('1', []);
|
||||||
|
$connection->add('2', []);
|
||||||
|
|
||||||
|
$failing = $connection->get();
|
||||||
|
$connection->reject($failing['id']);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
|
||||||
|
$this->assertNotNull($connection->get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ class Connection
|
|||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (false === $messages || $e) {
|
if ($e || (false === $messages && !$this->couldHavePendingMessages)) {
|
||||||
throw new TransportException(
|
throw new TransportException(
|
||||||
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
|
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
|
||||||
);
|
);
|
||||||
@ -123,7 +123,7 @@ class Connection
|
|||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$acknowledged || $e) {
|
if ($e || !$acknowledged) {
|
||||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
|
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,7 +136,7 @@ class Connection
|
|||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$deleted || $e) {
|
if ($e || !$deleted) {
|
||||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
|
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -151,7 +151,7 @@ class Connection
|
|||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$added || $e) {
|
if ($e || !$added) {
|
||||||
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
|
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user