From 5c1f3a2414a6b580b7989b90d6ad02278e32e696 Mon Sep 17 00:00:00 2001 From: "tien.xuan.vo" Date: Fri, 23 Aug 2019 10:48:20 +0700 Subject: [PATCH] [Messenger] Stop worker when it should stop --- .../Component/Messenger/Tests/WorkerTest.php | 25 +++++++++++++++++++ src/Symfony/Component/Messenger/Worker.php | 4 +++ 2 files changed, 29 insertions(+) diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index ac8c2a7ad8..8c8e54dda9 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -27,6 +27,7 @@ use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; +use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; /** @@ -361,6 +362,30 @@ class WorkerTest extends TestCase // make sure they were processed in the correct order $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); } + + public function testWorkerWithDecorator() + { + $envelope1 = new Envelope(new DummyMessage('message1')); + $envelope2 = new Envelope(new DummyMessage('message2')); + $envelope3 = new Envelope(new DummyMessage('message3')); + + $receiver = new DummyReceiver([ + [$envelope1, $envelope2, $envelope3], + ]); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker([$receiver], $bus); + $workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2); + $processedEnvelopes = []; + $workerWithDecorator->run([], function (?Envelope $envelope) use ($worker, &$processedEnvelopes) { + if (null !== $envelope) { + $processedEnvelopes[] = $envelope; + } + }); + + $this->assertSame([$envelope1, $envelope2], $processedEnvelopes); + } } class DummyReceiver implements ReceiverInterface diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index d0c98b76ff..6bfb4cb675 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -92,6 +92,10 @@ class Worker implements WorkerInterface $this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null); $onHandled($envelope); + + if ($this->shouldStop) { + break 2; + } } // after handling a single receiver, quit and start the loop again