From 5536ee1b3e85e3f6b5b778860a291d5f8995cc06 Mon Sep 17 00:00:00 2001 From: Simon DELICATA Date: Wed, 2 May 2018 23:18:35 +0200 Subject: [PATCH] Add a new time limit receiver --- .../Command/ConsumeMessagesCommand.php | 12 ++++- ...StopWhenTimeLimitIsReachedReceiverTest.php | 48 +++++++++++++++++ .../StopWhenTimeLimitIsReachedReceiver.php | 54 +++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index be976f63c0..94684aacfb 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -19,8 +19,9 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver; use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -56,6 +57,7 @@ class ConsumeMessagesCommand extends Command new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'), new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), + new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), )) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -70,6 +72,10 @@ Use the --limit option to limit the number of messages received: Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]: php %command.full_name% --memory-limit=128M + +Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached: + + php %command.full_name% --time-limit=3600 EOF ) ; @@ -96,6 +102,10 @@ EOF $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger); } + if ($timeLimit = $input->getOption('time-limit')) { + $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); + } + $worker = new Worker($receiver, $this->bus); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php new file mode 100644 index 0000000000..51119b36f5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php @@ -0,0 +1,48 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Enhancers; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver; + +class StopWhenTimeLimitIsReachedReceiverTest extends TestCase +{ + /** + * @group time-sensitive + */ + public function testReceiverStopsWhenTimeLimitIsReached() + { + $callable = function ($handler) { + $handler(new DummyMessage('API')); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock(); + + $decoratedReceiver->expects($this->once())->method('receive'); + $decoratedReceiver->expects($this->once())->method('stop'); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => 1)); + + $timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger); + $timeoutReceiver->receive(function () { + sleep(2); + }); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php new file mode 100644 index 0000000000..b88a0cbd04 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Enhancers; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Transport\ReceiverInterface; + +/** + * @author Simon Delicata + */ +class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface +{ + private $decoratedReceiver; + private $timeLimitInSeconds; + private $logger; + + public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null) + { + $this->decoratedReceiver = $decoratedReceiver; + $this->timeLimitInSeconds = $timeLimitInSeconds; + $this->logger = $logger; + } + + public function receive(callable $handler): void + { + $startTime = time(); + $endTime = $startTime + $this->timeLimitInSeconds; + + $this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) { + $handler($message); + + if ($endTime < time()) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds)); + } + } + }); + } + + public function stop(): void + { + $this->decoratedReceiver->stop(); + } +}