[Messenger] Add option to stop the worker after a message failed

This commit is contained in:
Michel Hunziker 2020-01-23 14:32:06 +01:00 committed by Fabien Potencier
parent 362c5d4700
commit ea79206470
4 changed files with 154 additions and 0 deletions

View File

@ -8,6 +8,7 @@ CHANGELOG
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
5.0.0
-----

View File

@ -23,6 +23,7 @@ use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
@ -64,6 +65,7 @@ class ConsumeMessagesCommand extends Command
->setDefinition([
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
@ -82,6 +84,10 @@ To receive from multiple transports, pass each name:
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
<info>php %command.full_name% <receiver-name> --failure-limit=2</info>
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
@ -152,6 +158,11 @@ EOF
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}
if ($failureLimit = $input->getOption('failure-limit')) {
$stopsWhen[] = "reached {$failureLimit} failed messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));

View File

@ -0,0 +1,63 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
/**
* @author Michel Hunziker <info@michelhunziker.com>
*/
class StopWorkerOnFailureLimitListener implements EventSubscriberInterface
{
private $maximumNumberOfFailures;
private $logger;
private $failedMessages = 0;
public function __construct(int $maximumNumberOfFailures, LoggerInterface $logger = null)
{
$this->maximumNumberOfFailures = $maximumNumberOfFailures;
$this->logger = $logger;
if ($maximumNumberOfFailures <= 0) {
throw new InvalidArgumentException('Failure limit must be greater than zero.');
}
}
public function onMessageFailed(WorkerMessageFailedEvent $event): void
{
++$this->failedMessages;
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) {
$this->failedMessages = 0;
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to limit of {count} failed message(s) is reached', ['count' => $this->maximumNumberOfFailures]);
}
}
}
public static function getSubscribedEvents(): array
{
return [
WorkerMessageFailedEvent::class => 'onMessageFailed',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
}

View File

@ -0,0 +1,79 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Worker;
use Throwable;
class StopWorkerOnFailureLimitListenerTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldStop): void
{
$worker = $this->createMock(Worker::class);
$worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
$failedEvent = $this->createFailedEvent();
$runningEvent = new WorkerRunningEvent($worker, false);
$failureLimitListener = new StopWorkerOnFailureLimitListener($max);
// simulate three messages (of which 2 failed)
$failureLimitListener->onMessageFailed($failedEvent);
$failureLimitListener->onWorkerRunning($runningEvent);
$failureLimitListener->onWorkerRunning($runningEvent);
$failureLimitListener->onMessageFailed($failedEvent);
$failureLimitListener->onWorkerRunning($runningEvent);
}
public function countProvider(): iterable
{
yield [1, true];
yield [2, true];
yield [3, false];
yield [4, false];
}
public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Worker stopped due to limit of {count} failed message(s) is reached'),
$this->equalTo(['count' => 1])
);
$worker = $this->createMock(Worker::class);
$event = new WorkerRunningEvent($worker, false);
$failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger);
$failureLimitListener->onMessageFailed($this->createFailedEvent());
$failureLimitListener->onWorkerRunning($event);
}
private function createFailedEvent(): WorkerMessageFailedEvent
{
$envelope = new Envelope(new DummyMessage('hello'));
return new WorkerMessageFailedEvent($envelope, 'default', $this->createMock(Throwable::class));
}
}