[messenger] AMQP configurable routing key & multiple queues

This commit is contained in:
Guillaume Gammelin 2019-01-28 16:06:43 +01:00 committed by Samuel ROZE
parent 45fd75ea20
commit 3151b54b7a
13 changed files with 473 additions and 185 deletions

View File

@ -18,6 +18,9 @@ CHANGELOG
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count.
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
@ -71,6 +74,8 @@ CHANGELOG
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
4.2.0
-----

View File

@ -49,7 +49,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
@ -79,7 +79,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
@ -126,7 +126,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$sender->send(new Envelope(new DummyMessage('Hello')));
@ -173,7 +173,7 @@ TXT
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
@ -182,7 +182,7 @@ TXT
$sender->send(new Envelope(new DummyMessage('Third')));
sleep(1); // give amqp a moment to have the messages ready
$this->assertSame(3, $connection->countMessagesInQueue());
$this->assertSame(3, $connection->countMessagesInQueues());
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)

View File

@ -0,0 +1,31 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
* @requires extension amqp
*/
class AmqpReceivedStampTest extends TestCase
{
public function testStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');
$this->assertSame($amqpEnvelope, $stamp->getAmqpEnvelope());
$this->assertSame('queueName', $stamp->getQueueName());
}
}

View File

@ -36,7 +36,8 @@ class AmqpReceiverTest extends TestCase
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
@ -52,11 +53,12 @@ class AmqpReceiverTest extends TestCase
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}
/**
@ -67,11 +69,12 @@ class AmqpReceiverTest extends TestCase
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, 'queueName', AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}
private function createAMQPEnvelope()

View File

@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
class AmqpRoutingKeyStampTest extends TestCase
{
public function testStamp()
{
$stamp = new AmqpRoutingKeyStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
}
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -38,6 +39,21 @@ class AmqpSenderTest extends TestCase
$sender->send($envelope);
}
public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
public function testItSendsTheEncodedMessageWithoutHeaders()
{
$envelope = new Envelope(new DummyMessage('Oy'));

View File

@ -45,7 +45,8 @@ class AmqpTransportTest extends TestCase
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

View File

@ -40,7 +40,7 @@ class ConnectionTest extends TestCase
], [
'name' => 'messages',
], [
'name' => 'messages',
'messages' => [],
]),
Connection::fromDsn('amqp://localhost/%2f/messages')
);
@ -58,9 +58,9 @@ class ConnectionTest extends TestCase
], [
'name' => 'exchangeName',
], [
'name' => 'queue',
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName')
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
);
}
@ -77,9 +77,9 @@ class ConnectionTest extends TestCase
], [
'name' => 'exchangeName',
], [
'name' => 'queueName',
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queue[name]=queueName', [
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
'persistent' => 'true',
'exchange' => ['name' => 'toBeOverwritten'],
])
@ -89,10 +89,10 @@ class ConnectionTest extends TestCase
public function testSetsParametersOnTheQueueAndExchange()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpQueue->expects($this->once())->method('setArguments')->with([
@ -110,17 +110,19 @@ class ConnectionTest extends TestCase
]);
$dsn = 'amqp://localhost/%2f/messages?'.
'queue[arguments][x-dead-letter-exchange]=dead-exchange&'.
'queue[arguments][x-message-ttl]=100&'.
'queue[arguments][x-delay]=100&'.
'queue[arguments][x-expires]=150&'
'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&'.
'queues[messages][arguments][x-message-ttl]=100&'.
'queues[messages][arguments][x-delay]=100&'.
'queues[messages][arguments][x-expires]=150&'
;
$connection = Connection::fromDsn($dsn, [
'queue' => [
'arguments' => [
'x-max-length' => '200',
'x-max-length-bytes' => '300',
'x-max-priority' => '4',
'queues' => [
'messages' => [
'arguments' => [
'x-max-length' => '200',
'x-max-length-bytes' => '300',
'x-max-priority' => '4',
],
],
],
'exchange' => [
@ -135,20 +137,23 @@ class ConnectionTest extends TestCase
public function invalidQueueArgumentsDataProvider(): iterable
{
$baseDsn = 'amqp://localhost/%2f/messages';
yield [$baseDsn.'?queue[arguments][x-delay]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-expires]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-length]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-length-bytes]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-priority]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-message-ttl]=not-a-number', []];
// Ensure the exception is thrown when the arguments are passed via the array options
yield [$baseDsn, ['queue' => ['arguments' => ['x-delay' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-expires' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-priority' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]];
return [
[$baseDsn.'?queues[messages][arguments][x-delay]=not-a-number', []],
[$baseDsn.'?queues[messages][arguments][x-expires]=not-a-number', []],
[$baseDsn.'?queues[messages][arguments][x-max-length]=not-a-number', []],
[$baseDsn.'?queues[messages][arguments][x-max-length-bytes]=not-a-number', []],
[$baseDsn.'?queues[messages][arguments][x-max-priority]=not-a-number', []],
[$baseDsn.'?queues[messages][arguments][x-message-ttl]=not-a-number', []],
// Ensure the exception is thrown when the arguments are passed via the array options
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-delay' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-expires' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-priority' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]]],
];
}
/**
@ -165,10 +170,10 @@ class ConnectionTest extends TestCase
public function testItUsesANormalConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
// makes sure the channel looks connected, so it's not re-created
@ -182,10 +187,10 @@ class ConnectionTest extends TestCase
public function testItAllowsToUseAPersistentConnection()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
// makes sure the channel looks connected, so it's not re-created
@ -196,49 +201,7 @@ class ConnectionTest extends TestCase
$connection->publish('body');
}
public function testItSetupsTheConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
$connection->publish('body');
}
public function testItCanDisableTheSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory);
$connection->publish('body');
}
public function testPublishWithQueueOptions()
public function testItSetupsTheConnectionWithDefaults()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
@ -246,14 +209,78 @@ class ConnectionTest extends TestCase
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$headers = [
'type' => '*',
];
$amqpExchange->expects($this->once())->method('publish')
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
$connection->publish('body', $headers);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body');
}
public function testItSetupsTheConnection()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$amqpExchange = $this->createMock(\AMQPExchange::class);
$amqpQueue0 = $this->createMock(\AMQPQueue::class);
$amqpQueue1 = $this->createMock(\AMQPQueue::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key0'],
['exchange_name', 'binding_key1']
);
$amqpQueue1->expects($this->once())->method('declareQueue');
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key2'],
['exchange_name', 'binding_key3']
);
$dsn = 'amqp://localhost/%2f/messages?'.
'exchange[default_publish_routing_key]=routing_key&'.
'queues[queue0][binding_keys][0]=binding_key0&'.
'queues[queue0][binding_keys][1]=binding_key1&'.
'queues[queue1][binding_keys][0]=binding_key2&'.
'queues[queue1][binding_keys][1]=binding_key3';
$connection = Connection::fromDsn($dsn, [], $factory);
$connection->publish('body');
}
public function testItCanDisableTheSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
$connection->publish('body');
}
public function testSetChannelPrefetchWhenSetup()
@ -277,11 +304,11 @@ class ConnectionTest extends TestCase
public function testItDelaysTheMessage()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
@ -314,11 +341,11 @@ class ConnectionTest extends TestCase
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
@ -375,6 +402,83 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
$connection->channel();
}
public function testItCanPublishWithTheDefaultRoutingKey()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection->publish('body');
}
public function testItCanPublishWithASuppliedRoutingKey()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, 'routing_key');
}
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->createMock(\AMQPExchange::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
]);
$delayQueue->expects($this->once())->method('setArgument')->with(
'x-dead-letter-routing-key',
'routing_key'
);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, 'routing_key');
}
}
class TestAmqpFactory extends AmqpFactory

View File

@ -21,14 +21,21 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
class AmqpReceivedStamp implements StampInterface
{
private $amqpEnvelope;
private $queueName;
public function __construct(\AMQPEnvelope $amqpEnvelope)
public function __construct(\AMQPEnvelope $amqpEnvelope, string $queueName)
{
$this->amqpEnvelope = $amqpEnvelope;
$this->queueName = $queueName;
}
public function getAmqpEnvelope(): \AMQPEnvelope
{
return $this->amqpEnvelope;
}
public function getQueueName(): string
{
return $this->queueName;
}
}

View File

@ -42,9 +42,16 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
* {@inheritdoc}
*/
public function get(): iterable
{
foreach ($this->connection->getQueueNames() as $queueName) {
yield from $this->getEnvelope($queueName);
}
}
private function getEnvelope(string $queueName): iterable
{
try {
$amqpEnvelope = $this->connection->get();
$amqpEnvelope = $this->connection->get($queueName);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
@ -60,12 +67,12 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
]);
} catch (MessageDecodingFailedException $exception) {
// invalid message of some type
$this->rejectAmqpEnvelope($amqpEnvelope);
$this->rejectAmqpEnvelope($amqpEnvelope, $queueName);
throw $exception;
}
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}
/**
@ -74,7 +81,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
public function ack(Envelope $envelope): void
{
try {
$this->connection->ack($this->findAmqpEnvelope($envelope));
/* @var AmqpReceivedStamp $amqpReceivedStamp */
$this->connection->ack(
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
$amqpReceivedStamp->getQueueName()
);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
@ -85,7 +96,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function reject(Envelope $envelope): void
{
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
/* @var AmqpReceivedStamp $amqpReceivedStamp */
$this->rejectAmqpEnvelope(
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
$amqpReceivedStamp->getQueueName()
);
}
/**
@ -93,21 +108,20 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function getMessageCount(): int
{
return $this->connection->countMessagesInQueue();
return $this->connection->countMessagesInQueues();
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
{
try {
$this->connection->nack($amqpEnvelope, AMQP_NOPARAM);
$this->connection->nack($amqpEnvelope, $queueName, AMQP_NOPARAM);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
private function findAmqpEnvelope(Envelope $envelope): \AMQPEnvelope
private function findAmqpEnvelope(Envelope $envelope, AmqpReceivedStamp &$amqpReceivedStamp = null): \AMQPEnvelope
{
/** @var AmqpReceivedStamp|null $amqpReceivedStamp */
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if (null === $amqpReceivedStamp) {

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
*
* @experimental in 4.3
*/
final class AmqpRoutingKeyStamp implements StampInterface
{
private $routingKey;
public function __construct(string $routingKey)
{
$this->routingKey = $routingKey;
}
public function getRoutingKey(): string
{
return $this->routingKey;
}
}

View File

@ -51,7 +51,11 @@ class AmqpSender implements SenderInterface
}
try {
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

View File

@ -35,7 +35,7 @@ class Connection
private $connectionConfiguration;
private $exchangeConfiguration;
private $queueConfiguration;
private $queuesConfiguration;
private $amqpFactory;
/**
@ -49,9 +49,9 @@ class Connection
private $amqpExchange;
/**
* @var \AMQPQueue|null
* @var \AMQPQueue[]|null
*/
private $amqpQueue;
private $amqpQueues = [];
/**
* @var \AMQPExchange|null
@ -68,14 +68,14 @@ class Connection
* * vhost: Virtual Host to use with the AMQP service
* * user: Username to use to connect the the AMQP service
* * password: Password to use the connect to the AMQP service
* * queue:
* * name: Name of the queue
* * routing_key: The routing key (if any) to use to push the messages to
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
* * name: Name of the exchange
* * type: Type of exchange (Default: fanout)
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
@ -86,7 +86,7 @@ class Connection
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionConfiguration = array_replace_recursive([
'delay' => [
@ -96,7 +96,7 @@ class Connection
],
], $connectionConfiguration);
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->queuesConfiguration = $queuesConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
@ -107,17 +107,17 @@ class Connection
}
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
$exchangeName = $pathParts[1] ?? 'messages';
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
$amqpOptions = array_replace_recursive([
'host' => $parsedUrl['host'] ?? 'localhost',
'port' => $parsedUrl['port'] ?? 5672,
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
'queue' => [
'name' => $queueName = $pathParts[1] ?? 'messages',
],
'exchange' => [
'name' => $queueName,
'name' => $exchangeName,
],
], $options);
], $options, $parsedQuery);
if (isset($parsedUrl['user'])) {
$amqpOptions['login'] = $parsedUrl['user'];
@ -127,21 +127,26 @@ class Connection
$amqpOptions['password'] = $parsedUrl['pass'];
}
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $parsedQuery);
$amqpOptions = array_replace_recursive($amqpOptions, $parsedQuery);
if (!isset($amqpOptions['queues'])) {
$amqpOptions['queues'][$exchangeName] = [];
}
$exchangeOptions = $amqpOptions['exchange'];
$queueOptions = $amqpOptions['queue'];
unset($amqpOptions['queue'], $amqpOptions['exchange']);
$queuesOptions = $amqpOptions['queues'];
unset($amqpOptions['queues'], $amqpOptions['exchange']);
if (\is_array($queueOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
$queuesOptions = array_map(function ($queueOptions) {
if (!\is_array($queueOptions)) {
$queueOptions = [];
}
if (\is_array($queueOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
return $queueOptions;
}, $queuesOptions);
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
}
private static function normalizeQueueArguments(array $arguments): array
@ -166,10 +171,10 @@ class Connection
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0): void
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay);
$this->publishWithDelay($body, $headers, $delay, $routingKey);
return;
}
@ -178,37 +183,50 @@ class Connection
$this->setup();
}
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);
// TODO - allow flag & attributes to be configured on the message
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
$this->exchange()->publish(
$body,
$routingKey ?? $this->getDefaultPublishRoutingKey(),
AMQP_NOPARAM,
[
'headers' => $headers,
]
);
}
/**
* Returns an approximate count of the messages in a queue.
* Returns an approximate count of the messages in defined queues.
*/
public function countMessagesInQueue(): int
public function countMessagesInQueues(): int
{
return $this->queue()->declareQueue();
return array_sum(array_map(function ($queueName) {
return $this->queue($queueName)->declareQueue();
}, $this->getQueueNames()));
}
/**
* @throws \AMQPException
*/
private function publishWithDelay(string $body, array $headers = [], int $delay)
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
{
if ($this->shouldSetup()) {
$this->setupDelay($delay);
$this->setupDelay($delay, $exchangeRoutingKey);
}
$routingKey = $this->getRoutingKeyForDelay($delay);
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);
// TODO - allow flag & attributes to be configured on the message
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
$this->getDelayExchange()->publish(
$body,
$this->getRoutingKeyForDelay($delay),
AMQP_NOPARAM,
[
'headers' => $headers,
]
);
}
private function setupDelay(int $delay)
private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
@ -217,7 +235,7 @@ class Connection
$exchange = $this->getDelayExchange();
$exchange->declareExchange();
$queue = $this->createDelayQueue($delay);
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
}
@ -242,7 +260,7 @@ class Connection
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay)
private function createDelayQueue(int $delay, ?string $routingKey)
{
$delayConfiguration = $this->connectionConfiguration['delay'];
@ -253,9 +271,10 @@ class Connection
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);
if (isset($this->queueConfiguration['routing_key'])) {
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
}
return $queue;
@ -267,18 +286,18 @@ class Connection
}
/**
* Waits and gets a message from the configured queue.
* Gets a message from the specified queue.
*
* @throws \AMQPException
*/
public function get(): ?\AMQPEnvelope
public function get(string $queueName): ?\AMQPEnvelope
{
if ($this->shouldSetup()) {
$this->setup();
}
try {
if (false !== $message = $this->queue()->get()) {
if (false !== $message = $this->queue($queueName)->get()) {
return $message;
}
} catch (\AMQPQueueException $e) {
@ -295,14 +314,14 @@ class Connection
return null;
}
public function ack(\AMQPEnvelope $message): bool
public function ack(\AMQPEnvelope $message, string $queueName): bool
{
return $this->queue()->ack($message->getDeliveryTag());
return $this->queue($queueName)->ack($message->getDeliveryTag());
}
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool
{
return $this->queue()->nack($message->getDeliveryTag(), $flags);
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
}
public function setup(): void
@ -313,10 +332,25 @@ class Connection
$this->exchange()->declareExchange();
$this->queue()->declareQueue();
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey);
}
}
}
/**
* @return string[]
*/
public function getQueueNames(): array
{
return array_keys($this->queuesConfiguration);
}
/**
* @internal
*/
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
@ -341,22 +375,26 @@ class Connection
return $this->amqpChannel;
}
public function queue(): \AMQPQueue
private function queue(string $queueName): \AMQPQueue
{
if (null === $this->amqpQueue) {
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
$this->amqpQueue->setName($this->queueConfiguration['name']);
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
if (!isset($this->amqpQueues[$queueName])) {
$queueConfig = $this->queuesConfiguration[$queueName];
if (isset($this->queueConfiguration['arguments'])) {
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
$amqpQueue->setName($queueName);
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE);
if (isset($queueConfig['arguments'])) {
$amqpQueue->setArguments($queueConfig['arguments']);
}
$this->amqpQueues[$queueName] = $amqpQueue;
}
return $this->amqpQueue;
return $this->amqpQueues[$queueName];
}
public function exchange(): \AMQPExchange
private function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
@ -380,7 +418,7 @@ class Connection
private function clear(): void
{
$this->amqpChannel = null;
$this->amqpQueue = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
}
@ -397,8 +435,15 @@ class Connection
return true;
}
private function getAttributes(array $headers): array
private function getDefaultPublishRoutingKey(): ?string
{
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
}
public function purgeQueues()
{
foreach ($this->getQueueNames() as $queueName) {
$this->queue($queueName)->purge();
}
}
}