diff --git a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php index 19c714d1b8..05ee86ffeb 100644 --- a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php @@ -81,12 +81,16 @@ class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface // "Root dispatch" call is finished, dispatch stored messages. $exceptions = []; while (null !== $queueItem = array_shift($this->queue)) { + // Save how many messages are left in queue before handling the message + $queueLengthBefore = \count($this->queue); try { // Execute the stored messages $queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack()); } catch (\Exception $exception) { // Gather all exceptions $exceptions[] = $exception; + // Restore queue to previous state + $this->queue = \array_slice($this->queue, 0, $queueLengthBefore); } } diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php index fe6d190c10..3054429e29 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php @@ -99,6 +99,86 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase $messageBus->dispatch($message); } + public function testLongChainWithExceptions() + { + $command = new DummyMessage('Level 0'); + + $eventL1a = new DummyEvent('Event level 1A'); + $eventL1b = new DummyEvent('Event level 1B'); // will dispatch 2 more events + $eventL1c = new DummyEvent('Event level 1C'); + + $eventL2a = new DummyEvent('Event level 2A'); // Will dispatch 1 event and throw exception + $eventL2b = new DummyEvent('Event level 2B'); // Will dispatch 1 event + + $eventL3a = new DummyEvent('Event level 3A'); // This should never get handled. + $eventL3b = new DummyEvent('Event level 3B'); + + $middleware = new DispatchAfterCurrentBusMiddleware(); + $handlingMiddleware = $this->createMock(MiddlewareInterface::class); + + $eventBus = new MessageBus([ + $middleware, + $handlingMiddleware, + ]); + + // The command bus will dispatch 3 events. + $commandBus = new MessageBus([ + $middleware, + new DispatchingMiddleware($eventBus, [ + new Envelope($eventL1a, [new DispatchAfterCurrentBusStamp()]), + new Envelope($eventL1b, [new DispatchAfterCurrentBusStamp()]), + new Envelope($eventL1c, [new DispatchAfterCurrentBusStamp()]), + ]), + $handlingMiddleware, + ]); + + // Expect main dispatched message to be handled first: + $this->expectHandledMessage($handlingMiddleware, 0, $command); + + $this->expectHandledMessage($handlingMiddleware, 1, $eventL1a); + + // Handling $eventL1b will dispatch 2 more events + $handlingMiddleware->expects($this->at(2))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL1b) { + return $envelope->getMessage() === $eventL1b; + }))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL2a, $eventL2b) { + $envelope1 = new Envelope($eventL2a, [new DispatchAfterCurrentBusStamp()]); + $eventBus->dispatch($envelope1); + $eventBus->dispatch(new Envelope($eventL2b, [new DispatchAfterCurrentBusStamp()])); + + return $stack->next()->handle($envelope, $stack); + }); + + $this->expectHandledMessage($handlingMiddleware, 3, $eventL1c); + + // Handle $eventL2a will dispatch event and throw exception + $handlingMiddleware->expects($this->at(4))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL2a) { + return $envelope->getMessage() === $eventL2a; + }))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL3a) { + $eventBus->dispatch(new Envelope($eventL3a, [new DispatchAfterCurrentBusStamp()])); + + throw new \RuntimeException('Some exception while handling Event level 2a'); + }); + + // Make sure $eventL2b is handled, since it was dispatched from $eventL1b + $handlingMiddleware->expects($this->at(5))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL2b) { + return $envelope->getMessage() === $eventL2b; + }))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL3b) { + $eventBus->dispatch(new Envelope($eventL3b, [new DispatchAfterCurrentBusStamp()])); + + return $stack->next()->handle($envelope, $stack); + }); + + // We dont handle exception L3a since L2a threw an exception. + $this->expectHandledMessage($handlingMiddleware, 6, $eventL3b); + + // Note: $eventL3a should not be handled. + + $this->expectException(DelayedMessageHandlingException::class); + $this->expectExceptionMessage('RuntimeException: Some exception while handling Event level 2a'); + + $commandBus->dispatch($command); + } + public function testHandleDelayedEventFromQueue() { $message = new DummyMessage('Hello');