[Messenger] Fix missing auto_setup for RedisTransport

This commit is contained in:
Robin Chalas 2019-05-25 19:07:58 +02:00
parent 6aacfea491
commit d27bc2a87d
2 changed files with 34 additions and 17 deletions

View File

@ -42,13 +42,13 @@ class ConnectionTest extends TestCase
public function testFromDsnWithOptions() public function testFromDsnWithOptions()
{ {
$this->assertEquals( $this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [ new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
'host' => 'localhost', 'host' => 'localhost',
'port' => 6379, 'port' => 6379,
], [ ], [
'serializer' => 2, 'serializer' => 2,
]), ]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2]) Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
); );
} }
@ -117,10 +117,6 @@ class ConnectionTest extends TestCase
{ {
$redis = new \Redis(); $redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis); $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}
$connection->add('1', []); $connection->add('1', []);
$connection->add('2', []); $connection->add('2', []);
@ -139,10 +135,6 @@ class ConnectionTest extends TestCase
$redis = new \Redis(); $redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis); $connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}
$this->assertNull($connection->get()); // no message, should return null immediately $this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []); $connection->add('1', []);

View File

@ -27,10 +27,18 @@ use Symfony\Component\Messenger\Exception\TransportException;
*/ */
class Connection class Connection
{ {
private const DEFAULT_OPTIONS = [
'stream' => 'messages',
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
];
private $connection; private $connection;
private $stream; private $stream;
private $group; private $group;
private $consumer; private $consumer;
private $autoSetup;
private $couldHavePendingMessages = true; private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@ -38,9 +46,10 @@ class Connection
$this->connection = $redis ?: new \Redis(); $this->connection = $redis ?: new \Redis();
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379); $this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP); $this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
$this->stream = $configuration['stream'] ?? '' ?: 'messages'; $this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? '' ?: 'symfony'; $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer'; $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
} }
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@ -51,9 +60,9 @@ class Connection
$pathParts = explode('/', $parsedUrl['path'] ?? ''); $pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? ''; $stream = $pathParts[1] ?? null;
$group = $pathParts[2] ?? ''; $group = $pathParts[2] ?? null;
$consumer = $pathParts[3] ?? ''; $consumer = $pathParts[3] ?? null;
$connectionCredentials = [ $connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1', 'host' => $parsedUrl['host'] ?? '127.0.0.1',
@ -64,11 +73,21 @@ class Connection
parse_str($parsedUrl['query'], $redisOptions); parse_str($parsedUrl['query'], $redisOptions);
} }
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis); $autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['auto_setup']);
}
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
} }
public function get(): ?array public function get(): ?array
{ {
if ($this->autoSetup) {
$this->setup();
}
$messageId = '>'; // will receive new messages $messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) { if ($this->couldHavePendingMessages) {
@ -141,6 +160,10 @@ class Connection
public function add(string $body, array $headers): void public function add(string $body, array $headers): void
{ {
if ($this->autoSetup) {
$this->setup();
}
$e = null; $e = null;
try { try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
@ -161,5 +184,7 @@ class Connection
} catch (\RedisException $e) { } catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
$this->autoSetup = false;
} }
} }