feature #28849 [Messenger] Support for handling messages after current bus is finished (Nyholm)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Support for handling messages after current bus is finished

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | https://github.com/symfony/symfony-docs/pull/10015

This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder".

```php
class CreateUserHandler
{
    private $em;
    private $eventBus;

    public function __construct(MessageBus $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(CreateUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        $message = new UserCreatedEvent($command->getUuid();
        $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus()));
    }
}
```

Note that this `DispatchAfterCurrentBusMiddleware` is added automatically as the first middleware.

2019-03-13: I updated the PR description.

Commits
-------

903355fbcc Support for handling messages after current bus is finished
This commit is contained in:
Samuel ROZE 2019-03-19 12:36:52 +07:00
commit b15eee97b4
8 changed files with 368 additions and 2 deletions

View File

@ -1608,7 +1608,7 @@ class FrameworkExtension extends Extension
}
$defaultMiddleware = [
'before' => [],
'before' => [['id' => 'dispatch_after_current_bus']],
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
];
foreach ($config['buses'] as $busId => $bus) {

View File

@ -39,6 +39,8 @@
</call>
</service>
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
<argument type="service" id="validator" />
</service>

View File

@ -702,12 +702,14 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'dispatch_after_current_bus'],
['id' => 'send_message'],
['id' => 'handle_message'],
], $container->getParameter('messenger.bus.commands.middleware'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'dispatch_after_current_bus'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
['id' => 'handle_message'],

View File

@ -43,7 +43,7 @@
"symfony/form": "^4.3",
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
"symfony/messenger": "^4.2",
"symfony/messenger": "^4.3",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-core": "~3.4|~4.0",

View File

@ -0,0 +1,48 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
* some handlers caused an exception. This exception contains all those handler exceptions.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
{
private $exceptions;
public function __construct(array $exceptions)
{
$exceptionMessages = implode(", \n", array_map(
function (\Throwable $e) {
return \get_class($e).': '.$e->getMessage();
},
$exceptions
));
if (1 === \count($exceptions)) {
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
} else {
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
}
$this->exceptions = $exceptions;
parent::__construct($message, 0, $exceptions[0]);
}
public function getExceptions(): array
{
return $this->exceptions;
}
}

View File

@ -0,0 +1,128 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
/**
* Allow to configure messages to be handled after the current bus is finished.
*
* I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
* will actually be handled once the current message being dispatched is fully
* handled.
*
* For instance, using this middleware before the DoctrineTransactionMiddleware
* means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
* handled after the Doctrine transaction has been committed.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
{
/**
* @var QueuedEnvelope[] A queue of messages and next middleware
*/
private $queue = [];
/**
* @var bool this property is used to signal if we are inside a the first/root call to
* MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
*/
private $isRootDispatchCallRunning = false;
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
if (!$this->isRootDispatchCallRunning) {
throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
}
$this->queue[] = new QueuedEnvelope($envelope, $stack);
return $envelope;
}
if ($this->isRootDispatchCallRunning) {
/*
* A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
* but the message does not have the stamp. So, process it like normal.
*/
return $stack->next()->handle($envelope, $stack);
}
// First time we get here, mark as inside a "root dispatch" call:
$this->isRootDispatchCallRunning = true;
try {
// Execute the whole middleware stack & message handling for main dispatch:
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
} catch (\Throwable $exception) {
/*
* Whenever an exception occurs while handling a message that has
* queued other messages, we drop the queued ones.
* This is intentional since the queued commands were likely dependent
* on the preceding command.
*/
$this->queue = [];
$this->isRootDispatchCallRunning = false;
throw $exception;
}
// "Root dispatch" call is finished, dispatch stored messages.
$exceptions = [];
while (null !== $queueItem = array_shift($this->queue)) {
try {
// Execute the stored messages
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
} catch (\Exception $exception) {
// Gather all exceptions
$exceptions[] = $exception;
}
}
$this->isRootDispatchCallRunning = false;
if (\count($exceptions) > 0) {
throw new DelayedMessageHandlingException($exceptions);
}
return $returnedEnvelope;
}
}
/**
* @internal
*/
final class QueuedEnvelope
{
/** @var Envelope */
private $envelope;
/** @var StackInterface */
private $stack;
public function __construct(Envelope $envelope, StackInterface $stack)
{
$this->envelope = $envelope;
$this->stack = $stack;
}
public function getEnvelope(): Envelope
{
return $this->envelope;
}
public function getStack(): StackInterface
{
return $this->stack;
}
}

View File

@ -0,0 +1,25 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
declare(strict_types=1);
namespace Symfony\Component\Messenger\Stamp;
/**
* Marker item to tell this message should be handled in after the current bus has finished.
*
* @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class DispatchAfterCurrentBusStamp implements StampInterface
{
}

View File

@ -0,0 +1,161 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class DispatchAfterCurrentBusMiddlewareTest extends TestCase
{
public function testEventsInNewTransactionAreHandledAfterMainMessage()
{
$message = new DummyMessage('Hello');
$firstEvent = new DummyEvent('First event');
$secondEvent = new DummyEvent('Second event');
$thirdEvent = new DummyEvent('Third event');
$middleware = new DispatchAfterCurrentBusMiddleware();
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
$eventBus = new MessageBus([
$middleware,
$handlingMiddleware,
]);
$messageBus = new MessageBus([
$middleware,
new DispatchingMiddleware($eventBus, [
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
$thirdEvent, // Not in a new transaction
]),
$handlingMiddleware,
]);
// Third event is dispatch within main dispatch, but before its handling:
$this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
// Then expect main dispatched message to be handled first:
$this->expectHandledMessage($handlingMiddleware, 1, $message);
// Then, expect events in new transaction to be handled next, in dispatched order:
$this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
$this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
$messageBus->dispatch($message);
}
public function testThrowingEventsHandlingWontStopExecution()
{
$message = new DummyMessage('Hello');
$firstEvent = new DummyEvent('First event');
$secondEvent = new DummyEvent('Second event');
$middleware = new DispatchAfterCurrentBusMiddleware();
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
$eventBus = new MessageBus([
$middleware,
$handlingMiddleware,
]);
$messageBus = new MessageBus([
$middleware,
new DispatchingMiddleware($eventBus, [
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
]),
$handlingMiddleware,
]);
// Expect main dispatched message to be handled first:
$this->expectHandledMessage($handlingMiddleware, 0, $message);
// Then, expect events in new transaction to be handled next, in dispatched order:
$this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
// Next event is still handled despite the previous exception:
$this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
$this->expectException(DelayedMessageHandlingException::class);
$this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
$messageBus->dispatch($message);
}
/**
* @param MiddlewareInterface|MockObject $handlingMiddleware
*/
private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
{
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
return $envelope->getMessage() === $message;
}))->willReturnCallback(function ($envelope, StackInterface $stack) {
return $stack->next()->handle($envelope, $stack);
});
}
/**
* @param MiddlewareInterface|MockObject $handlingMiddleware
*/
private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
{
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
return $envelope->getMessage() === $message;
}))->willReturnCallback(function () use ($throwable) {
throw $throwable;
});
}
}
class DummyEvent
{
private $message;
public function __construct(string $message)
{
$this->message = $message;
}
public function getMessage(): string
{
return $this->message;
}
}
class DispatchingMiddleware implements MiddlewareInterface
{
private $bus;
private $messages;
public function __construct(MessageBusInterface $bus, array $messages)
{
$this->bus = $bus;
$this->messages = $messages;
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
foreach ($this->messages as $event) {
$this->bus->dispatch($event);
}
return $stack->next()->handle($envelope, $stack);
}
}