feature #31977 Add handling for delayed message to redis transport (alexander-schranz)

This PR was merged into the 4.4 branch.

Discussion
----------

Add handling for delayed message to redis transport

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | WIP
| Fixed tickets | Fixes #31711
| License       | MIT
| Doc PR        | symfony/symfony-docs#... TODO

Still in WIP: This pull request implements delayed messages for redis transport. It will park the messages in an own sorted set and if the time comes it will push the messages to the stream to make them available for all consumers. Because of a race condition when having multiple consumers it need to be checked if not accidently a message from the future is popped by zpopmin so the add function is called and there is check if the delay is in the present/past and only then add the message to the stream.

Commits
-------

cfece10569 Add handling for delayed message to redis transport
This commit is contained in:
Robin Chalas 2019-11-05 20:03:26 +01:00
commit a307733751
5 changed files with 119 additions and 18 deletions

View File

@ -20,7 +20,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
matrix:
@ -59,7 +59,7 @@ before_install:
- |
# Start Redis cluster
docker pull grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |

View File

@ -23,6 +23,7 @@ CHANGELOG
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* The component is not marked as `@experimental` anymore.
* Marked the `MessengerDataCollector` class as `@final`.
* Added support for `DelayStamp` to the `redis` transport.
4.3.0
-----

View File

@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Transport\RedisExt\Connection;
/**
* @requires extension redis
* @group time-sensitive
*/
class RedisExtIntegrationTest extends TestCase
{
@ -31,7 +32,7 @@ class RedisExtIntegrationTest extends TestCase
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$this->clearRedis();
$this->connection->cleanup();
$this->connection->setup();
}
@ -55,11 +56,48 @@ class RedisExtIntegrationTest extends TestCase
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
private function clearRedis()
public function testConnectionSendWithSameContent()
{
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? 'symfony';
$this->redis->del($stream);
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers);
$this->connection->add($body, $headers);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
public function testConnectionSendAndGetDelayed()
{
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 500);
$encoded = $this->connection->get();
$this->assertNull($encoded);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testConnectionSendDelayedMessagesWithSameContent()
{
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers, 500);
$this->connection->add($body, $headers, 500);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
}

View File

@ -38,6 +38,7 @@ class Connection
private $connection;
private $stream;
private $queue;
private $group;
private $consumer;
private $autoSetup;
@ -65,6 +66,7 @@ class Connection
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
}
@ -125,6 +127,34 @@ class Connection
$this->setup();
}
try {
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($queuedMessageCount) {
for ($i = 0; $i < $queuedMessageCount; ++$i) {
try {
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
foreach ($queuedMessages as $queuedMessage => $time) {
$queuedMessage = json_decode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add(
$queuedMessage['body'],
$queuedMessage['headers'],
$time - $this->getCurrentTimeInMilliseconds()
);
}
}
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
@ -203,24 +233,40 @@ class Connection
}
}
public function add(string $body, array $headers): void
public function add(string $body, array $headers, int $delayInMs = 0): void
{
if ($this->autoSetup) {
$this->setup();
}
try {
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)], $this->maxEntries, true);
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
$message = json_encode([
'body' => $body,
'headers' => $headers,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid('', true),
]);
$score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs);
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
$message = json_encode([
'body' => $body,
'headers' => $headers,
]);
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
}
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? $e->getMessage(), 0, $e);
}
if (!$added) {
@ -246,4 +292,15 @@ class Connection
$this->autoSetup = false;
}
private function getCurrentTimeInMilliseconds(): int
{
return (int) (microtime(true) * 1000);
}
public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -37,7 +38,11 @@ class RedisSender implements SenderInterface
{
$encodedMessage = $this->serializer->encode($envelope);
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
return $envelope;
}