Support for handling messages after current bus is finished
Co-authored-by: Maxime Steinhausser <ogizanagi@users.noreply.github.com>
This commit is contained in:
parent
2ff8c19609
commit
903355fbcc
@ -1608,7 +1608,7 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
|
|
||||||
$defaultMiddleware = [
|
$defaultMiddleware = [
|
||||||
'before' => [],
|
'before' => [['id' => 'dispatch_after_current_bus']],
|
||||||
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
|
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
|
||||||
];
|
];
|
||||||
foreach ($config['buses'] as $busId => $bus) {
|
foreach ($config['buses'] as $busId => $bus) {
|
||||||
|
@ -39,6 +39,8 @@
|
|||||||
</call>
|
</call>
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
|
||||||
|
|
||||||
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
|
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
|
||||||
<argument type="service" id="validator" />
|
<argument type="service" id="validator" />
|
||||||
</service>
|
</service>
|
||||||
|
@ -702,12 +702,14 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertTrue($container->has('messenger.bus.commands'));
|
$this->assertTrue($container->has('messenger.bus.commands'));
|
||||||
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'send_message'],
|
['id' => 'send_message'],
|
||||||
['id' => 'handle_message'],
|
['id' => 'handle_message'],
|
||||||
], $container->getParameter('messenger.bus.commands.middleware'));
|
], $container->getParameter('messenger.bus.commands.middleware'));
|
||||||
$this->assertTrue($container->has('messenger.bus.events'));
|
$this->assertTrue($container->has('messenger.bus.events'));
|
||||||
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
||||||
['id' => 'send_message'],
|
['id' => 'send_message'],
|
||||||
['id' => 'handle_message'],
|
['id' => 'handle_message'],
|
||||||
|
@ -43,7 +43,7 @@
|
|||||||
"symfony/form": "^4.3",
|
"symfony/form": "^4.3",
|
||||||
"symfony/expression-language": "~3.4|~4.0",
|
"symfony/expression-language": "~3.4|~4.0",
|
||||||
"symfony/http-client": "^4.3",
|
"symfony/http-client": "^4.3",
|
||||||
"symfony/messenger": "^4.2",
|
"symfony/messenger": "^4.3",
|
||||||
"symfony/mime": "^4.3",
|
"symfony/mime": "^4.3",
|
||||||
"symfony/process": "~3.4|~4.0",
|
"symfony/process": "~3.4|~4.0",
|
||||||
"symfony/security-core": "~3.4|~4.0",
|
"symfony/security-core": "~3.4|~4.0",
|
||||||
|
@ -0,0 +1,48 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
|
||||||
|
* some handlers caused an exception. This exception contains all those handler exceptions.
|
||||||
|
*
|
||||||
|
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
|
||||||
|
*/
|
||||||
|
class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
|
||||||
|
{
|
||||||
|
private $exceptions;
|
||||||
|
|
||||||
|
public function __construct(array $exceptions)
|
||||||
|
{
|
||||||
|
$exceptionMessages = implode(", \n", array_map(
|
||||||
|
function (\Throwable $e) {
|
||||||
|
return \get_class($e).': '.$e->getMessage();
|
||||||
|
},
|
||||||
|
$exceptions
|
||||||
|
));
|
||||||
|
|
||||||
|
if (1 === \count($exceptions)) {
|
||||||
|
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
|
||||||
|
} else {
|
||||||
|
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->exceptions = $exceptions;
|
||||||
|
|
||||||
|
parent::__construct($message, 0, $exceptions[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getExceptions(): array
|
||||||
|
{
|
||||||
|
return $this->exceptions;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,128 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allow to configure messages to be handled after the current bus is finished.
|
||||||
|
*
|
||||||
|
* I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
|
||||||
|
* will actually be handled once the current message being dispatched is fully
|
||||||
|
* handled.
|
||||||
|
*
|
||||||
|
* For instance, using this middleware before the DoctrineTransactionMiddleware
|
||||||
|
* means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
|
||||||
|
* handled after the Doctrine transaction has been committed.
|
||||||
|
*
|
||||||
|
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
|
||||||
|
*/
|
||||||
|
class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @var QueuedEnvelope[] A queue of messages and next middleware
|
||||||
|
*/
|
||||||
|
private $queue = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var bool this property is used to signal if we are inside a the first/root call to
|
||||||
|
* MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
|
||||||
|
*/
|
||||||
|
private $isRootDispatchCallRunning = false;
|
||||||
|
|
||||||
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
|
{
|
||||||
|
if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
|
||||||
|
if (!$this->isRootDispatchCallRunning) {
|
||||||
|
throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
|
||||||
|
}
|
||||||
|
$this->queue[] = new QueuedEnvelope($envelope, $stack);
|
||||||
|
|
||||||
|
return $envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->isRootDispatchCallRunning) {
|
||||||
|
/*
|
||||||
|
* A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
|
||||||
|
* but the message does not have the stamp. So, process it like normal.
|
||||||
|
*/
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
}
|
||||||
|
|
||||||
|
// First time we get here, mark as inside a "root dispatch" call:
|
||||||
|
$this->isRootDispatchCallRunning = true;
|
||||||
|
try {
|
||||||
|
// Execute the whole middleware stack & message handling for main dispatch:
|
||||||
|
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
/*
|
||||||
|
* Whenever an exception occurs while handling a message that has
|
||||||
|
* queued other messages, we drop the queued ones.
|
||||||
|
* This is intentional since the queued commands were likely dependent
|
||||||
|
* on the preceding command.
|
||||||
|
*/
|
||||||
|
$this->queue = [];
|
||||||
|
$this->isRootDispatchCallRunning = false;
|
||||||
|
|
||||||
|
throw $exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
// "Root dispatch" call is finished, dispatch stored messages.
|
||||||
|
$exceptions = [];
|
||||||
|
while (null !== $queueItem = array_shift($this->queue)) {
|
||||||
|
try {
|
||||||
|
// Execute the stored messages
|
||||||
|
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
|
||||||
|
} catch (\Exception $exception) {
|
||||||
|
// Gather all exceptions
|
||||||
|
$exceptions[] = $exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->isRootDispatchCallRunning = false;
|
||||||
|
if (\count($exceptions) > 0) {
|
||||||
|
throw new DelayedMessageHandlingException($exceptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $returnedEnvelope;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
final class QueuedEnvelope
|
||||||
|
{
|
||||||
|
/** @var Envelope */
|
||||||
|
private $envelope;
|
||||||
|
|
||||||
|
/** @var StackInterface */
|
||||||
|
private $stack;
|
||||||
|
|
||||||
|
public function __construct(Envelope $envelope, StackInterface $stack)
|
||||||
|
{
|
||||||
|
$this->envelope = $envelope;
|
||||||
|
$this->stack = $stack;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getEnvelope(): Envelope
|
||||||
|
{
|
||||||
|
return $this->envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getStack(): StackInterface
|
||||||
|
{
|
||||||
|
return $this->stack;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marker item to tell this message should be handled in after the current bus has finished.
|
||||||
|
*
|
||||||
|
* @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
|
||||||
|
*
|
||||||
|
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
|
||||||
|
*/
|
||||||
|
class DispatchAfterCurrentBusStamp implements StampInterface
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,161 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\MockObject\MockObject;
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
|
||||||
|
use Symfony\Component\Messenger\MessageBus;
|
||||||
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
|
||||||
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
|
||||||
|
class DispatchAfterCurrentBusMiddlewareTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testEventsInNewTransactionAreHandledAfterMainMessage()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Hello');
|
||||||
|
|
||||||
|
$firstEvent = new DummyEvent('First event');
|
||||||
|
$secondEvent = new DummyEvent('Second event');
|
||||||
|
$thirdEvent = new DummyEvent('Third event');
|
||||||
|
|
||||||
|
$middleware = new DispatchAfterCurrentBusMiddleware();
|
||||||
|
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
|
||||||
|
|
||||||
|
$eventBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$messageBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
new DispatchingMiddleware($eventBus, [
|
||||||
|
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
|
||||||
|
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
|
||||||
|
$thirdEvent, // Not in a new transaction
|
||||||
|
]),
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Third event is dispatch within main dispatch, but before its handling:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
|
||||||
|
// Then expect main dispatched message to be handled first:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 1, $message);
|
||||||
|
// Then, expect events in new transaction to be handled next, in dispatched order:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
|
||||||
|
|
||||||
|
$messageBus->dispatch($message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testThrowingEventsHandlingWontStopExecution()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Hello');
|
||||||
|
|
||||||
|
$firstEvent = new DummyEvent('First event');
|
||||||
|
$secondEvent = new DummyEvent('Second event');
|
||||||
|
|
||||||
|
$middleware = new DispatchAfterCurrentBusMiddleware();
|
||||||
|
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
|
||||||
|
|
||||||
|
$eventBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$messageBus = new MessageBus([
|
||||||
|
$middleware,
|
||||||
|
new DispatchingMiddleware($eventBus, [
|
||||||
|
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
|
||||||
|
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
|
||||||
|
]),
|
||||||
|
$handlingMiddleware,
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Expect main dispatched message to be handled first:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 0, $message);
|
||||||
|
// Then, expect events in new transaction to be handled next, in dispatched order:
|
||||||
|
$this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
|
||||||
|
// Next event is still handled despite the previous exception:
|
||||||
|
$this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
|
||||||
|
|
||||||
|
$this->expectException(DelayedMessageHandlingException::class);
|
||||||
|
$this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
|
||||||
|
|
||||||
|
$messageBus->dispatch($message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param MiddlewareInterface|MockObject $handlingMiddleware
|
||||||
|
*/
|
||||||
|
private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
|
||||||
|
{
|
||||||
|
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
|
||||||
|
return $envelope->getMessage() === $message;
|
||||||
|
}))->willReturnCallback(function ($envelope, StackInterface $stack) {
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param MiddlewareInterface|MockObject $handlingMiddleware
|
||||||
|
*/
|
||||||
|
private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
|
||||||
|
{
|
||||||
|
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
|
||||||
|
return $envelope->getMessage() === $message;
|
||||||
|
}))->willReturnCallback(function () use ($throwable) {
|
||||||
|
throw $throwable;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyEvent
|
||||||
|
{
|
||||||
|
private $message;
|
||||||
|
|
||||||
|
public function __construct(string $message)
|
||||||
|
{
|
||||||
|
$this->message = $message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getMessage(): string
|
||||||
|
{
|
||||||
|
return $this->message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DispatchingMiddleware implements MiddlewareInterface
|
||||||
|
{
|
||||||
|
private $bus;
|
||||||
|
private $messages;
|
||||||
|
|
||||||
|
public function __construct(MessageBusInterface $bus, array $messages)
|
||||||
|
{
|
||||||
|
$this->bus = $bus;
|
||||||
|
$this->messages = $messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
|
{
|
||||||
|
foreach ($this->messages as $event) {
|
||||||
|
$this->bus->dispatch($event);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user