bug #32497 [Messenger] DispatchAfterCurrentBusMiddleware does not cancel messages from delayed handlers (Nyholm, BastienClement)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] DispatchAfterCurrentBusMiddleware does not cancel messages from delayed handlers | Q | A | ------------- | --- | Branch? | 4.3 for bug fixes | Bug fix? | yes | New feature? | no | BC breaks? | no | Deprecations? |no | Tests pass? | yes (thanks @Nyholm) | Fixed tickets | #32370 | License | MIT | Doc PR | - This is a fix for #32370. There is no need for anything sophisticated. There is no recursion or fancy stuff going on, just a queue of message handled sequentially. A simple variable is enough to keep track of the queue state. Thanks @Nyholm for the test, it would clearly have been the hardest part of the job. Commits -------1f5c8a6790
Cancel delayed message if handler fails35c76a385d
Added tests for #32370
This commit is contained in:
commit
d1d0943f80
@ -81,12 +81,16 @@ class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
|
|||||||
// "Root dispatch" call is finished, dispatch stored messages.
|
// "Root dispatch" call is finished, dispatch stored messages.
|
||||||
$exceptions = [];
|
$exceptions = [];
|
||||||
while (null !== $queueItem = array_shift($this->queue)) {
|
while (null !== $queueItem = array_shift($this->queue)) {
|
||||||
|
// Save how many messages are left in queue before handling the message
|
||||||
|
$queueLengthBefore = \count($this->queue);
|
||||||
try {
|
try {
|
||||||
// Execute the stored messages
|
// Execute the stored messages
|
||||||
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
|
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
|
||||||
} catch (\Exception $exception) {
|
} catch (\Exception $exception) {
|
||||||
// Gather all exceptions
|
// Gather all exceptions
|
||||||
$exceptions[] = $exception;
|
$exceptions[] = $exception;
|
||||||
|
// Restore queue to previous state
|
||||||
|
$this->queue = \array_slice($this->queue, 0, $queueLengthBefore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +99,86 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase
|
|||||||
$messageBus->dispatch($message);
|
$messageBus->dispatch($message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testLongChainWithExceptions()
|
||||||
|
{
|
||||||
|
$command = new DummyMessage('Level 0');
|
||||||
|
|
||||||
|
$eventL1a = new DummyEvent('Event level 1A');
|
||||||
|
$eventL1b = new DummyEvent('Event level 1B'); // will dispatch 2 more events
|
||||||
|
$eventL1c = new DummyEvent('Event level 1C');
|
||||||
|
|
||||||
|
$eventL2a = new DummyEvent('Event level 2A'); // Will dispatch 1 event and throw exception
|
||||||
|
$eventL2b = new DummyEvent('Event level 2B'); // Will dispatch 1 event
|
||||||
|
|
||||||
|
$eventL3a = new DummyEvent('Event level 3A'); // This should never get handled.
|
||||||
|
$eventL3b = new DummyEvent('Event level 3B');
|
||||||
|
|
||||||
|
$middleware = new DispatchAfterCurrentBusMiddleware();
|
||||||
|
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
|
||||||
|
|
||||||
|
$eventBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// The command bus will dispatch 3 events.
|
||||||
|
$commandBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
new DispatchingMiddleware($eventBus, [
|
||||||
|
new Envelope($eventL1a, [new DispatchAfterCurrentBusStamp()]),
|
||||||
|
new Envelope($eventL1b, [new DispatchAfterCurrentBusStamp()]),
|
||||||
|
new Envelope($eventL1c, [new DispatchAfterCurrentBusStamp()]),
|
||||||
|
]),
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Expect main dispatched message to be handled first:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 0, $command);
|
||||||
|
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 1, $eventL1a);
|
||||||
|
|
||||||
|
// Handling $eventL1b will dispatch 2 more events
|
||||||
|
$handlingMiddleware->expects($this->at(2))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL1b) {
|
||||||
|
return $envelope->getMessage() === $eventL1b;
|
||||||
|
}))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL2a, $eventL2b) {
|
||||||
|
$envelope1 = new Envelope($eventL2a, [new DispatchAfterCurrentBusStamp()]);
|
||||||
|
$eventBus->dispatch($envelope1);
|
||||||
|
$eventBus->dispatch(new Envelope($eventL2b, [new DispatchAfterCurrentBusStamp()]));
|
||||||
|
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 3, $eventL1c);
|
||||||
|
|
||||||
|
// Handle $eventL2a will dispatch event and throw exception
|
||||||
|
$handlingMiddleware->expects($this->at(4))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL2a) {
|
||||||
|
return $envelope->getMessage() === $eventL2a;
|
||||||
|
}))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL3a) {
|
||||||
|
$eventBus->dispatch(new Envelope($eventL3a, [new DispatchAfterCurrentBusStamp()]));
|
||||||
|
|
||||||
|
throw new \RuntimeException('Some exception while handling Event level 2a');
|
||||||
|
});
|
||||||
|
|
||||||
|
// Make sure $eventL2b is handled, since it was dispatched from $eventL1b
|
||||||
|
$handlingMiddleware->expects($this->at(5))->method('handle')->with($this->callback(function (Envelope $envelope) use ($eventL2b) {
|
||||||
|
return $envelope->getMessage() === $eventL2b;
|
||||||
|
}))->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL3b) {
|
||||||
|
$eventBus->dispatch(new Envelope($eventL3b, [new DispatchAfterCurrentBusStamp()]));
|
||||||
|
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
});
|
||||||
|
|
||||||
|
// We dont handle exception L3a since L2a threw an exception.
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 6, $eventL3b);
|
||||||
|
|
||||||
|
// Note: $eventL3a should not be handled.
|
||||||
|
|
||||||
|
$this->expectException(DelayedMessageHandlingException::class);
|
||||||
|
$this->expectExceptionMessage('RuntimeException: Some exception while handling Event level 2a');
|
||||||
|
|
||||||
|
$commandBus->dispatch($command);
|
||||||
|
}
|
||||||
|
|
||||||
public function testHandleDelayedEventFromQueue()
|
public function testHandleDelayedEventFromQueue()
|
||||||
{
|
{
|
||||||
$message = new DummyMessage('Hello');
|
$message = new DummyMessage('Hello');
|
||||||
|
Reference in New Issue
Block a user