Using AMQP auto-setup in all cases, not just in debug

This commit is contained in:
Ryan Weaver 2019-03-15 10:14:02 -04:00
parent b7e798ef74
commit 503c20989c
6 changed files with 37 additions and 27 deletions

View File

@ -61,7 +61,6 @@
<service id="messenger.transport.amqp.factory" class="Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory">
<tag name="messenger.transport_factory" />
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>
</service>
</services>
</container>

View File

@ -13,6 +13,11 @@ CHANGELOG
* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
no longer thrown in favor of `TransportException`.
* Deprecated `LoggingMiddleware`, pass a logger to `SendMessageMiddleware` instead.
* [BC BREAK] `Connection::__construct()` and `Connection::fromDsn()`
both no longer have `$isDebug` arguments.
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto-setup` connection option to control this.
4.2.0
-----

View File

@ -22,8 +22,7 @@ class AmqpTransportFactoryTest extends TestCase
public function testSupportsOnlyAmqpTransports()
{
$factory = new AmqpTransportFactory(
$this->getMockBuilder(SerializerInterface::class)->getMock(),
true
$this->getMockBuilder(SerializerInterface::class)->getMock()
);
$this->assertTrue($factory->supports('amqp://localhost', []));
@ -34,11 +33,10 @@ class AmqpTransportFactoryTest extends TestCase
public function testItCreatesTheTransport()
{
$factory = new AmqpTransportFactory(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
true
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
);
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar'], true), $serializer);
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar']), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['foo' => 'bar']));
}

View File

@ -128,7 +128,7 @@ class ConnectionTest extends TestCase
'alternate-exchange' => 'alternate',
],
],
], true, $factory);
], $factory);
$connection->publish('body');
}
@ -171,9 +171,11 @@ class ConnectionTest extends TestCase
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('connect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], false, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body');
}
@ -186,13 +188,15 @@ class ConnectionTest extends TestCase
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('pconnect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], false, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
$connection->publish('body');
}
public function testItSetupsTheConnectionWhenDebug()
public function testItSetupsTheConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
@ -206,7 +210,7 @@ class ConnectionTest extends TestCase
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
$connection->publish('body');
}
@ -224,13 +228,13 @@ class ConnectionTest extends TestCase
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], $factory);
$connection->publish('body');
}
@ -248,7 +252,7 @@ class ConnectionTest extends TestCase
$amqpExchange->expects($this->once())->method('publish')
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
$connection->publish('body', $headers);
}
}

View File

@ -24,17 +24,15 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
class AmqpTransportFactory implements TransportFactoryInterface
{
private $serializer;
private $debug;
public function __construct(SerializerInterface $serializer = null, bool $debug = false)
public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer ?? new PhpSerializer();
$this->debug = $debug;
}
public function createTransport(string $dsn, array $options): TransportInterface
{
return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
return new AmqpTransport(Connection::fromDsn($dsn, $options), $this->serializer);
}
public function supports(string $dsn, array $options): bool

View File

@ -36,7 +36,6 @@ class Connection
private $connectionCredentials;
private $exchangeConfiguration;
private $queueConfiguration;
private $debug;
private $amqpFactory;
/**
@ -54,16 +53,15 @@ class Connection
*/
private $amqpQueue;
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionCredentials = $connectionCredentials;
$this->debug = $debug;
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
public static function fromDsn(string $dsn, array $options = [], bool $debug = false, AmqpFactory $amqpFactory = null): self
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
@ -104,7 +102,7 @@ class Connection
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
}
private static function normalizeQueueArguments(array $arguments): array
@ -129,7 +127,7 @@ class Connection
*/
public function publish(string $body, array $headers = []): void
{
if ($this->debug && $this->shouldSetup()) {
if ($this->shouldSetup()) {
$this->setup();
}
@ -146,7 +144,7 @@ class Connection
*/
public function get(): ?\AMQPEnvelope
{
if ($this->debug && $this->shouldSetup()) {
if ($this->shouldSetup()) {
$this->setup();
}
@ -256,6 +254,14 @@ class Connection
private function shouldSetup(): bool
{
return !\array_key_exists('auto-setup', $this->connectionCredentials) || !\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true);
if (!\array_key_exists('auto-setup', $this->connectionCredentials)) {
return true;
}
if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) {
return false;
}
return true;
}
}