feature #26975 [Messenger] Add a memory limit option for ConsumeMessagesCommand
(sdelicata)
This PR was merged into the 4.1-dev branch.
Discussion
----------
[Messenger] Add a memory limit option for `ConsumeMessagesCommand`
| Q | A
| ------------- | ---
| Branch? | master for features
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| License | MIT
New feature to add a memory limit option to the ConsumeMessagesCommand.
```
bin/console messenger:consume-messages <receiver-name> -m 128
```
Commits
-------
08f98cfaf1
[Messenger] Add a memory limit option for `ConsumeMessagesCommand`
This commit is contained in:
commit
ee0967f0ea
@ -72,6 +72,7 @@
|
|||||||
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
|
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
|
||||||
<argument type="service" id="message_bus" />
|
<argument type="service" id="message_bus" />
|
||||||
<argument type="service" id="messenger.receiver_locator" />
|
<argument type="service" id="messenger.receiver_locator" />
|
||||||
|
<argument type="service" id="logger" on-invalid="null" />
|
||||||
|
|
||||||
<tag name="console.command" command="messenger:consume-messages" />
|
<tag name="console.command" command="messenger:consume-messages" />
|
||||||
</service>
|
</service>
|
||||||
|
@ -12,13 +12,15 @@
|
|||||||
namespace Symfony\Component\Messenger\Command;
|
namespace Symfony\Component\Messenger\Command;
|
||||||
|
|
||||||
use Psr\Container\ContainerInterface;
|
use Psr\Container\ContainerInterface;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
use Symfony\Component\Console\Command\Command;
|
use Symfony\Component\Console\Command\Command;
|
||||||
use Symfony\Component\Console\Input\InputArgument;
|
use Symfony\Component\Console\Input\InputArgument;
|
||||||
use Symfony\Component\Console\Input\InputInterface;
|
use Symfony\Component\Console\Input\InputInterface;
|
||||||
use Symfony\Component\Console\Input\InputOption;
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
use Symfony\Component\Console\Output\OutputInterface;
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
@ -33,24 +35,27 @@ class ConsumeMessagesCommand extends Command
|
|||||||
|
|
||||||
private $bus;
|
private $bus;
|
||||||
private $receiverLocator;
|
private $receiverLocator;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
|
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
|
|
||||||
$this->bus = $bus;
|
$this->bus = $bus;
|
||||||
$this->receiverLocator = $receiverLocator;
|
$this->receiverLocator = $receiverLocator;
|
||||||
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
protected function configure()
|
protected function configure(): void
|
||||||
{
|
{
|
||||||
$this
|
$this
|
||||||
->setDefinition(array(
|
->setDefinition(array(
|
||||||
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
|
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
|
||||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||||
|
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||||
))
|
))
|
||||||
->setDescription('Consumes messages')
|
->setDescription('Consumes messages')
|
||||||
->setHelp(<<<'EOF'
|
->setHelp(<<<'EOF'
|
||||||
@ -61,6 +66,10 @@ The <info>%command.name%</info> command consumes messages and dispatches them to
|
|||||||
Use the --limit option to limit the number of messages received:
|
Use the --limit option to limit the number of messages received:
|
||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
<info>php %command.full_name% <receiver-name> --limit=10</info>
|
||||||
|
|
||||||
|
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
|
||||||
|
|
||||||
|
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
|
||||||
EOF
|
EOF
|
||||||
)
|
)
|
||||||
;
|
;
|
||||||
@ -69,7 +78,7 @@ EOF
|
|||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
protected function execute(InputInterface $input, OutputInterface $output)
|
protected function execute(InputInterface $input, OutputInterface $output): void
|
||||||
{
|
{
|
||||||
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
||||||
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
||||||
@ -80,10 +89,39 @@ EOF
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
$receiver = new MaximumCountReceiver($receiver, $limit);
|
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($memoryLimit = $input->getOption('memory-limit')) {
|
||||||
|
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
$worker = new Worker($receiver, $this->bus);
|
$worker = new Worker($receiver, $this->bus);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function convertToBytes(string $memoryLimit): int
|
||||||
|
{
|
||||||
|
$memoryLimit = strtolower($memoryLimit);
|
||||||
|
$max = strtolower(ltrim($memoryLimit, '+'));
|
||||||
|
if (0 === strpos($max, '0x')) {
|
||||||
|
$max = intval($max, 16);
|
||||||
|
} elseif (0 === strpos($max, '0')) {
|
||||||
|
$max = intval($max, 8);
|
||||||
|
} else {
|
||||||
|
$max = (int) $max;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (substr($memoryLimit, -1)) {
|
||||||
|
case 't': $max *= 1024;
|
||||||
|
// no break
|
||||||
|
case 'g': $max *= 1024;
|
||||||
|
// no break
|
||||||
|
case 'm': $max *= 1024;
|
||||||
|
// no break
|
||||||
|
case 'k': $max *= 1024;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $max;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
|
|||||||
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
|
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Process\PhpProcess;
|
use Symfony\Component\Process\PhpProcess;
|
||||||
use Symfony\Component\Process\Process;
|
use Symfony\Component\Process\Process;
|
||||||
@ -58,7 +57,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
|
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
|
||||||
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
|
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
|
||||||
|
|
||||||
if (2 == ++$receivedMessages) {
|
if (2 === ++$receivedMessages) {
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -116,9 +115,15 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($serializer, $connection);
|
||||||
$receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2);
|
$receiver = new AmqpReceiver($serializer, $connection);
|
||||||
$receiver->receive(function ($message) {
|
|
||||||
|
$receivedMessages = 0;
|
||||||
|
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
|
||||||
$this->assertNull($message);
|
$this->assertNull($message);
|
||||||
|
|
||||||
|
if (2 === ++$receivedMessages) {
|
||||||
|
$receiver->stop();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,25 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
|
class CallbackReceiver implements ReceiverInterface
|
||||||
|
{
|
||||||
|
private $callable;
|
||||||
|
|
||||||
|
public function __construct(callable $callable)
|
||||||
|
{
|
||||||
|
$this->callable = $callable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function receive(callable $handler): void
|
||||||
|
{
|
||||||
|
$callable = $this->callable;
|
||||||
|
$callable($handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
|
|
||||||
|
class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider memoryProvider
|
||||||
|
*/
|
||||||
|
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(new DummyMessage('API'));
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
if (true === $shouldStop) {
|
||||||
|
$decoratedReceiver->expects($this->once())->method('stop');
|
||||||
|
} else {
|
||||||
|
$decoratedReceiver->expects($this->never())->method('stop');
|
||||||
|
}
|
||||||
|
|
||||||
|
$memoryResolver = function () use ($memoryUsage) {
|
||||||
|
return $memoryUsage;
|
||||||
|
};
|
||||||
|
|
||||||
|
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
|
||||||
|
$memoryLimitReceiver->receive(function () {});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function memoryProvider()
|
||||||
|
{
|
||||||
|
yield array(2048, 1024, true);
|
||||||
|
yield array(1024, 1024, false);
|
||||||
|
yield array(1024, 2048, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(new DummyMessage('API'));
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
$decoratedReceiver->expects($this->once())->method('stop');
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => 64 * 1024 * 1024));
|
||||||
|
|
||||||
|
$memoryResolver = function () {
|
||||||
|
return 70 * 1024 * 1024;
|
||||||
|
};
|
||||||
|
|
||||||
|
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
|
||||||
|
$memoryLimitReceiver->receive(function () {});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,102 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
||||||
|
|
||||||
|
class StopWhenMessageCountIsExceededReceiverTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider countProvider
|
||||||
|
*/
|
||||||
|
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(new DummyMessage('First message'));
|
||||||
|
$handler(new DummyMessage('Second message'));
|
||||||
|
$handler(new DummyMessage('Third message'));
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
if (true === $shouldStop) {
|
||||||
|
$decoratedReceiver->expects($this->any())->method('stop');
|
||||||
|
} else {
|
||||||
|
$decoratedReceiver->expects($this->never())->method('stop');
|
||||||
|
}
|
||||||
|
|
||||||
|
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
|
||||||
|
$maximumCountReceiver->receive(function () {});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function countProvider()
|
||||||
|
{
|
||||||
|
yield array(1, true);
|
||||||
|
yield array(2, true);
|
||||||
|
yield array(3, true);
|
||||||
|
yield array(4, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(null);
|
||||||
|
$handler(null);
|
||||||
|
$handler(null);
|
||||||
|
$handler(null);
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
$decoratedReceiver->expects($this->never())->method('stop');
|
||||||
|
|
||||||
|
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
|
||||||
|
$maximumCountReceiver->receive(function () {});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(new DummyMessage('First message'));
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
$decoratedReceiver->expects($this->once())->method('stop');
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with(
|
||||||
|
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
|
||||||
|
$this->equalTo(array('count' => 1))
|
||||||
|
);
|
||||||
|
|
||||||
|
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
|
||||||
|
$maximumCountReceiver->receive(function () {});
|
||||||
|
}
|
||||||
|
}
|
@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests;
|
|||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
class WorkerTest extends TestCase
|
class WorkerTest extends TestCase
|
||||||
@ -83,23 +83,3 @@ class WorkerTest extends TestCase
|
|||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class CallbackReceiver implements ReceiverInterface
|
|
||||||
{
|
|
||||||
private $callable;
|
|
||||||
|
|
||||||
public function __construct(callable $callable)
|
|
||||||
{
|
|
||||||
$this->callable = $callable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
|
||||||
{
|
|
||||||
$callable = $this->callable;
|
|
||||||
$callable($handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -0,0 +1,56 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Simon Delicata <simon.delicata@free.fr>
|
||||||
|
*/
|
||||||
|
class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
|
||||||
|
{
|
||||||
|
private $decoratedReceiver;
|
||||||
|
private $memoryLimit;
|
||||||
|
private $logger;
|
||||||
|
private $memoryResolver;
|
||||||
|
|
||||||
|
public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver = $decoratedReceiver;
|
||||||
|
$this->memoryLimit = $memoryLimit;
|
||||||
|
$this->logger = $logger;
|
||||||
|
$this->memoryResolver = $memoryResolver ?: function () {
|
||||||
|
return \memory_get_usage();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public function receive(callable $handler): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->receive(function ($message) use ($handler) {
|
||||||
|
$handler($message);
|
||||||
|
|
||||||
|
$memoryResolver = $this->memoryResolver;
|
||||||
|
if ($memoryResolver() > $this->memoryLimit) {
|
||||||
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => $this->memoryLimit));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->stop();
|
||||||
|
}
|
||||||
|
}
|
@ -11,20 +11,23 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*/
|
*/
|
||||||
class MaximumCountReceiver implements ReceiverInterface
|
class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
|
||||||
{
|
{
|
||||||
private $decoratedReceiver;
|
private $decoratedReceiver;
|
||||||
private $maximumNumberOfMessages;
|
private $maximumNumberOfMessages;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages)
|
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->decoratedReceiver = $decoratedReceiver;
|
$this->decoratedReceiver = $decoratedReceiver;
|
||||||
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
|
||||||
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
@ -34,8 +37,11 @@ class MaximumCountReceiver implements ReceiverInterface
|
|||||||
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
|
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
|
||||||
$handler($message);
|
$handler($message);
|
||||||
|
|
||||||
if (++$receivedMessages >= $this->maximumNumberOfMessages) {
|
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
||||||
$this->stop();
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
Reference in New Issue
Block a user