feature #30708 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30708).

Discussion
----------

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Helps with #30699
| License       | MIT
| Doc PR        | TODO

Highlights:

* `messenger:consume` can now consume messages from multiple transports with priority ️

```
bin/console messenger:consume amqp_high amqp_medium amqp_low
```

* How long you want to sleep before checking more messages is now an option to `messenger:consume`
* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`
* Logic for looping & sleeping is moved into `Worker`

Commits
-------

e800bd5bde [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
This commit is contained in:
Fabien Potencier 2019-03-30 08:01:33 +01:00
commit b12351a7eb
28 changed files with 926 additions and 813 deletions

View File

@ -11,8 +11,9 @@ CHANGELOG
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
`ack()`, `reject()` and `get()`. The methods `receive()`
and `stop()` were removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
@ -24,7 +25,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* [BC BREAK] The first argument to Worker changed from a single
`ReceiverInterface` to an array of `ReceiverInterface`.
* `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange

View File

@ -19,12 +19,13 @@ use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
@ -70,10 +71,11 @@ class ConsumeMessagesCommand extends Command
$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
@ -82,6 +84,10 @@ The <info>%command.name%</info> command consumes messages and dispatches them to
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
@ -111,16 +117,22 @@ EOF
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (0 === \count($input->getArgument('receivers'))) {
throw new RuntimeException('Please pass at least one receiver.');
}
}
@ -135,16 +147,25 @@ EOF
$output->writeln(sprintf('<comment>%s</comment>', $message));
}
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
$receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
throw new RuntimeException($message);
}
$receiver = $this->receiverLocator->get($receiverName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
@ -152,24 +173,25 @@ EOF
$bus = new RoutableMessageBus($this->busLocator);
}
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
@ -183,8 +205,9 @@ EOF
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
}
private function convertToBytes(string $memoryLimit): int

View File

@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}
public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
}

View File

@ -612,11 +612,9 @@ class DummyHandler
class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(): iterable
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
yield new Envelope(new DummyMessage('Dummy'));
}
public function stop(): void

View File

@ -1,48 +0,0 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
class CallbackReceiver implements ReceiverInterface
{
private $callable;
private $acknowledgeCount = 0;
private $rejectCount = 0;
public function __construct(callable $callable)
{
$this->callable = $callable;
}
public function receive(callable $handler): void
{
$callable = $this->callable;
$callable($handler);
}
public function stop(): void
{
}
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
}
public function reject(Envelope $envelope): void
{
++$this->rejectCount;
}
public function getAcknowledgeCount(): int
{
return $this->acknowledgeCount;
}
public function getRejectCount(): int
{
return $this->rejectCount;
}
}

View File

@ -0,0 +1,46 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\WorkerInterface;
class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;
public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}
if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}
public function stop(): void
{
$this->isStopped = true;
}
public function isStopped(): bool
{
return $this->isStopped;
}
public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}

View File

@ -57,16 +57,20 @@ class AmqpExtIntegrationTest extends TestCase
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
$envelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$this->assertEquals($first->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
$envelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$this->assertEquals($second->getMessage(), $envelope->getMessage());
$this->assertEmpty(iterator_to_array($receiver->get()));
}
public function testRetryAndDelay()
@ -82,50 +86,38 @@ class AmqpExtIntegrationTest extends TestCase
$sender->send($first = new Envelope(new DummyMessage('First')));
$receivedMessages = 0;
$envelopes = iterator_to_array($receiver->get());
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$newEnvelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($newEnvelope);
$receiver->ack($envelope);
$envelopes = [];
$startTime = time();
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
if (null === $envelope) {
// if we have been processing for 4 seconds + have received 2 messages
// then it's safe to say no other messages will be received
if (time() > $startTime + 4 && 2 === $receivedMessages) {
$receiver->stop();
}
// wait for next message, but only for max 3 seconds
while (0 === \count($envelopes) && $startTime + 3 > time()) {
$envelopes = iterator_to_array($receiver->get());
}
return;
}
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
++$receivedMessages;
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
// retry the first time
if (1 === $receivedMessages) {
// imitate what Worker does
$envelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($envelope);
$receiver->ack($envelope);
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
return;
}
if (2 === $receivedMessages) {
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
$receiver->ack($envelope);
return;
}
});
$receiver->ack($envelope);
}
public function testItReceivesSignals()
@ -175,29 +167,6 @@ TXT
, $process->getOutput());
}
/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
$connection->setup();
$connection->queue()->purge();
$receiver = new AmqpReceiver($connection, $serializer);
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;

View File

@ -14,9 +14,11 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@ -26,7 +28,7 @@ use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
*/
class AmqpReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandler()
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
@ -37,10 +39,9 @@ class AmqpReceiverTest extends TestCase
$connection->method('get')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
/**
@ -48,20 +49,14 @@ class AmqpReceiverTest extends TestCase
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->ack($envelope);
$receiver->stop();
});
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
/**
@ -69,20 +64,14 @@ class AmqpReceiverTest extends TestCase
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->reject($envelope);
$receiver->stop();
});
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
private function createAMQPEnvelope()

View File

@ -47,11 +47,8 @@ class AmqpTransportTest extends TestCase
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
$transport->stop();
});
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

View File

@ -32,7 +32,7 @@ $connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
public function dispatch($envelope): Envelope
{
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
@ -43,7 +43,7 @@ $worker = new Worker($receiver, new class() implements MessageBusInterface {
return $envelope;
}
}, 'the_receiver', $retryStrategy);
});
echo "Receiving messages...\n";
$worker->run();

View File

@ -1,84 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new Envelope(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->once())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}
$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}
public function memoryProvider()
{
yield [2048, 1024, true];
yield [1024, 1024, false];
yield [1024, 2048, false];
}
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new Envelope(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
$memoryResolver = function () {
return 70 * 1024 * 1024;
};
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}
}

View File

@ -1,103 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
class StopWhenMessageCountIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new Envelope(new DummyMessage('First message')));
$handler(new Envelope(new DummyMessage('Second message')));
$handler(new Envelope(new DummyMessage('Third message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->any())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
$maximumCountReceiver->receive(function () {});
}
public function countProvider()
{
yield [1, true];
yield [2, true];
yield [3, true];
yield [4, false];
}
public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
{
$callable = function ($handler) {
$handler(null);
$handler(null);
$handler(null);
$handler(null);
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->never())->method('stop');
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
$maximumCountReceiver->receive(function () {});
}
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new Envelope(new DummyMessage('First message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
$this->equalTo(['count' => 1])
);
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
$maximumCountReceiver->receive(function () {});
}
}

View File

@ -1,49 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
{
/**
* @group time-sensitive
*/
public function testReceiverStopsWhenTimeLimitIsReached()
{
$callable = function ($handler) {
$handler(new Envelope(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs([$callable])
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
$timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
$timeoutReceiver->receive(function () {
sleep(2);
});
}
}

View File

@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$handlerCalledTimes = 0;
$handledCallback = function () use (&$handlerCalledTimes) {
++$handlerCalledTimes;
};
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
$memoryLimitWorker->run([], $handledCallback);
// handler should be called exactly 1 time
$this->assertSame($handlerCalledTimes, 1);
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function memoryProvider()
{
yield [2048, 1024, true];
yield [1024, 1024, false];
yield [1024, 2048, false];
}
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
$memoryResolver = function () {
return 70 * 1024 * 1024;
};
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver);
$memoryLimitWorker->run();
}
}

View File

@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
class StopWhenMessageCountIsExceededWorkerTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$handlerCalledTimes = 0;
$handledCallback = function () use (&$handlerCalledTimes) {
++$handlerCalledTimes;
};
// receive 3 real messages
$decoratedWorker = new DummyWorker([
new Envelope(new DummyMessage('First message')),
null,
new Envelope(new DummyMessage('Second message')),
null,
new Envelope(new DummyMessage('Third message')),
]);
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
$maximumCountWorker->run([], $handledCallback);
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function countProvider()
{
yield [1, true];
yield [2, true];
yield [3, true];
yield [4, false];
}
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Worker stopped due to maximum count of {count} exceeded'),
$this->equalTo(['count' => 1])
);
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger);
$maximumCountWorker->run();
}
}

View File

@ -0,0 +1,44 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
class StopWhenTimeLimitIsReachedWorkerTest extends TestCase
{
/**
* @group time-sensitive
*/
public function testWorkerStopsWhenTimeLimitIsReached()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
$timeoutWorker->run([], function () {
sleep(2);
});
$this->assertTrue($decoratedWorker->isStopped());
$this->assertSame(1, $decoratedWorker->countEnvelopesHandled());
}
}

View File

@ -22,11 +22,14 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* @group time-sensitive
*/
class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
@ -34,18 +37,22 @@ class WorkerTest extends TestCase
$apiMessage = new DummyMessage('API');
$ipaMessage = new DummyMessage('IPA');
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
$handler(new Envelope($apiMessage));
$handler(new Envelope($ipaMessage));
});
$receiver = new DummyReceiver([
[new Envelope($apiMessage), new Envelope($ipaMessage)],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
$worker = new Worker($receiver, $bus, 'receiver_id');
$worker->run();
$worker = new Worker([$receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(2, $receiver->getAcknowledgeCount());
}
@ -53,24 +60,26 @@ class WorkerTest extends TestCase
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
{
$envelope = new Envelope(new DummyMessage('API'));
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
$handler($envelope);
});
$receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$worker = new Worker([$receiver], $bus, []);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
}
public function testDispatchCausesRetry()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
});
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
@ -93,8 +102,13 @@ class WorkerTest extends TestCase
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
@ -102,9 +116,9 @@ class WorkerTest extends TestCase
public function testDispatchCausesRejectWhenNoRetry()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
});
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
@ -112,17 +126,22 @@ class WorkerTest extends TestCase
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(1, $receiver->getRejectCount());
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
public function testDispatchCausesRejectOnUnrecoverableMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new Envelope(new DummyMessage('Hello')));
});
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'))],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
@ -130,36 +149,42 @@ class WorkerTest extends TestCase
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable');
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(1, $receiver->getRejectCount());
}
public function testWorkerDoesNotSendNullMessagesToTheBus()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(null);
});
$receiver = new DummyReceiver([
null,
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->never())->method('dispatch');
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$worker = new Worker([$receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
}
public function testWorkerDispatchesEventsOnSuccess()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
$handler($envelope);
});
$receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturn($envelope);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
@ -169,22 +194,24 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
$worker->run();
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
}
public function testWorkerDispatchesEventsOnError()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
$handler($envelope);
});
$receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$exception = new \InvalidArgumentException('Oh no!');
$bus->method('dispatch')->willThrowException($exception);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
@ -194,7 +221,143 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
$worker->run();
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
}
public function testTimeoutIsConfigurable()
{
$apiMessage = new DummyMessage('API');
$receiver = new DummyReceiver([
[new Envelope($apiMessage), new Envelope($apiMessage)],
[], // will cause a wait
[], // will cause a wait
[new Envelope($apiMessage)],
[new Envelope($apiMessage)],
[], // will cause a wait
[new Envelope($apiMessage)],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$worker = new Worker([$receiver], $bus);
$receivedCount = 0;
$startTime = microtime(true);
// sleep .1 after each idle
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
if (null !== $envelope) {
++$receivedCount;
}
if (5 === $receivedCount) {
$worker->stop();
$duration = microtime(true) - $startTime;
// wait time should be .3 seconds
// use .29 & .31 for timing "wiggle room"
$this->assertGreaterThanOrEqual(.29, $duration);
$this->assertLessThan(.31, $duration);
}
});
}
public function testWorkerWithMultipleReceivers()
{
// envelopes, in their expected delivery order
$envelope1 = new Envelope(new DummyMessage('message1'));
$envelope2 = new Envelope(new DummyMessage('message2'));
$envelope3 = new Envelope(new DummyMessage('message3'));
$envelope4 = new Envelope(new DummyMessage('message4'));
$envelope5 = new Envelope(new DummyMessage('message5'));
$envelope6 = new Envelope(new DummyMessage('message6'));
/*
* Round 1) receiver 1 & 2 have nothing, receiver 3 processes envelope1 and envelope2
* Round 2) receiver 1 has nothing, receiver 2 processes envelope3, receiver 3 is not called
* Round 3) receiver 1 processes envelope 4, receivers 2 & 3 are not called
* Round 4) receiver 1 processes envelope 5, receivers 2 & 3 are not called
* Round 5) receiver 1 has nothing, receiver 2 has nothing, receiver 3 has envelope 6
*/
$receiver1 = new DummyReceiver([
[],
[],
[$envelope4],
[$envelope5],
[],
]);
$receiver2 = new DummyReceiver([
[],
[$envelope3],
[],
]);
$receiver3 = new DummyReceiver([
[$envelope1, $envelope2],
[],
[$envelope6],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$receivedCount = 0;
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus);
$processedEnvelopes = [];
$worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) {
if (null !== $envelope) {
$processedEnvelopes[] = $envelope;
++$receivedCount;
}
// stop after the messages finish
if (6 === $receivedCount) {
$worker->stop();
}
});
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}
}
class DummyReceiver implements ReceiverInterface
{
private $deliveriesOfEnvelopes;
private $acknowledgeCount = 0;
private $rejectCount = 0;
public function __construct(array $deliveriesOfEnvelopes)
{
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
}
public function get(): iterable
{
$val = array_shift($this->deliveriesOfEnvelopes);
return null === $val ? [] : $val;
}
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
}
public function reject(Envelope $envelope): void
{
++$this->rejectCount;
}
public function getAcknowledgeCount(): int
{
return $this->acknowledgeCount;
}
public function getRejectCount(): int
{
return $this->rejectCount;
}
}

View File

@ -30,7 +30,6 @@ class AmqpReceiver implements ReceiverInterface
{
private $serializer;
private $connection;
private $shouldStop;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
@ -41,38 +40,31 @@ class AmqpReceiver implements ReceiverInterface
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
public function get(): iterable
{
while (!$this->shouldStop) {
try {
$amqpEnvelope = $this->connection->get();
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $amqpEnvelope) {
$handler(null);
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
continue;
}
try {
$envelope = $this->serializer->decode([
'body' => $amqpEnvelope->getBody(),
'headers' => $amqpEnvelope->getHeaders(),
]);
} catch (MessageDecodingFailedException $exception) {
// invalid message of some type
$this->rejectAmqpEnvelope($amqpEnvelope);
throw $exception;
}
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
$handler($envelope);
try {
$amqpEnvelope = $this->connection->get();
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $amqpEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $amqpEnvelope->getBody(),
'headers' => $amqpEnvelope->getHeaders(),
]);
} catch (MessageDecodingFailedException $exception) {
// invalid message of some type
$this->rejectAmqpEnvelope($amqpEnvelope);
throw $exception;
}
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
}
public function ack(Envelope $envelope): void
@ -89,11 +81,6 @@ class AmqpReceiver implements ReceiverInterface
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
}
public function stop(): void
{
$this->shouldStop = true;
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
{
try {

View File

@ -38,17 +38,9 @@ class AmqpTransport implements TransportInterface, SetupableTransportInterface
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
public function get(): iterable
{
($this->receiver ?? $this->getReceiver())->receive($handler);
}
/**
* {@inheritdoc}
*/
public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
return ($this->receiver ?? $this->getReceiver())->get();
}
/**

View File

@ -25,21 +25,25 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
* While this method could return an unlimited number of messages,
* the intention is that it returns only one, or a "small number"
* of messages each time. This gives the user more flexibility:
* they can finish processing the one (or "small number") of messages
* from this receiver and move on to check other receivers for messages.
* If a this method returns too many messages, it could cause a
* blocking effect where handling the messages received from one
* call to get() takes a long time, blocking other receivers from
* being called.
*
* If the received message cannot be decoded, the message should not
* If a received message cannot be decoded, the message should not
* be retried again (e.g. if there's a queue, it should be removed)
* and a MessageDecodingFailedException should be thrown.
*
* @throws TransportException If there is an issue communicating with the transport
*
* @return Envelope[]
*/
public function receive(callable $handler): void;
/**
* Stop receiving some messages.
*/
public function stop(): void;
public function get(): iterable;
/**
* Acknowledge that the passed message was handled.

View File

@ -1,68 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*
* @experimental in 4.2
*/
class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $memoryLimit;
private $logger;
private $memoryResolver;
public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->memoryLimit = $memoryLimit;
$this->logger = $logger;
$this->memoryResolver = $memoryResolver ?: function () {
return \memory_get_usage();
};
}
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
$handler($envelope);
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -1,65 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*/
class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $maximumNumberOfMessages;
private $logger;
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
}
public function receive(callable $handler): void
{
$receivedMessages = 0;
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
$handler($envelope);
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -1,66 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*
* @experimental in 4.2
*/
class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $timeLimitInSeconds;
private $logger;
public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->timeLimitInSeconds = $timeLimitInSeconds;
$this->logger = $logger;
}
public function receive(callable $handler): void
{
$startTime = microtime(true);
$endTime = $startTime + $this->timeLimitInSeconds;
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) {
$handler($envelope);
if ($endTime < microtime(true)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -32,112 +32,144 @@ use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
*
* @final
*/
class Worker
class Worker implements WorkerInterface
{
private $receiver;
private $receivers;
private $bus;
private $receiverName;
private $retryStrategy;
private $retryStrategies;
private $eventDispatcher;
private $logger;
private $shouldStop = false;
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
/**
* @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
*/
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
$this->receiver = $receiver;
$this->receivers = $receivers;
$this->bus = $bus;
if (null === $receiverName) {
@trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
$receiverName = 'unknown';
}
$this->receiverName = $receiverName;
$this->retryStrategy = $retryStrategy;
$this->retryStrategies = $retryStrategies;
$this->eventDispatcher = $eventDispatcher;
$this->logger = $logger;
}
/**
* Receive the messages and dispatch them to the bus.
*
* Valid options are:
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
*/
public function run()
public function run(array $options = [], callable $onHandledCallback = null): void
{
$options = array_merge([
'sleep' => 1000000,
], $options);
if (\function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, function () {
$this->receiver->stop();
$this->stop();
});
}
$this->receiver->receive(function (?Envelope $envelope) {
if (null === $envelope) {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
return;
}
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
$shouldRetry = $this->shouldRetry($throwable, $envelope);
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
if ($shouldRetry) {
if (null === $this->retryStrategy) {
// not logically allowed, but check just in case
throw new LogicException('Retrying is not supported without a retry strategy.');
}
$retryCount = $this->getRetryCount($envelope) + 1;
if (null !== $this->logger) {
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
}
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
->withoutAll(ReceivedStamp::class);
// re-send the message
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$this->receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
}
$this->receiver->reject($envelope);
}
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
return;
}
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
if (null !== $this->logger) {
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
}
$this->receiver->ack($envelope);
$onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
});
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
};
while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $receiverName => $receiver) {
$envelopes = $receiver->get();
foreach ($envelopes as $envelope) {
$envelopeHandled = true;
$this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
$onHandled($envelope);
}
// after handling a single receiver, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
break;
}
}
if (false === $envelopeHandled) {
$onHandled(null);
usleep($options['sleep']);
}
}
}
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
{
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
if ($shouldRetry) {
if (null === $retryStrategy) {
// not logically allowed, but check just in case
throw new LogicException('Retrying is not supported without a retry strategy.');
}
$retryCount = $this->getRetryCount($envelope) + 1;
if (null !== $this->logger) {
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
}
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
->withoutAll(ReceivedStamp::class);
// re-send the message
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
}
$receiver->reject($envelope);
}
return;
}
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
if (null !== $this->logger) {
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
}
$receiver->ack($envelope);
}
public function stop(): void
{
$this->shouldStop = true;
}
private function dispatchEvent($event)
@ -149,17 +181,17 @@ class Worker
$this->eventDispatcher->dispatch($event);
}
private function shouldRetry(\Throwable $e, Envelope $envelope): bool
private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool
{
if ($e instanceof UnrecoverableMessageHandlingException) {
return false;
}
if (null === $this->retryStrategy) {
if (null === $retryStrategy) {
return false;
}
return $this->retryStrategy->isRetryable($envelope);
return $retryStrategy->isRetryable($envelope);
}
private function getRetryCount(Envelope $envelope): int

View File

@ -0,0 +1,61 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*
* @experimental in 4.3
*/
class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface
{
private $decoratedWorker;
private $memoryLimit;
private $logger;
private $memoryResolver;
public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->memoryLimit = $memoryLimit;
$this->logger = $logger;
$this->memoryResolver = $memoryResolver ?: function () {
return \memory_get_usage();
};
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.3
*/
class StopWhenMessageCountIsExceededWorker implements WorkerInterface
{
private $decoratedWorker;
private $maximumNumberOfMessages;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$receivedMessages = 0;
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -0,0 +1,59 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*
* @experimental in 4.3
*/
class StopWhenTimeLimitIsReachedWorker implements WorkerInterface
{
private $decoratedWorker;
private $timeLimitInSeconds;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->timeLimitInSeconds = $timeLimitInSeconds;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$startTime = microtime(true);
$endTime = $startTime + $this->timeLimitInSeconds;
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if ($endTime < microtime(true)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* Interface for Workers that handle messages from transports.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
interface WorkerInterface
{
/**
* Receive the messages and dispatch them to the bus.
*
* The $onHandledCallback will be passed the Envelope that was just
* handled or null if nothing was handled.
*
* @param mixed[] $options options used to control worker behavior
*/
public function run(array $options = [], callable $onHandledCallback = null): void;
/**
* Stop receiving messages.
*/
public function stop(): void;
}