[Messenger] Fix missing auto_setup for RedisTransport
This commit is contained in:
parent
6aacfea491
commit
d27bc2a87d
@ -42,13 +42,13 @@ class ConnectionTest extends TestCase
|
|||||||
public function testFromDsnWithOptions()
|
public function testFromDsnWithOptions()
|
||||||
{
|
{
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
|
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
|
||||||
'host' => 'localhost',
|
'host' => 'localhost',
|
||||||
'port' => 6379,
|
'port' => 6379,
|
||||||
], [
|
], [
|
||||||
'serializer' => 2,
|
'serializer' => 2,
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
|
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,10 +117,6 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$redis = new \Redis();
|
$redis = new \Redis();
|
||||||
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
|
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
|
||||||
try {
|
|
||||||
$connection->setup();
|
|
||||||
} catch (TransportException $e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
$connection->add('1', []);
|
$connection->add('1', []);
|
||||||
$connection->add('2', []);
|
$connection->add('2', []);
|
||||||
@ -139,10 +135,6 @@ class ConnectionTest extends TestCase
|
|||||||
$redis = new \Redis();
|
$redis = new \Redis();
|
||||||
|
|
||||||
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
|
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
|
||||||
try {
|
|
||||||
$connection->setup();
|
|
||||||
} catch (TransportException $e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->assertNull($connection->get()); // no message, should return null immediately
|
$this->assertNull($connection->get()); // no message, should return null immediately
|
||||||
$connection->add('1', []);
|
$connection->add('1', []);
|
||||||
|
@ -27,10 +27,18 @@ use Symfony\Component\Messenger\Exception\TransportException;
|
|||||||
*/
|
*/
|
||||||
class Connection
|
class Connection
|
||||||
{
|
{
|
||||||
|
private const DEFAULT_OPTIONS = [
|
||||||
|
'stream' => 'messages',
|
||||||
|
'group' => 'symfony',
|
||||||
|
'consumer' => 'consumer',
|
||||||
|
'auto_setup' => true,
|
||||||
|
];
|
||||||
|
|
||||||
private $connection;
|
private $connection;
|
||||||
private $stream;
|
private $stream;
|
||||||
private $group;
|
private $group;
|
||||||
private $consumer;
|
private $consumer;
|
||||||
|
private $autoSetup;
|
||||||
private $couldHavePendingMessages = true;
|
private $couldHavePendingMessages = true;
|
||||||
|
|
||||||
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
||||||
@ -38,9 +46,10 @@ class Connection
|
|||||||
$this->connection = $redis ?: new \Redis();
|
$this->connection = $redis ?: new \Redis();
|
||||||
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
|
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
|
||||||
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
|
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
|
||||||
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
|
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
|
||||||
$this->group = $configuration['group'] ?? '' ?: 'symfony';
|
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
|
||||||
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
|
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
|
||||||
|
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
|
||||||
@ -51,9 +60,9 @@ class Connection
|
|||||||
|
|
||||||
$pathParts = explode('/', $parsedUrl['path'] ?? '');
|
$pathParts = explode('/', $parsedUrl['path'] ?? '');
|
||||||
|
|
||||||
$stream = $pathParts[1] ?? '';
|
$stream = $pathParts[1] ?? null;
|
||||||
$group = $pathParts[2] ?? '';
|
$group = $pathParts[2] ?? null;
|
||||||
$consumer = $pathParts[3] ?? '';
|
$consumer = $pathParts[3] ?? null;
|
||||||
|
|
||||||
$connectionCredentials = [
|
$connectionCredentials = [
|
||||||
'host' => $parsedUrl['host'] ?? '127.0.0.1',
|
'host' => $parsedUrl['host'] ?? '127.0.0.1',
|
||||||
@ -64,11 +73,21 @@ class Connection
|
|||||||
parse_str($parsedUrl['query'], $redisOptions);
|
parse_str($parsedUrl['query'], $redisOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis);
|
$autoSetup = null;
|
||||||
|
if (\array_key_exists('auto_setup', $redisOptions)) {
|
||||||
|
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
|
||||||
|
unset($redisOptions['auto_setup']);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function get(): ?array
|
public function get(): ?array
|
||||||
{
|
{
|
||||||
|
if ($this->autoSetup) {
|
||||||
|
$this->setup();
|
||||||
|
}
|
||||||
|
|
||||||
$messageId = '>'; // will receive new messages
|
$messageId = '>'; // will receive new messages
|
||||||
|
|
||||||
if ($this->couldHavePendingMessages) {
|
if ($this->couldHavePendingMessages) {
|
||||||
@ -141,6 +160,10 @@ class Connection
|
|||||||
|
|
||||||
public function add(string $body, array $headers): void
|
public function add(string $body, array $headers): void
|
||||||
{
|
{
|
||||||
|
if ($this->autoSetup) {
|
||||||
|
$this->setup();
|
||||||
|
}
|
||||||
|
|
||||||
$e = null;
|
$e = null;
|
||||||
try {
|
try {
|
||||||
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
|
||||||
@ -161,5 +184,7 @@ class Connection
|
|||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->autoSetup = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user