[Messenger] Fix missing auto_setup for RedisTransport

This commit is contained in:
Robin Chalas 2019-05-25 19:07:58 +02:00
parent 6aacfea491
commit d27bc2a87d
2 changed files with 34 additions and 17 deletions

View File

@ -42,13 +42,13 @@ class ConnectionTest extends TestCase
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
);
}
@ -117,10 +117,6 @@ class ConnectionTest extends TestCase
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}
$connection->add('1', []);
$connection->add('2', []);
@ -139,10 +135,6 @@ class ConnectionTest extends TestCase
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}
$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);

View File

@ -27,10 +27,18 @@ use Symfony\Component\Messenger\Exception\TransportException;
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'stream' => 'messages',
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
];
private $connection;
private $stream;
private $group;
private $consumer;
private $autoSetup;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -38,9 +46,10 @@ class Connection
$this->connection = $redis ?: new \Redis();
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
$this->group = $configuration['group'] ?? '' ?: 'symfony';
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
}
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -51,9 +60,9 @@ class Connection
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? '';
$group = $pathParts[2] ?? '';
$consumer = $pathParts[3] ?? '';
$stream = $pathParts[1] ?? null;
$group = $pathParts[2] ?? null;
$consumer = $pathParts[3] ?? null;
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
@ -64,11 +73,21 @@ class Connection
parse_str($parsedUrl['query'], $redisOptions);
}
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis);
$autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['auto_setup']);
}
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
}
public function get(): ?array
{
if ($this->autoSetup) {
$this->setup();
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
@ -141,6 +160,10 @@ class Connection
public function add(string $body, array $headers): void
{
if ($this->autoSetup) {
$this->setup();
}
$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
@ -161,5 +184,7 @@ class Connection
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$this->autoSetup = false;
}
}