feature #30754 [Messenger] New messenger:stop-workers Command (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30754).

Discussion
----------

[Messenger] New messenger:stop-workers Command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Kinda of #29451
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

o/ me again.

This requires and is built on top of #30708

When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).

Now run:

```
bin/console messenger:stop-workers
```

And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.

Cheers!

Commits
-------

58971627f5 [Messenger] New messenger:stop-workers Command
This commit is contained in:
Fabien Potencier 2019-03-31 18:41:56 +02:00
commit 343d28e3d1
8 changed files with 279 additions and 0 deletions

View File

@ -36,6 +36,10 @@
<tag name="cache.pool" />
</service>
<service id="cache.messenger.restart_workers_signal" parent="cache.app" public="false">
<tag name="cache.pool" />
</service>
<service id="cache.adapter.system" class="Symfony\Component\Cache\Adapter\AdapterInterface" abstract="true">
<factory class="Symfony\Component\Cache\Adapter\AbstractAdapter" method="createSystemCache" />
<tag name="cache.pool" clearer="cache.system_clearer" />

View File

@ -83,6 +83,9 @@
<argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" />
</call>
<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />
@ -101,6 +104,11 @@
<tag name="console.command" command="debug:messenger" />
</service>
<service id="console.command.messenger_stop_workers" class="Symfony\Component\Messenger\Command\StopWorkersCommand">
<argument type="service" id="cache.messenger.restart_workers_signal" />
<tag name="console.command" command="messenger:stop-workers" />
</service>
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<tag name="console.command" command="debug:router" />

View File

@ -10,6 +10,8 @@ CHANGELOG
second argument.
* [BC BREAK] The `MessageBusInterface::dispatch()` signature changed:
a second argument `array $stamps = []` was added.
* Added new `messenger:stop-workers` command that sends a signal
to stop all `messenger:consume` workers.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
@ -25,6 +26,7 @@ use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
private $receiverNames;
private $retryStrategyLocator;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
@ -62,6 +66,11 @@ class ConsumeMessagesCommand extends Command
parent::__construct();
}
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}
/**
* {@inheritdoc}
*/
@ -190,6 +199,11 @@ EOF
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));

View File

@ -0,0 +1,73 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWorkersCommand extends Command
{
protected static $defaultName = 'messenger:stop-workers';
private $restartSignalCachePool;
public function __construct(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([])
->setDescription('Stops workers after their current message')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
<info>php %command.full_name%</info>
Each worker command will finish the message they are currently processing
and then exit. Worker commands are *not* automatically restarted: that
should be handled by something like supervisord.
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): void
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(time());
$this->restartSignalCachePool->save($cacheItem);
$io->success('Signal successfully sent to stop any running workers.');
}
}

View File

@ -0,0 +1,35 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\StopWorkersCommand;
class StopWorkersCommandTest extends TestCase
{
public function testItSetsCacheItem()
{
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('set');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$cachePool->expects($this->once())->method('save')->with($cacheItem);
$command = new StopWorkersCommand($cachePool);
$tester = new CommandTester($command);
$tester->execute([]);
}
}

View File

@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
/**
* @group time-sensitive
*/
class StopWhenRestartSignalIsReceivedTest extends TestCase
{
/**
* @dataProvider restartTimeProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function restartTimeProvider()
{
yield [null, false]; // no cached restart time, do not restart
yield [+10, true]; // 10 seconds after starting, a restart was requested
yield [-10, false]; // a restart was requested, but 10 seconds before we started
}
public function testWorkerDoesNotStopIfRestartNotInCache()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
$cacheItem->expects($this->never())->method('get');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$this->assertFalse($decoratedWorker->isStopped());
}
}

View File

@ -0,0 +1,72 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWhenRestartSignalIsReceived implements WorkerInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
private $decoratedWorker;
private $cachePool;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->cachePool = $cachePool;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$workerStartedTimestamp = time();
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if ($this->shouldRestart($workerStartedTimestamp)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
private function shouldRestart(int $workerStartedAt)
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}
return $workerStartedAt < $cacheItem->get();
}
}