[Messenger] Make redis Connection::get() non blocking by default
This commit is contained in:
parent
2ecad3ffa9
commit
229502a89a
@ -46,9 +46,9 @@ class ConnectionTest extends TestCase
|
|||||||
'host' => 'localhost',
|
'host' => 'localhost',
|
||||||
'port' => 6379,
|
'port' => 6379,
|
||||||
], [
|
], [
|
||||||
'blocking_timeout' => 30,
|
'serializer' => 2,
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
|
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,9 +59,9 @@ class ConnectionTest extends TestCase
|
|||||||
'host' => 'localhost',
|
'host' => 'localhost',
|
||||||
'port' => 6379,
|
'port' => 6379,
|
||||||
], [
|
], [
|
||||||
'blocking_timeout' => 30,
|
'serializer' => 2,
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
|
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,16 +134,20 @@ class ConnectionTest extends TestCase
|
|||||||
$redis->del('messenger-rejectthenget');
|
$redis->del('messenger-rejectthenget');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testBlockingTimeout()
|
public function testGetNonBlocking()
|
||||||
{
|
{
|
||||||
$redis = new \Redis();
|
$redis = new \Redis();
|
||||||
$connection = Connection::fromDsn('redis://localhost/messenger-blockingtimeout', ['blocking_timeout' => 1], $redis);
|
|
||||||
|
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
|
||||||
try {
|
try {
|
||||||
$connection->setup();
|
$connection->setup();
|
||||||
} catch (TransportException $e) {
|
} catch (TransportException $e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->assertNull($connection->get());
|
$this->assertNull($connection->get()); // no message, should return null immediately
|
||||||
$redis->del('messenger-blockingtimeout');
|
$connection->add('1', []);
|
||||||
|
$this->assertNotEmpty($message = $connection->get());
|
||||||
|
$connection->reject($message['id']);
|
||||||
|
$redis->del('messenger-getnonblocking');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,6 @@ class Connection
|
|||||||
private $stream;
|
private $stream;
|
||||||
private $group;
|
private $group;
|
||||||
private $consumer;
|
private $consumer;
|
||||||
private $blockingTimeout;
|
|
||||||
private $couldHavePendingMessages = true;
|
private $couldHavePendingMessages = true;
|
||||||
|
|
||||||
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
||||||
@ -42,7 +41,6 @@ class Connection
|
|||||||
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
|
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
|
||||||
$this->group = $configuration['group'] ?? '' ?: 'symfony';
|
$this->group = $configuration['group'] ?? '' ?: 'symfony';
|
||||||
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
|
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
|
||||||
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
||||||
@ -83,8 +81,7 @@ class Connection
|
|||||||
$this->group,
|
$this->group,
|
||||||
$this->consumer,
|
$this->consumer,
|
||||||
[$this->stream => $messageId],
|
[$this->stream => $messageId],
|
||||||
1,
|
1
|
||||||
$this->blockingTimeout
|
|
||||||
);
|
);
|
||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
}
|
}
|
||||||
@ -142,7 +139,7 @@ class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function add(string $body, array $headers)
|
public function add(string $body, array $headers): void
|
||||||
{
|
{
|
||||||
$e = null;
|
$e = null;
|
||||||
try {
|
try {
|
||||||
|
Reference in New Issue
Block a user