[Messenger] Make redis Connection::get() non blocking by default

This commit is contained in:
Robin Chalas 2019-05-19 17:43:57 +02:00
parent 2ecad3ffa9
commit 229502a89a
2 changed files with 14 additions and 13 deletions

View File

@ -46,9 +46,9 @@ class ConnectionTest extends TestCase
'host' => 'localhost', 'host' => 'localhost',
'port' => 6379, 'port' => 6379,
], [ ], [
'blocking_timeout' => 30, 'serializer' => 2,
]), ]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30]) Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
); );
} }
@ -59,9 +59,9 @@ class ConnectionTest extends TestCase
'host' => 'localhost', 'host' => 'localhost',
'port' => 6379, 'port' => 6379,
], [ ], [
'blocking_timeout' => 30, 'serializer' => 2,
]), ]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30') Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
); );
} }
@ -134,16 +134,20 @@ class ConnectionTest extends TestCase
$redis->del('messenger-rejectthenget'); $redis->del('messenger-rejectthenget');
} }
public function testBlockingTimeout() public function testGetNonBlocking()
{ {
$redis = new \Redis(); $redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-blockingtimeout', ['blocking_timeout' => 1], $redis);
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
try { try {
$connection->setup(); $connection->setup();
} catch (TransportException $e) { } catch (TransportException $e) {
} }
$this->assertNull($connection->get()); $this->assertNull($connection->get()); // no message, should return null immediately
$redis->del('messenger-blockingtimeout'); $connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
} }
} }

View File

@ -31,7 +31,6 @@ class Connection
private $stream; private $stream;
private $group; private $group;
private $consumer; private $consumer;
private $blockingTimeout;
private $couldHavePendingMessages = true; private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -42,7 +41,6 @@ class Connection
$this->stream = $configuration['stream'] ?? '' ?: 'messages'; $this->stream = $configuration['stream'] ?? '' ?: 'messages';
$this->group = $configuration['group'] ?? '' ?: 'symfony'; $this->group = $configuration['group'] ?? '' ?: 'symfony';
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer'; $this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
} }
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -83,8 +81,7 @@ class Connection
$this->group, $this->group,
$this->consumer, $this->consumer,
[$this->stream => $messageId], [$this->stream => $messageId],
1, 1
$this->blockingTimeout
); );
} catch (\RedisException $e) { } catch (\RedisException $e) {
} }
@ -142,7 +139,7 @@ class Connection
} }
} }
public function add(string $body, array $headers) public function add(string $body, array $headers): void
{ {
$e = null; $e = null;
try { try {