bug #31545 [Messenger] Fix redis Connection::get() should be non blocking by default (chalasr)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Fix redis Connection::get() should be non blocking by default

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | n/a
| License       | MIT
| Doc PR        | todo

The `\Redis::xreadgroup()` method waits until a message arrives or the specified timeout is reached before returning, which means that `RedisExt\Connection::get()` is blocking.
That's inconsistent with other transports which all returns immediately in case there is no message, for instance the AMQP transport uses `\Amqp::get()` instead of `\Amqp::consume()` for this reason.
It also short-circuits the worker's stop logic: both the `--time-limit` option of the `messenger:consume` command and the `messenger:stop-workers` don't work with the redis transport.
This returns early in case the message count is 0 and no blocking timeout has been configured.

Commits
-------

229502a89a [Messenger] Make redis Connection::get() non blocking by default
This commit is contained in:
Fabien Potencier 2019-05-21 06:50:56 +02:00
commit 6fb05dc8dc
2 changed files with 14 additions and 13 deletions

View File

@ -46,9 +46,9 @@ class ConnectionTest extends TestCase
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
);
}
@ -59,9 +59,9 @@ class ConnectionTest extends TestCase
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
);
}
@ -134,16 +134,20 @@ class ConnectionTest extends TestCase
$redis->del('messenger-rejectthenget');
}
public function testBlockingTimeout()
public function testGetNonBlocking()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-blockingtimeout', ['blocking_timeout' => 1], $redis);
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}
$this->assertNull($connection->get());
$redis->del('messenger-blockingtimeout');
$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}
}

View File

@ -31,7 +31,6 @@ class Connection
private $stream;
private $group;
private $consumer;
private $blockingTimeout;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -42,7 +41,6 @@ class Connection
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
$this->group = $configuration['group'] ?? '' ?: 'symfony';
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
}
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -83,8 +81,7 @@ class Connection
$this->group,
$this->consumer,
[$this->stream => $messageId],
1,
$this->blockingTimeout
1
);
} catch (\RedisException $e) {
}
@ -142,7 +139,7 @@ class Connection
}
}
public function add(string $body, array $headers)
public function add(string $body, array $headers): void
{
$e = null;
try {