feature #35384 [Messenger] Add receiving of old pending messages (redis) (toooni)

This PR was merged into the 5.1-dev branch.

Discussion
----------

[Messenger] Add receiving of old pending messages (redis)

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       |
| License       | MIT
| Doc PR        | https://github.com/symfony/symfony-docs/pull/12976

This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using `XPENDING` and `XCLAIM`.
Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a `claimTimeout`. The `claimTimeout` defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).
Using this solution makes the remarks (https://github.com/symfony/symfony-docs/pull/11869#pullrequestreview-257483666) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.

**Questions**

- [x] Which value should we use as default `claimTimeout`?
- [x] How should the `claimTimeout` be configured?
- [x] Feature or Bugfix?

I will create a docs PR and a PR for the other branches as soon as the questions are resolved.

Commits
-------

9cb6fdfabb Implemted receiving of old pending messages
This commit is contained in:
Fabien Potencier 2020-02-08 08:48:00 +01:00
commit 4bc1ea21b9
3 changed files with 206 additions and 11 deletions

View File

@ -123,7 +123,7 @@ class ConnectionTest extends TestCase
$redis->expects($this->exactly(3))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => 0], 1, null)
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->assertNotNull($connection->get());
@ -164,23 +164,67 @@ class ConnectionTest extends TestCase
$this->assertSame(2, $redis->getDbNum());
}
public function testFirstGetPendingMessagesThenNewMessages()
public function testGetPendingMessageFirst()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$count = 0;
$redis->expects($this->exactly(1))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => '0'], 1, null)
->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testClaimAbandonedMessageWithRaceCondition()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(3))->method('xreadgroup')
->withConsecutive(
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
['symfony', 'consumer', ['queue' => '0'], 1, null], // sencond call because of claimed message (redisid-123)
['symfony', 'consumer', ['queue' => '>'], 1, null] // third call because of no result (other consumer claimed message redisid-123)
)
->willReturnOnConsecutiveCalls([], [], []);
$redis->expects($this->once())->method('xpending')->willReturn([[
0 => 'redisid-123', // message-id
1 => 'consumer-2', // consumer-name
2 => 3600001, // idle
]]);
$redis->expects($this->exactly(1))->method('xclaim')
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
->willReturn([]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testClaimAbandonedMessage()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(2))->method('xreadgroup')
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
++$count;
->withConsecutive(
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
['symfony', 'consumer', ['queue' => '0'], 1, null] // sencond call because of claimed message (redisid-123)
)
->willReturnOnConsecutiveCalls(
[], // first call returns no result
['queue' => [['message' => '{"body":"1","headers":[]}']]] // second call returns clamed message (redisid-123)
);
if (1 === $count) {
return '0' === $arr_streams['queue'];
}
$redis->expects($this->once())->method('xpending')->willReturn([[
0 => 'redisid-123', // message-id
1 => 'consumer-2', // consumer-name
2 => 3600001, // idle
]]);
return '>' === $arr_streams['queue'];
}), 1, null)
->willReturn(['queue' => []]);
$redis->expects($this->exactly(1))->method('xclaim')
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
->willReturn([]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();

View File

@ -100,4 +100,87 @@ class RedisExtIntegrationTest extends TestCase
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
public function testConnectionBelowRedeliverTimeout()
{
// lower redeliver timeout and claim interval
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$connection->cleanup();
$connection->setup();
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
// Add two messages
$connection->add($body, $headers);
// Read first message with other consumer
$this->redis->xreadgroup(
$this->getConnectionGroup($connection),
'other-consumer2',
[$this->getConnectionStream($connection) => '>'],
1
);
// Queue will not have any messages yet
$this->assertNull($connection->get());
}
public function testConnectionClaimAndRedeliver()
{
// lower redeliver timeout and claim interval
$connection = Connection::fromDsn(
getenv('MESSENGER_REDIS_DSN'),
['redeliver_timeout' => 0, 'claim_interval' => 500],
$this->redis
);
$connection->cleanup();
$connection->setup();
$body1 = '{"message": "Hi"}';
$body2 = '{"message": "Bye"}';
$headers = ['type' => DummyMessage::class];
// Add two messages
$connection->add($body1, $headers);
$connection->add($body2, $headers);
// Read first message with other consumer
$this->redis->xreadgroup(
$this->getConnectionGroup($connection),
'other-consumer2',
[$this->getConnectionStream($connection) => '>'],
1
);
// Queue will return the pending message first because redeliver_timeout = 0
$encoded = $connection->get();
$this->assertEquals($body1, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$connection->ack($encoded['id']);
// Queue will return the second message
$encoded = $connection->get();
$this->assertEquals($body2, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$connection->ack($encoded['id']);
}
private function getConnectionGroup(Connection $connection): string
{
$property = (new \ReflectionClass(Connection::class))->getProperty('group');
$property->setAccessible(true);
return $property->getValue($connection);
}
private function getConnectionStream(Connection $connection): string
{
$property = (new \ReflectionClass(Connection::class))->getProperty('stream');
$property->setAccessible(true);
return $property->getValue($connection);
}
}

View File

@ -35,6 +35,8 @@ class Connection
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
'tls' => false,
'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
];
private $connection;
@ -44,6 +46,9 @@ class Connection
private $consumer;
private $autoSetup;
private $maxEntries;
private $redeliverTimeout;
private $nextClaim = 0;
private $claimInterval;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -70,6 +75,8 @@ class Connection
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
}
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -113,6 +120,18 @@ class Connection
unset($redisOptions['tls']);
}
$redeliverTimeout = null;
if (\array_key_exists('redeliver_timeout', $redisOptions)) {
$redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], FILTER_VALIDATE_INT);
unset($redisOptions['redeliver_timeout']);
}
$claimInterval = null;
if (\array_key_exists('claim_interval', $redisOptions)) {
$claimInterval = filter_var($redisOptions['claim_interval'], FILTER_VALIDATE_INT);
unset($redisOptions['claim_interval']);
}
$configuration = [
'stream' => $redisOptions['stream'] ?? null,
'group' => $redisOptions['group'] ?? null,
@ -120,6 +139,8 @@ class Connection
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'dbindex' => $dbIndex,
'redeliver_timeout' => $redeliverTimeout,
'claim_interval' => $claimInterval,
];
if (isset($parsedUrl['host'])) {
@ -157,6 +178,49 @@ class Connection
}
}
private function claimOldPendingMessages()
{
try {
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
// https://github.com/antirez/redis/issues/6256
$pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$claimableIds = [];
foreach ($pendingMessages as $pendingMessage) {
if ($pendingMessage[1] === $this->consumer) {
$this->couldHavePendingMessages = true;
return;
}
if ($pendingMessage[2] >= $this->redeliverTimeout) {
$claimableIds[] = $pendingMessage[0];
}
}
if (\count($claimableIds) > 0) {
try {
$this->connection->xclaim(
$this->stream,
$this->group,
$this->consumer,
$this->redeliverTimeout,
$claimableIds,
['JUSTID']
);
$this->couldHavePendingMessages = true;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
$this->nextClaim = $this->getCurrentTimeInMilliseconds() + $this->claimInterval;
}
public function get(): ?array
{
if ($this->autoSetup) {
@ -191,6 +255,10 @@ class Connection
}
}
if (!$this->couldHavePendingMessages && $this->nextClaim <= $this->getCurrentTimeInMilliseconds()) {
$this->claimOldPendingMessages();
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {