Add a new time limit receiver
This commit is contained in:
parent
47da23c7cb
commit
5536ee1b3e
@ -19,8 +19,9 @@ use Symfony\Component\Console\Input\InputInterface;
|
|||||||
use Symfony\Component\Console\Input\InputOption;
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
use Symfony\Component\Console\Output\OutputInterface;
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
|
||||||
@ -56,6 +57,7 @@ class ConsumeMessagesCommand extends Command
|
|||||||
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
|
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
|
||||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||||
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||||
|
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
||||||
))
|
))
|
||||||
->setDescription('Consumes messages')
|
->setDescription('Consumes messages')
|
||||||
->setHelp(<<<'EOF'
|
->setHelp(<<<'EOF'
|
||||||
@ -70,6 +72,10 @@ Use the --limit option to limit the number of messages received:
|
|||||||
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
|
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
|
||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
|
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
|
||||||
|
|
||||||
|
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
|
||||||
|
|
||||||
|
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
|
||||||
EOF
|
EOF
|
||||||
)
|
)
|
||||||
;
|
;
|
||||||
@ -96,6 +102,10 @@ EOF
|
|||||||
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($timeLimit = $input->getOption('time-limit')) {
|
||||||
|
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
|
||||||
|
}
|
||||||
|
|
||||||
$worker = new Worker($receiver, $this->bus);
|
$worker = new Worker($receiver, $this->bus);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,48 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
|
||||||
|
|
||||||
|
class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @group time-sensitive
|
||||||
|
*/
|
||||||
|
public function testReceiverStopsWhenTimeLimitIsReached()
|
||||||
|
{
|
||||||
|
$callable = function ($handler) {
|
||||||
|
$handler(new DummyMessage('API'));
|
||||||
|
};
|
||||||
|
|
||||||
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
->setConstructorArgs(array($callable))
|
||||||
|
->enableProxyingToOriginalMethods()
|
||||||
|
->getMock();
|
||||||
|
|
||||||
|
$decoratedReceiver->expects($this->once())->method('receive');
|
||||||
|
$decoratedReceiver->expects($this->once())->method('stop');
|
||||||
|
|
||||||
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
|
$logger->expects($this->once())->method('info')
|
||||||
|
->with('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => 1));
|
||||||
|
|
||||||
|
$timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
|
||||||
|
$timeoutReceiver->receive(function () {
|
||||||
|
sleep(2);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Simon Delicata <simon.delicata@free.fr>
|
||||||
|
*/
|
||||||
|
class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
|
||||||
|
{
|
||||||
|
private $decoratedReceiver;
|
||||||
|
private $timeLimitInSeconds;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
|
public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver = $decoratedReceiver;
|
||||||
|
$this->timeLimitInSeconds = $timeLimitInSeconds;
|
||||||
|
$this->logger = $logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function receive(callable $handler): void
|
||||||
|
{
|
||||||
|
$startTime = time();
|
||||||
|
$endTime = $startTime + $this->timeLimitInSeconds;
|
||||||
|
|
||||||
|
$this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) {
|
||||||
|
$handler($message);
|
||||||
|
|
||||||
|
if ($endTime < time()) {
|
||||||
|
$this->stop();
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->stop();
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user