From 08f98cfaf1f09b9f854483ba59202eef737e70bd Mon Sep 17 00:00:00 2001 From: Simon DELICATA Date: Wed, 18 Apr 2018 15:40:19 +0200 Subject: [PATCH] [Messenger] Add a memory limit option for `ConsumeMessagesCommand` --- .../Resources/config/console.xml | 1 + .../Command/ConsumeMessagesCommand.php | 48 ++++++++- .../AmqpExt/AmqpExtIntegrationTest.php | 13 ++- .../Tests/Fixtures/CallbackReceiver.php | 25 +++++ ...pWhenMemoryUsageIsExceededReceiverTest.php | 83 ++++++++++++++ ...WhenMessageCountIsExceededReceiverTest.php | 102 ++++++++++++++++++ .../Component/Messenger/Tests/WorkerTest.php | 22 +--- .../StopWhenMemoryUsageIsExceededReceiver.php | 56 ++++++++++ ...topWhenMessageCountIsExceededReceiver.php} | 12 ++- 9 files changed, 329 insertions(+), 33 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php rename src/Symfony/Component/Messenger/Transport/Enhancers/{MaximumCountReceiver.php => StopWhenMessageCountIsExceededReceiver.php} (66%) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 989da40e00..cb71bbb8de 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -72,6 +72,7 @@ + diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 1ef2e843ce..be976f63c0 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -12,13 +12,15 @@ namespace Symfony\Component\Messenger\Command; use Psr\Container\ContainerInterface; +use Psr\Log\LoggerInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -33,24 +35,27 @@ class ConsumeMessagesCommand extends Command private $bus; private $receiverLocator; + private $logger; - public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator) + public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null) { parent::__construct(); $this->bus = $bus; $this->receiverLocator = $receiverLocator; + $this->logger = $logger; } /** * {@inheritdoc} */ - protected function configure() + protected function configure(): void { $this ->setDefinition(array( new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'), new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), + new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), )) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -61,6 +66,10 @@ The %command.name% command consumes messages and dispatches them to Use the --limit option to limit the number of messages received: php %command.full_name% --limit=10 + +Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]: + + php %command.full_name% --memory-limit=128M EOF ) ; @@ -69,7 +78,7 @@ EOF /** * {@inheritdoc} */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): void { if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); @@ -80,10 +89,39 @@ EOF } if ($limit = $input->getOption('limit')) { - $receiver = new MaximumCountReceiver($receiver, $limit); + $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); + } + + if ($memoryLimit = $input->getOption('memory-limit')) { + $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger); } $worker = new Worker($receiver, $this->bus); $worker->run(); } + + private function convertToBytes(string $memoryLimit): int + { + $memoryLimit = strtolower($memoryLimit); + $max = strtolower(ltrim($memoryLimit, '+')); + if (0 === strpos($max, '0x')) { + $max = intval($max, 16); + } elseif (0 === strpos($max, '0')) { + $max = intval($max, 8); + } else { + $max = (int) $max; + } + + switch (substr($memoryLimit, -1)) { + case 't': $max *= 1024; + // no break + case 'g': $max *= 1024; + // no break + case 'm': $max *= 1024; + // no break + case 'k': $max *= 1024; + } + + return $max; + } } diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php index 61d1ebe984..a1718af02f 100644 --- a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php @@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Adapter\AmqpExt\Connection; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Process\PhpProcess; use Symfony\Component\Process\Process; @@ -58,7 +57,7 @@ class AmqpExtIntegrationTest extends TestCase $receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) { $this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message); - if (2 == ++$receivedMessages) { + if (2 === ++$receivedMessages) { $receiver->stop(); } }); @@ -116,9 +115,15 @@ class AmqpExtIntegrationTest extends TestCase $connection->queue()->purge(); $sender = new AmqpSender($serializer, $connection); - $receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2); - $receiver->receive(function ($message) { + $receiver = new AmqpReceiver($serializer, $connection); + + $receivedMessages = 0; + $receiver->receive(function ($message) use ($receiver, &$receivedMessages) { $this->assertNull($message); + + if (2 === ++$receivedMessages) { + $receiver->stop(); + } }); } diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php new file mode 100644 index 0000000000..3580b0ef0c --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php @@ -0,0 +1,25 @@ +callable = $callable; + } + + public function receive(callable $handler): void + { + $callable = $this->callable; + $callable($handler); + } + + public function stop(): void + { + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php new file mode 100644 index 0000000000..f317c6a5bf --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php @@ -0,0 +1,83 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Enhancers; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; + +class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase +{ + /** + * @dataProvider memoryProvider + */ + public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) + { + $callable = function ($handler) { + $handler(new DummyMessage('API')); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + if (true === $shouldStop) { + $decoratedReceiver->expects($this->once())->method('stop'); + } else { + $decoratedReceiver->expects($this->never())->method('stop'); + } + + $memoryResolver = function () use ($memoryUsage) { + return $memoryUsage; + }; + + $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver); + $memoryLimitReceiver->receive(function () {}); + } + + public function memoryProvider() + { + yield array(2048, 1024, true); + yield array(1024, 1024, false); + yield array(1024, 2048, false); + } + + public function testReceiverLogsMemoryExceededWhenLoggerIsGiven() + { + $callable = function ($handler) { + $handler(new DummyMessage('API')); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + $decoratedReceiver->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => 64 * 1024 * 1024)); + + $memoryResolver = function () { + return 70 * 1024 * 1024; + }; + + $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver); + $memoryLimitReceiver->receive(function () {}); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php new file mode 100644 index 0000000000..7a516744e1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php @@ -0,0 +1,102 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Enhancers; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; + +class StopWhenMessageCountIsExceededReceiverTest extends TestCase +{ + /** + * @dataProvider countProvider + */ + public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop) + { + $callable = function ($handler) { + $handler(new DummyMessage('First message')); + $handler(new DummyMessage('Second message')); + $handler(new DummyMessage('Third message')); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + if (true === $shouldStop) { + $decoratedReceiver->expects($this->any())->method('stop'); + } else { + $decoratedReceiver->expects($this->never())->method('stop'); + } + + $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max); + $maximumCountReceiver->receive(function () {}); + } + + public function countProvider() + { + yield array(1, true); + yield array(2, true); + yield array(3, true); + yield array(4, false); + } + + public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage() + { + $callable = function ($handler) { + $handler(null); + $handler(null); + $handler(null); + $handler(null); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + $decoratedReceiver->expects($this->never())->method('stop'); + + $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1); + $maximumCountReceiver->receive(function () {}); + } + + public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven() + { + $callable = function ($handler) { + $handler(new DummyMessage('First message')); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + $decoratedReceiver->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with( + $this->equalTo('Receiver stopped due to maximum count of {count} exceeded'), + $this->equalTo(array('count' => 1)) + ); + + $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger); + $maximumCountReceiver->receive(function () {}); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 3c9aeb12e9..7599d8dadc 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Worker; class WorkerTest extends TestCase @@ -83,23 +83,3 @@ class WorkerTest extends TestCase $worker->run(); } } - -class CallbackReceiver implements ReceiverInterface -{ - private $callable; - - public function __construct(callable $callable) - { - $this->callable = $callable; - } - - public function receive(callable $handler): void - { - $callable = $this->callable; - $callable($handler); - } - - public function stop(): void - { - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php new file mode 100644 index 0000000000..4a05afe770 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php @@ -0,0 +1,56 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Enhancers; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Transport\ReceiverInterface; + +/** + * @author Simon Delicata + */ +class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface +{ + private $decoratedReceiver; + private $memoryLimit; + private $logger; + private $memoryResolver; + + public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) + { + $this->decoratedReceiver = $decoratedReceiver; + $this->memoryLimit = $memoryLimit; + $this->logger = $logger; + $this->memoryResolver = $memoryResolver ?: function () { + return \memory_get_usage(); + }; + } + + public function receive(callable $handler): void + { + $this->decoratedReceiver->receive(function ($message) use ($handler) { + $handler($message); + + $memoryResolver = $this->memoryResolver; + if ($memoryResolver() > $this->memoryLimit) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => $this->memoryLimit)); + } + } + }); + } + + public function stop(): void + { + $this->decoratedReceiver->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php similarity index 66% rename from src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php rename to src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php index 37fa2b649a..dc61466bbf 100644 --- a/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php @@ -11,20 +11,23 @@ namespace Symfony\Component\Messenger\Transport\Enhancers; +use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface; /** * @author Samuel Roze */ -class MaximumCountReceiver implements ReceiverInterface +class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface { private $decoratedReceiver; private $maximumNumberOfMessages; + private $logger; - public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages) + public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null) { $this->decoratedReceiver = $decoratedReceiver; $this->maximumNumberOfMessages = $maximumNumberOfMessages; + $this->logger = $logger; } public function receive(callable $handler): void @@ -34,8 +37,11 @@ class MaximumCountReceiver implements ReceiverInterface $this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) { $handler($message); - if (++$receivedMessages >= $this->maximumNumberOfMessages) { + if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) { $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages)); + } } }); }