feature #27130 [Messenger] Add a new time limit receiver (sdelicata)

This PR was merged into the 4.1-dev branch.

Discussion
----------

[Messenger] Add a new time limit receiver

| Q             | A
| ------------- | ---
| Branch?       | master for features
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no
| Tests pass?   | yes
| License       | MIT

New feature to add a time limit option to the ConsumeMessagesCommand.
```
bin/console messenger:consume-messages <receiver-name> -t 3600
```

Commits
-------

5536ee1b3e Add a new time limit receiver
This commit is contained in:
Samuel ROZE 2018-05-04 13:55:43 +01:00
commit d9c38310f6
3 changed files with 113 additions and 1 deletions

View File

@ -19,8 +19,9 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
@ -56,6 +57,7 @@ class ConsumeMessagesCommand extends Command
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@ -70,6 +72,10 @@ Use the --limit option to limit the number of messages received:
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
EOF
)
;
@ -96,6 +102,10 @@ EOF
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
}
$worker = new Worker($receiver, $this->bus);
$worker->run();
}

View File

@ -0,0 +1,48 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
{
/**
* @group time-sensitive
*/
public function testReceiverStopsWhenTimeLimitIsReached()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();
$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => 1));
$timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
$timeoutReceiver->receive(function () {
sleep(2);
});
}
}

View File

@ -0,0 +1,54 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
* @author Simon Delicata <simon.delicata@free.fr>
*/
class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $timeLimitInSeconds;
private $logger;
public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->timeLimitInSeconds = $timeLimitInSeconds;
$this->logger = $logger;
}
public function receive(callable $handler): void
{
$startTime = time();
$endTime = $startTime + $this->timeLimitInSeconds;
$this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) {
$handler($message);
if ($endTime < time()) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds));
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
}