diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 673c89f9fe..ef6a2147e3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1608,7 +1608,7 @@ class FrameworkExtension extends Extension } $defaultMiddleware = [ - 'before' => [], + 'before' => [['id' => 'dispatch_after_current_bus']], 'after' => [['id' => 'send_message'], ['id' => 'handle_message']], ]; foreach ($config['buses'] as $busId => $bus) { diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 5e3c042f47..d50d13bde2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -39,6 +39,8 @@ + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 6a98f7c184..0c0b5cc6c3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -702,12 +702,14 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertTrue($container->has('messenger.bus.commands')); $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); $this->assertEquals([ + ['id' => 'dispatch_after_current_bus'], ['id' => 'send_message'], ['id' => 'handle_message'], ], $container->getParameter('messenger.bus.commands.middleware')); $this->assertTrue($container->has('messenger.bus.events')); $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); $this->assertEquals([ + ['id' => 'dispatch_after_current_bus'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], ['id' => 'send_message'], ['id' => 'handle_message'], diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json index 06d16c65d4..f065dafc66 100644 --- a/src/Symfony/Bundle/FrameworkBundle/composer.json +++ b/src/Symfony/Bundle/FrameworkBundle/composer.json @@ -43,7 +43,7 @@ "symfony/form": "^4.3", "symfony/expression-language": "~3.4|~4.0", "symfony/http-client": "^4.3", - "symfony/messenger": "^4.2", + "symfony/messenger": "^4.3", "symfony/mime": "^4.3", "symfony/process": "~3.4|~4.0", "symfony/security-core": "~3.4|~4.0", diff --git a/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php new file mode 100644 index 0000000000..313d6f672c --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php @@ -0,0 +1,48 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * When handling queued messages from {@link DispatchAfterCurrentBusMiddleware}, + * some handlers caused an exception. This exception contains all those handler exceptions. + * + * @author Tobias Nyholm + */ +class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface +{ + private $exceptions; + + public function __construct(array $exceptions) + { + $exceptionMessages = implode(", \n", array_map( + function (\Throwable $e) { + return \get_class($e).': '.$e->getMessage(); + }, + $exceptions + )); + + if (1 === \count($exceptions)) { + $message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages); + } else { + $message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages); + } + + $this->exceptions = $exceptions; + + parent::__construct($message, 0, $exceptions[0]); + } + + public function getExceptions(): array + { + return $this->exceptions; + } +} diff --git a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php new file mode 100644 index 0000000000..4c098c79b7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php @@ -0,0 +1,128 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException; +use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp; + +/** + * Allow to configure messages to be handled after the current bus is finished. + * + * I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp + * will actually be handled once the current message being dispatched is fully + * handled. + * + * For instance, using this middleware before the DoctrineTransactionMiddleware + * means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be + * handled after the Doctrine transaction has been committed. + * + * @author Tobias Nyholm + */ +class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface +{ + /** + * @var QueuedEnvelope[] A queue of messages and next middleware + */ + private $queue = []; + + /** + * @var bool this property is used to signal if we are inside a the first/root call to + * MessageBusInterface::dispatch() or if dispatch has been called inside a message handler + */ + private $isRootDispatchCallRunning = false; + + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) { + if (!$this->isRootDispatchCallRunning) { + throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class)); + } + $this->queue[] = new QueuedEnvelope($envelope, $stack); + + return $envelope; + } + + if ($this->isRootDispatchCallRunning) { + /* + * A call to MessageBusInterface::dispatch() was made from inside the main bus handling, + * but the message does not have the stamp. So, process it like normal. + */ + return $stack->next()->handle($envelope, $stack); + } + + // First time we get here, mark as inside a "root dispatch" call: + $this->isRootDispatchCallRunning = true; + try { + // Execute the whole middleware stack & message handling for main dispatch: + $returnedEnvelope = $stack->next()->handle($envelope, $stack); + } catch (\Throwable $exception) { + /* + * Whenever an exception occurs while handling a message that has + * queued other messages, we drop the queued ones. + * This is intentional since the queued commands were likely dependent + * on the preceding command. + */ + $this->queue = []; + $this->isRootDispatchCallRunning = false; + + throw $exception; + } + + // "Root dispatch" call is finished, dispatch stored messages. + $exceptions = []; + while (null !== $queueItem = array_shift($this->queue)) { + try { + // Execute the stored messages + $queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack()); + } catch (\Exception $exception) { + // Gather all exceptions + $exceptions[] = $exception; + } + } + + $this->isRootDispatchCallRunning = false; + if (\count($exceptions) > 0) { + throw new DelayedMessageHandlingException($exceptions); + } + + return $returnedEnvelope; + } +} + +/** + * @internal + */ +final class QueuedEnvelope +{ + /** @var Envelope */ + private $envelope; + + /** @var StackInterface */ + private $stack; + + public function __construct(Envelope $envelope, StackInterface $stack) + { + $this->envelope = $envelope; + $this->stack = $stack; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } + + public function getStack(): StackInterface + { + return $this->stack; + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php b/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php new file mode 100644 index 0000000000..38222cbc3b --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Marker item to tell this message should be handled in after the current bus has finished. + * + * @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware + * + * @author Tobias Nyholm + */ +class DispatchAfterCurrentBusStamp implements StampInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php new file mode 100644 index 0000000000..6b6d99c4e1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php @@ -0,0 +1,161 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Middleware; + +use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException; +use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; +use Symfony\Component\Messenger\Middleware\MiddlewareInterface; +use Symfony\Component\Messenger\Middleware\StackInterface; +use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class DispatchAfterCurrentBusMiddlewareTest extends TestCase +{ + public function testEventsInNewTransactionAreHandledAfterMainMessage() + { + $message = new DummyMessage('Hello'); + + $firstEvent = new DummyEvent('First event'); + $secondEvent = new DummyEvent('Second event'); + $thirdEvent = new DummyEvent('Third event'); + + $middleware = new DispatchAfterCurrentBusMiddleware(); + $handlingMiddleware = $this->createMock(MiddlewareInterface::class); + + $eventBus = new MessageBus([ + $middleware, + $handlingMiddleware, + ]); + + $messageBus = new MessageBus([ + $middleware, + new DispatchingMiddleware($eventBus, [ + new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()), + new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()), + $thirdEvent, // Not in a new transaction + ]), + $handlingMiddleware, + ]); + + // Third event is dispatch within main dispatch, but before its handling: + $this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent); + // Then expect main dispatched message to be handled first: + $this->expectHandledMessage($handlingMiddleware, 1, $message); + // Then, expect events in new transaction to be handled next, in dispatched order: + $this->expectHandledMessage($handlingMiddleware, 2, $firstEvent); + $this->expectHandledMessage($handlingMiddleware, 3, $secondEvent); + + $messageBus->dispatch($message); + } + + public function testThrowingEventsHandlingWontStopExecution() + { + $message = new DummyMessage('Hello'); + + $firstEvent = new DummyEvent('First event'); + $secondEvent = new DummyEvent('Second event'); + + $middleware = new DispatchAfterCurrentBusMiddleware(); + $handlingMiddleware = $this->createMock(MiddlewareInterface::class); + + $eventBus = new MessageBus([ + $middleware, + $handlingMiddleware, + ]); + + $messageBus = new MessageBus([ + $middleware, + new DispatchingMiddleware($eventBus, [ + new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()), + new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()), + ]), + $handlingMiddleware, + ]); + + // Expect main dispatched message to be handled first: + $this->expectHandledMessage($handlingMiddleware, 0, $message); + // Then, expect events in new transaction to be handled next, in dispatched order: + $this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event')); + // Next event is still handled despite the previous exception: + $this->expectHandledMessage($handlingMiddleware, 2, $secondEvent); + + $this->expectException(DelayedMessageHandlingException::class); + $this->expectExceptionMessage('RuntimeException: Some exception while handling first event'); + + $messageBus->dispatch($message); + } + + /** + * @param MiddlewareInterface|MockObject $handlingMiddleware + */ + private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void + { + $handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) { + return $envelope->getMessage() === $message; + }))->willReturnCallback(function ($envelope, StackInterface $stack) { + return $stack->next()->handle($envelope, $stack); + }); + } + + /** + * @param MiddlewareInterface|MockObject $handlingMiddleware + */ + private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void + { + $handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) { + return $envelope->getMessage() === $message; + }))->willReturnCallback(function () use ($throwable) { + throw $throwable; + }); + } +} + +class DummyEvent +{ + private $message; + + public function __construct(string $message) + { + $this->message = $message; + } + + public function getMessage(): string + { + return $this->message; + } +} + +class DispatchingMiddleware implements MiddlewareInterface +{ + private $bus; + private $messages; + + public function __construct(MessageBusInterface $bus, array $messages) + { + $this->bus = $bus; + $this->messages = $messages; + } + + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + foreach ($this->messages as $event) { + $this->bus->dispatch($event); + } + + return $stack->next()->handle($envelope, $stack); + } +}