[Messenger] use events consistently in worker
This commit is contained in:
parent
72dd176132
commit
201f159303
@ -168,13 +168,17 @@ Lock
|
||||
Messenger
|
||||
---------
|
||||
|
||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||
pass a `RoutableMessageBus` instance instead.
|
||||
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
|
||||
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
|
||||
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
|
||||
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
|
||||
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
|
||||
* [BC BREAK] Removed `UnknownSenderException`.
|
||||
* [BC BREAK] Removed `WorkerInterface`.
|
||||
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
|
||||
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
|
||||
* Marked the `MessengerDataCollector` class as `@final`.
|
||||
|
||||
Mime
|
||||
|
@ -88,12 +88,9 @@
|
||||
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
|
||||
<argument /> <!-- Routable message bus -->
|
||||
<argument type="service" id="messenger.receiver_locator" />
|
||||
<argument type="service" id="event_dispatcher" />
|
||||
<argument type="service" id="logger" on-invalid="null" />
|
||||
<argument type="collection" /> <!-- Receiver names -->
|
||||
<argument type="service" id="event_dispatcher" />
|
||||
<call method="setCachePoolForRestartSignal">
|
||||
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
||||
</call>
|
||||
|
||||
<tag name="console.command" command="messenger:consume" />
|
||||
<tag name="console.command" command="messenger:consume-messages" />
|
||||
|
@ -98,6 +98,7 @@
|
||||
<argument /> <!-- max delay ms -->
|
||||
</service>
|
||||
|
||||
<!-- worker event listeners -->
|
||||
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
<tag name="monolog.logger" channel="messenger" />
|
||||
@ -106,7 +107,6 @@
|
||||
<argument type="service" id="logger" on-invalid="ignore" />
|
||||
</service>
|
||||
|
||||
<!-- failed handling -->
|
||||
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
<tag name="monolog.logger" channel="messenger" />
|
||||
@ -114,6 +114,21 @@
|
||||
<argument type="service" id="logger" on-invalid="ignore" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.listener.dispatch_pcntl_signal_listener" class="Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.listener.stop_worker_on_restart_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
<tag name="monolog.logger" channel="messenger" />
|
||||
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
||||
<argument type="service" id="logger" on-invalid="ignore" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.listener.stop_worker_on_sigterm_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
</service>
|
||||
|
||||
<!-- routable message bus -->
|
||||
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
|
||||
<argument /> <!-- Message bus locator -->
|
||||
|
@ -4,18 +4,23 @@ CHANGELOG
|
||||
4.4.0
|
||||
-----
|
||||
|
||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||
pass a `RoutableMessageBus` instance instead.
|
||||
* Added support for auto trimming of Redis streams.
|
||||
* `InMemoryTransport` handle acknowledged and rejected messages.
|
||||
* Made all dispatched worker event classes final.
|
||||
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
|
||||
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
|
||||
* Added `WorkerStartedEvent` and `WorkerRunningEvent`
|
||||
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
|
||||
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
|
||||
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
|
||||
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
|
||||
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
|
||||
* [BC BREAK] Removed `UnknownSenderException`.
|
||||
* [BC BREAK] Removed `WorkerInterface`.
|
||||
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
|
||||
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
|
||||
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
|
||||
* The component is not marked as `@experimental` anymore.
|
||||
* Marked the `MessengerDataCollector` class as `@final`.
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
@ -23,13 +22,12 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Question\ChoiceQuestion;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
|
||||
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
|
||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
@ -43,13 +41,11 @@ class ConsumeMessagesCommand extends Command
|
||||
private $logger;
|
||||
private $receiverNames;
|
||||
private $eventDispatcher;
|
||||
/** @var CacheItemPoolInterface|null */
|
||||
private $restartSignalCachePool;
|
||||
|
||||
/**
|
||||
* @param RoutableMessageBus $routableBus
|
||||
*/
|
||||
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
|
||||
public function __construct($routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
|
||||
{
|
||||
if ($routableBus instanceof ContainerInterface) {
|
||||
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
|
||||
@ -58,12 +54,6 @@ class ConsumeMessagesCommand extends Command
|
||||
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
|
||||
}
|
||||
|
||||
if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
|
||||
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
|
||||
|
||||
$eventDispatcher = null;
|
||||
}
|
||||
|
||||
$this->routableBus = $routableBus;
|
||||
$this->receiverLocator = $receiverLocator;
|
||||
$this->logger = $logger;
|
||||
@ -73,11 +63,6 @@ class ConsumeMessagesCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
|
||||
{
|
||||
$this->restartSignalCachePool = $restartSignalCachePool;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
@ -177,29 +162,23 @@ EOF
|
||||
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
|
||||
}
|
||||
|
||||
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
|
||||
|
||||
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
|
||||
$stopsWhen = [];
|
||||
if ($limit = $input->getOption('limit')) {
|
||||
$stopsWhen[] = "processed {$limit} messages";
|
||||
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
|
||||
}
|
||||
|
||||
if ($memoryLimit = $input->getOption('memory-limit')) {
|
||||
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
||||
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
|
||||
}
|
||||
|
||||
if ($timeLimit = $input->getOption('time-limit')) {
|
||||
$stopsWhen[] = "been running for {$timeLimit}s";
|
||||
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
|
||||
}
|
||||
|
||||
if (null !== $this->restartSignalCachePool) {
|
||||
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
|
||||
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
|
||||
}
|
||||
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
|
||||
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
|
||||
@ -216,6 +195,9 @@ EOF
|
||||
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
||||
}
|
||||
|
||||
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
|
||||
|
||||
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
|
||||
$worker->run([
|
||||
'sleep' => $input->getOption('sleep') * 1000000,
|
||||
]);
|
||||
|
@ -20,8 +20,8 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
@ -87,6 +87,8 @@ EOF
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
|
||||
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
$io->comment('Quit this command with CONTROL-C.');
|
||||
if (!$output->isVeryVerbose()) {
|
||||
@ -158,7 +160,9 @@ EOF
|
||||
|
||||
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
|
||||
{
|
||||
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
|
||||
$count = 0;
|
||||
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
|
||||
++$count;
|
||||
$envelope = $messageReceivedEvent->getEnvelope();
|
||||
|
||||
$this->displaySingleMessage($envelope, $io);
|
||||
@ -181,14 +185,8 @@ EOF
|
||||
$this->logger
|
||||
);
|
||||
|
||||
$count = 0;
|
||||
try {
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker, &$count) {
|
||||
++$count;
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$worker->run();
|
||||
} finally {
|
||||
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
@ -63,7 +63,7 @@ EOF
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
|
||||
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
$cacheItem->set(microtime(true));
|
||||
$this->restartSignalCachePool->save($cacheItem);
|
||||
|
||||
|
@ -276,7 +276,7 @@ class MessengerPass implements CompilerPassInterface
|
||||
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
|
||||
}
|
||||
|
||||
$consumeCommandDefinition->replaceArgument(3, array_values($receiverNames));
|
||||
$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
|
||||
}
|
||||
|
||||
if ($container->hasDefinition('console.command.messenger_setup_transports')) {
|
||||
|
44
src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php
Normal file
44
src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php
Normal file
@ -0,0 +1,44 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Event;
|
||||
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
/**
|
||||
* Dispatched after the worker processed a message or didn't receive a message at all.
|
||||
*
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
final class WorkerRunningEvent
|
||||
{
|
||||
private $worker;
|
||||
private $isWorkerIdle;
|
||||
|
||||
public function __construct(Worker $worker, bool $isWorkerIdle)
|
||||
{
|
||||
$this->worker = $worker;
|
||||
$this->isWorkerIdle = $isWorkerIdle;
|
||||
}
|
||||
|
||||
public function getWorker(): Worker
|
||||
{
|
||||
return $this->worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true when no message has been received by the worker.
|
||||
*/
|
||||
public function isWorkerIdle(): bool
|
||||
{
|
||||
return $this->isWorkerIdle;
|
||||
}
|
||||
}
|
34
src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php
Normal file
34
src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php
Normal file
@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Event;
|
||||
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
/**
|
||||
* Dispatched when a worker has been started.
|
||||
*
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
final class WorkerStartedEvent
|
||||
{
|
||||
private $worker;
|
||||
|
||||
public function __construct(Worker $worker)
|
||||
{
|
||||
$this->worker = $worker;
|
||||
}
|
||||
|
||||
public function getWorker(): Worker
|
||||
{
|
||||
return $this->worker;
|
||||
}
|
||||
}
|
@ -11,6 +11,8 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Event;
|
||||
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
/**
|
||||
* Dispatched when a worker has been stopped.
|
||||
*
|
||||
@ -18,4 +20,15 @@ namespace Symfony\Component\Messenger\Event;
|
||||
*/
|
||||
final class WorkerStoppedEvent
|
||||
{
|
||||
private $worker;
|
||||
|
||||
public function __construct(Worker $worker)
|
||||
{
|
||||
$this->worker = $worker;
|
||||
}
|
||||
|
||||
public function getWorker(): Worker
|
||||
{
|
||||
return $this->worker;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
|
||||
/**
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class DispatchPcntlSignalListener implements EventSubscriberInterface
|
||||
{
|
||||
public function onWorkerRunning(): void
|
||||
{
|
||||
pcntl_signal_dispatch();
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
if (!\function_exists('pcntl_signal_dispatch')) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return [
|
||||
WorkerRunningEvent::class => ['onWorkerRunning', 100],
|
||||
];
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
|
||||
/**
|
||||
* @author Simon Delicata <simon.delicata@free.fr>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class StopWorkerOnMemoryLimitListener implements EventSubscriberInterface
|
||||
{
|
||||
private $memoryLimit;
|
||||
private $logger;
|
||||
private $memoryResolver;
|
||||
|
||||
public function __construct(int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
|
||||
{
|
||||
$this->memoryLimit = $memoryLimit;
|
||||
$this->logger = $logger;
|
||||
$this->memoryResolver = $memoryResolver ?: static function () {
|
||||
return memory_get_usage(true);
|
||||
};
|
||||
}
|
||||
|
||||
public function onWorkerRunning(WorkerRunningEvent $event): void
|
||||
{
|
||||
$memoryResolver = $this->memoryResolver;
|
||||
$usedMemory = $memoryResolver();
|
||||
if ($usedMemory > $this->memoryLimit) {
|
||||
$event->getWorker()->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => $this->memoryLimit, 'memory' => $usedMemory]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
WorkerRunningEvent::class => 'onWorkerRunning',
|
||||
];
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class StopWorkerOnMessageLimitListener implements EventSubscriberInterface
|
||||
{
|
||||
private $maximumNumberOfMessages;
|
||||
private $logger;
|
||||
private $receivedMessages = 0;
|
||||
|
||||
public function __construct(int $maximumNumberOfMessages, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
||||
$this->logger = $logger;
|
||||
|
||||
if ($maximumNumberOfMessages <= 0) {
|
||||
throw new InvalidArgumentException('Message limit must be greater than zero.');
|
||||
}
|
||||
}
|
||||
|
||||
public function onWorkerRunning(WorkerRunningEvent $event): void
|
||||
{
|
||||
if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) {
|
||||
$this->receivedMessages = 0;
|
||||
$event->getWorker()->stop();
|
||||
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to maximum count of {count} messages processed', ['count' => $this->maximumNumberOfMessages]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
WorkerRunningEvent::class => 'onWorkerRunning',
|
||||
];
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
|
||||
{
|
||||
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
|
||||
|
||||
private $cachePool;
|
||||
private $logger;
|
||||
private $workerStartedAt;
|
||||
|
||||
public function __construct(CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->cachePool = $cachePool;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function onWorkerStarted(): void
|
||||
{
|
||||
$this->workerStartedAt = microtime(true);
|
||||
}
|
||||
|
||||
public function onWorkerRunning(WorkerRunningEvent $event): void
|
||||
{
|
||||
if ($this->shouldRestart()) {
|
||||
$event->getWorker()->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped because a restart was requested.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
WorkerStartedEvent::class => 'onWorkerStarted',
|
||||
WorkerRunningEvent::class => 'onWorkerRunning',
|
||||
];
|
||||
}
|
||||
|
||||
private function shouldRestart(): bool
|
||||
{
|
||||
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
|
||||
if (!$cacheItem->isHit()) {
|
||||
// no restart has ever been scheduled
|
||||
return false;
|
||||
}
|
||||
|
||||
return $this->workerStartedAt < $cacheItem->get();
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
||||
|
||||
/**
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class StopWorkerOnSigtermSignalListener implements EventSubscriberInterface
|
||||
{
|
||||
public function onWorkerStarted(WorkerStartedEvent $event): void
|
||||
{
|
||||
pcntl_signal(SIGTERM, static function () use ($event) {
|
||||
$event->getWorker()->stop();
|
||||
});
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
if (!\function_exists('pcntl_signal')) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return [
|
||||
WorkerStartedEvent::class => ['onWorkerStarted', 100],
|
||||
];
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
||||
|
||||
/**
|
||||
* @author Simon Delicata <simon.delicata@free.fr>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class StopWorkerOnTimeLimitListener implements EventSubscriberInterface
|
||||
{
|
||||
private $timeLimitInSeconds;
|
||||
private $logger;
|
||||
private $endTime;
|
||||
|
||||
public function __construct(int $timeLimitInSeconds, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->timeLimitInSeconds = $timeLimitInSeconds;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function onWorkerStarted(): void
|
||||
{
|
||||
$startTime = microtime(true);
|
||||
$this->endTime = $startTime + $this->timeLimitInSeconds;
|
||||
}
|
||||
|
||||
public function onWorkerRunning(WorkerRunningEvent $event): void
|
||||
{
|
||||
if ($this->endTime < microtime(true)) {
|
||||
$event->getWorker()->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => $this->timeLimitInSeconds]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
WorkerStartedEvent::class => 'onWorkerStarted',
|
||||
WorkerRunningEvent::class => 'onWorkerRunning',
|
||||
];
|
||||
}
|
||||
}
|
@ -16,6 +16,8 @@ use Symfony\Component\Console\Application;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\DependencyInjection\ContainerInterface;
|
||||
use Symfony\Component\DependencyInjection\ServiceLocator;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
@ -27,7 +29,7 @@ class ConsumeMessagesCommandTest extends TestCase
|
||||
{
|
||||
public function testConfigurationWithDefaultReceiver()
|
||||
{
|
||||
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
|
||||
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $this->createMock(ServiceLocator::class), $this->createMock(EventDispatcherInterface::class), null, ['amqp']);
|
||||
$inputArgument = $command->getDefinition()->getArgument('receivers');
|
||||
$this->assertFalse($inputArgument->isRequired());
|
||||
$this->assertSame(['amqp'], $inputArgument->getDefault());
|
||||
@ -51,7 +53,7 @@ class ConsumeMessagesCommandTest extends TestCase
|
||||
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
|
||||
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
|
||||
|
||||
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
|
||||
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
|
||||
|
||||
$application = new Application();
|
||||
$application->add($command);
|
||||
@ -83,7 +85,7 @@ class ConsumeMessagesCommandTest extends TestCase
|
||||
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
|
||||
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
|
||||
|
||||
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
|
||||
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
|
||||
|
||||
$application = new Application();
|
||||
$application->add($command);
|
||||
@ -120,7 +122,7 @@ class ConsumeMessagesCommandTest extends TestCase
|
||||
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
|
||||
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
|
||||
|
||||
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
|
||||
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher());
|
||||
|
||||
$application = new Application();
|
||||
$application->add($command);
|
||||
@ -156,7 +158,7 @@ class ConsumeMessagesCommandTest extends TestCase
|
||||
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
|
||||
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
|
||||
|
||||
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
|
||||
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator, new EventDispatcher());
|
||||
|
||||
$application = new Application();
|
||||
$application->add($command);
|
||||
|
@ -13,7 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Command;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
@ -28,7 +28,7 @@ class FailedMessagesRetryCommandTest extends TestCase
|
||||
// message will eventually be ack'ed in Worker
|
||||
$receiver->expects($this->exactly(2))->method('ack');
|
||||
|
||||
$dispatcher = $this->createMock(EventDispatcherInterface::class);
|
||||
$dispatcher = new EventDispatcher();
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
// the bus should be called in the worker
|
||||
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
|
||||
@ -41,7 +41,7 @@ class FailedMessagesRetryCommandTest extends TestCase
|
||||
);
|
||||
|
||||
$tester = new CommandTester($command);
|
||||
$tester->execute(['id' => [10, 12]]);
|
||||
$tester->execute(['id' => [10, 12], '--force' => true]);
|
||||
|
||||
$this->assertStringContainsString('[OK]', $tester->getDisplay());
|
||||
}
|
||||
|
@ -291,7 +291,7 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
(new MessengerPass())->process($container);
|
||||
|
||||
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
|
||||
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
|
||||
}
|
||||
|
||||
public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
|
||||
|
@ -46,10 +46,23 @@ class SendFailedMessageForRetryListenerTest extends TestCase
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
|
||||
$sender = $this->createMock(SenderInterface::class);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0);
|
||||
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
|
||||
/** @var DelayStamp $delayStamp */
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
/** @var RedeliveryStamp $redeliveryStamp */
|
||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||
|
||||
$this->assertInstanceOf(DelayStamp::class, $delayStamp);
|
||||
$this->assertSame(1000, $delayStamp->getDelay());
|
||||
|
||||
$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
|
||||
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||
|
||||
return $envelope;
|
||||
});
|
||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||
$senderLocator->expects($this->once())->method('has')->willReturn(true);
|
||||
$senderLocator->expects($this->never())->method('get')->willReturn($sender);
|
||||
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
|
||||
$retryStategy = $this->createMock(RetryStrategyInterface::class);
|
||||
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
|
||||
|
@ -0,0 +1,62 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
class StopWorkerOnMemoryLimitListenerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider memoryProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
||||
{
|
||||
$memoryResolver = function () use ($memoryUsage) {
|
||||
return $memoryUsage;
|
||||
};
|
||||
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$worker->expects($shouldStop ? $this->once() : $this->never())->method('stop');
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$memoryLimitListener = new StopWorkerOnMemoryLimitListener($memoryLimit, null, $memoryResolver);
|
||||
$memoryLimitListener->onWorkerRunning($event);
|
||||
}
|
||||
|
||||
public function memoryProvider(): iterable
|
||||
{
|
||||
yield [2048, 1024, true];
|
||||
yield [1024, 1024, false];
|
||||
yield [1024, 2048, false];
|
||||
}
|
||||
|
||||
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
|
||||
{
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with('Worker stopped due to memory limit of {limit} bytes exceeded ({memory} bytes used)', ['limit' => 64, 'memory' => 70]);
|
||||
|
||||
$memoryResolver = function () {
|
||||
return 70;
|
||||
};
|
||||
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$memoryLimitListener = new StopWorkerOnMemoryLimitListener(64, $logger, $memoryResolver);
|
||||
$memoryLimitListener->onWorkerRunning($event);
|
||||
}
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
class StopWorkerOnMessageLimitListenerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider countProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop)
|
||||
{
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$maximumCountListener = new StopWorkerOnMessageLimitListener($max);
|
||||
// simulate three messages processed
|
||||
$maximumCountListener->onWorkerRunning($event);
|
||||
$maximumCountListener->onWorkerRunning($event);
|
||||
$maximumCountListener->onWorkerRunning($event);
|
||||
}
|
||||
|
||||
public function countProvider(): iterable
|
||||
{
|
||||
yield [1, true];
|
||||
yield [2, true];
|
||||
yield [3, true];
|
||||
yield [4, false];
|
||||
}
|
||||
|
||||
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
|
||||
{
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with(
|
||||
$this->equalTo('Worker stopped due to maximum count of {count} messages processed'),
|
||||
$this->equalTo(['count' => 1])
|
||||
);
|
||||
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$maximumCountListener = new StopWorkerOnMessageLimitListener(1, $logger);
|
||||
$maximumCountListener->onWorkerRunning($event);
|
||||
}
|
||||
}
|
@ -9,39 +9,38 @@
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Cache\CacheItemInterface;
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
/**
|
||||
* @group time-sensitive
|
||||
*/
|
||||
class StopWhenRestartSignalIsReceivedTest extends TestCase
|
||||
class StopWorkerOnRestartSignalListenerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider restartTimeProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$cachePool = $this->createMock(CacheItemPoolInterface::class);
|
||||
$cacheItem = $this->createMock(CacheItemInterface::class);
|
||||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
|
||||
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
|
||||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
|
||||
|
||||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
|
||||
$stopOnSignalWorker->run();
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$worker->expects($shouldStop ? $this->once() : $this->never())->method('stop');
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
|
||||
$stopOnSignalListener->onWorkerStarted();
|
||||
$stopOnSignalListener->onWorkerRunning($event);
|
||||
}
|
||||
|
||||
public function restartTimeProvider()
|
||||
@ -53,19 +52,18 @@ class StopWhenRestartSignalIsReceivedTest extends TestCase
|
||||
|
||||
public function testWorkerDoesNotStopIfRestartNotInCache()
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$cachePool = $this->createMock(CacheItemPoolInterface::class);
|
||||
$cacheItem = $this->createMock(CacheItemInterface::class);
|
||||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
|
||||
$cacheItem->expects($this->never())->method('get');
|
||||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
|
||||
|
||||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
|
||||
$stopOnSignalWorker->run();
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$worker->expects($this->never())->method('stop');
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$this->assertFalse($decoratedWorker->isStopped());
|
||||
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
|
||||
$stopOnSignalListener->onWorkerStarted();
|
||||
$stopOnSignalListener->onWorkerRunning($event);
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
class StopWorkerOnTimeLimitListenerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @group time-sensitive
|
||||
*/
|
||||
public function testWorkerStopsWhenTimeLimitIsReached()
|
||||
{
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with('Worker stopped due to time limit of {timeLimit}s exceeded', ['timeLimit' => 1]);
|
||||
|
||||
$worker = $this->createMock(Worker::class);
|
||||
$worker->expects($this->once())->method('stop');
|
||||
$event = new WorkerRunningEvent($worker, false);
|
||||
|
||||
$timeoutListener = new StopWorkerOnTimeLimitListener(1, $logger);
|
||||
$timeoutListener->onWorkerStarted();
|
||||
sleep(2);
|
||||
$timeoutListener->onWorkerRunning($event);
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
@ -97,6 +98,7 @@ class FailureIntegrationTest extends TestCase
|
||||
]);
|
||||
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
|
||||
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport));
|
||||
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
|
||||
|
||||
$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
|
||||
$throwable = null;
|
||||
@ -107,12 +109,7 @@ class FailureIntegrationTest extends TestCase
|
||||
|
||||
$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
|
||||
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// handle one envelope, then stop
|
||||
if (null !== $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$worker->run();
|
||||
|
||||
$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
|
||||
|
||||
@ -208,7 +205,8 @@ class FailureIntegrationTest extends TestCase
|
||||
* Dispatch the original message again
|
||||
*/
|
||||
$bus->dispatch($envelope);
|
||||
// handle the message, but with no retries
|
||||
// handle the failing message so it goes into the failure transport
|
||||
$runWorker('transport1');
|
||||
$runWorker('transport1');
|
||||
// now make the handler work!
|
||||
$transport1HandlerThatFails->setShouldThrow(false);
|
||||
|
@ -1,46 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
||||
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
class DummyWorker implements WorkerInterface
|
||||
{
|
||||
private $isStopped = false;
|
||||
private $envelopesToReceive;
|
||||
private $envelopesHandled = 0;
|
||||
|
||||
public function __construct(array $envelopesToReceive)
|
||||
{
|
||||
$this->envelopesToReceive = $envelopesToReceive;
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
foreach ($this->envelopesToReceive as $envelope) {
|
||||
if (true === $this->isStopped) {
|
||||
break;
|
||||
}
|
||||
|
||||
if ($onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
++$this->envelopesHandled;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->isStopped = true;
|
||||
}
|
||||
|
||||
public function isStopped(): bool
|
||||
{
|
||||
return $this->isStopped;
|
||||
}
|
||||
|
||||
public function countEnvelopesHandled()
|
||||
{
|
||||
return $this->envelopesHandled;
|
||||
}
|
||||
}
|
@ -12,9 +12,11 @@ if (!file_exists($autoload)) {
|
||||
|
||||
require_once $autoload;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
@ -30,7 +32,9 @@ $serializer = new Serializer(
|
||||
|
||||
$connection = Connection::fromDsn(getenv('DSN'));
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$retryStrategy = new MultiplierRetryStrategy(3, 0);
|
||||
$eventDispatcher = new EventDispatcher();
|
||||
$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener());
|
||||
$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener());
|
||||
|
||||
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
|
||||
public function dispatch($envelope, array $stamps = []): Envelope
|
||||
@ -43,7 +47,7 @@ $worker = new Worker(['the_receiver' => $receiver], new class() implements Messa
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
});
|
||||
}, $eventDispatcher);
|
||||
|
||||
echo "Receiving messages...\n";
|
||||
$worker->run();
|
||||
|
@ -1,71 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
|
||||
|
||||
class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider memoryProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
||||
{
|
||||
$handlerCalledTimes = 0;
|
||||
$handledCallback = function () use (&$handlerCalledTimes) {
|
||||
++$handlerCalledTimes;
|
||||
};
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$memoryResolver = function () use ($memoryUsage) {
|
||||
return $memoryUsage;
|
||||
};
|
||||
|
||||
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
|
||||
$memoryLimitWorker->run([], $handledCallback);
|
||||
|
||||
// handler should be called exactly 1 time
|
||||
$this->assertSame($handlerCalledTimes, 1);
|
||||
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||
}
|
||||
|
||||
public function memoryProvider(): iterable
|
||||
{
|
||||
yield [2048, 1024, true];
|
||||
yield [1024, 1024, false];
|
||||
yield [1024, 2048, false];
|
||||
}
|
||||
|
||||
public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
|
||||
|
||||
$memoryResolver = function () {
|
||||
return 70 * 1024 * 1024;
|
||||
};
|
||||
|
||||
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver);
|
||||
$memoryLimitWorker->run();
|
||||
}
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||
|
||||
class StopWhenMessageCountIsExceededWorkerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider countProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop)
|
||||
{
|
||||
$handlerCalledTimes = 0;
|
||||
$handledCallback = function () use (&$handlerCalledTimes) {
|
||||
++$handlerCalledTimes;
|
||||
};
|
||||
// receive 3 real messages
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new DummyMessage('First message')),
|
||||
null,
|
||||
new Envelope(new DummyMessage('Second message')),
|
||||
null,
|
||||
new Envelope(new DummyMessage('Third message')),
|
||||
]);
|
||||
|
||||
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
|
||||
$maximumCountWorker->run([], $handledCallback);
|
||||
|
||||
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||
}
|
||||
|
||||
public function countProvider(): iterable
|
||||
{
|
||||
yield [1, true];
|
||||
yield [2, true];
|
||||
yield [3, true];
|
||||
yield [4, false];
|
||||
}
|
||||
|
||||
public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with(
|
||||
$this->equalTo('Worker stopped due to maximum count of {count} exceeded'),
|
||||
$this->equalTo(['count' => 1])
|
||||
);
|
||||
|
||||
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger);
|
||||
$maximumCountWorker->run();
|
||||
}
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
|
||||
|
||||
class StopWhenTimeLimitIsReachedWorkerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @group time-sensitive
|
||||
*/
|
||||
public function testWorkerStopsWhenTimeLimitIsReached()
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('info')
|
||||
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
|
||||
|
||||
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
|
||||
$timeoutWorker->run([], function () {
|
||||
sleep(2);
|
||||
});
|
||||
|
||||
$this->assertTrue($decoratedWorker->isStopped());
|
||||
$this->assertSame(1, $decoratedWorker->countEnvelopesHandled());
|
||||
}
|
||||
}
|
@ -12,11 +12,15 @@
|
||||
namespace Symfony\Component\Messenger\Tests;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
@ -24,7 +28,6 @@ use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
/**
|
||||
@ -51,13 +54,11 @@ class WorkerTest extends TestCase
|
||||
new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
||||
)->willReturnArgument(0);
|
||||
|
||||
$worker = new Worker(['transport' => $receiver], $bus);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$dispatcher = new EventDispatcher();
|
||||
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
|
||||
|
||||
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
|
||||
$worker->run();
|
||||
|
||||
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
||||
}
|
||||
@ -71,13 +72,12 @@ class WorkerTest extends TestCase
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||
|
||||
$worker = new Worker(['transport1' => $receiver], $bus);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$dispatcher = new EventDispatcher();
|
||||
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
|
||||
|
||||
$worker = new Worker(['transport1' => $receiver], $bus, $dispatcher);
|
||||
$worker->run();
|
||||
|
||||
$this->assertSame(1, $receiver->getRejectCount());
|
||||
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
||||
}
|
||||
@ -91,13 +91,13 @@ class WorkerTest extends TestCase
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->expects($this->never())->method('dispatch');
|
||||
|
||||
$worker = new Worker([$receiver], $bus);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
$dispatcher = new EventDispatcher();
|
||||
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
|
||||
$event->getWorker()->stop();
|
||||
});
|
||||
|
||||
$worker = new Worker([$receiver], $bus, $dispatcher);
|
||||
$worker->run();
|
||||
}
|
||||
|
||||
public function testWorkerDispatchesEventsOnSuccess()
|
||||
@ -110,21 +110,22 @@ class WorkerTest extends TestCase
|
||||
|
||||
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||
|
||||
$eventDispatcher->expects($this->exactly(3))
|
||||
$eventDispatcher->expects($this->exactly(5))
|
||||
->method('dispatch')
|
||||
->withConsecutive(
|
||||
[$this->isInstanceOf(WorkerStartedEvent::class)],
|
||||
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
|
||||
[$this->isInstanceOf(WorkerMessageHandledEvent::class)],
|
||||
[$this->isInstanceOf(WorkerRunningEvent::class)],
|
||||
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
||||
);
|
||||
)->willReturnCallback(function ($event) {
|
||||
if ($event instanceof WorkerRunningEvent) {
|
||||
$event->getWorker()->stop();
|
||||
}
|
||||
});
|
||||
|
||||
$worker = new Worker([$receiver], $bus, $eventDispatcher);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$worker->run();
|
||||
}
|
||||
|
||||
public function testWorkerDispatchesEventsOnError()
|
||||
@ -138,21 +139,22 @@ class WorkerTest extends TestCase
|
||||
|
||||
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||
|
||||
$eventDispatcher->expects($this->exactly(3))
|
||||
$eventDispatcher->expects($this->exactly(5))
|
||||
->method('dispatch')
|
||||
->withConsecutive(
|
||||
[$this->isInstanceOf(WorkerStartedEvent::class)],
|
||||
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
|
||||
[$this->isInstanceOf(WorkerMessageFailedEvent::class)],
|
||||
[$this->isInstanceOf(WorkerRunningEvent::class)],
|
||||
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
||||
);
|
||||
)->willReturnCallback(function ($event) {
|
||||
if ($event instanceof WorkerRunningEvent) {
|
||||
$event->getWorker()->stop();
|
||||
}
|
||||
});
|
||||
|
||||
$worker = new Worker([$receiver], $bus, $eventDispatcher);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
$worker->run();
|
||||
}
|
||||
|
||||
public function testTimeoutIsConfigurable()
|
||||
@ -170,25 +172,19 @@ class WorkerTest extends TestCase
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$worker = new Worker([$receiver], $bus);
|
||||
$receivedCount = 0;
|
||||
$dispatcher = new EventDispatcher();
|
||||
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(5));
|
||||
|
||||
$worker = new Worker([$receiver], $bus, $dispatcher);
|
||||
$startTime = microtime(true);
|
||||
// sleep .1 after each idle
|
||||
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
|
||||
if (null !== $envelope) {
|
||||
++$receivedCount;
|
||||
}
|
||||
$worker->run(['sleep' => 100000]);
|
||||
|
||||
if (5 === $receivedCount) {
|
||||
$worker->stop();
|
||||
$duration = microtime(true) - $startTime;
|
||||
|
||||
// wait time should be .3 seconds
|
||||
// use .29 & .31 for timing "wiggle room"
|
||||
$this->assertGreaterThanOrEqual(.29, $duration);
|
||||
$this->assertLessThan(.31, $duration);
|
||||
}
|
||||
});
|
||||
$duration = microtime(true) - $startTime;
|
||||
// wait time should be .3 seconds
|
||||
// use .29 & .31 for timing "wiggle room"
|
||||
$this->assertGreaterThanOrEqual(.29, $duration);
|
||||
$this->assertLessThan(.31, $duration);
|
||||
}
|
||||
|
||||
public function testWorkerWithMultipleReceivers()
|
||||
@ -228,48 +224,18 @@ class WorkerTest extends TestCase
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$receivedCount = 0;
|
||||
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus);
|
||||
$processedEnvelopes = [];
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) {
|
||||
if (null !== $envelope) {
|
||||
$processedEnvelopes[] = $envelope;
|
||||
++$receivedCount;
|
||||
}
|
||||
|
||||
// stop after the messages finish
|
||||
if (6 === $receivedCount) {
|
||||
$worker->stop();
|
||||
}
|
||||
$dispatcher = new EventDispatcher();
|
||||
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(6));
|
||||
$dispatcher->addListener(WorkerMessageReceivedEvent::class, function (WorkerMessageReceivedEvent $event) use (&$processedEnvelopes) {
|
||||
$processedEnvelopes[] = $event->getEnvelope();
|
||||
});
|
||||
$worker = new Worker([$receiver1, $receiver2, $receiver3], $bus, $dispatcher);
|
||||
$worker->run();
|
||||
|
||||
// make sure they were processed in the correct order
|
||||
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
||||
}
|
||||
|
||||
public function testWorkerWithDecorator()
|
||||
{
|
||||
$envelope1 = new Envelope(new DummyMessage('message1'));
|
||||
$envelope2 = new Envelope(new DummyMessage('message2'));
|
||||
$envelope3 = new Envelope(new DummyMessage('message3'));
|
||||
|
||||
$receiver = new DummyReceiver([
|
||||
[$envelope1, $envelope2, $envelope3],
|
||||
]);
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$worker = new Worker([$receiver], $bus);
|
||||
$workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2);
|
||||
$processedEnvelopes = [];
|
||||
$workerWithDecorator->run([], function (?Envelope $envelope) use (&$processedEnvelopes) {
|
||||
if (null !== $envelope) {
|
||||
$processedEnvelopes[] = $envelope;
|
||||
}
|
||||
});
|
||||
|
||||
$this->assertSame([$envelope1, $envelope2], $processedEnvelopes);
|
||||
}
|
||||
}
|
||||
|
||||
class DummyReceiver implements ReceiverInterface
|
||||
|
@ -16,6 +16,8 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||
@ -26,10 +28,11 @@ use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*
|
||||
* @final
|
||||
*/
|
||||
class Worker implements WorkerInterface
|
||||
class Worker
|
||||
{
|
||||
private $receivers;
|
||||
private $bus;
|
||||
@ -54,28 +57,14 @@ class Worker implements WorkerInterface
|
||||
* Valid options are:
|
||||
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
|
||||
*/
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
public function run(array $options = []): void
|
||||
{
|
||||
$this->dispatchEvent(new WorkerStartedEvent($this));
|
||||
|
||||
$options = array_merge([
|
||||
'sleep' => 1000000,
|
||||
], $options);
|
||||
|
||||
if (\function_exists('pcntl_signal')) {
|
||||
pcntl_signal(SIGTERM, function () {
|
||||
$this->stop();
|
||||
});
|
||||
}
|
||||
|
||||
$onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
|
||||
if (\function_exists('pcntl_signal_dispatch')) {
|
||||
pcntl_signal_dispatch();
|
||||
}
|
||||
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
};
|
||||
|
||||
while (false === $this->shouldStop) {
|
||||
$envelopeHandled = false;
|
||||
foreach ($this->receivers as $transportName => $receiver) {
|
||||
@ -85,7 +74,7 @@ class Worker implements WorkerInterface
|
||||
$envelopeHandled = true;
|
||||
|
||||
$this->handleMessage($envelope, $receiver, $transportName);
|
||||
$onHandled($envelope);
|
||||
$this->dispatchEvent(new WorkerRunningEvent($this, false));
|
||||
|
||||
if ($this->shouldStop) {
|
||||
break 2;
|
||||
@ -101,13 +90,13 @@ class Worker implements WorkerInterface
|
||||
}
|
||||
|
||||
if (false === $envelopeHandled) {
|
||||
$onHandled(null);
|
||||
$this->dispatchEvent(new WorkerRunningEvent($this, true));
|
||||
|
||||
usleep($options['sleep']);
|
||||
}
|
||||
}
|
||||
|
||||
$this->dispatchEvent(new WorkerStoppedEvent());
|
||||
$this->dispatchEvent(new WorkerStoppedEvent($this));
|
||||
}
|
||||
|
||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
|
||||
|
@ -1,59 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Worker;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
/**
|
||||
* @author Simon Delicata <simon.delicata@free.fr>
|
||||
*/
|
||||
class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface
|
||||
{
|
||||
private $decoratedWorker;
|
||||
private $memoryLimit;
|
||||
private $logger;
|
||||
private $memoryResolver;
|
||||
|
||||
public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
|
||||
{
|
||||
$this->decoratedWorker = $decoratedWorker;
|
||||
$this->memoryLimit = $memoryLimit;
|
||||
$this->logger = $logger;
|
||||
$this->memoryResolver = $memoryResolver ?: function () {
|
||||
return memory_get_usage(true);
|
||||
};
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
|
||||
$memoryResolver = $this->memoryResolver;
|
||||
if ($memoryResolver() > $this->memoryLimit) {
|
||||
$this->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->decoratedWorker->stop();
|
||||
}
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Worker;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*/
|
||||
class StopWhenMessageCountIsExceededWorker implements WorkerInterface
|
||||
{
|
||||
private $decoratedWorker;
|
||||
private $maximumNumberOfMessages;
|
||||
private $logger;
|
||||
|
||||
public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->decoratedWorker = $decoratedWorker;
|
||||
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
$receivedMessages = 0;
|
||||
|
||||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
|
||||
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
||||
$this->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->decoratedWorker->stop();
|
||||
}
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Worker;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class StopWhenRestartSignalIsReceived implements WorkerInterface
|
||||
{
|
||||
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
|
||||
|
||||
private $decoratedWorker;
|
||||
private $cachePool;
|
||||
private $logger;
|
||||
|
||||
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->decoratedWorker = $decoratedWorker;
|
||||
$this->cachePool = $cachePool;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
$workerStartedAt = microtime(true);
|
||||
|
||||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedAt) {
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
|
||||
if ($this->shouldRestart($workerStartedAt)) {
|
||||
$this->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped because a restart was requested.');
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->decoratedWorker->stop();
|
||||
}
|
||||
|
||||
private function shouldRestart(float $workerStartedAt): bool
|
||||
{
|
||||
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
|
||||
if (!$cacheItem->isHit()) {
|
||||
// no restart has ever been scheduled
|
||||
return false;
|
||||
}
|
||||
|
||||
return $workerStartedAt < $cacheItem->get();
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Worker;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
/**
|
||||
* @author Simon Delicata <simon.delicata@free.fr>
|
||||
*/
|
||||
class StopWhenTimeLimitIsReachedWorker implements WorkerInterface
|
||||
{
|
||||
private $decoratedWorker;
|
||||
private $timeLimitInSeconds;
|
||||
private $logger;
|
||||
|
||||
public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->decoratedWorker = $decoratedWorker;
|
||||
$this->timeLimitInSeconds = $timeLimitInSeconds;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
$startTime = microtime(true);
|
||||
$endTime = $startTime + $this->timeLimitInSeconds;
|
||||
|
||||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
|
||||
if ($endTime < microtime(true)) {
|
||||
$this->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->decoratedWorker->stop();
|
||||
}
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger;
|
||||
|
||||
/**
|
||||
* Interface for Workers that handle messages from transports.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
interface WorkerInterface
|
||||
{
|
||||
/**
|
||||
* Receives the messages and dispatch them to the bus.
|
||||
*
|
||||
* The $onHandledCallback will be passed the Envelope that was just
|
||||
* handled or null if nothing was handled.
|
||||
*
|
||||
* @param mixed[] $options options used to control worker behavior
|
||||
*/
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void;
|
||||
|
||||
/**
|
||||
* Stops receiving messages.
|
||||
*/
|
||||
public function stop(): void;
|
||||
}
|
Reference in New Issue
Block a user