feature #38973 [Messenger] Allow to limit consumer to specific queues (dbu)

This PR was squashed before being merged into the 5.3-dev branch.

Discussion
----------

[Messenger] Allow to limit consumer to specific queues

| Q             | A
| ------------- | ---
| Branch?       | 5.x for features
| Bug fix?      | no
| New feature?  | yes (TODO: changelog)
| Deprecations? | no
| Tickets       | Fix #38630 (i think)
| License       | MIT
| Doc PR        | symfony/symfony-docs#... TODO

(Note: I am aware that there are other solutions for #38630 that might be more elegant. Our usecase does not use fanout, the reason why we need the functionality is different)

**Description**
We have a large application where one part is creating messages for products that need reindexing. A transport decorator decides before queueing whether this is a large effort or a small effort, based on some metric. Based on that, it adds a routing key which is then used in rabbitmq to put the message into the "small" or "large" queue.

We need two separate consumer processes that consume the small and the large queue, for separate scaling and such.

I looked into how we could achieve that. One option is to offer another option in the consume command. That would need to be forwarded to the receiver somehow, i added an interface for it now. The current PR is an illustration of the idea. If you specify a queue that the receiver does not have, things will fail in an inlegeant. If you specify multiple receivers, you can't specify the queue per receiver (though that starts being an odd usecase imho, you could then run two consumers instead)

Another option could be to allow configuring multiple receivers for the same transport that get the queue name(s) injected into their constructor. Then you can consume them separately. This currently needs a ton of configuration and some custom code to work. I can look at doing a PR to make this approach simpler, if you prefer it over the option to the consume command...

Commits
-------

9af1e20dae Adding changelog
81d6a49750 [Messenger] Allow to limit consumer to specific queues
This commit is contained in:
Nyholm 2021-02-02 21:33:54 +01:00
commit 84faecf77a
No known key found for this signature in database
GPG Key ID: D6332DE2B6F8FA38
9 changed files with 115 additions and 8 deletions

View File

@ -4,6 +4,7 @@
"preferred-install": {
"symfony/form": "source",
"symfony/http-kernel": "source",
"symfony/messenger": "source",
"symfony/notifier": "source",
"symfony/validator": "source",
"*": "dist"

View File

@ -4,7 +4,8 @@ CHANGELOG
5.3
---
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
5.2.0
-----

View File

@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -25,7 +25,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
@ -41,7 +41,15 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function get(): iterable
{
foreach ($this->connection->getQueueNames() as $queueName) {
yield from $this->getFromQueues($this->connection->getQueueNames());
}
/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
yield from $this->getEnvelope($queueName);
}
}

View File

@ -18,7 +18,7 @@
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1",
"symfony/messenger": "^5.1"
"symfony/messenger": "^5.3"
},
"require-dev": {
"symfony/event-dispatcher": "^4.4|^5.0",

View File

@ -5,6 +5,7 @@ CHANGELOG
---
* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.
* Added `queues` option to `Worker` to only fetch messages from a specific queue from a receiver implementing `QueueReceiverInterface`.
5.2.0
-----

View File

@ -71,6 +71,7 @@ class ConsumeMessagesCommand extends Command
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
@ -104,6 +105,10 @@ to instead of trying to determine it automatically. This is required if the
messages didn't originate from Messenger:
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
EOF
)
;
@ -195,9 +200,13 @@ EOF
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$worker->run([
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
]);
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
}
$worker->run($options);
return 0;
}

View File

@ -21,12 +21,14 @@ use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -245,6 +247,41 @@ class WorkerTest extends TestCase
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}
public function testWorkerLimitQueues()
{
$envelope = [new Envelope(new DummyMessage('message1'))];
$receiver = $this->createMock(QueueReceiverInterface::class);
$receiver->expects($this->once())
->method('getFromQueues')
->with(['foo'])
->willReturn($envelope)
;
$receiver->expects($this->never())
->method('get')
;
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run(['queues' => ['foo']]);
}
public function testWorkerLimitQueuesUnsupported()
{
$receiver1 = $this->createMock(QueueReceiverInterface::class);
$receiver2 = $this->createMock(ReceiverInterface::class);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus);
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
$worker->run(['queues' => ['foo']]);
}
public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));

View File

@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
use Symfony\Component\Messenger\Envelope;
/**
* Some transports may have multiple queues. This interface is used to read from only some queues.
*
* @author David Buchmann <mail@davidbu.ch>
*
* @experimental in 5.3
*/
interface QueueReceiverInterface extends ReceiverInterface
{
/**
* Get messages from the specified queue names instead of consuming from all queues.
*
* @param string[] $queueNames
*
* @return Envelope[]
*/
public function getFromQueues(array $queueNames): iterable;
}

View File

@ -22,8 +22,10 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -57,6 +59,7 @@ class Worker
*
* Valid options are:
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
* * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
*/
public function run(array $options = []): void
{
@ -65,11 +68,25 @@ class Worker
$options = array_merge([
'sleep' => 1000000,
], $options);
$queueNames = $options['queues'] ?? false;
if ($queueNames) {
// if queue names are specified, all receivers must implement the QueueReceiverInterface
foreach ($this->receivers as $transportName => $receiver) {
if (!$receiver instanceof QueueReceiverInterface) {
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
}
}
}
while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $transportName => $receiver) {
$envelopes = $receiver->get();
if ($queueNames) {
$envelopes = $receiver->getFromQueues($queueNames);
} else {
$envelopes = $receiver->get();
}
foreach ($envelopes as $envelope) {
$envelopeHandled = true;