diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index 1a602f41f2..177f6b9038 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -42,13 +42,13 @@ class ConnectionTest extends TestCase public function testFromDsnWithOptions() { $this->assertEquals( - new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [ + new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [ 'host' => 'localhost', 'port' => 6379, ], [ 'serializer' => 2, ]), - Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2]) + Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false]) ); } @@ -117,10 +117,6 @@ class ConnectionTest extends TestCase { $redis = new \Redis(); $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis); - try { - $connection->setup(); - } catch (TransportException $e) { - } $connection->add('1', []); $connection->add('2', []); @@ -139,10 +135,6 @@ class ConnectionTest extends TestCase $redis = new \Redis(); $connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis); - try { - $connection->setup(); - } catch (TransportException $e) { - } $this->assertNull($connection->get()); // no message, should return null immediately $connection->add('1', []); diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index b190f3105d..5757f0be7f 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -27,10 +27,18 @@ use Symfony\Component\Messenger\Exception\TransportException; */ class Connection { + private const DEFAULT_OPTIONS = [ + 'stream' => 'messages', + 'group' => 'symfony', + 'consumer' => 'consumer', + 'auto_setup' => true, + ]; + private $connection; private $stream; private $group; private $consumer; + private $autoSetup; private $couldHavePendingMessages = true; public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) @@ -38,9 +46,10 @@ class Connection $this->connection = $redis ?: new \Redis(); $this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379); $this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP); - $this->stream = $configuration['stream'] ?? '' ?: 'messages'; - $this->group = $configuration['group'] ?? '' ?: 'symfony'; - $this->consumer = $configuration['consumer'] ?? '' ?: 'consumer'; + $this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream']; + $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group']; + $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer']; + $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']; } public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self @@ -51,9 +60,9 @@ class Connection $pathParts = explode('/', $parsedUrl['path'] ?? ''); - $stream = $pathParts[1] ?? ''; - $group = $pathParts[2] ?? ''; - $consumer = $pathParts[3] ?? ''; + $stream = $pathParts[1] ?? null; + $group = $pathParts[2] ?? null; + $consumer = $pathParts[3] ?? null; $connectionCredentials = [ 'host' => $parsedUrl['host'] ?? '127.0.0.1', @@ -64,11 +73,21 @@ class Connection parse_str($parsedUrl['query'], $redisOptions); } - return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis); + $autoSetup = null; + if (\array_key_exists('auto_setup', $redisOptions)) { + $autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN); + unset($redisOptions['auto_setup']); + } + + return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis); } public function get(): ?array { + if ($this->autoSetup) { + $this->setup(); + } + $messageId = '>'; // will receive new messages if ($this->couldHavePendingMessages) { @@ -141,6 +160,10 @@ class Connection public function add(string $body, array $headers): void { + if ($this->autoSetup) { + $this->setup(); + } + $e = null; try { $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( @@ -161,5 +184,7 @@ class Connection } catch (\RedisException $e) { throw new TransportException($e->getMessage(), 0, $e); } + + $this->autoSetup = false; } }