[Messenger] use events consistently in worker

This commit is contained in:
Tobias Schultze 2019-11-01 19:40:04 +01:00 committed by Nicolas Grekas
parent 72dd176132
commit 201f159303
38 changed files with 745 additions and 713 deletions

View File

@ -168,13 +168,17 @@ Lock
Messenger
---------
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* Marked the `MessengerDataCollector` class as `@final`.
Mime

View File

@ -88,12 +88,9 @@
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument /> <!-- Routable message bus -->
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="event_dispatcher" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" />
</call>
<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />

View File

@ -98,6 +98,7 @@
<argument /> <!-- max delay ms -->
</service>
<!-- worker event listeners -->
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
@ -106,7 +107,6 @@
<argument type="service" id="logger" on-invalid="ignore" />
</service>
<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
@ -114,6 +114,21 @@
<argument type="service" id="logger" on-invalid="ignore" />
</service>
<service id="messenger.listener.dispatch_pcntl_signal_listener" class="Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener">
<tag name="kernel.event_subscriber" />
</service>
<service id="messenger.listener.stop_worker_on_restart_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="cache.messenger.restart_workers_signal" />
<argument type="service" id="logger" on-invalid="ignore" />
</service>
<service id="messenger.listener.stop_worker_on_sigterm_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener">
<tag name="kernel.event_subscriber" />
</service>
<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->

View File

@ -4,18 +4,23 @@ CHANGELOG
4.4.0
-----
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
* Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
* Added `WorkerStartedEvent` and `WorkerRunningEvent`
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* The component is not marked as `@experimental` anymore.
* Marked the `MessengerDataCollector` class as `@final`.

View File

@ -11,7 +11,6 @@
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
@ -23,13 +22,12 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -43,13 +41,11 @@ class ConsumeMessagesCommand extends Command
private $logger;
private $receiverNames;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;
/**
* @param RoutableMessageBus $routableBus
*/
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
public function __construct($routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
{
if ($routableBus instanceof ContainerInterface) {
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
@ -58,12 +54,6 @@ class ConsumeMessagesCommand extends Command
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
}
if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
$eventDispatcher = null;
}
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
@ -73,11 +63,6 @@ class ConsumeMessagesCommand extends Command
parent::__construct();
}
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}
/**
* {@inheritdoc}
*/
@ -177,29 +162,23 @@ EOF
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
}
if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
@ -216,6 +195,9 @@ EOF
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);

View File

@ -20,8 +20,8 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
@ -87,6 +87,8 @@ EOF
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->comment('Quit this command with CONTROL-C.');
if (!$output->isVeryVerbose()) {
@ -158,7 +160,9 @@ EOF
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
$count = 0;
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
++$count;
$envelope = $messageReceivedEvent->getEnvelope();
$this->displaySingleMessage($envelope, $io);
@ -181,14 +185,8 @@ EOF
$this->logger
);
$count = 0;
try {
$worker->run([], function (?Envelope $envelope) use ($worker, &$count) {
++$count;
if (null === $envelope) {
$worker->stop();
}
});
$worker->run();
} finally {
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
}

View File

@ -17,7 +17,7 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
@ -63,7 +63,7 @@ EOF
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(microtime(true));
$this->restartSignalCachePool->save($cacheItem);

View File

@ -276,7 +276,7 @@ class MessengerPass implements CompilerPassInterface
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
}
$consumeCommandDefinition->replaceArgument(3, array_values($receiverNames));
$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
}
if ($container->hasDefinition('console.command.messenger_setup_transports')) {

View File

@ -0,0 +1,44 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Worker;
/**
* Dispatched after the worker processed a message or didn't receive a message at all.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerRunningEvent
{
private $worker;
private $isWorkerIdle;
public function __construct(Worker $worker, bool $isWorkerIdle)
{
$this->worker = $worker;
$this->isWorkerIdle = $isWorkerIdle;
}
public function getWorker(): Worker
{
return $this->worker;
}
/**
* Returns true when no message has been received by the worker.
*/
public function isWorkerIdle(): bool
{
return $this->isWorkerIdle;
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Worker;
/**
* Dispatched when a worker has been started.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerStartedEvent
{
private $worker;
public function __construct(Worker $worker)
{
$this->worker = $worker;
}
public function getWorker(): Worker
{
return $this->worker;
}
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Worker;
/**
* Dispatched when a worker has been stopped.
*
@ -18,4 +20,15 @@ namespace Symfony\Component\Messenger\Event;
*/
final class WorkerStoppedEvent
{
private $worker;
public function __construct(Worker $worker)
{
$this->worker = $worker;
}
public function getWorker(): Worker
{
return $this->worker;
}
}

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
/**
* @author Tobias Schultze <http://tobion.de>
*/
class DispatchPcntlSignalListener implements EventSubscriberInterface
{
public function onWorkerRunning(): void
{
pcntl_signal_dispatch();
}
public static function getSubscribedEvents()
{
if (!\function_exists('pcntl_signal_dispatch')) {
return [];
}
return [
WorkerRunningEvent::class => ['onWorkerRunning', 100],
];
}
}

View File

@ -0,0 +1,55 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
/**
* @author Simon Delicata <simon.delicata@free.fr>
* @author Tobias Schultze <http://tobion.de>
*/
class StopWorkerOnMemoryLimitListener implements EventSubscriberInterface
{
private $memoryLimit;
private $logger;
private $memoryResolver;
public function __construct(int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
{
$this->memoryLimit = $memoryLimit;
$this->logger = $logger;
$this->memoryResolver = $memoryResolver ?: static function () {
return memory_get_usage(true);
};
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
$memoryResolver = $this->memoryResolver;
$usedMemory = $memoryResolver();
if ($usedMemory > $this->memoryLimit) {
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => $this->memoryLimit, 'memory' => $usedMemory]);
}
}
}
public static function getSubscribedEvents()
{
return [
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
}

View File

@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*/
class StopWorkerOnMessageLimitListener implements EventSubscriberInterface
{
private $maximumNumberOfMessages;
private $logger;
private $receivedMessages = 0;
public function __construct(int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
if ($maximumNumberOfMessages <= 0) {
throw new InvalidArgumentException('Message limit must be greater than zero.');
}
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) {
$this->receivedMessages = 0;
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to maximum count of {count} messages processed', ['count' => $this->maximumNumberOfMessages]);
}
}
}
public static function getSubscribedEvents()
{
return [
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
}

View File

@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
private $cachePool;
private $logger;
private $workerStartedAt;
public function __construct(CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
{
$this->cachePool = $cachePool;
$this->logger = $logger;
}
public function onWorkerStarted(): void
{
$this->workerStartedAt = microtime(true);
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if ($this->shouldRestart()) {
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
}
public static function getSubscribedEvents()
{
return [
WorkerStartedEvent::class => 'onWorkerStarted',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
private function shouldRestart(): bool
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}
return $this->workerStartedAt < $cacheItem->get();
}
}

View File

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
/**
* @author Tobias Schultze <http://tobion.de>
*/
class StopWorkerOnSigtermSignalListener implements EventSubscriberInterface
{
public function onWorkerStarted(WorkerStartedEvent $event): void
{
pcntl_signal(SIGTERM, static function () use ($event) {
$event->getWorker()->stop();
});
}
public static function getSubscribedEvents()
{
if (!\function_exists('pcntl_signal')) {
return [];
}
return [
WorkerStartedEvent::class => ['onWorkerStarted', 100],
];
}
}

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
/**
* @author Simon Delicata <simon.delicata@free.fr>
* @author Tobias Schultze <http://tobion.de>
*/
class StopWorkerOnTimeLimitListener implements EventSubscriberInterface
{
private $timeLimitInSeconds;
private $logger;
private $endTime;
public function __construct(int $timeLimitInSeconds, LoggerInterface $logger = null)
{
$this->timeLimitInSeconds = $timeLimitInSeconds;
$this->logger = $logger;
}
public function onWorkerStarted(): void
{
$startTime = microtime(true);
$this->endTime = $startTime + $this->timeLimitInSeconds;
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if ($this->endTime < microtime(true)) {
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => $this->timeLimitInSeconds]);
}
}
}
public static function getSubscribedEvents()
{
return [
WorkerStartedEvent::class => 'onWorkerStarted',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
}

View File

@ -16,6 +16,8 @@ use Symfony\Component\Console\Application;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
@ -27,7 +29,7 @@ class ConsumeMessagesCommandTest extends TestCase
{
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), $this->createMock(EventDispatcherInterface::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame(['amqp'], $inputArgument->getDefault());
@ -51,7 +53,7 @@ class ConsumeMessagesCommandTest extends TestCase
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
$application = new Application();
$application->add($command);
@ -83,7 +85,7 @@ class ConsumeMessagesCommandTest extends TestCase
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
$application = new Application();
$application->add($command);
@ -120,7 +122,7 @@ class ConsumeMessagesCommandTest extends TestCase
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher());
$application = new Application();
$application->add($command);
@ -156,7 +158,7 @@ class ConsumeMessagesCommandTest extends TestCase
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher());
$application = new Application();
$application->add($command);

View File

@ -13,7 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
@ -28,7 +28,7 @@ class FailedMessagesRetryCommandTest extends TestCase
// message will eventually be ack'ed in Worker
$receiver->expects($this->exactly(2))->method('ack');
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$dispatcher = new EventDispatcher();
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
@ -41,7 +41,7 @@ class FailedMessagesRetryCommandTest extends TestCase
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12]]);
$tester->execute(['id' => [10, 12], '--force' => true]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}

View File

@ -291,7 +291,7 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container);
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
}
public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()

View File

@ -46,10 +46,23 @@ class SendFailedMessageForRetryListenerTest extends TestCase
$envelope = new Envelope(new \stdClass());
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0);
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
/** @var DelayStamp $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertInstanceOf(DelayStamp::class, $delayStamp);
$this->assertSame(1000, $delayStamp->getDelay());
$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
$this->assertSame(1, $redeliveryStamp->getRetryCount());
return $envelope;
});
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->expects($this->once())->method('has')->willReturn(true);
$senderLocator->expects($this->never())->method('get')->willReturn($sender);
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
$retryStategy = $this->createMock(RetryStrategyInterface::class);
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);

View File

@ -0,0 +1,62 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\Worker;
class StopWorkerOnMemoryLimitListenerTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};
$worker = $this->createMock(Worker::class);
$worker->expects($shouldStop ? $this->once() : $this->never())->method('stop');
$event = new WorkerRunningEvent($worker, false);
$memoryLimitListener = new StopWorkerOnMemoryLimitListener($memoryLimit, null, $memoryResolver);
$memoryLimitListener->onWorkerRunning($event);
}
public function memoryProvider(): iterable
{
yield [2048, 1024, true];
yield [1024, 1024, false];
yield [1024, 2048, false];
}
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => 64, 'memory' => 70]);
$memoryResolver = function () {
return 70;
};
$worker = $this->createMock(Worker::class);
$event = new WorkerRunningEvent($worker, false);
$memoryLimitListener = new StopWorkerOnMemoryLimitListener(64, $logger, $memoryResolver);
$memoryLimitListener->onWorkerRunning($event);
}
}

View File

@ -0,0 +1,61 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Worker;
class StopWorkerOnMessageLimitListenerTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop)
{
$worker = $this->createMock(Worker::class);
$worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
$event = new WorkerRunningEvent($worker, false);
$maximumCountListener = new StopWorkerOnMessageLimitListener($max);
// simulate three messages processed
$maximumCountListener->onWorkerRunning($event);
$maximumCountListener->onWorkerRunning($event);
$maximumCountListener->onWorkerRunning($event);
}
public function countProvider(): iterable
{
yield [1, true];
yield [2, true];
yield [3, true];
yield [4, false];
}
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Worker stopped due to maximum count of {count} messages processed'),
$this->equalTo(['count' => 1])
);
$worker = $this->createMock(Worker::class);
$event = new WorkerRunningEvent($worker, false);
$maximumCountListener = new StopWorkerOnMessageLimitListener(1, $logger);
$maximumCountListener->onWorkerRunning($event);
}
}

View File

@ -9,39 +9,38 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
use Symfony\Component\Messenger\Worker;
/**
* @group time-sensitive
*/
class StopWhenRestartSignalIsReceivedTest extends TestCase
class StopWorkerOnRestartSignalListenerTest extends TestCase
{
/**
* @dataProvider restartTimeProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$worker = $this->createMock(Worker::class);
$worker->expects($shouldStop ? $this->once() : $this->never())->method('stop');
$event = new WorkerRunningEvent($worker, false);
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
$stopOnSignalListener->onWorkerStarted();
$stopOnSignalListener->onWorkerRunning($event);
}
public function restartTimeProvider()
@ -53,19 +52,18 @@ class StopWhenRestartSignalIsReceivedTest extends TestCase
public function testWorkerDoesNotStopIfRestartNotInCache()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
$cacheItem->expects($this->never())->method('get');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$worker = $this->createMock(Worker::class);
$worker->expects($this->never())->method('stop');
$event = new WorkerRunningEvent($worker, false);
$this->assertFalse($decoratedWorker->isStopped());
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
$stopOnSignalListener->onWorkerStarted();
$stopOnSignalListener->onWorkerRunning($event);
}
}

View File

@ -0,0 +1,40 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\Worker;
class StopWorkerOnTimeLimitListenerTest extends TestCase
{
/**
* @group time-sensitive
*/
public function testWorkerStopsWhenTimeLimitIsReached()
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => 1]);
$worker = $this->createMock(Worker::class);
$worker->expects($this->once())->method('stop');
$event = new WorkerRunningEvent($worker, false);
$timeoutListener = new StopWorkerOnTimeLimitListener(1, $logger);
$timeoutListener->onWorkerStarted();
sleep(2);
$timeoutListener->onWorkerRunning($event);
}
}

View File

@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
@ -97,6 +98,7 @@ class FailureIntegrationTest extends TestCase
]);
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport));
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
@ -107,12 +109,7 @@ class FailureIntegrationTest extends TestCase
$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// handle one envelope, then stop
if (null !== $envelope) {
$worker->stop();
}
});
$worker->run();
$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
@ -208,7 +205,8 @@ class FailureIntegrationTest extends TestCase
* Dispatch the original message again
*/
$bus->dispatch($envelope);
// handle the message, but with no retries
// handle the failing message so it goes into the failure transport
$runWorker('transport1');
$runWorker('transport1');
// now make the handler work!
$transport1HandlerThatFails->setShouldThrow(false);

View File

@ -1,46 +0,0 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\WorkerInterface;
class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;
public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}
if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}
public function stop(): void
{
$this->isStopped = true;
}
public function isStopped(): bool
{
return $this->isStopped;
}
public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}

View File

@ -12,9 +12,11 @@ if (!file_exists($autoload)) {
require_once $autoload;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@ -30,7 +32,9 @@ $serializer = new Serializer(
$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener());
$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener());
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
public function dispatch($envelope, array $stamps = []): Envelope
@ -43,7 +47,7 @@ $worker = new Worker(['the_receiver' => $receiver], new class() implements Messa
return $envelope;
}
});
}, $eventDispatcher);
echo "Receiving messages...\n";
$worker->run();

View File

@ -1,71 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$handlerCalledTimes = 0;
$handledCallback = function () use (&$handlerCalledTimes) {
++$handlerCalledTimes;
};
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
$memoryLimitWorker->run([], $handledCallback);
// handler should be called exactly 1 time
$this->assertSame($handlerCalledTimes, 1);
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function memoryProvider(): iterable
{
yield [2048, 1024, true];
yield [1024, 1024, false];
yield [1024, 2048, false];
}
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
$memoryResolver = function () {
return 70 * 1024 * 1024;
};
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver);
$memoryLimitWorker->run();
}
}

View File

@ -1,71 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
class StopWhenMessageCountIsExceededWorkerTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop)
{
$handlerCalledTimes = 0;
$handledCallback = function () use (&$handlerCalledTimes) {
++$handlerCalledTimes;
};
// receive 3 real messages
$decoratedWorker = new DummyWorker([
new Envelope(new DummyMessage('First message')),
null,
new Envelope(new DummyMessage('Second message')),
null,
new Envelope(new DummyMessage('Third message')),
]);
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
$maximumCountWorker->run([], $handledCallback);
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function countProvider(): iterable
{
yield [1, true];
yield [2, true];
yield [3, true];
yield [4, false];
}
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Worker stopped due to maximum count of {count} exceeded'),
$this->equalTo(['count' => 1])
);
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger);
$maximumCountWorker->run();
}
}

View File

@ -1,44 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
class StopWhenTimeLimitIsReachedWorkerTest extends TestCase
{
/**
* @group time-sensitive
*/
public function testWorkerStopsWhenTimeLimitIsReached()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
new Envelope(new \stdClass()),
]);
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
$timeoutWorker->run([], function () {
sleep(2);
});
$this->assertTrue($decoratedWorker->isStopped());
$this->assertSame(1, $decoratedWorker->countEnvelopesHandled());
}
}

View File

@ -12,11 +12,15 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
@ -24,7 +28,6 @@ use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
@ -51,13 +54,11 @@ class WorkerTest extends TestCase
new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturnArgument(0);
$worker = new Worker(['transport' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run();
$this->assertSame(2, $receiver->getAcknowledgeCount());
}
@ -71,13 +72,12 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
$worker = new Worker(['transport1' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(['transport1' => $receiver], $bus, $dispatcher);
$worker->run();
$this->assertSame(1, $receiver->getRejectCount());
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
@ -91,13 +91,13 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->never())->method('dispatch');
$worker = new Worker([$receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
$dispatcher = new EventDispatcher();
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
$event->getWorker()->stop();
});
$worker = new Worker([$receiver], $bus, $dispatcher);
$worker->run();
}
public function testWorkerDispatchesEventsOnSuccess()
@ -110,21 +110,22 @@ class WorkerTest extends TestCase
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(3))
$eventDispatcher->expects($this->exactly(5))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerStartedEvent::class)],
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageHandledEvent::class)],
[$this->isInstanceOf(WorkerRunningEvent::class)],
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
)->willReturnCallback(function ($event) {
if ($event instanceof WorkerRunningEvent) {
$event->getWorker()->stop();
}
});
$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$worker->run();
}
public function testWorkerDispatchesEventsOnError()
@ -138,21 +139,22 @@ class WorkerTest extends TestCase
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(3))
$eventDispatcher->expects($this->exactly(5))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerStartedEvent::class)],
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageFailedEvent::class)],
[$this->isInstanceOf(WorkerRunningEvent::class)],
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
)->willReturnCallback(function ($event) {
if ($event instanceof WorkerRunningEvent) {
$event->getWorker()->stop();
}
});
$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$worker->run();
}
public function testTimeoutIsConfigurable()
@ -170,25 +172,19 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$worker = new Worker([$receiver], $bus);
$receivedCount = 0;
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(5));
$worker = new Worker([$receiver], $bus, $dispatcher);
$startTime = microtime(true);
// sleep .1 after each idle
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
if (null !== $envelope) {
++$receivedCount;
}
$worker->run(['sleep' => 100000]);
if (5 === $receivedCount) {
$worker->stop();
$duration = microtime(true) - $startTime;
// wait time should be .3 seconds
// use .29 & .31 for timing "wiggle room"
$this->assertGreaterThanOrEqual(.29, $duration);
$this->assertLessThan(.31, $duration);
}
});
$duration = microtime(true) - $startTime;
// wait time should be .3 seconds
// use .29 & .31 for timing "wiggle room"
$this->assertGreaterThanOrEqual(.29, $duration);
$this->assertLessThan(.31, $duration);
}
public function testWorkerWithMultipleReceivers()
@ -228,48 +224,18 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$receivedCount = 0;
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus);
$processedEnvelopes = [];
$worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) {
if (null !== $envelope) {
$processedEnvelopes[] = $envelope;
++$receivedCount;
}
// stop after the messages finish
if (6 === $receivedCount) {
$worker->stop();
}
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(6));
$dispatcher->addListener(WorkerMessageReceivedEvent::class, function (WorkerMessageReceivedEvent $event) use (&$processedEnvelopes) {
$processedEnvelopes[] = $event->getEnvelope();
});
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus, $dispatcher);
$worker->run();
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}
public function testWorkerWithDecorator()
{
$envelope1 = new Envelope(new DummyMessage('message1'));
$envelope2 = new Envelope(new DummyMessage('message2'));
$envelope3 = new Envelope(new DummyMessage('message3'));
$receiver = new DummyReceiver([
[$envelope1, $envelope2, $envelope3],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$worker = new Worker([$receiver], $bus);
$workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2);
$processedEnvelopes = [];
$workerWithDecorator->run([], function (?Envelope $envelope) use (&$processedEnvelopes) {
if (null !== $envelope) {
$processedEnvelopes[] = $envelope;
}
});
$this->assertSame([$envelope1, $envelope2], $processedEnvelopes);
}
}
class DummyReceiver implements ReceiverInterface

View File

@ -16,6 +16,8 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
@ -26,10 +28,11 @@ use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*
* @final
*/
class Worker implements WorkerInterface
class Worker
{
private $receivers;
private $bus;
@ -54,28 +57,14 @@ class Worker implements WorkerInterface
* Valid options are:
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
*/
public function run(array $options = [], callable $onHandledCallback = null): void
public function run(array $options = []): void
{
$this->dispatchEvent(new WorkerStartedEvent($this));
$options = array_merge([
'sleep' => 1000000,
], $options);
if (\function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, function () {
$this->stop();
});
}
$onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
};
while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $transportName => $receiver) {
@ -85,7 +74,7 @@ class Worker implements WorkerInterface
$envelopeHandled = true;
$this->handleMessage($envelope, $receiver, $transportName);
$onHandled($envelope);
$this->dispatchEvent(new WorkerRunningEvent($this, false));
if ($this->shouldStop) {
break 2;
@ -101,13 +90,13 @@ class Worker implements WorkerInterface
}
if (false === $envelopeHandled) {
$onHandled(null);
$this->dispatchEvent(new WorkerRunningEvent($this, true));
usleep($options['sleep']);
}
}
$this->dispatchEvent(new WorkerStoppedEvent());
$this->dispatchEvent(new WorkerStoppedEvent($this));
}
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void

View File

@ -1,59 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*/
class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface
{
private $decoratedWorker;
private $memoryLimit;
private $logger;
private $memoryResolver;
public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->memoryLimit = $memoryLimit;
$this->logger = $logger;
$this->memoryResolver = $memoryResolver ?: function () {
return memory_get_usage(true);
};
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -1,56 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class StopWhenMessageCountIsExceededWorker implements WorkerInterface
{
private $decoratedWorker;
private $maximumNumberOfMessages;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$receivedMessages = 0;
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -1,71 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class StopWhenRestartSignalIsReceived implements WorkerInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
private $decoratedWorker;
private $cachePool;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->cachePool = $cachePool;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$workerStartedAt = microtime(true);
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedAt) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if ($this->shouldRestart($workerStartedAt)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
private function shouldRestart(float $workerStartedAt): bool
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}
return $workerStartedAt < $cacheItem->get();
}
}

View File

@ -1,57 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*/
class StopWhenTimeLimitIsReachedWorker implements WorkerInterface
{
private $decoratedWorker;
private $timeLimitInSeconds;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->timeLimitInSeconds = $timeLimitInSeconds;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$startTime = microtime(true);
$endTime = $startTime + $this->timeLimitInSeconds;
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if ($endTime < microtime(true)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
}

View File

@ -1,35 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* Interface for Workers that handle messages from transports.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
interface WorkerInterface
{
/**
* Receives the messages and dispatch them to the bus.
*
* The $onHandledCallback will be passed the Envelope that was just
* handled or null if nothing was handled.
*
* @param mixed[] $options options used to control worker behavior
*/
public function run(array $options = [], callable $onHandledCallback = null): void;
/**
* Stops receiving messages.
*/
public function stop(): void;
}