[messenger] AMQP configurable routing key & multiple queues
This commit is contained in:
parent
45fd75ea20
commit
3151b54b7a
@ -18,6 +18,9 @@ CHANGELOG
|
||||
changed: a required 3rd `SerializerInterface` argument was added.
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
explicitly handle messages synchronously.
|
||||
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
|
||||
* Removed publishing with a `routing_key` option from queue configuration, for
|
||||
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
|
||||
* Added optional parameter `prefetch_count` in connection configuration,
|
||||
to setup channel prefetch count.
|
||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||
@ -71,6 +74,8 @@ CHANGELOG
|
||||
only. Pass the `auto_setup` connection option to control this.
|
||||
* Added a `SetupTransportsCommand` command to setup the transports
|
||||
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
|
||||
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
|
||||
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
|
||||
|
||||
4.2.0
|
||||
-----
|
||||
|
@ -49,7 +49,7 @@ class AmqpExtIntegrationTest extends TestCase
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||
$connection->setup();
|
||||
$connection->queue()->purge();
|
||||
$connection->purgeQueues();
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
@ -79,7 +79,7 @@ class AmqpExtIntegrationTest extends TestCase
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||
$connection->setup();
|
||||
$connection->queue()->purge();
|
||||
$connection->purgeQueues();
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
@ -126,7 +126,7 @@ class AmqpExtIntegrationTest extends TestCase
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||
$connection->setup();
|
||||
$connection->queue()->purge();
|
||||
$connection->purgeQueues();
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send(new Envelope(new DummyMessage('Hello')));
|
||||
@ -173,7 +173,7 @@ TXT
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||
$connection->setup();
|
||||
$connection->queue()->purge();
|
||||
$connection->purgeQueues();
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
|
||||
@ -182,7 +182,7 @@ TXT
|
||||
$sender->send(new Envelope(new DummyMessage('Third')));
|
||||
|
||||
sleep(1); // give amqp a moment to have the messages ready
|
||||
$this->assertSame(3, $connection->countMessagesInQueue());
|
||||
$this->assertSame(3, $connection->countMessagesInQueues());
|
||||
}
|
||||
|
||||
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
|
||||
|
@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||
|
||||
/**
|
||||
* @requires extension amqp
|
||||
*/
|
||||
class AmqpReceivedStampTest extends TestCase
|
||||
{
|
||||
public function testStamp()
|
||||
{
|
||||
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
|
||||
|
||||
$stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');
|
||||
|
||||
$this->assertSame($amqpEnvelope, $stamp->getAmqpEnvelope());
|
||||
$this->assertSame('queueName', $stamp->getQueueName());
|
||||
}
|
||||
}
|
@ -36,7 +36,8 @@ class AmqpReceiverTest extends TestCase
|
||||
|
||||
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($amqpEnvelope);
|
||||
$connection->method('getQueueNames')->willReturn(['queueName']);
|
||||
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$actualEnvelopes = iterator_to_array($receiver->get());
|
||||
@ -52,11 +53,12 @@ class AmqpReceiverTest extends TestCase
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($amqpEnvelope);
|
||||
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
|
||||
$connection->method('getQueueNames')->willReturn(['queueName']);
|
||||
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
|
||||
$connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
|
||||
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,11 +69,12 @@ class AmqpReceiverTest extends TestCase
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($amqpEnvelope);
|
||||
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
||||
$connection->method('getQueueNames')->willReturn(['queueName']);
|
||||
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
|
||||
$connection->method('nack')->with($amqpEnvelope, 'queueName', AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
|
||||
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
|
||||
}
|
||||
|
||||
private function createAMQPEnvelope()
|
||||
|
@ -0,0 +1,24 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
|
||||
|
||||
class AmqpRoutingKeyStampTest extends TestCase
|
||||
{
|
||||
public function testStamp()
|
||||
{
|
||||
$stamp = new AmqpRoutingKeyStamp('routing_key');
|
||||
$this->assertSame('routing_key', $stamp->getRoutingKey());
|
||||
}
|
||||
}
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
@ -38,6 +39,21 @@ class AmqpSenderTest extends TestCase
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
public function testItSendsTheEncodedMessageUsingARoutingKey()
|
||||
{
|
||||
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
|
||||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
||||
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$serializer->method('encode')->with($envelope)->willReturn($encoded);
|
||||
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
public function testItSendsTheEncodedMessageWithoutHeaders()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Oy'));
|
||||
|
@ -45,7 +45,8 @@ class AmqpTransportTest extends TestCase
|
||||
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
|
||||
|
||||
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
||||
$connection->method('get')->willReturn($amqpEnvelope);
|
||||
$connection->method('getQueueNames')->willReturn(['queueName']);
|
||||
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
|
||||
|
||||
$envelopes = iterator_to_array($transport->get());
|
||||
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
||||
|
@ -40,7 +40,7 @@ class ConnectionTest extends TestCase
|
||||
], [
|
||||
'name' => 'messages',
|
||||
], [
|
||||
'name' => 'messages',
|
||||
'messages' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://localhost/%2f/messages')
|
||||
);
|
||||
@ -58,9 +58,9 @@ class ConnectionTest extends TestCase
|
||||
], [
|
||||
'name' => 'exchangeName',
|
||||
], [
|
||||
'name' => 'queue',
|
||||
'queueName' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName')
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
|
||||
);
|
||||
}
|
||||
|
||||
@ -77,9 +77,9 @@ class ConnectionTest extends TestCase
|
||||
], [
|
||||
'name' => 'exchangeName',
|
||||
], [
|
||||
'name' => 'queueName',
|
||||
'queueName' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queue[name]=queueName', [
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
|
||||
'persistent' => 'true',
|
||||
'exchange' => ['name' => 'toBeOverwritten'],
|
||||
])
|
||||
@ -89,10 +89,10 @@ class ConnectionTest extends TestCase
|
||||
public function testSetsParametersOnTheQueueAndExchange()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpQueue->expects($this->once())->method('setArguments')->with([
|
||||
@ -110,17 +110,19 @@ class ConnectionTest extends TestCase
|
||||
]);
|
||||
|
||||
$dsn = 'amqp://localhost/%2f/messages?'.
|
||||
'queue[arguments][x-dead-letter-exchange]=dead-exchange&'.
|
||||
'queue[arguments][x-message-ttl]=100&'.
|
||||
'queue[arguments][x-delay]=100&'.
|
||||
'queue[arguments][x-expires]=150&'
|
||||
'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&'.
|
||||
'queues[messages][arguments][x-message-ttl]=100&'.
|
||||
'queues[messages][arguments][x-delay]=100&'.
|
||||
'queues[messages][arguments][x-expires]=150&'
|
||||
;
|
||||
$connection = Connection::fromDsn($dsn, [
|
||||
'queue' => [
|
||||
'arguments' => [
|
||||
'x-max-length' => '200',
|
||||
'x-max-length-bytes' => '300',
|
||||
'x-max-priority' => '4',
|
||||
'queues' => [
|
||||
'messages' => [
|
||||
'arguments' => [
|
||||
'x-max-length' => '200',
|
||||
'x-max-length-bytes' => '300',
|
||||
'x-max-priority' => '4',
|
||||
],
|
||||
],
|
||||
],
|
||||
'exchange' => [
|
||||
@ -135,20 +137,23 @@ class ConnectionTest extends TestCase
|
||||
public function invalidQueueArgumentsDataProvider(): iterable
|
||||
{
|
||||
$baseDsn = 'amqp://localhost/%2f/messages';
|
||||
yield [$baseDsn.'?queue[arguments][x-delay]=not-a-number', []];
|
||||
yield [$baseDsn.'?queue[arguments][x-expires]=not-a-number', []];
|
||||
yield [$baseDsn.'?queue[arguments][x-max-length]=not-a-number', []];
|
||||
yield [$baseDsn.'?queue[arguments][x-max-length-bytes]=not-a-number', []];
|
||||
yield [$baseDsn.'?queue[arguments][x-max-priority]=not-a-number', []];
|
||||
yield [$baseDsn.'?queue[arguments][x-message-ttl]=not-a-number', []];
|
||||
|
||||
// Ensure the exception is thrown when the arguments are passed via the array options
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-delay' => 'not-a-number']]]];
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-expires' => 'not-a-number']]]];
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length' => 'not-a-number']]]];
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]];
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-priority' => 'not-a-number']]]];
|
||||
yield [$baseDsn, ['queue' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]];
|
||||
return [
|
||||
[$baseDsn.'?queues[messages][arguments][x-delay]=not-a-number', []],
|
||||
[$baseDsn.'?queues[messages][arguments][x-expires]=not-a-number', []],
|
||||
[$baseDsn.'?queues[messages][arguments][x-max-length]=not-a-number', []],
|
||||
[$baseDsn.'?queues[messages][arguments][x-max-length-bytes]=not-a-number', []],
|
||||
[$baseDsn.'?queues[messages][arguments][x-max-priority]=not-a-number', []],
|
||||
[$baseDsn.'?queues[messages][arguments][x-message-ttl]=not-a-number', []],
|
||||
|
||||
// Ensure the exception is thrown when the arguments are passed via the array options
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-delay' => 'not-a-number']]]]],
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-expires' => 'not-a-number']]]]],
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length' => 'not-a-number']]]]],
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]]],
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-priority' => 'not-a-number']]]]],
|
||||
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]]],
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -165,10 +170,10 @@ class ConnectionTest extends TestCase
|
||||
public function testItUsesANormalConnectionByDefault()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
// makes sure the channel looks connected, so it's not re-created
|
||||
@ -182,10 +187,10 @@ class ConnectionTest extends TestCase
|
||||
public function testItAllowsToUseAPersistentConnection()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
// makes sure the channel looks connected, so it's not re-created
|
||||
@ -196,49 +201,7 @@ class ConnectionTest extends TestCase
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItSetupsTheConnectionByDefault()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
$amqpExchange->method('getName')->willReturn('exchange_name');
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItCanDisableTheSetup()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
$amqpExchange->method('getName')->willReturn('exchange_name');
|
||||
$amqpExchange->expects($this->never())->method('declareExchange');
|
||||
$amqpQueue->expects($this->never())->method('declareQueue');
|
||||
$amqpQueue->expects($this->never())->method('bind');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testPublishWithQueueOptions()
|
||||
public function testItSetupsTheConnectionWithDefaults()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
@ -246,14 +209,78 @@ class ConnectionTest extends TestCase
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
$headers = [
|
||||
'type' => '*',
|
||||
];
|
||||
$amqpExchange->expects($this->once())->method('publish')
|
||||
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
|
||||
$connection->publish('body', $headers);
|
||||
$amqpExchange->method('getName')->willReturn('exchange_name');
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
|
||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItSetupsTheConnection()
|
||||
{
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class);
|
||||
$amqpQueue0 = $this->createMock(\AMQPQueue::class);
|
||||
$amqpQueue1 = $this->createMock(\AMQPQueue::class);
|
||||
|
||||
$factory = $this->createMock(AmqpFactory::class);
|
||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||
$factory->method('createExchange')->willReturn($amqpExchange);
|
||||
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
|
||||
|
||||
$amqpExchange->method('getName')->willReturn('exchange_name');
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
|
||||
$amqpQueue0->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
|
||||
['exchange_name', 'binding_key0'],
|
||||
['exchange_name', 'binding_key1']
|
||||
);
|
||||
$amqpQueue1->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
|
||||
['exchange_name', 'binding_key2'],
|
||||
['exchange_name', 'binding_key3']
|
||||
);
|
||||
|
||||
$dsn = 'amqp://localhost/%2f/messages?'.
|
||||
'exchange[default_publish_routing_key]=routing_key&'.
|
||||
'queues[queue0][binding_keys][0]=binding_key0&'.
|
||||
'queues[queue0][binding_keys][1]=binding_key1&'.
|
||||
'queues[queue1][binding_keys][0]=binding_key2&'.
|
||||
'queues[queue1][binding_keys][1]=binding_key3';
|
||||
|
||||
$connection = Connection::fromDsn($dsn, [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItCanDisableTheSetup()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->method('getName')->willReturn('exchange_name');
|
||||
$amqpExchange->expects($this->never())->method('declareExchange');
|
||||
$amqpQueue->expects($this->never())->method('declareQueue');
|
||||
$amqpQueue->expects($this->never())->method('bind');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
|
||||
$connection->publish('body');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testSetChannelPrefetchWhenSetup()
|
||||
@ -277,11 +304,11 @@ class ConnectionTest extends TestCase
|
||||
|
||||
public function testItDelaysTheMessage()
|
||||
{
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
|
||||
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
||||
|
||||
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
|
||||
$factory = $this->createMock(AmqpFactory::class);
|
||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||
$factory->method('createQueue')->willReturn($delayQueue);
|
||||
@ -314,11 +341,11 @@ class ConnectionTest extends TestCase
|
||||
|
||||
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
|
||||
{
|
||||
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
|
||||
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
|
||||
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
||||
|
||||
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
|
||||
$factory = $this->createMock(AmqpFactory::class);
|
||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||
$factory->method('createQueue')->willReturn($delayQueue);
|
||||
@ -375,6 +402,83 @@ class ConnectionTest extends TestCase
|
||||
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
|
||||
$connection->channel();
|
||||
}
|
||||
|
||||
public function testItCanPublishWithTheDefaultRoutingKey()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
|
||||
$connection->publish('body');
|
||||
}
|
||||
|
||||
public function testItCanPublishWithASuppliedRoutingKey()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
|
||||
$connection->publish('body', [], 0, 'routing_key');
|
||||
}
|
||||
|
||||
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
|
||||
{
|
||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
||||
|
||||
$factory = $this->createMock(AmqpFactory::class);
|
||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||
$factory->method('createQueue')->willReturn($delayQueue);
|
||||
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||
$delayExchange = $this->createMock(\AMQPExchange::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
));
|
||||
|
||||
$amqpExchange->expects($this->once())->method('setName')->with('messages');
|
||||
$amqpExchange->method('getName')->willReturn('messages');
|
||||
|
||||
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
||||
$delayExchange->expects($this->once())->method('declareExchange');
|
||||
$delayExchange->method('getName')->willReturn('delay');
|
||||
|
||||
$connectionOptions = [
|
||||
'retry' => [
|
||||
'dead_routing_key' => 'my_dead_routing_key',
|
||||
],
|
||||
];
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
|
||||
|
||||
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
|
||||
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||
'x-message-ttl' => 120000,
|
||||
'x-dead-letter-exchange' => 'messages',
|
||||
]);
|
||||
$delayQueue->expects($this->once())->method('setArgument')->with(
|
||||
'x-dead-letter-routing-key',
|
||||
'routing_key'
|
||||
);
|
||||
|
||||
$delayQueue->expects($this->once())->method('declareQueue');
|
||||
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
|
||||
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
|
||||
$connection->publish('{}', [], 120000, 'routing_key');
|
||||
}
|
||||
}
|
||||
|
||||
class TestAmqpFactory extends AmqpFactory
|
||||
|
@ -21,14 +21,21 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
class AmqpReceivedStamp implements StampInterface
|
||||
{
|
||||
private $amqpEnvelope;
|
||||
private $queueName;
|
||||
|
||||
public function __construct(\AMQPEnvelope $amqpEnvelope)
|
||||
public function __construct(\AMQPEnvelope $amqpEnvelope, string $queueName)
|
||||
{
|
||||
$this->amqpEnvelope = $amqpEnvelope;
|
||||
$this->queueName = $queueName;
|
||||
}
|
||||
|
||||
public function getAmqpEnvelope(): \AMQPEnvelope
|
||||
{
|
||||
return $this->amqpEnvelope;
|
||||
}
|
||||
|
||||
public function getQueueName(): string
|
||||
{
|
||||
return $this->queueName;
|
||||
}
|
||||
}
|
||||
|
@ -42,9 +42,16 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function get(): iterable
|
||||
{
|
||||
foreach ($this->connection->getQueueNames() as $queueName) {
|
||||
yield from $this->getEnvelope($queueName);
|
||||
}
|
||||
}
|
||||
|
||||
private function getEnvelope(string $queueName): iterable
|
||||
{
|
||||
try {
|
||||
$amqpEnvelope = $this->connection->get();
|
||||
$amqpEnvelope = $this->connection->get($queueName);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
@ -60,12 +67,12 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
]);
|
||||
} catch (MessageDecodingFailedException $exception) {
|
||||
// invalid message of some type
|
||||
$this->rejectAmqpEnvelope($amqpEnvelope);
|
||||
$this->rejectAmqpEnvelope($amqpEnvelope, $queueName);
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
|
||||
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -74,7 +81,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
try {
|
||||
$this->connection->ack($this->findAmqpEnvelope($envelope));
|
||||
/* @var AmqpReceivedStamp $amqpReceivedStamp */
|
||||
$this->connection->ack(
|
||||
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
|
||||
$amqpReceivedStamp->getQueueName()
|
||||
);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
@ -85,7 +96,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
|
||||
/* @var AmqpReceivedStamp $amqpReceivedStamp */
|
||||
$this->rejectAmqpEnvelope(
|
||||
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
|
||||
$amqpReceivedStamp->getQueueName()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,21 +108,20 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return $this->connection->countMessagesInQueue();
|
||||
return $this->connection->countMessagesInQueues();
|
||||
}
|
||||
|
||||
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
|
||||
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
|
||||
{
|
||||
try {
|
||||
$this->connection->nack($amqpEnvelope, AMQP_NOPARAM);
|
||||
$this->connection->nack($amqpEnvelope, $queueName, AMQP_NOPARAM);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
private function findAmqpEnvelope(Envelope $envelope): \AMQPEnvelope
|
||||
private function findAmqpEnvelope(Envelope $envelope, AmqpReceivedStamp &$amqpReceivedStamp = null): \AMQPEnvelope
|
||||
{
|
||||
/** @var AmqpReceivedStamp|null $amqpReceivedStamp */
|
||||
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||
|
||||
if (null === $amqpReceivedStamp) {
|
||||
|
@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
|
||||
/**
|
||||
* @author Guillaume Gammelin <ggammelin@gmail.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
final class AmqpRoutingKeyStamp implements StampInterface
|
||||
{
|
||||
private $routingKey;
|
||||
|
||||
public function __construct(string $routingKey)
|
||||
{
|
||||
$this->routingKey = $routingKey;
|
||||
}
|
||||
|
||||
public function getRoutingKey(): string
|
||||
{
|
||||
return $this->routingKey;
|
||||
}
|
||||
}
|
@ -51,7 +51,11 @@ class AmqpSender implements SenderInterface
|
||||
}
|
||||
|
||||
try {
|
||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
|
||||
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
|
||||
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
|
||||
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
|
||||
|
||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
|
||||
} catch (\AMQPException $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ class Connection
|
||||
|
||||
private $connectionConfiguration;
|
||||
private $exchangeConfiguration;
|
||||
private $queueConfiguration;
|
||||
private $queuesConfiguration;
|
||||
private $amqpFactory;
|
||||
|
||||
/**
|
||||
@ -49,9 +49,9 @@ class Connection
|
||||
private $amqpExchange;
|
||||
|
||||
/**
|
||||
* @var \AMQPQueue|null
|
||||
* @var \AMQPQueue[]|null
|
||||
*/
|
||||
private $amqpQueue;
|
||||
private $amqpQueues = [];
|
||||
|
||||
/**
|
||||
* @var \AMQPExchange|null
|
||||
@ -68,14 +68,14 @@ class Connection
|
||||
* * vhost: Virtual Host to use with the AMQP service
|
||||
* * user: Username to use to connect the the AMQP service
|
||||
* * password: Password to use the connect to the AMQP service
|
||||
* * queue:
|
||||
* * name: Name of the queue
|
||||
* * routing_key: The routing key (if any) to use to push the messages to
|
||||
* * queues[name]: An array of queues, keyed by the name
|
||||
* * binding_keys: The binding keys (if any) to bind to this queue
|
||||
* * flags: Queue flags (Default: AMQP_DURABLE)
|
||||
* * arguments: Extra arguments
|
||||
* * exchange:
|
||||
* * name: Name of the exchange
|
||||
* * type: Type of exchange (Default: fanout)
|
||||
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
|
||||
* * flags: Exchange flags (Default: AMQP_DURABLE)
|
||||
* * arguments: Extra arguments
|
||||
* * delay:
|
||||
@ -86,7 +86,7 @@ class Connection
|
||||
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
||||
* * prefetch_count: set channel prefetch count
|
||||
*/
|
||||
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
|
||||
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
$this->connectionConfiguration = array_replace_recursive([
|
||||
'delay' => [
|
||||
@ -96,7 +96,7 @@ class Connection
|
||||
],
|
||||
], $connectionConfiguration);
|
||||
$this->exchangeConfiguration = $exchangeConfiguration;
|
||||
$this->queueConfiguration = $queueConfiguration;
|
||||
$this->queuesConfiguration = $queuesConfiguration;
|
||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||
}
|
||||
|
||||
@ -107,17 +107,17 @@ class Connection
|
||||
}
|
||||
|
||||
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
|
||||
$exchangeName = $pathParts[1] ?? 'messages';
|
||||
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
|
||||
|
||||
$amqpOptions = array_replace_recursive([
|
||||
'host' => $parsedUrl['host'] ?? 'localhost',
|
||||
'port' => $parsedUrl['port'] ?? 5672,
|
||||
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
|
||||
'queue' => [
|
||||
'name' => $queueName = $pathParts[1] ?? 'messages',
|
||||
],
|
||||
'exchange' => [
|
||||
'name' => $queueName,
|
||||
'name' => $exchangeName,
|
||||
],
|
||||
], $options);
|
||||
], $options, $parsedQuery);
|
||||
|
||||
if (isset($parsedUrl['user'])) {
|
||||
$amqpOptions['login'] = $parsedUrl['user'];
|
||||
@ -127,21 +127,26 @@ class Connection
|
||||
$amqpOptions['password'] = $parsedUrl['pass'];
|
||||
}
|
||||
|
||||
if (isset($parsedUrl['query'])) {
|
||||
parse_str($parsedUrl['query'], $parsedQuery);
|
||||
|
||||
$amqpOptions = array_replace_recursive($amqpOptions, $parsedQuery);
|
||||
if (!isset($amqpOptions['queues'])) {
|
||||
$amqpOptions['queues'][$exchangeName] = [];
|
||||
}
|
||||
|
||||
$exchangeOptions = $amqpOptions['exchange'];
|
||||
$queueOptions = $amqpOptions['queue'];
|
||||
unset($amqpOptions['queue'], $amqpOptions['exchange']);
|
||||
$queuesOptions = $amqpOptions['queues'];
|
||||
unset($amqpOptions['queues'], $amqpOptions['exchange']);
|
||||
|
||||
if (\is_array($queueOptions['arguments'] ?? false)) {
|
||||
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
|
||||
}
|
||||
$queuesOptions = array_map(function ($queueOptions) {
|
||||
if (!\is_array($queueOptions)) {
|
||||
$queueOptions = [];
|
||||
}
|
||||
if (\is_array($queueOptions['arguments'] ?? false)) {
|
||||
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
|
||||
}
|
||||
|
||||
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
|
||||
return $queueOptions;
|
||||
}, $queuesOptions);
|
||||
|
||||
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
|
||||
}
|
||||
|
||||
private static function normalizeQueueArguments(array $arguments): array
|
||||
@ -166,10 +171,10 @@ class Connection
|
||||
*
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
public function publish(string $body, array $headers = [], int $delay = 0): void
|
||||
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
|
||||
{
|
||||
if (0 !== $delay) {
|
||||
$this->publishWithDelay($body, $headers, $delay);
|
||||
$this->publishWithDelay($body, $headers, $delay, $routingKey);
|
||||
|
||||
return;
|
||||
}
|
||||
@ -178,37 +183,50 @@ class Connection
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
||||
$attributes = $this->getAttributes($headers);
|
||||
// TODO - allow flag & attributes to be configured on the message
|
||||
|
||||
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
|
||||
$this->exchange()->publish(
|
||||
$body,
|
||||
$routingKey ?? $this->getDefaultPublishRoutingKey(),
|
||||
AMQP_NOPARAM,
|
||||
[
|
||||
'headers' => $headers,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an approximate count of the messages in a queue.
|
||||
* Returns an approximate count of the messages in defined queues.
|
||||
*/
|
||||
public function countMessagesInQueue(): int
|
||||
public function countMessagesInQueues(): int
|
||||
{
|
||||
return $this->queue()->declareQueue();
|
||||
return array_sum(array_map(function ($queueName) {
|
||||
return $this->queue($queueName)->declareQueue();
|
||||
}, $this->getQueueNames()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
private function publishWithDelay(string $body, array $headers = [], int $delay)
|
||||
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
|
||||
{
|
||||
if ($this->shouldSetup()) {
|
||||
$this->setupDelay($delay);
|
||||
$this->setupDelay($delay, $exchangeRoutingKey);
|
||||
}
|
||||
|
||||
$routingKey = $this->getRoutingKeyForDelay($delay);
|
||||
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
||||
$attributes = $this->getAttributes($headers);
|
||||
// TODO - allow flag & attributes to be configured on the message
|
||||
|
||||
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
|
||||
$this->getDelayExchange()->publish(
|
||||
$body,
|
||||
$this->getRoutingKeyForDelay($delay),
|
||||
AMQP_NOPARAM,
|
||||
[
|
||||
'headers' => $headers,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
private function setupDelay(int $delay)
|
||||
private function setupDelay(int $delay, ?string $routingKey)
|
||||
{
|
||||
if (!$this->channel()->isConnected()) {
|
||||
$this->clear();
|
||||
@ -217,7 +235,7 @@ class Connection
|
||||
$exchange = $this->getDelayExchange();
|
||||
$exchange->declareExchange();
|
||||
|
||||
$queue = $this->createDelayQueue($delay);
|
||||
$queue = $this->createDelayQueue($delay, $routingKey);
|
||||
$queue->declareQueue();
|
||||
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
|
||||
}
|
||||
@ -242,7 +260,7 @@ class Connection
|
||||
* which is the original exchange, resulting on it being put back into
|
||||
* the original queue.
|
||||
*/
|
||||
private function createDelayQueue(int $delay)
|
||||
private function createDelayQueue(int $delay, ?string $routingKey)
|
||||
{
|
||||
$delayConfiguration = $this->connectionConfiguration['delay'];
|
||||
|
||||
@ -253,9 +271,10 @@ class Connection
|
||||
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||
]);
|
||||
|
||||
if (isset($this->queueConfiguration['routing_key'])) {
|
||||
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
|
||||
if (null !== $routingKey) {
|
||||
// after being released from to DLX, this routing key will be used
|
||||
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
|
||||
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
|
||||
}
|
||||
|
||||
return $queue;
|
||||
@ -267,18 +286,18 @@ class Connection
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits and gets a message from the configured queue.
|
||||
* Gets a message from the specified queue.
|
||||
*
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
public function get(): ?\AMQPEnvelope
|
||||
public function get(string $queueName): ?\AMQPEnvelope
|
||||
{
|
||||
if ($this->shouldSetup()) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
try {
|
||||
if (false !== $message = $this->queue()->get()) {
|
||||
if (false !== $message = $this->queue($queueName)->get()) {
|
||||
return $message;
|
||||
}
|
||||
} catch (\AMQPQueueException $e) {
|
||||
@ -295,14 +314,14 @@ class Connection
|
||||
return null;
|
||||
}
|
||||
|
||||
public function ack(\AMQPEnvelope $message): bool
|
||||
public function ack(\AMQPEnvelope $message, string $queueName): bool
|
||||
{
|
||||
return $this->queue()->ack($message->getDeliveryTag());
|
||||
return $this->queue($queueName)->ack($message->getDeliveryTag());
|
||||
}
|
||||
|
||||
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
|
||||
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool
|
||||
{
|
||||
return $this->queue()->nack($message->getDeliveryTag(), $flags);
|
||||
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
@ -313,10 +332,25 @@ class Connection
|
||||
|
||||
$this->exchange()->declareExchange();
|
||||
|
||||
$this->queue()->declareQueue();
|
||||
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
|
||||
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
|
||||
$this->queue($queueName)->declareQueue();
|
||||
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
|
||||
$this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string[]
|
||||
*/
|
||||
public function getQueueNames(): array
|
||||
{
|
||||
return array_keys($this->queuesConfiguration);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function channel(): \AMQPChannel
|
||||
{
|
||||
if (null === $this->amqpChannel) {
|
||||
@ -341,22 +375,26 @@ class Connection
|
||||
return $this->amqpChannel;
|
||||
}
|
||||
|
||||
public function queue(): \AMQPQueue
|
||||
private function queue(string $queueName): \AMQPQueue
|
||||
{
|
||||
if (null === $this->amqpQueue) {
|
||||
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
|
||||
$this->amqpQueue->setName($this->queueConfiguration['name']);
|
||||
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
|
||||
if (!isset($this->amqpQueues[$queueName])) {
|
||||
$queueConfig = $this->queuesConfiguration[$queueName];
|
||||
|
||||
if (isset($this->queueConfiguration['arguments'])) {
|
||||
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
|
||||
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
|
||||
$amqpQueue->setName($queueName);
|
||||
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE);
|
||||
|
||||
if (isset($queueConfig['arguments'])) {
|
||||
$amqpQueue->setArguments($queueConfig['arguments']);
|
||||
}
|
||||
|
||||
$this->amqpQueues[$queueName] = $amqpQueue;
|
||||
}
|
||||
|
||||
return $this->amqpQueue;
|
||||
return $this->amqpQueues[$queueName];
|
||||
}
|
||||
|
||||
public function exchange(): \AMQPExchange
|
||||
private function exchange(): \AMQPExchange
|
||||
{
|
||||
if (null === $this->amqpExchange) {
|
||||
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
|
||||
@ -380,7 +418,7 @@ class Connection
|
||||
private function clear(): void
|
||||
{
|
||||
$this->amqpChannel = null;
|
||||
$this->amqpQueue = null;
|
||||
$this->amqpQueues = [];
|
||||
$this->amqpExchange = null;
|
||||
}
|
||||
|
||||
@ -397,8 +435,15 @@ class Connection
|
||||
return true;
|
||||
}
|
||||
|
||||
private function getAttributes(array $headers): array
|
||||
private function getDefaultPublishRoutingKey(): ?string
|
||||
{
|
||||
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
|
||||
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
|
||||
}
|
||||
|
||||
public function purgeQueues()
|
||||
{
|
||||
foreach ($this->getQueueNames() as $queueName) {
|
||||
$this->queue($queueName)->purge();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user