Implemted receiving of old pending messages
This commit is contained in:
parent
33e2735d14
commit
9cb6fdfabb
@ -123,7 +123,7 @@ class ConnectionTest extends TestCase
|
|||||||
|
|
||||||
$redis->expects($this->exactly(3))->method('xreadgroup')
|
$redis->expects($this->exactly(3))->method('xreadgroup')
|
||||||
->with('symfony', 'consumer', ['queue' => 0], 1, null)
|
->with('symfony', 'consumer', ['queue' => 0], 1, null)
|
||||||
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
|
->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]);
|
||||||
|
|
||||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||||
$this->assertNotNull($connection->get());
|
$this->assertNotNull($connection->get());
|
||||||
@ -164,23 +164,67 @@ class ConnectionTest extends TestCase
|
|||||||
$this->assertSame(2, $redis->getDbNum());
|
$this->assertSame(2, $redis->getDbNum());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testFirstGetPendingMessagesThenNewMessages()
|
public function testGetPendingMessageFirst()
|
||||||
{
|
{
|
||||||
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
$count = 0;
|
$redis->expects($this->exactly(1))->method('xreadgroup')
|
||||||
|
->with('symfony', 'consumer', ['queue' => '0'], 1, null)
|
||||||
|
->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]);
|
||||||
|
|
||||||
$redis->expects($this->exactly(2))->method('xreadgroup')
|
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||||
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
|
$connection->get();
|
||||||
++$count;
|
|
||||||
|
|
||||||
if (1 === $count) {
|
|
||||||
return '0' === $arr_streams['queue'];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return '>' === $arr_streams['queue'];
|
public function testClaimAbandonedMessageWithRaceCondition()
|
||||||
}), 1, null)
|
{
|
||||||
->willReturn(['queue' => []]);
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
|
$redis->expects($this->exactly(3))->method('xreadgroup')
|
||||||
|
->withConsecutive(
|
||||||
|
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
|
||||||
|
['symfony', 'consumer', ['queue' => '0'], 1, null], // sencond call because of claimed message (redisid-123)
|
||||||
|
['symfony', 'consumer', ['queue' => '>'], 1, null] // third call because of no result (other consumer claimed message redisid-123)
|
||||||
|
)
|
||||||
|
->willReturnOnConsecutiveCalls([], [], []);
|
||||||
|
|
||||||
|
$redis->expects($this->once())->method('xpending')->willReturn([[
|
||||||
|
0 => 'redisid-123', // message-id
|
||||||
|
1 => 'consumer-2', // consumer-name
|
||||||
|
2 => 3600001, // idle
|
||||||
|
]]);
|
||||||
|
|
||||||
|
$redis->expects($this->exactly(1))->method('xclaim')
|
||||||
|
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
|
||||||
|
->willReturn([]);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||||
|
$connection->get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testClaimAbandonedMessage()
|
||||||
|
{
|
||||||
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
|
$redis->expects($this->exactly(2))->method('xreadgroup')
|
||||||
|
->withConsecutive(
|
||||||
|
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
|
||||||
|
['symfony', 'consumer', ['queue' => '0'], 1, null] // sencond call because of claimed message (redisid-123)
|
||||||
|
)
|
||||||
|
->willReturnOnConsecutiveCalls(
|
||||||
|
[], // first call returns no result
|
||||||
|
['queue' => [['message' => '{"body":"1","headers":[]}']]] // second call returns clamed message (redisid-123)
|
||||||
|
);
|
||||||
|
|
||||||
|
$redis->expects($this->once())->method('xpending')->willReturn([[
|
||||||
|
0 => 'redisid-123', // message-id
|
||||||
|
1 => 'consumer-2', // consumer-name
|
||||||
|
2 => 3600001, // idle
|
||||||
|
]]);
|
||||||
|
|
||||||
|
$redis->expects($this->exactly(1))->method('xclaim')
|
||||||
|
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
|
||||||
|
->willReturn([]);
|
||||||
|
|
||||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||||
$connection->get();
|
$connection->get();
|
||||||
|
@ -100,4 +100,87 @@ class RedisExtIntegrationTest extends TestCase
|
|||||||
$this->assertEquals($body, $encoded['body']);
|
$this->assertEquals($body, $encoded['body']);
|
||||||
$this->assertEquals($headers, $encoded['headers']);
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testConnectionBelowRedeliverTimeout()
|
||||||
|
{
|
||||||
|
// lower redeliver timeout and claim interval
|
||||||
|
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
|
||||||
|
|
||||||
|
$connection->cleanup();
|
||||||
|
$connection->setup();
|
||||||
|
|
||||||
|
$body = '{"message": "Hi"}';
|
||||||
|
$headers = ['type' => DummyMessage::class];
|
||||||
|
|
||||||
|
// Add two messages
|
||||||
|
$connection->add($body, $headers);
|
||||||
|
|
||||||
|
// Read first message with other consumer
|
||||||
|
$this->redis->xreadgroup(
|
||||||
|
$this->getConnectionGroup($connection),
|
||||||
|
'other-consumer2',
|
||||||
|
[$this->getConnectionStream($connection) => '>'],
|
||||||
|
1
|
||||||
|
);
|
||||||
|
|
||||||
|
// Queue will not have any messages yet
|
||||||
|
$this->assertNull($connection->get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testConnectionClaimAndRedeliver()
|
||||||
|
{
|
||||||
|
// lower redeliver timeout and claim interval
|
||||||
|
$connection = Connection::fromDsn(
|
||||||
|
getenv('MESSENGER_REDIS_DSN'),
|
||||||
|
['redeliver_timeout' => 0, 'claim_interval' => 500],
|
||||||
|
$this->redis
|
||||||
|
);
|
||||||
|
|
||||||
|
$connection->cleanup();
|
||||||
|
$connection->setup();
|
||||||
|
|
||||||
|
$body1 = '{"message": "Hi"}';
|
||||||
|
$body2 = '{"message": "Bye"}';
|
||||||
|
$headers = ['type' => DummyMessage::class];
|
||||||
|
|
||||||
|
// Add two messages
|
||||||
|
$connection->add($body1, $headers);
|
||||||
|
$connection->add($body2, $headers);
|
||||||
|
|
||||||
|
// Read first message with other consumer
|
||||||
|
$this->redis->xreadgroup(
|
||||||
|
$this->getConnectionGroup($connection),
|
||||||
|
'other-consumer2',
|
||||||
|
[$this->getConnectionStream($connection) => '>'],
|
||||||
|
1
|
||||||
|
);
|
||||||
|
|
||||||
|
// Queue will return the pending message first because redeliver_timeout = 0
|
||||||
|
$encoded = $connection->get();
|
||||||
|
$this->assertEquals($body1, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
|
$connection->ack($encoded['id']);
|
||||||
|
|
||||||
|
// Queue will return the second message
|
||||||
|
$encoded = $connection->get();
|
||||||
|
$this->assertEquals($body2, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
|
$connection->ack($encoded['id']);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getConnectionGroup(Connection $connection): string
|
||||||
|
{
|
||||||
|
$property = (new \ReflectionClass(Connection::class))->getProperty('group');
|
||||||
|
$property->setAccessible(true);
|
||||||
|
|
||||||
|
return $property->getValue($connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getConnectionStream(Connection $connection): string
|
||||||
|
{
|
||||||
|
$property = (new \ReflectionClass(Connection::class))->getProperty('stream');
|
||||||
|
$property->setAccessible(true);
|
||||||
|
|
||||||
|
return $property->getValue($connection);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,8 @@ class Connection
|
|||||||
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
||||||
'dbindex' => 0,
|
'dbindex' => 0,
|
||||||
'tls' => false,
|
'tls' => false,
|
||||||
|
'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
|
||||||
|
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
|
||||||
];
|
];
|
||||||
|
|
||||||
private $connection;
|
private $connection;
|
||||||
@ -44,6 +46,9 @@ class Connection
|
|||||||
private $consumer;
|
private $consumer;
|
||||||
private $autoSetup;
|
private $autoSetup;
|
||||||
private $maxEntries;
|
private $maxEntries;
|
||||||
|
private $redeliverTimeout;
|
||||||
|
private $nextClaim = 0;
|
||||||
|
private $claimInterval;
|
||||||
private $couldHavePendingMessages = true;
|
private $couldHavePendingMessages = true;
|
||||||
|
|
||||||
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
||||||
@ -70,6 +75,8 @@ class Connection
|
|||||||
$this->queue = $this->stream.'__queue';
|
$this->queue = $this->stream.'__queue';
|
||||||
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
||||||
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
||||||
|
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
|
||||||
|
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
||||||
@ -113,6 +120,18 @@ class Connection
|
|||||||
unset($redisOptions['tls']);
|
unset($redisOptions['tls']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$redeliverTimeout = null;
|
||||||
|
if (\array_key_exists('redeliver_timeout', $redisOptions)) {
|
||||||
|
$redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], FILTER_VALIDATE_INT);
|
||||||
|
unset($redisOptions['redeliver_timeout']);
|
||||||
|
}
|
||||||
|
|
||||||
|
$claimInterval = null;
|
||||||
|
if (\array_key_exists('claim_interval', $redisOptions)) {
|
||||||
|
$claimInterval = filter_var($redisOptions['claim_interval'], FILTER_VALIDATE_INT);
|
||||||
|
unset($redisOptions['claim_interval']);
|
||||||
|
}
|
||||||
|
|
||||||
$configuration = [
|
$configuration = [
|
||||||
'stream' => $redisOptions['stream'] ?? null,
|
'stream' => $redisOptions['stream'] ?? null,
|
||||||
'group' => $redisOptions['group'] ?? null,
|
'group' => $redisOptions['group'] ?? null,
|
||||||
@ -120,6 +139,8 @@ class Connection
|
|||||||
'auto_setup' => $autoSetup,
|
'auto_setup' => $autoSetup,
|
||||||
'stream_max_entries' => $maxEntries,
|
'stream_max_entries' => $maxEntries,
|
||||||
'dbindex' => $dbIndex,
|
'dbindex' => $dbIndex,
|
||||||
|
'redeliver_timeout' => $redeliverTimeout,
|
||||||
|
'claim_interval' => $claimInterval,
|
||||||
];
|
];
|
||||||
|
|
||||||
if (isset($parsedUrl['host'])) {
|
if (isset($parsedUrl['host'])) {
|
||||||
@ -157,6 +178,49 @@ class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function claimOldPendingMessages()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
|
||||||
|
// https://github.com/antirez/redis/issues/6256
|
||||||
|
$pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1);
|
||||||
|
} catch (\RedisException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
|
|
||||||
|
$claimableIds = [];
|
||||||
|
foreach ($pendingMessages as $pendingMessage) {
|
||||||
|
if ($pendingMessage[1] === $this->consumer) {
|
||||||
|
$this->couldHavePendingMessages = true;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($pendingMessage[2] >= $this->redeliverTimeout) {
|
||||||
|
$claimableIds[] = $pendingMessage[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (\count($claimableIds) > 0) {
|
||||||
|
try {
|
||||||
|
$this->connection->xclaim(
|
||||||
|
$this->stream,
|
||||||
|
$this->group,
|
||||||
|
$this->consumer,
|
||||||
|
$this->redeliverTimeout,
|
||||||
|
$claimableIds,
|
||||||
|
['JUSTID']
|
||||||
|
);
|
||||||
|
|
||||||
|
$this->couldHavePendingMessages = true;
|
||||||
|
} catch (\RedisException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->nextClaim = $this->getCurrentTimeInMilliseconds() + $this->claimInterval;
|
||||||
|
}
|
||||||
|
|
||||||
public function get(): ?array
|
public function get(): ?array
|
||||||
{
|
{
|
||||||
if ($this->autoSetup) {
|
if ($this->autoSetup) {
|
||||||
@ -191,6 +255,10 @@ class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!$this->couldHavePendingMessages && $this->nextClaim <= $this->getCurrentTimeInMilliseconds()) {
|
||||||
|
$this->claimOldPendingMessages();
|
||||||
|
}
|
||||||
|
|
||||||
$messageId = '>'; // will receive new messages
|
$messageId = '>'; // will receive new messages
|
||||||
|
|
||||||
if ($this->couldHavePendingMessages) {
|
if ($this->couldHavePendingMessages) {
|
||||||
|
Reference in New Issue
Block a user