[Messenger] fix manual amqp setup when autosetup disabled
This commit is contained in:
parent
69de4d2225
commit
29e5bde6c0
@ -80,7 +80,7 @@ class Connection
|
|||||||
private $queuesOptions;
|
private $queuesOptions;
|
||||||
private $amqpFactory;
|
private $amqpFactory;
|
||||||
private $autoSetupExchange;
|
private $autoSetupExchange;
|
||||||
private $autoSetup;
|
private $autoSetupDelayExchange;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var \AMQPChannel|null
|
* @var \AMQPChannel|null
|
||||||
@ -114,7 +114,7 @@ class Connection
|
|||||||
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
||||||
],
|
],
|
||||||
], $connectionOptions);
|
], $connectionOptions);
|
||||||
$this->autoSetupExchange = $this->autoSetup = $connectionOptions['auto_setup'] ?? true;
|
$this->autoSetupExchange = $this->autoSetupDelayExchange = $connectionOptions['auto_setup'] ?? true;
|
||||||
$this->exchangeOptions = $exchangeOptions;
|
$this->exchangeOptions = $exchangeOptions;
|
||||||
$this->queuesOptions = $queuesOptions;
|
$this->queuesOptions = $queuesOptions;
|
||||||
$this->amqpFactory = $amqpFactory ?? new AmqpFactory();
|
$this->amqpFactory = $amqpFactory ?? new AmqpFactory();
|
||||||
@ -288,16 +288,16 @@ class Connection
|
|||||||
{
|
{
|
||||||
$this->clearWhenDisconnected();
|
$this->clearWhenDisconnected();
|
||||||
|
|
||||||
|
if ($this->autoSetupExchange) {
|
||||||
|
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
|
||||||
|
}
|
||||||
|
|
||||||
if (0 !== $delayInMs) {
|
if (0 !== $delayInMs) {
|
||||||
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
|
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->autoSetupExchange) {
|
|
||||||
$this->setupExchangeAndQueues();
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->publishOnExchange(
|
$this->publishOnExchange(
|
||||||
$this->exchange(),
|
$this->exchange(),
|
||||||
$body,
|
$body,
|
||||||
@ -356,8 +356,8 @@ class Connection
|
|||||||
|
|
||||||
private function setupDelay(int $delay, ?string $routingKey)
|
private function setupDelay(int $delay, ?string $routingKey)
|
||||||
{
|
{
|
||||||
if ($this->autoSetup) {
|
if ($this->autoSetupDelayExchange) {
|
||||||
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
|
$this->setupDelayExchange();
|
||||||
}
|
}
|
||||||
|
|
||||||
$queue = $this->createDelayQueue($delay, $routingKey);
|
$queue = $this->createDelayQueue($delay, $routingKey);
|
||||||
@ -450,11 +450,8 @@ class Connection
|
|||||||
|
|
||||||
public function setup(): void
|
public function setup(): void
|
||||||
{
|
{
|
||||||
if ($this->autoSetupExchange) {
|
$this->setupExchangeAndQueues();
|
||||||
$this->setupExchangeAndQueues();
|
$this->setupDelayExchange();
|
||||||
}
|
|
||||||
$this->getDelayExchange()->declareExchange();
|
|
||||||
$this->autoSetup = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function setupExchangeAndQueues(): void
|
private function setupExchangeAndQueues(): void
|
||||||
@ -470,6 +467,12 @@ class Connection
|
|||||||
$this->autoSetupExchange = false;
|
$this->autoSetupExchange = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function setupDelayExchange(): void
|
||||||
|
{
|
||||||
|
$this->getDelayExchange()->declareExchange();
|
||||||
|
$this->autoSetupDelayExchange = false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return string[]
|
* @return string[]
|
||||||
*/
|
*/
|
||||||
|
Reference in New Issue
Block a user