[Messenger] fix manual amqp setup when autosetup disabled

This commit is contained in:
Tobias Schultze 2021-04-28 00:28:04 +02:00
parent 69de4d2225
commit 29e5bde6c0

View File

@ -80,7 +80,7 @@ class Connection
private $queuesOptions;
private $amqpFactory;
private $autoSetupExchange;
private $autoSetup;
private $autoSetupDelayExchange;
/**
* @var \AMQPChannel|null
@ -114,7 +114,7 @@ class Connection
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
],
], $connectionOptions);
$this->autoSetupExchange = $this->autoSetup = $connectionOptions['auto_setup'] ?? true;
$this->autoSetupExchange = $this->autoSetupDelayExchange = $connectionOptions['auto_setup'] ?? true;
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?? new AmqpFactory();
@ -288,16 +288,16 @@ class Connection
{
$this->clearWhenDisconnected();
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
}
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
return;
}
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}
$this->publishOnExchange(
$this->exchange(),
$body,
@ -356,8 +356,8 @@ class Connection
private function setupDelay(int $delay, ?string $routingKey)
{
if ($this->autoSetup) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
if ($this->autoSetupDelayExchange) {
$this->setupDelayExchange();
}
$queue = $this->createDelayQueue($delay, $routingKey);
@ -450,11 +450,8 @@ class Connection
public function setup(): void
{
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}
$this->getDelayExchange()->declareExchange();
$this->autoSetup = false;
$this->setupExchangeAndQueues();
$this->setupDelayExchange();
}
private function setupExchangeAndQueues(): void
@ -470,6 +467,12 @@ class Connection
$this->autoSetupExchange = false;
}
private function setupDelayExchange(): void
{
$this->getDelayExchange()->declareExchange();
$this->autoSetupDelayExchange = false;
}
/**
* @return string[]
*/