Using AMQP auto-setup in all cases, not just in debug
This commit is contained in:
parent
b7e798ef74
commit
503c20989c
@ -61,7 +61,6 @@
|
||||
<service id="messenger.transport.amqp.factory" class="Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory">
|
||||
<tag name="messenger.transport_factory" />
|
||||
<argument type="service" id="messenger.transport.serializer" />
|
||||
<argument>%kernel.debug%</argument>
|
||||
</service>
|
||||
</services>
|
||||
</container>
|
||||
|
@ -13,6 +13,11 @@ CHANGELOG
|
||||
* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
|
||||
no longer thrown in favor of `TransportException`.
|
||||
* Deprecated `LoggingMiddleware`, pass a logger to `SendMessageMiddleware` instead.
|
||||
* [BC BREAK] `Connection::__construct()` and `Connection::fromDsn()`
|
||||
both no longer have `$isDebug` arguments.
|
||||
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
|
||||
and queues by default. Previously, this was done when in "debug" mode
|
||||
only. Pass the `auto-setup` connection option to control this.
|
||||
|
||||
4.2.0
|
||||
-----
|
||||
|
@ -22,8 +22,7 @@ class AmqpTransportFactoryTest extends TestCase
|
||||
public function testSupportsOnlyAmqpTransports()
|
||||
{
|
||||
$factory = new AmqpTransportFactory(
|
||||
$this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
true
|
||||
$this->getMockBuilder(SerializerInterface::class)->getMock()
|
||||
);
|
||||
|
||||
$this->assertTrue($factory->supports('amqp://localhost', []));
|
||||
@ -34,11 +33,10 @@ class AmqpTransportFactoryTest extends TestCase
|
||||
public function testItCreatesTheTransport()
|
||||
{
|
||||
$factory = new AmqpTransportFactory(
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
true
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
|
||||
);
|
||||
|
||||
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar'], true), $serializer);
|
||||
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar']), $serializer);
|
||||
|
||||
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['foo' => 'bar']));
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ class ConnectionTest extends TestCase
|
||||
'alternate-exchange' => 'alternate',
|
||||
],
|
||||
],
|
||||
], true, $factory);
|
||||
], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
@ -171,9 +171,11 @@ class ConnectionTest extends TestCase
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
// makes sure the channel looks connected, so it's not re-created
|
||||
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
|
||||
$amqpConnection->expects($this->once())->method('connect');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], false, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
@ -186,13 +188,15 @@ class ConnectionTest extends TestCase
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
// makes sure the channel looks connected, so it's not re-created
|
||||
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
|
||||
$amqpConnection->expects($this->once())->method('pconnect');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], false, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItSetupsTheConnectionWhenDebug()
|
||||
public function testItSetupsTheConnectionByDefault()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
@ -206,7 +210,7 @@ class ConnectionTest extends TestCase
|
||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], true, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
@ -224,13 +228,13 @@ class ConnectionTest extends TestCase
|
||||
$amqpQueue->expects($this->never())->method('declareQueue');
|
||||
$amqpQueue->expects($this->never())->method('bind');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], true, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], true, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
@ -248,7 +252,7 @@ class ConnectionTest extends TestCase
|
||||
$amqpExchange->expects($this->once())->method('publish')
|
||||
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory);
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
|
||||
$connection->publish('body', $headers);
|
||||
}
|
||||
}
|
||||
|
@ -24,17 +24,15 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
class AmqpTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private $serializer;
|
||||
private $debug;
|
||||
|
||||
public function __construct(SerializerInterface $serializer = null, bool $debug = false)
|
||||
public function __construct(SerializerInterface $serializer = null)
|
||||
{
|
||||
$this->serializer = $serializer ?? new PhpSerializer();
|
||||
$this->debug = $debug;
|
||||
}
|
||||
|
||||
public function createTransport(string $dsn, array $options): TransportInterface
|
||||
{
|
||||
return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
|
||||
return new AmqpTransport(Connection::fromDsn($dsn, $options), $this->serializer);
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
|
@ -36,7 +36,6 @@ class Connection
|
||||
private $connectionCredentials;
|
||||
private $exchangeConfiguration;
|
||||
private $queueConfiguration;
|
||||
private $debug;
|
||||
private $amqpFactory;
|
||||
|
||||
/**
|
||||
@ -54,16 +53,15 @@ class Connection
|
||||
*/
|
||||
private $amqpQueue;
|
||||
|
||||
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
|
||||
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
$this->connectionCredentials = $connectionCredentials;
|
||||
$this->debug = $debug;
|
||||
$this->exchangeConfiguration = $exchangeConfiguration;
|
||||
$this->queueConfiguration = $queueConfiguration;
|
||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||
}
|
||||
|
||||
public static function fromDsn(string $dsn, array $options = [], bool $debug = false, AmqpFactory $amqpFactory = null): self
|
||||
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
|
||||
{
|
||||
if (false === $parsedUrl = parse_url($dsn)) {
|
||||
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
|
||||
@ -104,7 +102,7 @@ class Connection
|
||||
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
|
||||
}
|
||||
|
||||
return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
|
||||
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
|
||||
}
|
||||
|
||||
private static function normalizeQueueArguments(array $arguments): array
|
||||
@ -129,7 +127,7 @@ class Connection
|
||||
*/
|
||||
public function publish(string $body, array $headers = []): void
|
||||
{
|
||||
if ($this->debug && $this->shouldSetup()) {
|
||||
if ($this->shouldSetup()) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
@ -146,7 +144,7 @@ class Connection
|
||||
*/
|
||||
public function get(): ?\AMQPEnvelope
|
||||
{
|
||||
if ($this->debug && $this->shouldSetup()) {
|
||||
if ($this->shouldSetup()) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
@ -256,6 +254,14 @@ class Connection
|
||||
|
||||
private function shouldSetup(): bool
|
||||
{
|
||||
return !\array_key_exists('auto-setup', $this->connectionCredentials) || !\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true);
|
||||
if (!\array_key_exists('auto-setup', $this->connectionCredentials)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user