diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 664ab03e49..47f72d7919 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -8,6 +8,7 @@ CHANGELOG * Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport` * Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport` * Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure. + * Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached. 5.0.0 ----- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index ac718153d2..8dfcbe240f 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -23,6 +23,7 @@ use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Question\ChoiceQuestion; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; @@ -64,6 +65,7 @@ class ConsumeMessagesCommand extends Command ->setDefinition([ new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []), new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), + new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1), @@ -82,6 +84,10 @@ To receive from multiple transports, pass each name: Use the --limit option to limit the number of messages received: php %command.full_name% --limit=10 + +Use the --failure-limit option to stop the worker when the given number of failed messages is reached: + + php %command.full_name% --failure-limit=2 Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]: @@ -152,6 +158,11 @@ EOF $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger)); } + if ($failureLimit = $input->getOption('failure-limit')) { + $stopsWhen[] = "reached {$failureLimit} failed messages"; + $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger)); + } + if ($memoryLimit = $input->getOption('memory-limit')) { $stopsWhen[] = "exceeded {$memoryLimit} of memory"; $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger)); diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php new file mode 100644 index 0000000000..29dc6aaaf2 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php @@ -0,0 +1,63 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; + +/** + * @author Michel Hunziker + */ +class StopWorkerOnFailureLimitListener implements EventSubscriberInterface +{ + private $maximumNumberOfFailures; + private $logger; + private $failedMessages = 0; + + public function __construct(int $maximumNumberOfFailures, LoggerInterface $logger = null) + { + $this->maximumNumberOfFailures = $maximumNumberOfFailures; + $this->logger = $logger; + + if ($maximumNumberOfFailures <= 0) { + throw new InvalidArgumentException('Failure limit must be greater than zero.'); + } + } + + public function onMessageFailed(WorkerMessageFailedEvent $event): void + { + ++$this->failedMessages; + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) { + $this->failedMessages = 0; + $event->getWorker()->stop(); + + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to limit of {count} failed message(s) is reached', ['count' => $this->maximumNumberOfFailures]); + } + } + } + + public static function getSubscribedEvents(): array + { + return [ + WorkerMessageFailedEvent::class => 'onMessageFailed', + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php new file mode 100644 index 0000000000..9f12b0b258 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php @@ -0,0 +1,79 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Worker; +use Throwable; + +class StopWorkerOnFailureLimitListenerTest extends TestCase +{ + /** + * @dataProvider countProvider + */ + public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldStop): void + { + $worker = $this->createMock(Worker::class); + $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop'); + + $failedEvent = $this->createFailedEvent(); + $runningEvent = new WorkerRunningEvent($worker, false); + + $failureLimitListener = new StopWorkerOnFailureLimitListener($max); + // simulate three messages (of which 2 failed) + $failureLimitListener->onMessageFailed($failedEvent); + $failureLimitListener->onWorkerRunning($runningEvent); + + $failureLimitListener->onWorkerRunning($runningEvent); + + $failureLimitListener->onMessageFailed($failedEvent); + $failureLimitListener->onWorkerRunning($runningEvent); + } + + public function countProvider(): iterable + { + yield [1, true]; + yield [2, true]; + yield [3, false]; + yield [4, false]; + } + + public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven(): void + { + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with( + $this->equalTo('Worker stopped due to limit of {count} failed message(s) is reached'), + $this->equalTo(['count' => 1]) + ); + + $worker = $this->createMock(Worker::class); + $event = new WorkerRunningEvent($worker, false); + + $failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger); + $failureLimitListener->onMessageFailed($this->createFailedEvent()); + $failureLimitListener->onWorkerRunning($event); + } + + private function createFailedEvent(): WorkerMessageFailedEvent + { + $envelope = new Envelope(new DummyMessage('hello')); + + return new WorkerMessageFailedEvent($envelope, 'default', $this->createMock(Throwable::class)); + } +}