feature #29303 [Messenger] add welcome notice when running the command (nicolas-grekas)
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] add welcome notice when running the command
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | -
| License | MIT
| Doc PR | -
The current behavior of `./bin/console messenger:consume-messages` is totally silent: you run it and nothing visible happens.
Here is what is displayed with this PR:
![image](https://user-images.githubusercontent.com/243674/54235039-af0a6c80-4510-11e9-89d8-3c1c55e946c0.png)
Combined with #30539, it gives:
![image](https://user-images.githubusercontent.com/243674/54235156-ed079080-4510-11e9-9d4d-9f27c87e16e5.png)
Commits
-------
673b58b964
[Messenger] add welcome notice when running the command
This commit is contained in:
commit
b6f3932004
@ -94,15 +94,15 @@ EOF
|
|||||||
*/
|
*/
|
||||||
protected function interact(InputInterface $input, OutputInterface $output)
|
protected function interact(InputInterface $input, OutputInterface $output)
|
||||||
{
|
{
|
||||||
$style = new SymfonyStyle($input, $output);
|
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||||
|
|
||||||
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
||||||
if (null === $receiverName) {
|
if (null === $receiverName) {
|
||||||
$style->block('Missing receiver argument.', null, 'error', ' ', true);
|
$io->block('Missing receiver argument.', null, 'error', ' ', true);
|
||||||
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
|
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
|
||||||
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
|
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
|
||||||
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
|
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
|
||||||
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
|
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
|
||||||
$input->setArgument('receiver', $alternatives[0]);
|
$input->setArgument('receiver', $alternatives[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -111,17 +111,17 @@ EOF
|
|||||||
$busName = $input->getOption('bus');
|
$busName = $input->getOption('bus');
|
||||||
if ($this->busNames && !$this->busLocator->has($busName)) {
|
if ($this->busNames && !$this->busLocator->has($busName)) {
|
||||||
if (null === $busName) {
|
if (null === $busName) {
|
||||||
$style->block('Missing bus argument.', null, 'error', ' ', true);
|
$io->block('Missing bus argument.', null, 'error', ' ', true);
|
||||||
$input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames));
|
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
|
||||||
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
|
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
|
||||||
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
|
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
|
||||||
|
|
||||||
if (1 === \count($alternatives)) {
|
if (1 === \count($alternatives)) {
|
||||||
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
|
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
|
||||||
$input->setOption('bus', $alternatives[0]);
|
$input->setOption('bus', $alternatives[0]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
|
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -143,18 +143,37 @@ EOF
|
|||||||
$receiver = $this->receiverLocator->get($receiverName);
|
$receiver = $this->receiverLocator->get($receiverName);
|
||||||
$bus = $this->busLocator->get($busName);
|
$bus = $this->busLocator->get($busName);
|
||||||
|
|
||||||
|
$stopsWhen = [];
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
|
$stopsWhen[] = "processed {$limit} messages";
|
||||||
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
|
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($memoryLimit = $input->getOption('memory-limit')) {
|
if ($memoryLimit = $input->getOption('memory-limit')) {
|
||||||
|
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
|
||||||
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($timeLimit = $input->getOption('time-limit')) {
|
if ($timeLimit = $input->getOption('time-limit')) {
|
||||||
|
$stopsWhen[] = "been running for {$timeLimit}s";
|
||||||
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
|
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||||
|
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
|
||||||
|
|
||||||
|
if ($stopsWhen) {
|
||||||
|
$last = array_pop($stopsWhen);
|
||||||
|
$stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
|
||||||
|
$io->comment("The worker will automatically exit once it has {$stopsWhen}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
$io->comment('Quit the worker with CONTROL-C.');
|
||||||
|
|
||||||
|
if (!$output->isDebug()) {
|
||||||
|
$io->comment('Re-run the command with a -vvv option to see logs about consumed messages.');
|
||||||
|
}
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
@ -171,7 +190,7 @@ EOF
|
|||||||
$max = (int) $max;
|
$max = (int) $max;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (substr($memoryLimit, -1)) {
|
switch (substr(rtrim($memoryLimit, 'b'), -1)) {
|
||||||
case 't': $max *= 1024;
|
case 't': $max *= 1024;
|
||||||
// no break
|
// no break
|
||||||
case 'g': $max *= 1024;
|
case 'g': $max *= 1024;
|
||||||
|
@ -18,6 +18,8 @@ use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
|||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*
|
*
|
||||||
* @experimental in 4.2
|
* @experimental in 4.2
|
||||||
|
*
|
||||||
|
* @final
|
||||||
*/
|
*/
|
||||||
class Worker
|
class Worker
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user