feature #28983 [Messenger] make dispatch(), handle() and send() methods return Envelope (nicolas-grekas)
This PR was merged into the 4.2-dev branch.
Discussion
----------
[Messenger] make dispatch(), handle() and send() methods return Envelope
| Q | A
| ------------- | ---
| Branch? | 4.2
| Bug fix? | no
| New feature? | yes
| BC breaks? | no (already broken ;) )
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | -
| License | MIT
| Doc PR | -
Follow up of #28909. We can do better than return `void`: let's return `Envelope`!
This means middleware and senders should also return `Envelope`, so that we can typically read back the stamps that were added during the sending process (eg to get the Kafka queue id).
ping @dunglas as we discussed that first on Slack, and @sroze as we confirmed interest IRL today.
(User handlers don't know anything about envelopes so they still should return `void` - use senders or middleware if you need to populate/read envelopes.)
Commits
-------
4b0e015402
[Messenger] make dispatch(), handle() and send() methods return Envelope
This commit is contained in:
commit
d5771cc55e
@ -27,6 +27,7 @@ use Symfony\Component\HttpFoundation\ResponseHeaderBag;
|
||||
use Symfony\Component\HttpFoundation\StreamedResponse;
|
||||
use Symfony\Component\HttpKernel\Exception\NotFoundHttpException;
|
||||
use Symfony\Component\HttpKernel\HttpKernelInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
|
||||
use Symfony\Component\Security\Core\Exception\AccessDeniedException;
|
||||
use Symfony\Component\Security\Csrf\CsrfToken;
|
||||
@ -394,7 +395,7 @@ trait ControllerTrait
|
||||
*
|
||||
* @final
|
||||
*/
|
||||
protected function dispatchMessage($message)
|
||||
protected function dispatchMessage($message): Envelope
|
||||
{
|
||||
if (!$this->container->has('message_bus')) {
|
||||
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
|
||||
|
@ -6,7 +6,7 @@ CHANGELOG
|
||||
|
||||
* The component is not experimental anymore
|
||||
* All the changes below are BC BREAKS
|
||||
* `MessageBusInterface::dispatch()` and `MiddlewareInterface::handle()` now return `void`
|
||||
* `MessageBusInterface::dispatch()`, `MiddlewareInterface::handle()` and `SenderInterface::send()` return `Envelope`
|
||||
* `MiddlewareInterface::handle()` now require an `Envelope` as first argument and a `StackInterface` as second
|
||||
* `EnvelopeAwareInterface` has been removed
|
||||
* The signature of `Amqp*` classes changed to take a `Connection` as a first argument and an optional
|
||||
@ -31,7 +31,6 @@ CHANGELOG
|
||||
* `AbstractHandlerLocator` is now internal
|
||||
* `HandlerLocatorInterface::resolve()` has been replaced by `getHandler(Envelope $envelope): ?callable` and shouldn't throw when no handlers are found
|
||||
* `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
|
||||
* `SenderInterface::send()` returns `void`
|
||||
* Classes in the `Middleware\Enhancers` sub-namespace have been moved to the `Middleware` one
|
||||
* Classes in the `Asynchronous\Routing` sub-namespace have been moved to the `Transport\Sender\Locator` sub-namespace
|
||||
* The `Asynchronous/Middleware/SendMessageMiddleware` class has been moved to the `Middleware` namespace
|
||||
|
@ -50,11 +50,12 @@ class MessageBus implements MessageBusInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function dispatch($message): void
|
||||
public function dispatch($message): Envelope
|
||||
{
|
||||
if (!\is_object($message)) {
|
||||
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object, but got %s.', __METHOD__, \gettype($message)));
|
||||
}
|
||||
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
|
||||
$middlewareIterator = $this->middlewareAggregate->getIterator();
|
||||
|
||||
while ($middlewareIterator instanceof \IteratorAggregate) {
|
||||
@ -63,10 +64,10 @@ class MessageBus implements MessageBusInterface
|
||||
$middlewareIterator->rewind();
|
||||
|
||||
if (!$middlewareIterator->valid()) {
|
||||
return;
|
||||
return $envelope;
|
||||
}
|
||||
$stack = new StackMiddleware($middlewareIterator);
|
||||
|
||||
$middlewareIterator->current()->handle($message instanceof Envelope ? $message : new Envelope($message), $stack);
|
||||
return $middlewareIterator->current()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -21,5 +21,5 @@ interface MessageBusInterface
|
||||
*
|
||||
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
|
||||
*/
|
||||
public function dispatch($message): void;
|
||||
public function dispatch($message): Envelope;
|
||||
}
|
||||
|
@ -35,12 +35,12 @@ class ActivationMiddleware implements MiddlewareInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
if (\is_callable($this->activated) ? ($this->activated)($envelope) : $this->activated) {
|
||||
$this->inner->handle($envelope, $stack);
|
||||
} else {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $this->inner->handle($envelope, $stack);
|
||||
}
|
||||
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -34,13 +34,14 @@ class HandleMessageMiddleware implements MiddlewareInterface
|
||||
*
|
||||
* @throws NoHandlerForMessageException When no handler is found and $allowNoHandlers is false
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
if (null !== $handler = $this->messageHandlerLocator->getHandler($envelope)) {
|
||||
$handler($envelope->getMessage());
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
} elseif (!$this->allowNoHandlers) {
|
||||
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', \get_class($envelope->getMessage())));
|
||||
}
|
||||
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ class LoggingMiddleware implements MiddlewareInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$message = $envelope->getMessage();
|
||||
$context = array(
|
||||
@ -39,7 +39,7 @@ class LoggingMiddleware implements MiddlewareInterface
|
||||
$this->logger->debug('Starting handling message {name}', $context);
|
||||
|
||||
try {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
$envelope = $stack->next()->handle($envelope, $stack);
|
||||
} catch (\Throwable $e) {
|
||||
$context['exception'] = $e;
|
||||
$this->logger->warning('An exception occurred while handling message {name}', $context);
|
||||
@ -48,5 +48,7 @@ class LoggingMiddleware implements MiddlewareInterface
|
||||
}
|
||||
|
||||
$this->logger->debug('Finished handling message {name}', $context);
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
@ -18,5 +18,5 @@ use Symfony\Component\Messenger\Envelope;
|
||||
*/
|
||||
interface MiddlewareInterface
|
||||
{
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void;
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope;
|
||||
}
|
||||
|
@ -34,26 +34,24 @@ class SendMessageMiddleware implements MiddlewareInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
if ($envelope->get(ReceivedStamp::class)) {
|
||||
// It's a received message. Do not send it back:
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
|
||||
return;
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
|
||||
$sender = $this->senderLocator->getSender($envelope);
|
||||
|
||||
if ($sender) {
|
||||
$sender->send($envelope);
|
||||
$envelope = $sender->send($envelope);
|
||||
|
||||
if (!AbstractSenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $envelope)) {
|
||||
// message has no corresponding handler
|
||||
return;
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -41,8 +41,8 @@ class StackMiddleware implements MiddlewareInterface, StackInterface
|
||||
return $iterator->current();
|
||||
}
|
||||
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
// no-op: this is the last null middleware
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ class TraceableMiddleware implements MiddlewareInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$class = \get_class($this->inner);
|
||||
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;
|
||||
@ -49,7 +49,7 @@ class TraceableMiddleware implements MiddlewareInterface
|
||||
$this->stopwatch->start($eventName, $this->eventCategory);
|
||||
|
||||
try {
|
||||
$this->inner->handle($envelope, new TraceableInnerMiddleware($stack, $this->stopwatch, $eventName, $this->eventCategory));
|
||||
return $this->inner->handle($envelope, new TraceableInnerMiddleware($stack, $this->stopwatch, $eventName, $this->eventCategory));
|
||||
} finally {
|
||||
if ($this->stopwatch->isStarted($eventName)) {
|
||||
$this->stopwatch->stop($eventName);
|
||||
@ -79,15 +79,17 @@ class TraceableInnerMiddleware implements MiddlewareInterface, StackInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$this->stopwatch->stop($this->eventName);
|
||||
if ($this === $stack) {
|
||||
$this->stack->next()->handle($envelope, $this->stack);
|
||||
$envelope = $this->stack->next()->handle($envelope, $this->stack);
|
||||
} else {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
$envelope = $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
$this->stopwatch->start($this->eventName, $this->eventCategory);
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -31,7 +31,7 @@ class ValidationMiddleware implements MiddlewareInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function handle(Envelope $envelope, StackInterface $stack): void
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$message = $envelope->getMessage();
|
||||
$groups = null;
|
||||
@ -45,6 +45,6 @@ class ValidationMiddleware implements MiddlewareInterface
|
||||
throw new ValidationFailedException($message, $violations);
|
||||
}
|
||||
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\DataCollector;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\TraceableMessageBus;
|
||||
@ -36,9 +37,10 @@ class MessengerDataCollectorTest extends TestCase
|
||||
public function testHandle()
|
||||
{
|
||||
$message = new DummyMessage('dummy message');
|
||||
$envelope = new Envelope($message);
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->method('dispatch')->with($message);
|
||||
$bus->method('dispatch')->with($message)->willReturn($envelope);
|
||||
$bus = new TraceableMessageBus($bus);
|
||||
|
||||
$collector = new MessengerDataCollector();
|
||||
@ -124,9 +126,11 @@ DUMP
|
||||
public function testKeepsOrderedDispatchCalls()
|
||||
{
|
||||
$firstBus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$firstBus->method('dispatch')->willReturn(new Envelope(new \stdClass()));
|
||||
$firstBus = new TraceableMessageBus($firstBus);
|
||||
|
||||
$secondBus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$secondBus->method('dispatch')->willReturn(new Envelope(new \stdClass()));
|
||||
$secondBus = new TraceableMessageBus($secondBus);
|
||||
|
||||
$collector = new MessengerDataCollector();
|
||||
|
@ -847,8 +847,8 @@ class HandlerOnUndefinedBus implements MessageSubscriberInterface
|
||||
|
||||
class UselessMiddleware implements MiddlewareInterface
|
||||
{
|
||||
public function handle(Envelope $message, StackInterface $stack): void
|
||||
public function handle(Envelope $message, StackInterface $stack): Envelope
|
||||
{
|
||||
$stack->next()->handle($message, $stack);
|
||||
return $stack->next()->handle($message, $stack);
|
||||
}
|
||||
}
|
||||
|
@ -49,13 +49,14 @@ class MessageBusTest extends TestCase
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, $stack) {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}));
|
||||
|
||||
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
$secondMiddleware->expects($this->once())
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->willReturn($envelope)
|
||||
;
|
||||
|
||||
$bus = new MessageBus(array(
|
||||
@ -77,7 +78,7 @@ class MessageBusTest extends TestCase
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, $stack) {
|
||||
$stack->next()->handle($envelope->with(new AnEnvelopeStamp()), $stack);
|
||||
return $stack->next()->handle($envelope->with(new AnEnvelopeStamp()), $stack);
|
||||
}));
|
||||
|
||||
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
@ -85,13 +86,14 @@ class MessageBusTest extends TestCase
|
||||
->method('handle')
|
||||
->with($envelopeWithAnotherStamp, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, $stack) {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}));
|
||||
|
||||
$thirdMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
$thirdMiddleware->expects($this->once())
|
||||
->method('handle')
|
||||
->with($envelopeWithAnotherStamp, $this->anything())
|
||||
->willReturn($envelopeWithAnotherStamp)
|
||||
;
|
||||
|
||||
$bus = new MessageBus(array(
|
||||
@ -116,13 +118,14 @@ class MessageBusTest extends TestCase
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, $stack) use ($expectedEnvelope) {
|
||||
$stack->next()->handle($expectedEnvelope, $stack);
|
||||
return $stack->next()->handle($expectedEnvelope, $stack);
|
||||
}));
|
||||
|
||||
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
$secondMiddleware->expects($this->once())
|
||||
->method('handle')
|
||||
->with($expectedEnvelope, $this->anything())
|
||||
->willReturn($envelope)
|
||||
;
|
||||
|
||||
$bus = new MessageBus(array(
|
||||
|
@ -31,7 +31,7 @@ class ActivationMiddlewareTest extends MiddlewareTestCase
|
||||
$stack->expects($this->never())->method('next');
|
||||
|
||||
$middleware = $this->createMock(MiddlewareInterface::class);
|
||||
$middleware->expects($this->once())->method('handle')->with($envelope, $stack);
|
||||
$middleware->expects($this->once())->method('handle')->with($envelope, $stack)->willReturn($envelope);
|
||||
|
||||
$decorator = new ActivationMiddleware($middleware, true);
|
||||
|
||||
@ -50,7 +50,7 @@ class ActivationMiddlewareTest extends MiddlewareTestCase
|
||||
$stack->expects($this->never())->method('next');
|
||||
|
||||
$middleware = $this->createMock(MiddlewareInterface::class);
|
||||
$middleware->expects($this->once())->method('handle')->with($envelope, $stack);
|
||||
$middleware->expects($this->once())->method('handle')->with($envelope, $stack)->willReturn($envelope);
|
||||
|
||||
$decorator = new ActivationMiddleware($middleware, $activated);
|
||||
|
||||
|
@ -50,6 +50,6 @@ class HandleMessageMiddlewareTest extends MiddlewareTestCase
|
||||
{
|
||||
$middleware = new HandleMessageMiddleware(new HandlerLocator(array()), true);
|
||||
|
||||
$this->assertNull($middleware->handle(new Envelope(new DummyMessage('Hey')), new StackMiddleware()));
|
||||
$this->assertInstanceOf(Envelope::class, $middleware->handle(new Envelope(new DummyMessage('Hey')), new StackMiddleware()));
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,9 @@ abstract class MiddlewareTestCase extends TestCase
|
||||
$nextMiddleware
|
||||
->expects($nextIsCalled ? $this->once() : $this->never())
|
||||
->method('handle')
|
||||
->will($this->returnCallback(function ($envelope, StackInterface $stack) {
|
||||
return $envelope;
|
||||
}))
|
||||
;
|
||||
|
||||
$stack = $this->createMock(StackInterface::class);
|
||||
|
@ -31,7 +31,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
@ -43,7 +43,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
@ -58,7 +58,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
DummyMessage::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -73,7 +73,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
DummyMessage::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -88,7 +88,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
DummyMessageInterface::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -103,7 +103,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
'*' => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ class TraceableMiddlewareTest extends MiddlewareTestCase
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, StackInterface $stack) {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}))
|
||||
;
|
||||
|
||||
@ -67,7 +67,7 @@ class TraceableMiddlewareTest extends MiddlewareTestCase
|
||||
->method('handle')
|
||||
->with($envelope, $this->anything())
|
||||
->will($this->returnCallback(function ($envelope, StackInterface $stack) {
|
||||
$stack->next()->handle($envelope, $stack);
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}))
|
||||
;
|
||||
|
||||
|
@ -25,10 +25,11 @@ class TraceableMessageBusTest extends TestCase
|
||||
$message = new DummyMessage('Hello');
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->expects($this->once())->method('dispatch')->with($message)->willReturn(new Envelope($message));
|
||||
|
||||
$traceableBus = new TraceableMessageBus($bus);
|
||||
$line = __LINE__ + 1;
|
||||
$this->assertNull($traceableBus->dispatch($message));
|
||||
$this->assertInstanceOf(Envelope::class, $traceableBus->dispatch($message));
|
||||
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
|
||||
$this->assertArraySubset(array(
|
||||
'message' => $message,
|
||||
@ -47,10 +48,11 @@ class TraceableMessageBusTest extends TestCase
|
||||
$envelope = (new Envelope($message))->with($stamp = new AnEnvelopeStamp());
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$traceableBus = new TraceableMessageBus($bus);
|
||||
$line = __LINE__ + 1;
|
||||
$this->assertNull($traceableBus->dispatch($envelope));
|
||||
$this->assertInstanceOf(Envelope::class, $traceableBus->dispatch($envelope));
|
||||
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
|
||||
$this->assertArraySubset(array(
|
||||
'message' => $message,
|
||||
|
@ -12,6 +12,7 @@ if (!file_exists($autoload)) {
|
||||
|
||||
require_once $autoload;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
@ -29,13 +30,15 @@ $connection = Connection::fromDsn(getenv('DSN'));
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
|
||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||
public function dispatch($envelope): void
|
||||
public function dispatch($envelope): Envelope
|
||||
{
|
||||
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
|
||||
echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
|
||||
|
||||
sleep(30);
|
||||
echo "Done.\n";
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -33,8 +33,8 @@ class WorkerTest extends TestCase
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$bus->expects($this->at(0))->method('dispatch')->with((new Envelope($apiMessage))->with(new ReceivedStamp()));
|
||||
$bus->expects($this->at(1))->method('dispatch')->with((new Envelope($ipaMessage))->with(new ReceivedStamp()));
|
||||
$bus->expects($this->at(0))->method('dispatch')->with(($envelope = new Envelope($apiMessage))->with(new ReceivedStamp()))->willReturn($envelope);
|
||||
$bus->expects($this->at(1))->method('dispatch')->with(($envelope = new Envelope($ipaMessage))->with(new ReceivedStamp()))->willReturn($envelope);
|
||||
|
||||
$worker = new Worker($receiver, $bus);
|
||||
$worker->run();
|
||||
@ -42,14 +42,13 @@ class WorkerTest extends TestCase
|
||||
|
||||
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
||||
{
|
||||
$envelop = (new Envelope(new DummyMessage('API')))->with(new ReceivedStamp());
|
||||
$receiver = new CallbackReceiver(function ($handler) use ($envelop) {
|
||||
$handler($envelop);
|
||||
$envelope = (new Envelope(new DummyMessage('API')))->with(new ReceivedStamp());
|
||||
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
||||
$handler($envelope);
|
||||
});
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelop);
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$worker = new Worker($receiver, $bus);
|
||||
$worker->run();
|
||||
|
@ -27,7 +27,7 @@ class TraceableMessageBus implements MessageBusInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function dispatch($message): void
|
||||
public function dispatch($message): Envelope
|
||||
{
|
||||
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
|
||||
$context = array(
|
||||
@ -38,7 +38,7 @@ class TraceableMessageBus implements MessageBusInterface
|
||||
);
|
||||
|
||||
try {
|
||||
$this->decoratedBus->dispatch($message);
|
||||
return $this->decoratedBus->dispatch($message);
|
||||
} catch (\Throwable $e) {
|
||||
$context['exception'] = $e;
|
||||
|
||||
|
@ -35,10 +35,12 @@ class AmqpSender implements SenderInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope): void
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
|
||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
@ -51,9 +51,9 @@ class AmqpTransport implements TransportInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope): void
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
($this->sender ?? $this->getSender())->send($envelope);
|
||||
return ($this->sender ?? $this->getSender())->send($envelope);
|
||||
}
|
||||
|
||||
private function getReceiver()
|
||||
|
@ -17,6 +17,8 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
* An AMQP connection.
|
||||
*
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*
|
||||
* @final
|
||||
*/
|
||||
class Connection
|
||||
{
|
||||
|
@ -31,10 +31,12 @@ class ChainSender implements SenderInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $message): void
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
foreach ($this->senders as $sender) {
|
||||
$sender->send($message);
|
||||
$envelope = $sender->send($envelope);
|
||||
}
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
@ -21,5 +21,5 @@ interface SenderInterface
|
||||
/**
|
||||
* Sends the given envelope.
|
||||
*/
|
||||
public function send(Envelope $envelope): void;
|
||||
public function send(Envelope $envelope): Envelope;
|
||||
}
|
||||
|
Reference in New Issue
Block a user