bug #33298 [Messenger] Stop worker when it should stop (tienvx)
This PR was merged into the 4.3 branch.
Discussion
----------
[Messenger] Stop worker when it should stop
| Q | A
| ------------- | ---
| Branch? | 4.3 <!-- see below -->
| Bug fix? | yes
| New feature? | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks? | no <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass? | yes <!-- please add some, will be required by reviewers -->
| Fixed tickets | NA <!-- #-prefixed issue number(s), if any -->
| License | MIT
| Doc PR | NA <!-- required for new features -->
<!--
Replace this notice by a short README for your feature/bugfix. This will help people
understand your PR and can be used as a start for the documentation.
Additionally (see https://symfony.com/roadmap):
- Bug fixes must be submitted against the lowest maintained branch where they apply
(lowest branches are regularly merged to upper ones so they get the fixes too).
- Features and deprecations must be submitted against branch 4.4.
- Legacy code removals go to the master branch.
-->
There are 2 things about this PR:
* This PR fix the bug when using `limit`, `memory-limit`, `time-limit` options with command `messenger:consume`, these options does not work if the receiver return multiple messages
* This PR is the continue work of https://github.com/symfony/symfony/pull/32783
Commits
-------
5c1f3a2414
[Messenger] Stop worker when it should stop
This commit is contained in:
commit
113fa0bc50
@ -27,6 +27,7 @@ use Symfony\Component\Messenger\Stamp\SentStamp;
|
|||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
|
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -361,6 +362,30 @@ class WorkerTest extends TestCase
|
|||||||
// make sure they were processed in the correct order
|
// make sure they were processed in the correct order
|
||||||
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testWorkerWithDecorator()
|
||||||
|
{
|
||||||
|
$envelope1 = new Envelope(new DummyMessage('message1'));
|
||||||
|
$envelope2 = new Envelope(new DummyMessage('message2'));
|
||||||
|
$envelope3 = new Envelope(new DummyMessage('message3'));
|
||||||
|
|
||||||
|
$receiver = new DummyReceiver([
|
||||||
|
[$envelope1, $envelope2, $envelope3],
|
||||||
|
]);
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
|
$worker = new Worker([$receiver], $bus);
|
||||||
|
$workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2);
|
||||||
|
$processedEnvelopes = [];
|
||||||
|
$workerWithDecorator->run([], function (?Envelope $envelope) use ($worker, &$processedEnvelopes) {
|
||||||
|
if (null !== $envelope) {
|
||||||
|
$processedEnvelopes[] = $envelope;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->assertSame([$envelope1, $envelope2], $processedEnvelopes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DummyReceiver implements ReceiverInterface
|
class DummyReceiver implements ReceiverInterface
|
||||||
|
@ -92,6 +92,10 @@ class Worker implements WorkerInterface
|
|||||||
|
|
||||||
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
|
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
|
||||||
$onHandled($envelope);
|
$onHandled($envelope);
|
||||||
|
|
||||||
|
if ($this->shouldStop) {
|
||||||
|
break 2;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// after handling a single receiver, quit and start the loop again
|
// after handling a single receiver, quit and start the loop again
|
||||||
|
Reference in New Issue
Block a user