[Messenger] Add option to stop the worker after a message failed
This commit is contained in:
parent
362c5d4700
commit
ea79206470
@ -8,6 +8,7 @@ CHANGELOG
|
|||||||
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
|
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
|
||||||
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
|
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
|
||||||
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
|
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
|
||||||
|
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
|
||||||
|
|
||||||
5.0.0
|
5.0.0
|
||||||
-----
|
-----
|
||||||
|
@ -23,6 +23,7 @@ use Symfony\Component\Console\Output\OutputInterface;
|
|||||||
use Symfony\Component\Console\Question\ChoiceQuestion;
|
use Symfony\Component\Console\Question\ChoiceQuestion;
|
||||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||||
|
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
|
||||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
|
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
|
||||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
|
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
|
||||||
@ -64,6 +65,7 @@ class ConsumeMessagesCommand extends Command
|
|||||||
->setDefinition([
|
->setDefinition([
|
||||||
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
|
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
|
||||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||||
|
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
|
||||||
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||||
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
||||||
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
|
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
|
||||||
@ -83,6 +85,10 @@ Use the --limit option to limit the number of messages received:
|
|||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
||||||
|
|
||||||
|
Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
|
||||||
|
|
||||||
|
<info>php %command.full_name% <receiver-name> --failure-limit=2</info>
|
||||||
|
|
||||||
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
|
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
|
||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
|
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
|
||||||
@ -152,6 +158,11 @@ EOF
|
|||||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
|
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($failureLimit = $input->getOption('failure-limit')) {
|
||||||
|
$stopsWhen[] = "reached {$failureLimit} failed messages";
|
||||||
|
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
|
||||||
|
}
|
||||||
|
|
||||||
if ($memoryLimit = $input->getOption('memory-limit')) {
|
if ($memoryLimit = $input->getOption('memory-limit')) {
|
||||||
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
||||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
|
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
|
||||||
|
@ -0,0 +1,63 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\EventListener;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Michel Hunziker <info@michelhunziker.com>
|
||||||
|
*/
|
||||||
|
class StopWorkerOnFailureLimitListener implements EventSubscriberInterface
|
||||||
|
{
|
||||||
|
private $maximumNumberOfFailures;
|
||||||
|
private $logger;
|
||||||
|
private $failedMessages = 0;
|
||||||
|
|
||||||
|
public function __construct(int $maximumNumberOfFailures, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->maximumNumberOfFailures = $maximumNumberOfFailures;
|
||||||
|
$this->logger = $logger;
|
||||||
|
|
||||||
|
if ($maximumNumberOfFailures <= 0) {
|
||||||
|
throw new InvalidArgumentException('Failure limit must be greater than zero.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onMessageFailed(WorkerMessageFailedEvent $event): void
|
||||||
|
{
|
||||||
|
++$this->failedMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onWorkerRunning(WorkerRunningEvent $event): void
|
||||||
|
{
|
||||||
|
if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) {
|
||||||
|
$this->failedMessages = 0;
|
||||||
|
$event->getWorker()->stop();
|
||||||
|
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Worker stopped due to limit of {count} failed message(s) is reached', ['count' => $this->maximumNumberOfFailures]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getSubscribedEvents(): array
|
||||||
|
{
|
||||||
|
return [
|
||||||
|
WorkerMessageFailedEvent::class => 'onMessageFailed',
|
||||||
|
WorkerRunningEvent::class => 'onWorkerRunning',
|
||||||
|
];
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||||
|
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
use Throwable;
|
||||||
|
|
||||||
|
class StopWorkerOnFailureLimitListenerTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider countProvider
|
||||||
|
*/
|
||||||
|
public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldStop): void
|
||||||
|
{
|
||||||
|
$worker = $this->createMock(Worker::class);
|
||||||
|
$worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
|
||||||
|
|
||||||
|
$failedEvent = $this->createFailedEvent();
|
||||||
|
$runningEvent = new WorkerRunningEvent($worker, false);
|
||||||
|
|
||||||
|
$failureLimitListener = new StopWorkerOnFailureLimitListener($max);
|
||||||
|
// simulate three messages (of which 2 failed)
|
||||||
|
$failureLimitListener->onMessageFailed($failedEvent);
|
||||||
|
$failureLimitListener->onWorkerRunning($runningEvent);
|
||||||
|
|
||||||
|
$failureLimitListener->onWorkerRunning($runningEvent);
|
||||||
|
|
||||||
|
$failureLimitListener->onMessageFailed($failedEvent);
|
||||||
|
$failureLimitListener->onWorkerRunning($runningEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function countProvider(): iterable
|
||||||
|
{
|
||||||
|
yield [1, true];
|
||||||
|
yield [2, true];
|
||||||
|
yield [3, false];
|
||||||
|
yield [4, false];
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven(): void
|
||||||
|
{
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with(
|
||||||
|
$this->equalTo('Worker stopped due to limit of {count} failed message(s) is reached'),
|
||||||
|
$this->equalTo(['count' => 1])
|
||||||
|
);
|
||||||
|
|
||||||
|
$worker = $this->createMock(Worker::class);
|
||||||
|
$event = new WorkerRunningEvent($worker, false);
|
||||||
|
|
||||||
|
$failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger);
|
||||||
|
$failureLimitListener->onMessageFailed($this->createFailedEvent());
|
||||||
|
$failureLimitListener->onWorkerRunning($event);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function createFailedEvent(): WorkerMessageFailedEvent
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('hello'));
|
||||||
|
|
||||||
|
return new WorkerMessageFailedEvent($envelope, 'default', $this->createMock(Throwable::class));
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user