[Messenger] Added support for auto trimming of redis streams

This commit is contained in:
Yanick Witschi 2019-06-03 17:58:20 +02:00 committed by Samuel ROZE
parent c8f3cef35c
commit 7fe06bc5f6
3 changed files with 40 additions and 6 deletions

View File

@ -6,6 +6,7 @@ CHANGELOG
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
4.3.0
-----

View File

@ -42,13 +42,13 @@ class ConnectionTest extends TestCase
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'stream_max_entries' => 20000], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false, 'stream_max_entries' => 20000])
);
}
@ -142,4 +142,16 @@ class ConnectionTest extends TestCase
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}
public function testMaxEntries()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('xadd')
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
->willReturn(1);
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
$connection->add('1', []);
}
}

View File

@ -32,6 +32,7 @@ class Connection
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
];
private $connection;
@ -39,6 +40,7 @@ class Connection
private $group;
private $consumer;
private $autoSetup;
private $maxEntries;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -50,6 +52,7 @@ class Connection
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
}
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -79,7 +82,19 @@ class Connection
unset($redisOptions['auto_setup']);
}
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
$maxEntries = null;
if (\array_key_exists('stream_max_entries', $redisOptions)) {
$maxEntries = filter_var($redisOptions['stream_max_entries'], FILTER_VALIDATE_INT);
unset($redisOptions['stream_max_entries']);
}
return new self([
'stream' => $stream,
'group' => $group,
'consumer' => $consumer,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
], $connectionCredentials, $redisOptions, $redis);
}
public function get(): ?array
@ -166,9 +181,15 @@ class Connection
$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
}
} catch (\RedisException $e) {
}