[Messenger] New messenger:stop-workers Command
This commit is contained in:
parent
88042a317a
commit
58971627f5
@ -36,6 +36,10 @@
|
||||
<tag name="cache.pool" />
|
||||
</service>
|
||||
|
||||
<service id="cache.messenger.restart_workers_signal" parent="cache.app" public="false">
|
||||
<tag name="cache.pool" />
|
||||
</service>
|
||||
|
||||
<service id="cache.adapter.system" class="Symfony\Component\Cache\Adapter\AdapterInterface" abstract="true">
|
||||
<factory class="Symfony\Component\Cache\Adapter\AbstractAdapter" method="createSystemCache" />
|
||||
<tag name="cache.pool" clearer="cache.system_clearer" />
|
||||
|
@ -83,6 +83,9 @@
|
||||
<argument type="collection" /> <!-- Receiver names -->
|
||||
<argument type="service" id="messenger.retry_strategy_locator" />
|
||||
<argument type="service" id="event_dispatcher" />
|
||||
<call method="setCachePoolForRestartSignal">
|
||||
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
||||
</call>
|
||||
|
||||
<tag name="console.command" command="messenger:consume" />
|
||||
<tag name="console.command" command="messenger:consume-messages" />
|
||||
@ -101,6 +104,11 @@
|
||||
<tag name="console.command" command="debug:messenger" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.messenger_stop_workers" class="Symfony\Component\Messenger\Command\StopWorkersCommand">
|
||||
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
||||
<tag name="console.command" command="messenger:stop-workers" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
|
||||
<argument type="service" id="router" />
|
||||
<tag name="console.command" command="debug:router" />
|
||||
|
@ -4,6 +4,8 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* Added new `messenger:stop-workers` command that sends a signal
|
||||
to stop all `messenger:consume` workers.
|
||||
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
|
||||
changed: a required 3rd `SerializerInterface` argument was added.
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
@ -25,6 +26,7 @@ use Symfony\Component\Messenger\RoutableMessageBus;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
|
||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
|
||||
private $receiverNames;
|
||||
private $retryStrategyLocator;
|
||||
private $eventDispatcher;
|
||||
/** @var CacheItemPoolInterface|null */
|
||||
private $restartSignalCachePool;
|
||||
|
||||
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
||||
{
|
||||
@ -62,6 +66,11 @@ class ConsumeMessagesCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
|
||||
{
|
||||
$this->restartSignalCachePool = $restartSignalCachePool;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
@ -190,6 +199,11 @@ EOF
|
||||
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
|
||||
}
|
||||
|
||||
if (null !== $this->restartSignalCachePool) {
|
||||
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
|
||||
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
|
||||
}
|
||||
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
|
||||
|
||||
|
@ -0,0 +1,73 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class StopWorkersCommand extends Command
|
||||
{
|
||||
protected static $defaultName = 'messenger:stop-workers';
|
||||
|
||||
private $restartSignalCachePool;
|
||||
|
||||
public function __construct(CacheItemPoolInterface $restartSignalCachePool)
|
||||
{
|
||||
$this->restartSignalCachePool = $restartSignalCachePool;
|
||||
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setDefinition([])
|
||||
->setDescription('Stops workers after their current message')
|
||||
->setHelp(<<<'EOF'
|
||||
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
|
||||
|
||||
<info>php %command.full_name%</info>
|
||||
|
||||
Each worker command will finish the message they are currently processing
|
||||
and then exit. Worker commands are *not* automatically restarted: that
|
||||
should be handled by something like supervisord.
|
||||
EOF
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): void
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
|
||||
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
$cacheItem->set(time());
|
||||
$this->restartSignalCachePool->save($cacheItem);
|
||||
|
||||
$io->success('Signal successfully sent to stop any running workers.');
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Command;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Cache\CacheItemInterface;
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\Messenger\Command\StopWorkersCommand;
|
||||
|
||||
class StopWorkersCommandTest extends TestCase
|
||||
{
|
||||
public function testItSetsCacheItem()
|
||||
{
|
||||
$cachePool = $this->createMock(CacheItemPoolInterface::class);
|
||||
$cacheItem = $this->createMock(CacheItemInterface::class);
|
||||
$cacheItem->expects($this->once())->method('set');
|
||||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
|
||||
$cachePool->expects($this->once())->method('save')->with($cacheItem);
|
||||
|
||||
$command = new StopWorkersCommand($cachePool);
|
||||
|
||||
$tester = new CommandTester($command);
|
||||
$tester->execute([]);
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Worker;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Cache\CacheItemInterface;
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
|
||||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
|
||||
|
||||
/**
|
||||
* @group time-sensitive
|
||||
*/
|
||||
class StopWhenRestartSignalIsReceivedTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider restartTimeProvider
|
||||
*/
|
||||
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$cachePool = $this->createMock(CacheItemPoolInterface::class);
|
||||
$cacheItem = $this->createMock(CacheItemInterface::class);
|
||||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
|
||||
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
|
||||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
|
||||
|
||||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
|
||||
$stopOnSignalWorker->run();
|
||||
|
||||
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
|
||||
}
|
||||
|
||||
public function restartTimeProvider()
|
||||
{
|
||||
yield [null, false]; // no cached restart time, do not restart
|
||||
yield [+10, true]; // 10 seconds after starting, a restart was requested
|
||||
yield [-10, false]; // a restart was requested, but 10 seconds before we started
|
||||
}
|
||||
|
||||
public function testWorkerDoesNotStopIfRestartNotInCache()
|
||||
{
|
||||
$decoratedWorker = new DummyWorker([
|
||||
new Envelope(new \stdClass()),
|
||||
]);
|
||||
|
||||
$cachePool = $this->createMock(CacheItemPoolInterface::class);
|
||||
$cacheItem = $this->createMock(CacheItemInterface::class);
|
||||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
|
||||
$cacheItem->expects($this->never())->method('get');
|
||||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
|
||||
|
||||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
|
||||
$stopOnSignalWorker->run();
|
||||
|
||||
$this->assertFalse($decoratedWorker->isStopped());
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Worker;
|
||||
|
||||
use Psr\Cache\CacheItemPoolInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\WorkerInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class StopWhenRestartSignalIsReceived implements WorkerInterface
|
||||
{
|
||||
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
|
||||
|
||||
private $decoratedWorker;
|
||||
private $cachePool;
|
||||
private $logger;
|
||||
|
||||
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->decoratedWorker = $decoratedWorker;
|
||||
$this->cachePool = $cachePool;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function run(array $options = [], callable $onHandledCallback = null): void
|
||||
{
|
||||
$workerStartedTimestamp = time();
|
||||
|
||||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
|
||||
if (null !== $onHandledCallback) {
|
||||
$onHandledCallback($envelope);
|
||||
}
|
||||
|
||||
if ($this->shouldRestart($workerStartedTimestamp)) {
|
||||
$this->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped because a restart was requested.');
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->decoratedWorker->stop();
|
||||
}
|
||||
|
||||
private function shouldRestart(int $workerStartedAt)
|
||||
{
|
||||
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||
if (!$cacheItem->isHit()) {
|
||||
// no restart has ever been scheduled
|
||||
return false;
|
||||
}
|
||||
|
||||
return $workerStartedAt < $cacheItem->get();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user