[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
This commit is contained in:
parent
2389d7c686
commit
e800bd5bde
@ -11,8 +11,9 @@ CHANGELOG
|
|||||||
to the `Envelope` then find the correct bus when receiving from
|
to the `Envelope` then find the correct bus when receiving from
|
||||||
the transport. See `ConsumeMessagesCommand`.
|
the transport. See `ConsumeMessagesCommand`.
|
||||||
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
|
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
|
||||||
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
|
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
|
||||||
`ack()` and `reject()`.
|
`ack()`, `reject()` and `get()`. The methods `receive()`
|
||||||
|
and `stop()` were removed.
|
||||||
* [BC BREAK] Error handling was moved from the receivers into
|
* [BC BREAK] Error handling was moved from the receivers into
|
||||||
`Worker`. Implementations of `ReceiverInterface::handle()`
|
`Worker`. Implementations of `ReceiverInterface::handle()`
|
||||||
should now allow all exceptions to be thrown, except for transport
|
should now allow all exceptions to be thrown, except for transport
|
||||||
@ -24,7 +25,9 @@ CHANGELOG
|
|||||||
* The default command name for `ConsumeMessagesCommand` was
|
* The default command name for `ConsumeMessagesCommand` was
|
||||||
changed from `messenger:consume-messages` to `messenger:consume`
|
changed from `messenger:consume-messages` to `messenger:consume`
|
||||||
* `ConsumeMessagesCommand` has two new optional constructor arguments
|
* `ConsumeMessagesCommand` has two new optional constructor arguments
|
||||||
* `Worker` has 4 new option constructor arguments.
|
* [BC BREAK] The first argument to Worker changed from a single
|
||||||
|
`ReceiverInterface` to an array of `ReceiverInterface`.
|
||||||
|
* `Worker` has 3 new optional constructor arguments.
|
||||||
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
|
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
|
||||||
receiver no longer needs to call this.
|
receiver no longer needs to call this.
|
||||||
* The `AmqpSender` will now retry messages using a dead-letter exchange
|
* The `AmqpSender` will now retry messages using a dead-letter exchange
|
||||||
|
@ -19,12 +19,13 @@ use Symfony\Component\Console\Input\InputArgument;
|
|||||||
use Symfony\Component\Console\Input\InputInterface;
|
use Symfony\Component\Console\Input\InputInterface;
|
||||||
use Symfony\Component\Console\Input\InputOption;
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
use Symfony\Component\Console\Output\OutputInterface;
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
|
use Symfony\Component\Console\Question\ChoiceQuestion;
|
||||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||||
use Symfony\Component\Messenger\RoutableMessageBus;
|
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,10 +71,11 @@ class ConsumeMessagesCommand extends Command
|
|||||||
|
|
||||||
$this
|
$this
|
||||||
->setDefinition([
|
->setDefinition([
|
||||||
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
|
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
|
||||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||||
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||||
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
||||||
|
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
|
||||||
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
|
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
|
||||||
])
|
])
|
||||||
->setDescription('Consumes messages')
|
->setDescription('Consumes messages')
|
||||||
@ -82,6 +84,10 @@ The <info>%command.name%</info> command consumes messages and dispatches them to
|
|||||||
|
|
||||||
<info>php %command.full_name% <receiver-name></info>
|
<info>php %command.full_name% <receiver-name></info>
|
||||||
|
|
||||||
|
To receive from multiple transports, pass each name:
|
||||||
|
|
||||||
|
<info>php %command.full_name% receiver1 receiver2</info>
|
||||||
|
|
||||||
Use the --limit option to limit the number of messages received:
|
Use the --limit option to limit the number of messages received:
|
||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
||||||
@ -111,16 +117,22 @@ EOF
|
|||||||
{
|
{
|
||||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||||
|
|
||||||
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
|
||||||
if (null === $receiverName) {
|
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
|
||||||
$io->block('Missing receiver argument.', null, 'error', ' ', true);
|
|
||||||
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
|
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
|
||||||
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
|
if (\count($this->receiverNames) > 1) {
|
||||||
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
|
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
|
||||||
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
|
|
||||||
$input->setArgument('receiver', $alternatives[0]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
|
||||||
|
$question->setMultiselect(true);
|
||||||
|
|
||||||
|
$input->setArgument('receivers', $io->askQuestion($question));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (0 === \count($input->getArgument('receivers'))) {
|
||||||
|
throw new RuntimeException('Please pass at least one receiver.');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,16 +147,25 @@ EOF
|
|||||||
$output->writeln(sprintf('<comment>%s</comment>', $message));
|
$output->writeln(sprintf('<comment>%s</comment>', $message));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
$receivers = [];
|
||||||
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
$retryStrategies = [];
|
||||||
|
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
|
||||||
|
if (!$this->receiverLocator->has($receiverName)) {
|
||||||
|
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
|
||||||
|
if ($this->receiverNames) {
|
||||||
|
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new RuntimeException($message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
||||||
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
||||||
}
|
}
|
||||||
|
|
||||||
$receiver = $this->receiverLocator->get($receiverName);
|
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
|
||||||
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
||||||
|
}
|
||||||
|
|
||||||
if (null !== $input->getOption('bus')) {
|
if (null !== $input->getOption('bus')) {
|
||||||
$bus = $this->busLocator->get($input->getOption('bus'));
|
$bus = $this->busLocator->get($input->getOption('bus'));
|
||||||
@ -152,24 +173,25 @@ EOF
|
|||||||
$bus = new RoutableMessageBus($this->busLocator);
|
$bus = new RoutableMessageBus($this->busLocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
|
||||||
$stopsWhen = [];
|
$stopsWhen = [];
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
$stopsWhen[] = "processed {$limit} messages";
|
$stopsWhen[] = "processed {$limit} messages";
|
||||||
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
|
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($memoryLimit = $input->getOption('memory-limit')) {
|
if ($memoryLimit = $input->getOption('memory-limit')) {
|
||||||
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
||||||
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($timeLimit = $input->getOption('time-limit')) {
|
if ($timeLimit = $input->getOption('time-limit')) {
|
||||||
$stopsWhen[] = "been running for {$timeLimit}s";
|
$stopsWhen[] = "been running for {$timeLimit}s";
|
||||||
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
|
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||||
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
|
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
|
||||||
|
|
||||||
if ($stopsWhen) {
|
if ($stopsWhen) {
|
||||||
$last = array_pop($stopsWhen);
|
$last = array_pop($stopsWhen);
|
||||||
@ -183,8 +205,9 @@ EOF
|
|||||||
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
||||||
}
|
}
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
|
$worker->run([
|
||||||
$worker->run();
|
'sleep' => $input->getOption('sleep') * 1000000,
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function convertToBytes(string $memoryLimit): int
|
private function convertToBytes(string $memoryLimit): int
|
||||||
|
@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
|
|||||||
public function testConfigurationWithDefaultReceiver()
|
public function testConfigurationWithDefaultReceiver()
|
||||||
{
|
{
|
||||||
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
|
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
|
||||||
$inputArgument = $command->getDefinition()->getArgument('receiver');
|
$inputArgument = $command->getDefinition()->getArgument('receivers');
|
||||||
$this->assertFalse($inputArgument->isRequired());
|
$this->assertFalse($inputArgument->isRequired());
|
||||||
$this->assertSame('amqp', $inputArgument->getDefault());
|
$this->assertSame(['amqp'], $inputArgument->getDefault());
|
||||||
}
|
|
||||||
|
|
||||||
public function testConfigurationWithoutDefaultReceiver()
|
|
||||||
{
|
|
||||||
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
|
|
||||||
$inputArgument = $command->getDefinition()->getArgument('receiver');
|
|
||||||
$this->assertTrue($inputArgument->isRequired());
|
|
||||||
$this->assertNull($inputArgument->getDefault());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -612,11 +612,9 @@ class DummyHandler
|
|||||||
|
|
||||||
class DummyReceiver implements ReceiverInterface
|
class DummyReceiver implements ReceiverInterface
|
||||||
{
|
{
|
||||||
public function receive(callable $handler): void
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
for ($i = 0; $i < 3; ++$i) {
|
yield new Envelope(new DummyMessage('Dummy'));
|
||||||
$handler(new Envelope(new DummyMessage("Dummy $i")));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function stop(): void
|
public function stop(): void
|
||||||
|
@ -1,48 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
|
||||||
|
|
||||||
class CallbackReceiver implements ReceiverInterface
|
|
||||||
{
|
|
||||||
private $callable;
|
|
||||||
private $acknowledgeCount = 0;
|
|
||||||
private $rejectCount = 0;
|
|
||||||
|
|
||||||
public function __construct(callable $callable)
|
|
||||||
{
|
|
||||||
$this->callable = $callable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
|
||||||
{
|
|
||||||
$callable = $this->callable;
|
|
||||||
$callable($handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
++$this->acknowledgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
++$this->rejectCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getAcknowledgeCount(): int
|
|
||||||
{
|
|
||||||
return $this->acknowledgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getRejectCount(): int
|
|
||||||
{
|
|
||||||
return $this->rejectCount;
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,46 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\WorkerInterface;
|
||||||
|
|
||||||
|
class DummyWorker implements WorkerInterface
|
||||||
|
{
|
||||||
|
private $isStopped = false;
|
||||||
|
private $envelopesToReceive;
|
||||||
|
private $envelopesHandled = 0;
|
||||||
|
|
||||||
|
public function __construct(array $envelopesToReceive)
|
||||||
|
{
|
||||||
|
$this->envelopesToReceive = $envelopesToReceive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||||
|
{
|
||||||
|
foreach ($this->envelopesToReceive as $envelope) {
|
||||||
|
if (true === $this->isStopped) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($onHandledCallback) {
|
||||||
|
$onHandledCallback($envelope);
|
||||||
|
++$this->envelopesHandled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->isStopped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isStopped(): bool
|
||||||
|
{
|
||||||
|
return $this->isStopped;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function countEnvelopesHandled()
|
||||||
|
{
|
||||||
|
return $this->envelopesHandled;
|
||||||
|
}
|
||||||
|
}
|
@ -57,16 +57,20 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$sender->send($first = new Envelope(new DummyMessage('First')));
|
$sender->send($first = new Envelope(new DummyMessage('First')));
|
||||||
$sender->send($second = new Envelope(new DummyMessage('Second')));
|
$sender->send($second = new Envelope(new DummyMessage('Second')));
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$envelopes = iterator_to_array($receiver->get());
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
|
$this->assertCount(1, $envelopes);
|
||||||
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
|
/** @var Envelope $envelope */
|
||||||
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
|
$envelope = $envelopes[0];
|
||||||
|
$this->assertEquals($first->getMessage(), $envelope->getMessage());
|
||||||
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
|
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
|
||||||
|
|
||||||
if (2 === ++$receivedMessages) {
|
$envelopes = iterator_to_array($receiver->get());
|
||||||
$receiver->stop();
|
$this->assertCount(1, $envelopes);
|
||||||
}
|
/** @var Envelope $envelope */
|
||||||
});
|
$envelope = $envelopes[0];
|
||||||
|
$this->assertEquals($second->getMessage(), $envelope->getMessage());
|
||||||
|
|
||||||
|
$this->assertEmpty(iterator_to_array($receiver->get()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testRetryAndDelay()
|
public function testRetryAndDelay()
|
||||||
@ -82,34 +86,26 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
|
|
||||||
$sender->send($first = new Envelope(new DummyMessage('First')));
|
$sender->send($first = new Envelope(new DummyMessage('First')));
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$envelopes = iterator_to_array($receiver->get());
|
||||||
$startTime = time();
|
/** @var Envelope $envelope */
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
|
$envelope = $envelopes[0];
|
||||||
if (null === $envelope) {
|
$newEnvelope = $envelope
|
||||||
// if we have been processing for 4 seconds + have received 2 messages
|
|
||||||
// then it's safe to say no other messages will be received
|
|
||||||
if (time() > $startTime + 4 && 2 === $receivedMessages) {
|
|
||||||
$receiver->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
++$receivedMessages;
|
|
||||||
|
|
||||||
// retry the first time
|
|
||||||
if (1 === $receivedMessages) {
|
|
||||||
// imitate what Worker does
|
|
||||||
$envelope = $envelope
|
|
||||||
->with(new DelayStamp(2000))
|
->with(new DelayStamp(2000))
|
||||||
->with(new RedeliveryStamp(1, 'not_important'));
|
->with(new RedeliveryStamp(1, 'not_important'));
|
||||||
$sender->send($envelope);
|
$sender->send($newEnvelope);
|
||||||
$receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
return;
|
$envelopes = [];
|
||||||
|
$startTime = time();
|
||||||
|
// wait for next message, but only for max 3 seconds
|
||||||
|
while (0 === \count($envelopes) && $startTime + 3 > time()) {
|
||||||
|
$envelopes = iterator_to_array($receiver->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (2 === $receivedMessages) {
|
$this->assertCount(1, $envelopes);
|
||||||
|
/** @var Envelope $envelope */
|
||||||
|
$envelope = $envelopes[0];
|
||||||
|
|
||||||
// should have a 2 second delay
|
// should have a 2 second delay
|
||||||
$this->assertGreaterThanOrEqual($startTime + 2, time());
|
$this->assertGreaterThanOrEqual($startTime + 2, time());
|
||||||
// but only a 2 second delay
|
// but only a 2 second delay
|
||||||
@ -122,10 +118,6 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$this->assertSame(1, $retryStamp->getRetryCount());
|
$this->assertSame(1, $retryStamp->getRetryCount());
|
||||||
|
|
||||||
$receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItReceivesSignals()
|
public function testItReceivesSignals()
|
||||||
@ -175,29 +167,6 @@ TXT
|
|||||||
, $process->getOutput());
|
, $process->getOutput());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @runInSeparateProcess
|
|
||||||
*/
|
|
||||||
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
|
|
||||||
{
|
|
||||||
$serializer = $this->createSerializer();
|
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
|
|
||||||
$connection->setup();
|
|
||||||
$connection->queue()->purge();
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
|
||||||
|
|
||||||
$receivedMessages = 0;
|
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
|
||||||
$this->assertNull($envelope);
|
|
||||||
|
|
||||||
if (2 === ++$receivedMessages) {
|
|
||||||
$receiver->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
|
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
|
||||||
{
|
{
|
||||||
$timedOutTime = time() + $timeoutInSeconds;
|
$timedOutTime = time() + $timeoutInSeconds;
|
||||||
|
@ -14,9 +14,11 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
@ -26,7 +28,7 @@ use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
|||||||
*/
|
*/
|
||||||
class AmqpReceiverTest extends TestCase
|
class AmqpReceiverTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testItSendTheDecodedMessageToTheHandler()
|
public function testItReturnsTheDecodedMessageToTheHandler()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = new Serializer(
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||||
@ -37,10 +39,9 @@ class AmqpReceiverTest extends TestCase
|
|||||||
$connection->method('get')->willReturn($amqpEnvelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$actualEnvelopes = iterator_to_array($receiver->get());
|
||||||
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
$this->assertCount(1, $actualEnvelopes);
|
||||||
$receiver->stop();
|
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,20 +49,14 @@ class AmqpReceiverTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
|
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$amqpEnvelope = $this->createAMQPEnvelope();
|
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->method('get')->willReturn($amqpEnvelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
|
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
|
||||||
$receiver->ack($envelope);
|
|
||||||
$receiver->stop();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -69,20 +64,14 @@ class AmqpReceiverTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
|
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$amqpEnvelope = $this->createAMQPEnvelope();
|
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->method('get')->willReturn($amqpEnvelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
|
||||||
$receiver->reject($envelope);
|
|
||||||
$receiver->stop();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function createAMQPEnvelope()
|
private function createAMQPEnvelope()
|
||||||
|
@ -47,11 +47,8 @@ class AmqpTransportTest extends TestCase
|
|||||||
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
||||||
$connection->method('get')->willReturn($amqpEnvelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
|
|
||||||
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
|
$envelopes = iterator_to_array($transport->get());
|
||||||
$this->assertSame($decodedMessage, $envelope->getMessage());
|
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
||||||
|
|
||||||
$transport->stop();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
|
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
|
||||||
|
@ -32,7 +32,7 @@ $connection = Connection::fromDsn(getenv('DSN'));
|
|||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$retryStrategy = new MultiplierRetryStrategy(3, 0);
|
$retryStrategy = new MultiplierRetryStrategy(3, 0);
|
||||||
|
|
||||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
|
||||||
public function dispatch($envelope): Envelope
|
public function dispatch($envelope): Envelope
|
||||||
{
|
{
|
||||||
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
|
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
|
||||||
@ -43,7 +43,7 @@ $worker = new Worker($receiver, new class() implements MessageBusInterface {
|
|||||||
|
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
}, 'the_receiver', $retryStrategy);
|
});
|
||||||
|
|
||||||
echo "Receiving messages...\n";
|
echo "Receiving messages...\n";
|
||||||
$worker->run();
|
$worker->run();
|
||||||
|
@ -1,84 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
|
||||||
|
|
||||||
class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @dataProvider memoryProvider
|
|
||||||
*/
|
|
||||||
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(new Envelope(new DummyMessage('API')));
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
if (true === $shouldStop) {
|
|
||||||
$decoratedReceiver->expects($this->once())->method('stop');
|
|
||||||
} else {
|
|
||||||
$decoratedReceiver->expects($this->never())->method('stop');
|
|
||||||
}
|
|
||||||
|
|
||||||
$memoryResolver = function () use ($memoryUsage) {
|
|
||||||
return $memoryUsage;
|
|
||||||
};
|
|
||||||
|
|
||||||
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
|
|
||||||
$memoryLimitReceiver->receive(function () {});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function memoryProvider()
|
|
||||||
{
|
|
||||||
yield [2048, 1024, true];
|
|
||||||
yield [1024, 1024, false];
|
|
||||||
yield [1024, 2048, false];
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(new Envelope(new DummyMessage('API')));
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
$decoratedReceiver->expects($this->once())->method('stop');
|
|
||||||
|
|
||||||
$logger = $this->createMock(LoggerInterface::class);
|
|
||||||
$logger->expects($this->once())->method('info')
|
|
||||||
->with('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
|
|
||||||
|
|
||||||
$memoryResolver = function () {
|
|
||||||
return 70 * 1024 * 1024;
|
|
||||||
};
|
|
||||||
|
|
||||||
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
|
|
||||||
$memoryLimitReceiver->receive(function () {});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,103 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
|
||||||
|
|
||||||
class StopWhenMessageCountIsExceededReceiverTest extends TestCase
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @dataProvider countProvider
|
|
||||||
*/
|
|
||||||
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(new Envelope(new DummyMessage('First message')));
|
|
||||||
$handler(new Envelope(new DummyMessage('Second message')));
|
|
||||||
$handler(new Envelope(new DummyMessage('Third message')));
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
if (true === $shouldStop) {
|
|
||||||
$decoratedReceiver->expects($this->any())->method('stop');
|
|
||||||
} else {
|
|
||||||
$decoratedReceiver->expects($this->never())->method('stop');
|
|
||||||
}
|
|
||||||
|
|
||||||
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
|
|
||||||
$maximumCountReceiver->receive(function () {});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function countProvider()
|
|
||||||
{
|
|
||||||
yield [1, true];
|
|
||||||
yield [2, true];
|
|
||||||
yield [3, true];
|
|
||||||
yield [4, false];
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(null);
|
|
||||||
$handler(null);
|
|
||||||
$handler(null);
|
|
||||||
$handler(null);
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
$decoratedReceiver->expects($this->never())->method('stop');
|
|
||||||
|
|
||||||
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
|
|
||||||
$maximumCountReceiver->receive(function () {});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(new Envelope(new DummyMessage('First message')));
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
$decoratedReceiver->expects($this->once())->method('stop');
|
|
||||||
|
|
||||||
$logger = $this->createMock(LoggerInterface::class);
|
|
||||||
$logger->expects($this->once())->method('info')
|
|
||||||
->with(
|
|
||||||
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
|
|
||||||
$this->equalTo(['count' => 1])
|
|
||||||
);
|
|
||||||
|
|
||||||
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
|
|
||||||
$maximumCountReceiver->receive(function () {});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,49 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
|
||||||
|
|
||||||
class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @group time-sensitive
|
|
||||||
*/
|
|
||||||
public function testReceiverStopsWhenTimeLimitIsReached()
|
|
||||||
{
|
|
||||||
$callable = function ($handler) {
|
|
||||||
$handler(new Envelope(new DummyMessage('API')));
|
|
||||||
};
|
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
|
||||||
->setConstructorArgs([$callable])
|
|
||||||
->enableProxyingToOriginalMethods()
|
|
||||||
->getMock();
|
|
||||||
|
|
||||||
$decoratedReceiver->expects($this->once())->method('receive');
|
|
||||||
$decoratedReceiver->expects($this->once())->method('stop');
|
|
||||||
|
|
||||||
$logger = $this->createMock(LoggerInterface::class);
|
|
||||||
$logger->expects($this->once())->method('info')
|
|
||||||
->with('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
|
|
||||||
|
|
||||||
$timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
|
|
||||||
$timeoutReceiver->receive(function () {
|
|
||||||
sleep(2);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,71 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
|
||||||
|
|
||||||
|
class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider memoryProvider
|
||||||
|
*/
|
||||||
|
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
||||||
|
{
|
||||||
|
$handlerCalledTimes = 0;
|
||||||
|
$handledCallback = function () use (&$handlerCalledTimes) {
|
||||||
|
++$handlerCalledTimes;
|
||||||
|
};
|
||||||
|
$decoratedWorker = new DummyWorker([
|
||||||
|
new Envelope(new \stdClass()),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$memoryResolver = function () use ($memoryUsage) {
|
||||||
|
return $memoryUsage;
|
||||||
|
};
|
||||||
|
|
||||||
|
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
|
||||||
|
$memoryLimitWorker->run([], $handledCallback);
|
||||||
|
|
||||||
|
// handler should be called exactly 1 time
|
||||||
|
$this->assertSame($handlerCalledTimes, 1);
|
||||||
|
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function memoryProvider()
|
||||||
|
{
|
||||||
|
yield [2048, 1024, true];
|
||||||
|
yield [1024, 1024, false];
|
||||||
|
yield [1024, 2048, false];
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
|
||||||
|
{
|
||||||
|
$decoratedWorker = new DummyWorker([
|
||||||
|
new Envelope(new \stdClass()),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
|
||||||
|
|
||||||
|
$memoryResolver = function () {
|
||||||
|
return 70 * 1024 * 1024;
|
||||||
|
};
|
||||||
|
|
||||||
|
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver);
|
||||||
|
$memoryLimitWorker->run();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,71 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||||
|
|
||||||
|
class StopWhenMessageCountIsExceededWorkerTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider countProvider
|
||||||
|
*/
|
||||||
|
public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
|
||||||
|
{
|
||||||
|
$handlerCalledTimes = 0;
|
||||||
|
$handledCallback = function () use (&$handlerCalledTimes) {
|
||||||
|
++$handlerCalledTimes;
|
||||||
|
};
|
||||||
|
// receive 3 real messages
|
||||||
|
$decoratedWorker = new DummyWorker([
|
||||||
|
new Envelope(new DummyMessage('First message')),
|
||||||
|
null,
|
||||||
|
new Envelope(new DummyMessage('Second message')),
|
||||||
|
null,
|
||||||
|
new Envelope(new DummyMessage('Third message')),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
|
||||||
|
$maximumCountWorker->run([], $handledCallback);
|
||||||
|
|
||||||
|
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function countProvider()
|
||||||
|
{
|
||||||
|
yield [1, true];
|
||||||
|
yield [2, true];
|
||||||
|
yield [3, true];
|
||||||
|
yield [4, false];
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
|
||||||
|
{
|
||||||
|
$decoratedWorker = new DummyWorker([
|
||||||
|
new Envelope(new \stdClass()),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with(
|
||||||
|
$this->equalTo('Worker stopped due to maximum count of {count} exceeded'),
|
||||||
|
$this->equalTo(['count' => 1])
|
||||||
|
);
|
||||||
|
|
||||||
|
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger);
|
||||||
|
$maximumCountWorker->run();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,44 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
|
||||||
|
|
||||||
|
class StopWhenTimeLimitIsReachedWorkerTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @group time-sensitive
|
||||||
|
*/
|
||||||
|
public function testWorkerStopsWhenTimeLimitIsReached()
|
||||||
|
{
|
||||||
|
$decoratedWorker = new DummyWorker([
|
||||||
|
new Envelope(new \stdClass()),
|
||||||
|
new Envelope(new \stdClass()),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
|
||||||
|
|
||||||
|
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
|
||||||
|
$timeoutWorker->run([], function () {
|
||||||
|
sleep(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->assertTrue($decoratedWorker->isStopped());
|
||||||
|
$this->assertSame(1, $decoratedWorker->countEnvelopesHandled());
|
||||||
|
}
|
||||||
|
}
|
@ -22,11 +22,14 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @group time-sensitive
|
||||||
|
*/
|
||||||
class WorkerTest extends TestCase
|
class WorkerTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testWorkerDispatchTheReceivedMessage()
|
public function testWorkerDispatchTheReceivedMessage()
|
||||||
@ -34,18 +37,22 @@ class WorkerTest extends TestCase
|
|||||||
$apiMessage = new DummyMessage('API');
|
$apiMessage = new DummyMessage('API');
|
||||||
$ipaMessage = new DummyMessage('IPA');
|
$ipaMessage = new DummyMessage('IPA');
|
||||||
|
|
||||||
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
|
$receiver = new DummyReceiver([
|
||||||
$handler(new Envelope($apiMessage));
|
[new Envelope($apiMessage), new Envelope($ipaMessage)],
|
||||||
$handler(new Envelope($ipaMessage));
|
]);
|
||||||
});
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
|
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
|
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id');
|
$worker = new Worker([$receiver], $bus);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
||||||
}
|
}
|
||||||
@ -53,24 +60,26 @@ class WorkerTest extends TestCase
|
|||||||
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('API'));
|
$envelope = new Envelope(new DummyMessage('API'));
|
||||||
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
$receiver = new DummyReceiver([[$envelope]]);
|
||||||
$handler($envelope);
|
|
||||||
});
|
|
||||||
$envelope = $envelope->with(new ReceivedStamp());
|
$envelope = $envelope->with(new ReceivedStamp());
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
$worker = new Worker([$receiver], $bus, []);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testDispatchCausesRetry()
|
public function testDispatchCausesRetry()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new DummyReceiver([
|
||||||
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
|
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
|
||||||
});
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||||
@ -93,8 +102,13 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// old message acknowledged
|
// old message acknowledged
|
||||||
$this->assertSame(1, $receiver->getAcknowledgeCount());
|
$this->assertSame(1, $receiver->getAcknowledgeCount());
|
||||||
@ -102,9 +116,9 @@ class WorkerTest extends TestCase
|
|||||||
|
|
||||||
public function testDispatchCausesRejectWhenNoRetry()
|
public function testDispatchCausesRejectWhenNoRetry()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new DummyReceiver([
|
||||||
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
|
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
|
||||||
});
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||||
@ -112,17 +126,22 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
$this->assertSame(1, $receiver->getRejectCount());
|
$this->assertSame(1, $receiver->getRejectCount());
|
||||||
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testDispatchCausesRejectOnUnrecoverableMessage()
|
public function testDispatchCausesRejectOnUnrecoverableMessage()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new DummyReceiver([
|
||||||
$handler(new Envelope(new DummyMessage('Hello')));
|
[new Envelope(new DummyMessage('Hello'))],
|
||||||
});
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
|
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
|
||||||
@ -130,36 +149,42 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->never())->method('isRetryable');
|
$retryStrategy->expects($this->never())->method('isRetryable');
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
$this->assertSame(1, $receiver->getRejectCount());
|
$this->assertSame(1, $receiver->getRejectCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new DummyReceiver([
|
||||||
$handler(null);
|
null,
|
||||||
});
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->never())->method('dispatch');
|
$bus->expects($this->never())->method('dispatch');
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
$worker = new Worker([$receiver], $bus);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDispatchesEventsOnSuccess()
|
public function testWorkerDispatchesEventsOnSuccess()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('Hello'));
|
$envelope = new Envelope(new DummyMessage('Hello'));
|
||||||
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
$receiver = new DummyReceiver([[$envelope]]);
|
||||||
$handler($envelope);
|
|
||||||
});
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->method('dispatch')->willReturn($envelope);
|
$bus->method('dispatch')->willReturn($envelope);
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||||
|
|
||||||
$eventDispatcher->expects($this->exactly(2))
|
$eventDispatcher->expects($this->exactly(2))
|
||||||
@ -169,22 +194,24 @@ class WorkerTest extends TestCase
|
|||||||
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
|
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
|
||||||
);
|
);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
|
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDispatchesEventsOnError()
|
public function testWorkerDispatchesEventsOnError()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('Hello'));
|
$envelope = new Envelope(new DummyMessage('Hello'));
|
||||||
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
$receiver = new DummyReceiver([[$envelope]]);
|
||||||
$handler($envelope);
|
|
||||||
});
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$exception = new \InvalidArgumentException('Oh no!');
|
$exception = new \InvalidArgumentException('Oh no!');
|
||||||
$bus->method('dispatch')->willThrowException($exception);
|
$bus->method('dispatch')->willThrowException($exception);
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||||
|
|
||||||
$eventDispatcher->expects($this->exactly(2))
|
$eventDispatcher->expects($this->exactly(2))
|
||||||
@ -194,7 +221,143 @@ class WorkerTest extends TestCase
|
|||||||
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
|
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
|
||||||
);
|
);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
|
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
|
||||||
$worker->run();
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
|
// stop after the messages finish
|
||||||
|
if (null === $envelope) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testTimeoutIsConfigurable()
|
||||||
|
{
|
||||||
|
$apiMessage = new DummyMessage('API');
|
||||||
|
$receiver = new DummyReceiver([
|
||||||
|
[new Envelope($apiMessage), new Envelope($apiMessage)],
|
||||||
|
[], // will cause a wait
|
||||||
|
[], // will cause a wait
|
||||||
|
[new Envelope($apiMessage)],
|
||||||
|
[new Envelope($apiMessage)],
|
||||||
|
[], // will cause a wait
|
||||||
|
[new Envelope($apiMessage)],
|
||||||
|
]);
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
|
$worker = new Worker([$receiver], $bus);
|
||||||
|
$receivedCount = 0;
|
||||||
|
$startTime = microtime(true);
|
||||||
|
// sleep .1 after each idle
|
||||||
|
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
|
||||||
|
if (null !== $envelope) {
|
||||||
|
++$receivedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (5 === $receivedCount) {
|
||||||
|
$worker->stop();
|
||||||
|
$duration = microtime(true) - $startTime;
|
||||||
|
|
||||||
|
// wait time should be .3 seconds
|
||||||
|
// use .29 & .31 for timing "wiggle room"
|
||||||
|
$this->assertGreaterThanOrEqual(.29, $duration);
|
||||||
|
$this->assertLessThan(.31, $duration);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerWithMultipleReceivers()
|
||||||
|
{
|
||||||
|
// envelopes, in their expected delivery order
|
||||||
|
$envelope1 = new Envelope(new DummyMessage('message1'));
|
||||||
|
$envelope2 = new Envelope(new DummyMessage('message2'));
|
||||||
|
$envelope3 = new Envelope(new DummyMessage('message3'));
|
||||||
|
$envelope4 = new Envelope(new DummyMessage('message4'));
|
||||||
|
$envelope5 = new Envelope(new DummyMessage('message5'));
|
||||||
|
$envelope6 = new Envelope(new DummyMessage('message6'));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Round 1) receiver 1 & 2 have nothing, receiver 3 processes envelope1 and envelope2
|
||||||
|
* Round 2) receiver 1 has nothing, receiver 2 processes envelope3, receiver 3 is not called
|
||||||
|
* Round 3) receiver 1 processes envelope 4, receivers 2 & 3 are not called
|
||||||
|
* Round 4) receiver 1 processes envelope 5, receivers 2 & 3 are not called
|
||||||
|
* Round 5) receiver 1 has nothing, receiver 2 has nothing, receiver 3 has envelope 6
|
||||||
|
*/
|
||||||
|
$receiver1 = new DummyReceiver([
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
[$envelope4],
|
||||||
|
[$envelope5],
|
||||||
|
[],
|
||||||
|
]);
|
||||||
|
$receiver2 = new DummyReceiver([
|
||||||
|
[],
|
||||||
|
[$envelope3],
|
||||||
|
[],
|
||||||
|
]);
|
||||||
|
$receiver3 = new DummyReceiver([
|
||||||
|
[$envelope1, $envelope2],
|
||||||
|
[],
|
||||||
|
[$envelope6],
|
||||||
|
]);
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
|
$receivedCount = 0;
|
||||||
|
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus);
|
||||||
|
$processedEnvelopes = [];
|
||||||
|
$worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) {
|
||||||
|
if (null !== $envelope) {
|
||||||
|
$processedEnvelopes[] = $envelope;
|
||||||
|
++$receivedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop after the messages finish
|
||||||
|
if (6 === $receivedCount) {
|
||||||
|
$worker->stop();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// make sure they were processed in the correct order
|
||||||
|
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyReceiver implements ReceiverInterface
|
||||||
|
{
|
||||||
|
private $deliveriesOfEnvelopes;
|
||||||
|
private $acknowledgeCount = 0;
|
||||||
|
private $rejectCount = 0;
|
||||||
|
|
||||||
|
public function __construct(array $deliveriesOfEnvelopes)
|
||||||
|
{
|
||||||
|
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function get(): iterable
|
||||||
|
{
|
||||||
|
$val = array_shift($this->deliveriesOfEnvelopes);
|
||||||
|
|
||||||
|
return null === $val ? [] : $val;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
++$this->acknowledgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
++$this->rejectCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getAcknowledgeCount(): int
|
||||||
|
{
|
||||||
|
return $this->acknowledgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getRejectCount(): int
|
||||||
|
{
|
||||||
|
return $this->rejectCount;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,6 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
private $serializer;
|
private $serializer;
|
||||||
private $connection;
|
private $connection;
|
||||||
private $shouldStop;
|
|
||||||
|
|
||||||
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
@ -41,9 +40,8 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function receive(callable $handler): void
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
while (!$this->shouldStop) {
|
|
||||||
try {
|
try {
|
||||||
$amqpEnvelope = $this->connection->get();
|
$amqpEnvelope = $this->connection->get();
|
||||||
} catch (\AMQPException $exception) {
|
} catch (\AMQPException $exception) {
|
||||||
@ -51,11 +49,7 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (null === $amqpEnvelope) {
|
if (null === $amqpEnvelope) {
|
||||||
$handler(null);
|
return [];
|
||||||
|
|
||||||
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -70,9 +64,7 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
throw $exception;
|
throw $exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
|
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
|
||||||
$handler($envelope);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
@ -89,11 +81,6 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
|
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
$this->shouldStop = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
|
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -38,17 +38,9 @@ class AmqpTransport implements TransportInterface, SetupableTransportInterface
|
|||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function receive(callable $handler): void
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
($this->receiver ?? $this->getReceiver())->receive($handler);
|
return ($this->receiver ?? $this->getReceiver())->get();
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
($this->receiver ?? $this->getReceiver())->stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -25,21 +25,25 @@ interface ReceiverInterface
|
|||||||
/**
|
/**
|
||||||
* Receive some messages to the given handler.
|
* Receive some messages to the given handler.
|
||||||
*
|
*
|
||||||
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
|
* While this method could return an unlimited number of messages,
|
||||||
* Note that this envelope can be `null` if the timeout to receive something has expired.
|
* the intention is that it returns only one, or a "small number"
|
||||||
|
* of messages each time. This gives the user more flexibility:
|
||||||
|
* they can finish processing the one (or "small number") of messages
|
||||||
|
* from this receiver and move on to check other receivers for messages.
|
||||||
|
* If a this method returns too many messages, it could cause a
|
||||||
|
* blocking effect where handling the messages received from one
|
||||||
|
* call to get() takes a long time, blocking other receivers from
|
||||||
|
* being called.
|
||||||
*
|
*
|
||||||
* If the received message cannot be decoded, the message should not
|
* If a received message cannot be decoded, the message should not
|
||||||
* be retried again (e.g. if there's a queue, it should be removed)
|
* be retried again (e.g. if there's a queue, it should be removed)
|
||||||
* and a MessageDecodingFailedException should be thrown.
|
* and a MessageDecodingFailedException should be thrown.
|
||||||
*
|
*
|
||||||
* @throws TransportException If there is an issue communicating with the transport
|
* @throws TransportException If there is an issue communicating with the transport
|
||||||
|
*
|
||||||
|
* @return Envelope[]
|
||||||
*/
|
*/
|
||||||
public function receive(callable $handler): void;
|
public function get(): iterable;
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop receiving some messages.
|
|
||||||
*/
|
|
||||||
public function stop(): void;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acknowledge that the passed message was handled.
|
* Acknowledge that the passed message was handled.
|
||||||
|
@ -1,68 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Simon Delicata <simon.delicata@free.fr>
|
|
||||||
*
|
|
||||||
* @experimental in 4.2
|
|
||||||
*/
|
|
||||||
class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
|
|
||||||
{
|
|
||||||
private $decoratedReceiver;
|
|
||||||
private $memoryLimit;
|
|
||||||
private $logger;
|
|
||||||
private $memoryResolver;
|
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver = $decoratedReceiver;
|
|
||||||
$this->memoryLimit = $memoryLimit;
|
|
||||||
$this->logger = $logger;
|
|
||||||
$this->memoryResolver = $memoryResolver ?: function () {
|
|
||||||
return \memory_get_usage();
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
|
|
||||||
$handler($envelope);
|
|
||||||
|
|
||||||
$memoryResolver = $this->memoryResolver;
|
|
||||||
if ($memoryResolver() > $this->memoryLimit) {
|
|
||||||
$this->stop();
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->ack($envelope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->reject($envelope);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,65 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
|
||||||
*
|
|
||||||
* @experimental in 4.2
|
|
||||||
*/
|
|
||||||
class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
|
|
||||||
{
|
|
||||||
private $decoratedReceiver;
|
|
||||||
private $maximumNumberOfMessages;
|
|
||||||
private $logger;
|
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver = $decoratedReceiver;
|
|
||||||
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
|
||||||
$this->logger = $logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
|
||||||
{
|
|
||||||
$receivedMessages = 0;
|
|
||||||
|
|
||||||
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
|
|
||||||
$handler($envelope);
|
|
||||||
|
|
||||||
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
|
||||||
$this->stop();
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->ack($envelope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->reject($envelope);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,66 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Simon Delicata <simon.delicata@free.fr>
|
|
||||||
*
|
|
||||||
* @experimental in 4.2
|
|
||||||
*/
|
|
||||||
class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
|
|
||||||
{
|
|
||||||
private $decoratedReceiver;
|
|
||||||
private $timeLimitInSeconds;
|
|
||||||
private $logger;
|
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver = $decoratedReceiver;
|
|
||||||
$this->timeLimitInSeconds = $timeLimitInSeconds;
|
|
||||||
$this->logger = $logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
|
||||||
{
|
|
||||||
$startTime = microtime(true);
|
|
||||||
$endTime = $startTime + $this->timeLimitInSeconds;
|
|
||||||
|
|
||||||
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) {
|
|
||||||
$handler($envelope);
|
|
||||||
|
|
||||||
if ($endTime < microtime(true)) {
|
|
||||||
$this->stop();
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->ack($envelope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
$this->decoratedReceiver->reject($envelope);
|
|
||||||
}
|
|
||||||
}
|
|
@ -32,51 +32,87 @@ use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
|||||||
*
|
*
|
||||||
* @final
|
* @final
|
||||||
*/
|
*/
|
||||||
class Worker
|
class Worker implements WorkerInterface
|
||||||
{
|
{
|
||||||
private $receiver;
|
private $receivers;
|
||||||
private $bus;
|
private $bus;
|
||||||
private $receiverName;
|
private $retryStrategies;
|
||||||
private $retryStrategy;
|
|
||||||
private $eventDispatcher;
|
private $eventDispatcher;
|
||||||
private $logger;
|
private $logger;
|
||||||
|
private $shouldStop = false;
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
/**
|
||||||
|
* @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
|
||||||
|
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
|
||||||
|
*/
|
||||||
|
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->receiver = $receiver;
|
$this->receivers = $receivers;
|
||||||
$this->bus = $bus;
|
$this->bus = $bus;
|
||||||
if (null === $receiverName) {
|
$this->retryStrategies = $retryStrategies;
|
||||||
@trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
|
|
||||||
|
|
||||||
$receiverName = 'unknown';
|
|
||||||
}
|
|
||||||
$this->receiverName = $receiverName;
|
|
||||||
$this->retryStrategy = $retryStrategy;
|
|
||||||
$this->eventDispatcher = $eventDispatcher;
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Receive the messages and dispatch them to the bus.
|
* Receive the messages and dispatch them to the bus.
|
||||||
|
*
|
||||||
|
* Valid options are:
|
||||||
|
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
|
||||||
*/
|
*/
|
||||||
public function run()
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||||
{
|
{
|
||||||
|
$options = array_merge([
|
||||||
|
'sleep' => 1000000,
|
||||||
|
], $options);
|
||||||
|
|
||||||
if (\function_exists('pcntl_signal')) {
|
if (\function_exists('pcntl_signal')) {
|
||||||
pcntl_signal(SIGTERM, function () {
|
pcntl_signal(SIGTERM, function () {
|
||||||
$this->receiver->stop();
|
$this->stop();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->receiver->receive(function (?Envelope $envelope) {
|
$onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
|
||||||
if (null === $envelope) {
|
|
||||||
if (\function_exists('pcntl_signal_dispatch')) {
|
if (\function_exists('pcntl_signal_dispatch')) {
|
||||||
pcntl_signal_dispatch();
|
pcntl_signal_dispatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
if (null !== $onHandledCallback) {
|
||||||
|
$onHandledCallback($envelope);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
while (false === $this->shouldStop) {
|
||||||
|
$envelopeHandled = false;
|
||||||
|
foreach ($this->receivers as $receiverName => $receiver) {
|
||||||
|
$envelopes = $receiver->get();
|
||||||
|
|
||||||
|
foreach ($envelopes as $envelope) {
|
||||||
|
$envelopeHandled = true;
|
||||||
|
|
||||||
|
$this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
|
||||||
|
$onHandled($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
|
// after handling a single receiver, quit and start the loop again
|
||||||
|
// this should prevent multiple lower priority receivers from
|
||||||
|
// blocking too long before the higher priority are checked
|
||||||
|
if ($envelopeHandled) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (false === $envelopeHandled) {
|
||||||
|
$onHandled(null);
|
||||||
|
|
||||||
|
usleep($options['sleep']);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
|
||||||
|
{
|
||||||
|
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
|
||||||
|
|
||||||
$message = $envelope->getMessage();
|
$message = $envelope->getMessage();
|
||||||
$context = [
|
$context = [
|
||||||
@ -87,12 +123,12 @@ class Worker
|
|||||||
try {
|
try {
|
||||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
||||||
} catch (\Throwable $throwable) {
|
} catch (\Throwable $throwable) {
|
||||||
$shouldRetry = $this->shouldRetry($throwable, $envelope);
|
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
|
||||||
|
|
||||||
if ($shouldRetry) {
|
if ($shouldRetry) {
|
||||||
if (null === $this->retryStrategy) {
|
if (null === $retryStrategy) {
|
||||||
// not logically allowed, but check just in case
|
// not logically allowed, but check just in case
|
||||||
throw new LogicException('Retrying is not supported without a retry strategy.');
|
throw new LogicException('Retrying is not supported without a retry strategy.');
|
||||||
}
|
}
|
||||||
@ -103,41 +139,37 @@ class Worker
|
|||||||
}
|
}
|
||||||
|
|
||||||
// add the delay and retry stamp info + remove ReceivedStamp
|
// add the delay and retry stamp info + remove ReceivedStamp
|
||||||
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
|
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
|
||||||
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
|
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
|
||||||
->withoutAll(ReceivedStamp::class);
|
->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
// re-send the message
|
// re-send the message
|
||||||
$this->bus->dispatch($retryEnvelope);
|
$this->bus->dispatch($retryEnvelope);
|
||||||
// acknowledge the previous message has received
|
// acknowledge the previous message has received
|
||||||
$this->receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
} else {
|
} else {
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
|
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->receiver->reject($envelope);
|
$receiver->reject($envelope);
|
||||||
}
|
|
||||||
|
|
||||||
if (\function_exists('pcntl_signal_dispatch')) {
|
|
||||||
pcntl_signal_dispatch();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
|
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
|
||||||
|
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
if (\function_exists('pcntl_signal_dispatch')) {
|
|
||||||
pcntl_signal_dispatch();
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->shouldStop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function dispatchEvent($event)
|
private function dispatchEvent($event)
|
||||||
@ -149,17 +181,17 @@ class Worker
|
|||||||
$this->eventDispatcher->dispatch($event);
|
$this->eventDispatcher->dispatch($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function shouldRetry(\Throwable $e, Envelope $envelope): bool
|
private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool
|
||||||
{
|
{
|
||||||
if ($e instanceof UnrecoverableMessageHandlingException) {
|
if ($e instanceof UnrecoverableMessageHandlingException) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null === $this->retryStrategy) {
|
if (null === $retryStrategy) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->retryStrategy->isRetryable($envelope);
|
return $retryStrategy->isRetryable($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getRetryCount(Envelope $envelope): int
|
private function getRetryCount(Envelope $envelope): int
|
||||||
|
@ -0,0 +1,61 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\WorkerInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Simon Delicata <simon.delicata@free.fr>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface
|
||||||
|
{
|
||||||
|
private $decoratedWorker;
|
||||||
|
private $memoryLimit;
|
||||||
|
private $logger;
|
||||||
|
private $memoryResolver;
|
||||||
|
|
||||||
|
public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
|
||||||
|
{
|
||||||
|
$this->decoratedWorker = $decoratedWorker;
|
||||||
|
$this->memoryLimit = $memoryLimit;
|
||||||
|
$this->logger = $logger;
|
||||||
|
$this->memoryResolver = $memoryResolver ?: function () {
|
||||||
|
return \memory_get_usage();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||||
|
{
|
||||||
|
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
|
||||||
|
if (null !== $onHandledCallback) {
|
||||||
|
$onHandledCallback($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
$memoryResolver = $this->memoryResolver;
|
||||||
|
if ($memoryResolver() > $this->memoryLimit) {
|
||||||
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->decoratedWorker->stop();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,58 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\WorkerInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class StopWhenMessageCountIsExceededWorker implements WorkerInterface
|
||||||
|
{
|
||||||
|
private $decoratedWorker;
|
||||||
|
private $maximumNumberOfMessages;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
|
public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->decoratedWorker = $decoratedWorker;
|
||||||
|
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
||||||
|
$this->logger = $logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||||
|
{
|
||||||
|
$receivedMessages = 0;
|
||||||
|
|
||||||
|
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
|
||||||
|
if (null !== $onHandledCallback) {
|
||||||
|
$onHandledCallback($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
||||||
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->decoratedWorker->stop();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\WorkerInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Simon Delicata <simon.delicata@free.fr>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class StopWhenTimeLimitIsReachedWorker implements WorkerInterface
|
||||||
|
{
|
||||||
|
private $decoratedWorker;
|
||||||
|
private $timeLimitInSeconds;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
|
public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->decoratedWorker = $decoratedWorker;
|
||||||
|
$this->timeLimitInSeconds = $timeLimitInSeconds;
|
||||||
|
$this->logger = $logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||||
|
{
|
||||||
|
$startTime = microtime(true);
|
||||||
|
$endTime = $startTime + $this->timeLimitInSeconds;
|
||||||
|
|
||||||
|
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
|
||||||
|
if (null !== $onHandledCallback) {
|
||||||
|
$onHandledCallback($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($endTime < microtime(true)) {
|
||||||
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->decoratedWorker->stop();
|
||||||
|
}
|
||||||
|
}
|
37
src/Symfony/Component/Messenger/WorkerInterface.php
Normal file
37
src/Symfony/Component/Messenger/WorkerInterface.php
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for Workers that handle messages from transports.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
interface WorkerInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Receive the messages and dispatch them to the bus.
|
||||||
|
*
|
||||||
|
* The $onHandledCallback will be passed the Envelope that was just
|
||||||
|
* handled or null if nothing was handled.
|
||||||
|
*
|
||||||
|
* @param mixed[] $options options used to control worker behavior
|
||||||
|
*/
|
||||||
|
public function run(array $options = [], callable $onHandledCallback = null): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop receiving messages.
|
||||||
|
*/
|
||||||
|
public function stop(): void;
|
||||||
|
}
|
Reference in New Issue
Block a user