diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index be976f63c0..94684aacfb 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -19,8 +19,9 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
-use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
+use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
+use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
@@ -56,6 +57,7 @@ class ConsumeMessagesCommand extends Command
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
+ new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@@ -70,6 +72,10 @@ Use the --limit option to limit the number of messages received:
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
php %command.full_name% --memory-limit=128M
+
+Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
+
+ php %command.full_name% --time-limit=3600
EOF
)
;
@@ -96,6 +102,10 @@ EOF
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
}
+ if ($timeLimit = $input->getOption('time-limit')) {
+ $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
+ }
+
$worker = new Worker($receiver, $this->bus);
$worker->run();
}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php
new file mode 100644
index 0000000000..51119b36f5
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiverTest.php
@@ -0,0 +1,48 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
+
+class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
+{
+ /**
+ * @group time-sensitive
+ */
+ public function testReceiverStopsWhenTimeLimitIsReached()
+ {
+ $callable = function ($handler) {
+ $handler(new DummyMessage('API'));
+ };
+
+ $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
+ ->setConstructorArgs(array($callable))
+ ->enableProxyingToOriginalMethods()
+ ->getMock();
+
+ $decoratedReceiver->expects($this->once())->method('receive');
+ $decoratedReceiver->expects($this->once())->method('stop');
+
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('info')
+ ->with('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => 1));
+
+ $timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
+ $timeoutReceiver->receive(function () {
+ sleep(2);
+ });
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php
new file mode 100644
index 0000000000..b88a0cbd04
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenTimeLimitIsReachedReceiver.php
@@ -0,0 +1,54 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Enhancers;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Transport\ReceiverInterface;
+
+/**
+ * @author Simon Delicata
+ */
+class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
+{
+ private $decoratedReceiver;
+ private $timeLimitInSeconds;
+ private $logger;
+
+ public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
+ {
+ $this->decoratedReceiver = $decoratedReceiver;
+ $this->timeLimitInSeconds = $timeLimitInSeconds;
+ $this->logger = $logger;
+ }
+
+ public function receive(callable $handler): void
+ {
+ $startTime = time();
+ $endTime = $startTime + $this->timeLimitInSeconds;
+
+ $this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) {
+ $handler($message);
+
+ if ($endTime < time()) {
+ $this->stop();
+ if (null !== $this->logger) {
+ $this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds));
+ }
+ }
+ });
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedReceiver->stop();
+ }
+}