[Messenger] Add a memory limit option for `ConsumeMessagesCommand`

This commit is contained in:
Simon DELICATA 2018-04-18 15:40:19 +02:00 committed by Simon Delicata
parent cef8d2823a
commit 08f98cfaf1
9 changed files with 329 additions and 33 deletions

View File

@ -72,6 +72,7 @@
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument type="service" id="message_bus" />
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<tag name="console.command" command="messenger:consume-messages" />
</service>

View File

@ -12,13 +12,15 @@
namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
@ -33,24 +35,27 @@ class ConsumeMessagesCommand extends Command
private $bus;
private $receiverLocator;
private $logger;
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null)
{
parent::__construct();
$this->bus = $bus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
}
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDefinition(array(
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@ -61,6 +66,10 @@ The <info>%command.name%</info> command consumes messages and dispatches them to
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
EOF
)
;
@ -69,7 +78,7 @@ EOF
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): void
{
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
@ -80,10 +89,39 @@ EOF
}
if ($limit = $input->getOption('limit')) {
$receiver = new MaximumCountReceiver($receiver, $limit);
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
}
$worker = new Worker($receiver, $this->bus);
$worker->run();
}
private function convertToBytes(string $memoryLimit): int
{
$memoryLimit = strtolower($memoryLimit);
$max = strtolower(ltrim($memoryLimit, '+'));
if (0 === strpos($max, '0x')) {
$max = intval($max, 16);
} elseif (0 === strpos($max, '0')) {
$max = intval($max, 8);
} else {
$max = (int) $max;
}
switch (substr($memoryLimit, -1)) {
case 't': $max *= 1024;
// no break
case 'g': $max *= 1024;
// no break
case 'm': $max *= 1024;
// no break
case 'k': $max *= 1024;
}
return $max;
}
}

View File

@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
@ -58,7 +57,7 @@ class AmqpExtIntegrationTest extends TestCase
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
if (2 == ++$receivedMessages) {
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
@ -116,9 +115,15 @@ class AmqpExtIntegrationTest extends TestCase
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2);
$receiver->receive(function ($message) {
$receiver = new AmqpReceiver($serializer, $connection);
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
$this->assertNull($message);
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

View File

@ -0,0 +1,25 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
class CallbackReceiver implements ReceiverInterface
{
private $callable;
public function __construct(callable $callable)
{
$this->callable = $callable;
}
public function receive(callable $handler): void
{
$callable = $this->callable;
$callable($handler);
}
public function stop(): void
{
}
}

View File

@ -0,0 +1,83 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->once())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}
$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}
public function memoryProvider()
{
yield array(2048, 1024, true);
yield array(1024, 1024, false);
yield array(1024, 2048, false);
}
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => 64 * 1024 * 1024));
$memoryResolver = function () {
return 70 * 1024 * 1024;
};
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}
}

View File

@ -0,0 +1,102 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
class StopWhenMessageCountIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(new DummyMessage('Second message'));
$handler(new DummyMessage('Third message'));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->any())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
$maximumCountReceiver->receive(function () {});
}
public function countProvider()
{
yield array(1, true);
yield array(2, true);
yield array(3, true);
yield array(4, false);
}
public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
{
$callable = function ($handler) {
$handler(null);
$handler(null);
$handler(null);
$handler(null);
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->never())->method('stop');
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
$maximumCountReceiver->receive(function () {});
}
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
$this->equalTo(array('count' => 1))
);
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
$maximumCountReceiver->receive(function () {});
}
}

View File

@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
class WorkerTest extends TestCase
@ -83,23 +83,3 @@ class WorkerTest extends TestCase
$worker->run();
}
}
class CallbackReceiver implements ReceiverInterface
{
private $callable;
public function __construct(callable $callable)
{
$this->callable = $callable;
}
public function receive(callable $handler): void
{
$callable = $this->callable;
$callable($handler);
}
public function stop(): void
{
}
}

View File

@ -0,0 +1,56 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*/
class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $memoryLimit;
private $logger;
private $memoryResolver;
public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->memoryLimit = $memoryLimit;
$this->logger = $logger;
$this->memoryResolver = $memoryResolver ?: function () {
return \memory_get_usage();
};
}
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
$handler($message);
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => $this->memoryLimit));
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
}

View File

@ -11,20 +11,23 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MaximumCountReceiver implements ReceiverInterface
class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $maximumNumberOfMessages;
private $logger;
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages)
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
}
public function receive(callable $handler): void
@ -34,8 +37,11 @@ class MaximumCountReceiver implements ReceiverInterface
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
$handler($message);
if (++$receivedMessages >= $this->maximumNumberOfMessages) {
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));
}
}
});
}