Add handling for delayed message to redis transport
This commit is contained in:
parent
a0cefaa103
commit
cfece10569
|
@ -20,7 +20,7 @@ env:
|
||||||
- MIN_PHP=7.1.3
|
- MIN_PHP=7.1.3
|
||||||
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
|
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
|
||||||
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
|
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
|
||||||
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
|
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
|
||||||
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
|
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
|
@ -59,7 +59,7 @@ before_install:
|
||||||
- |
|
- |
|
||||||
# Start Redis cluster
|
# Start Redis cluster
|
||||||
docker pull grokzen/redis-cluster:5.0.4
|
docker pull grokzen/redis-cluster:5.0.4
|
||||||
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
|
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
|
||||||
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
|
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
|
||||||
|
|
||||||
- |
|
- |
|
||||||
|
|
|
@ -23,6 +23,7 @@ CHANGELOG
|
||||||
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
|
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
|
||||||
* The component is not marked as `@experimental` anymore.
|
* The component is not marked as `@experimental` anymore.
|
||||||
* Marked the `MessengerDataCollector` class as `@final`.
|
* Marked the `MessengerDataCollector` class as `@final`.
|
||||||
|
* Added support for `DelayStamp` to the `redis` transport.
|
||||||
|
|
||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
|
|
@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Transport\RedisExt\Connection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @requires extension redis
|
* @requires extension redis
|
||||||
|
* @group time-sensitive
|
||||||
*/
|
*/
|
||||||
class RedisExtIntegrationTest extends TestCase
|
class RedisExtIntegrationTest extends TestCase
|
||||||
{
|
{
|
||||||
|
@ -31,7 +32,7 @@ class RedisExtIntegrationTest extends TestCase
|
||||||
|
|
||||||
$this->redis = new \Redis();
|
$this->redis = new \Redis();
|
||||||
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
|
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
|
||||||
$this->clearRedis();
|
$this->connection->cleanup();
|
||||||
$this->connection->setup();
|
$this->connection->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,11 +56,48 @@ class RedisExtIntegrationTest extends TestCase
|
||||||
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
|
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function clearRedis()
|
public function testConnectionSendWithSameContent()
|
||||||
{
|
{
|
||||||
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
|
$body = '{"message": "Hi"}';
|
||||||
$pathParts = explode('/', $parsedUrl['path'] ?? '');
|
$headers = ['type' => DummyMessage::class];
|
||||||
$stream = $pathParts[1] ?? 'symfony';
|
|
||||||
$this->redis->del($stream);
|
$this->connection->add($body, $headers);
|
||||||
|
$this->connection->add($body, $headers);
|
||||||
|
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertEquals($body, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
|
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertEquals($body, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testConnectionSendAndGetDelayed()
|
||||||
|
{
|
||||||
|
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 500);
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertNull($encoded);
|
||||||
|
sleep(2);
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
|
||||||
|
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testConnectionSendDelayedMessagesWithSameContent()
|
||||||
|
{
|
||||||
|
$body = '{"message": "Hi"}';
|
||||||
|
$headers = ['type' => DummyMessage::class];
|
||||||
|
|
||||||
|
$this->connection->add($body, $headers, 500);
|
||||||
|
$this->connection->add($body, $headers, 500);
|
||||||
|
sleep(2);
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertEquals($body, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
|
|
||||||
|
$encoded = $this->connection->get();
|
||||||
|
$this->assertEquals($body, $encoded['body']);
|
||||||
|
$this->assertEquals($headers, $encoded['headers']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ class Connection
|
||||||
|
|
||||||
private $connection;
|
private $connection;
|
||||||
private $stream;
|
private $stream;
|
||||||
|
private $queue;
|
||||||
private $group;
|
private $group;
|
||||||
private $consumer;
|
private $consumer;
|
||||||
private $autoSetup;
|
private $autoSetup;
|
||||||
|
@ -65,6 +66,7 @@ class Connection
|
||||||
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
|
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
|
||||||
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
|
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
|
||||||
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
|
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
|
||||||
|
$this->queue = $this->stream.'__queue';
|
||||||
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
||||||
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
||||||
}
|
}
|
||||||
|
@ -125,6 +127,34 @@ class Connection
|
||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
|
||||||
|
} catch (\RedisException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($queuedMessageCount) {
|
||||||
|
for ($i = 0; $i < $queuedMessageCount; ++$i) {
|
||||||
|
try {
|
||||||
|
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
|
||||||
|
} catch (\RedisException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($queuedMessages as $queuedMessage => $time) {
|
||||||
|
$queuedMessage = json_decode($queuedMessage, true);
|
||||||
|
// if a futured placed message is actually popped because of a race condition with
|
||||||
|
// another running message consumer, the message is readded to the queue by add function
|
||||||
|
// else its just added stream and will be available for all stream consumers
|
||||||
|
$this->add(
|
||||||
|
$queuedMessage['body'],
|
||||||
|
$queuedMessage['headers'],
|
||||||
|
$time - $this->getCurrentTimeInMilliseconds()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$messageId = '>'; // will receive new messages
|
$messageId = '>'; // will receive new messages
|
||||||
|
|
||||||
if ($this->couldHavePendingMessages) {
|
if ($this->couldHavePendingMessages) {
|
||||||
|
@ -203,24 +233,40 @@ class Connection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function add(string $body, array $headers): void
|
public function add(string $body, array $headers, int $delayInMs = 0): void
|
||||||
{
|
{
|
||||||
if ($this->autoSetup) {
|
if ($this->autoSetup) {
|
||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if ($this->maxEntries) {
|
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
|
||||||
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
$message = json_encode([
|
||||||
['body' => $body, 'headers' => $headers]
|
'body' => $body,
|
||||||
)], $this->maxEntries, true);
|
'headers' => $headers,
|
||||||
|
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
|
||||||
|
'uniqid' => uniqid('', true),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs);
|
||||||
|
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
|
||||||
} else {
|
} else {
|
||||||
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
$message = json_encode([
|
||||||
['body' => $body, 'headers' => $headers]
|
'body' => $body,
|
||||||
)]);
|
'headers' => $headers,
|
||||||
|
]);
|
||||||
|
|
||||||
|
if ($this->maxEntries) {
|
||||||
|
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
|
||||||
|
} else {
|
||||||
|
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
if ($error = $this->connection->getLastError() ?: null) {
|
||||||
|
$this->connection->clearLastError();
|
||||||
|
}
|
||||||
|
throw new TransportException($error ?? $e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$added) {
|
if (!$added) {
|
||||||
|
@ -246,4 +292,15 @@ class Connection
|
||||||
|
|
||||||
$this->autoSetup = false;
|
$this->autoSetup = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function getCurrentTimeInMilliseconds(): int
|
||||||
|
{
|
||||||
|
return (int) (microtime(true) * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function cleanup(): void
|
||||||
|
{
|
||||||
|
$this->connection->del($this->stream);
|
||||||
|
$this->connection->del($this->queue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
namespace Symfony\Component\Messenger\Transport\RedisExt;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
|
@ -37,7 +38,11 @@ class RedisSender implements SenderInterface
|
||||||
{
|
{
|
||||||
$encodedMessage = $this->serializer->encode($envelope);
|
$encodedMessage = $this->serializer->encode($envelope);
|
||||||
|
|
||||||
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []);
|
/** @var DelayStamp|null $delayStamp */
|
||||||
|
$delayStamp = $envelope->last(DelayStamp::class);
|
||||||
|
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
|
||||||
|
|
||||||
|
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
|
||||||
|
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue