feature #26945 [Messenger] Support configuring messages when dispatching (ogizanagi)

This PR was squashed before being merged into the 4.1-dev branch (closes #26945).

Discussion
----------

[Messenger] Support configuring messages when dispatching

| Q             | A
| ------------- | ---
| Branch?       | master <!-- see below -->
| Bug fix?      | no
| New feature?  | yes <!-- don't forget to update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | see CI checks    <!-- please add some, will be required by reviewers -->
| Fixed tickets | N/A   <!-- #-prefixed issue number(s), if any -->
| License       | MIT
| Doc PR        | todo

For now, this is mainly a RFC to get your opinion on such a feature (so it misses proper tests).
Supporting wrapped messages out-of-the-box would mainly allow to easily create middlewares that should act only or be configured differently for specific messages (by using wrappers instead of making messages implements specific interfaces and without requiring dedicated buses). For instance, I might want to track specific messages and expose metrics about it.

The Symfony Serializer transport (relates to #26900) and Validator middleware serve as guinea pigs for sample purpose here. But despite message validation usually is simple as it matches one use-case and won't require specific validation groups in most cases, I still think it can be useful sometimes. Also there is the case of the [`AllValidator` which currently requires using a `GroupSequence`](https://github.com/symfony/symfony/pull/26477) as a workaround, but that could also be specified directly in message metadata instead.

## Latest updates

PR updated to use a flat version of configurations instead of wrappers, using an `Envelope` wrapper and a list of envelope items.
Usage:

```php
$message = Envelope::wrap(new DummyMessage('Hello'))
    ->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
    ->with(new ValidationConfiguration(array('foo', 'bar')))
;
```

~~By default, Messenger protagonists receive the original message. But each of them can opt-in to receive the envelope instead, by implementing `EnvelopeReaderInterface`.
Then, they can read configurations from it and forward the original message or the envelope to another party.~~

Senders, encoders/decoders & receivers always get an `Envelope`.
Middlewares & handlers always get a message. But middlewares can opt-in to receive the envelope instead, by implementing `EnvelopeReaderInterface`.

Commits
-------

749054a1e8 [Messenger] Support configuring messages when dispatching
This commit is contained in:
Samuel ROZE 2018-05-07 14:50:19 +01:00
commit e7a0b80cb2
36 changed files with 649 additions and 126 deletions

View File

@ -13,12 +13,14 @@ namespace Symfony\Component\Messenger\Asynchronous\Middleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageMiddleware implements MiddlewareInterface
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;
@ -32,17 +34,19 @@ class SendMessageMiddleware implements MiddlewareInterface
*/
public function handle($message, callable $next)
{
if ($message instanceof ReceivedMessage) {
return $next($message->getMessage());
$envelope = Envelope::wrap($message);
if ($envelope->get(ReceivedMessage::class)) {
// It's a received message. Do not send it back:
return $next($message);
}
if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}
$sender->send($message);
$sender->send($envelope);
}
if (!\in_array(null, $senders, true)) {

View File

@ -12,26 +12,26 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
* Marker config for a received message.
* This is mainly used by the `SendMessageMiddleware` middleware to identify
* a message should not be sent if it was just received.
*
* @see SendMessageMiddleware
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
final class ReceivedMessage
final class ReceivedMessage implements EnvelopeItemInterface
{
private $message;
public function __construct($message)
public function serialize()
{
$this->message = $message;
return '';
}
public function getMessage()
public function unserialize($serialized)
{
return $this->message;
// noop
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -27,12 +28,12 @@ class WrapIntoReceivedMessage implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
if (null !== $message) {
$message = new ReceivedMessage($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
if (null !== $envelope) {
$envelope = $envelope->with(new ReceivedMessage());
}
$handler($message);
$handler($envelope);
});
}

View File

@ -0,0 +1,80 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A message wrapped in an envelope with items (configurations, markers, ...).
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class Envelope
{
private $items = array();
private $message;
/**
* @param object $message
* @param EnvelopeItemInterface[] $items
*/
public function __construct($message, array $items = array())
{
$this->message = $message;
foreach ($items as $item) {
$this->items[\get_class($item)] = $item;
}
}
/**
* Wrap a message into an envelope if not already wrapped.
*
* @param Envelope|object $message
*/
public static function wrap($message): self
{
return $message instanceof self ? $message : new self($message);
}
/**
* @return Envelope a new Envelope instance with additional item
*/
public function with(EnvelopeItemInterface $item): self
{
$cloned = clone $this;
$cloned->items[\get_class($item)] = $item;
return $cloned;
}
public function get(string $itemFqcn): ?EnvelopeItemInterface
{
return $this->items[$itemFqcn] ?? null;
}
/**
* @return EnvelopeItemInterface[] indexed by fqcn
*/
public function all(): array
{
return $this->items;
}
/**
* @return object The original message contained in the envelope
*/
public function getMessage()
{
return $this->message;
}
}

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A Messenger protagonist aware of the message envelope and its content.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* An envelope item related to a message.
* This item must be serializable for transport.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeItemInterface extends \Serializable
{
}

View File

@ -55,6 +55,12 @@ class MessageBus implements MessageBusInterface
$middleware = $this->indexedMiddlewareHandlers[$index];
return function ($message) use ($middleware, $index) {
$message = Envelope::wrap($message);
if (!$middleware instanceof EnvelopeAwareInterface) {
// Do not provide the envelope if the middleware cannot read it:
$message = $message->getMessage();
}
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
};
}

View File

@ -23,7 +23,7 @@ interface MessageBusInterface
*
* The bus can return a value coming from handlers, but is not required to do so.
*
* @param object $message
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @return mixed
*/

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware\Configuration;
use Symfony\Component\Messenger\EnvelopeItemInterface;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class ValidationConfiguration implements EnvelopeItemInterface
{
private $groups;
/**
* @param string[]|GroupSequence $groups
*/
public function __construct($groups)
{
$this->groups = $groups;
}
public function getGroups()
{
return $this->groups;
}
public function serialize()
{
$isGroupSequence = $this->groups instanceof GroupSequence;
return serialize(array(
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
'is_group_sequence' => $isGroupSequence,
));
}
public function unserialize($serialized)
{
list(
'groups' => $groups,
'is_group_sequence' => $isGroupSequence
) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
}
}

View File

@ -11,7 +11,6 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Psr\Log\LoggerInterface;
/**
@ -51,10 +50,6 @@ class LoggingMiddleware implements MiddlewareInterface
private function createContext($message): array
{
if ($message instanceof ReceivedMessage) {
$message = $message->getMessage();
}
return array(
'message' => $message,
'class' => \get_class($message),

View File

@ -11,13 +11,16 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Exception\ValidationFailedException;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Validator\ValidatorInterface;
/**
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class ValidationMiddleware implements MiddlewareInterface
class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $validator;
@ -28,9 +31,17 @@ class ValidationMiddleware implements MiddlewareInterface
public function handle($message, callable $next)
{
$violations = $this->validator->validate($message);
$envelope = Envelope::wrap($message);
$subject = $envelope->getMessage();
$groups = null;
/** @var ValidationConfiguration|null $validationConfig */
if ($validationConfig = $envelope->get(ValidationConfiguration::class)) {
$groups = $validationConfig->getGroups();
}
$violations = $this->validator->validate($subject, null, $groups);
if (\count($violations)) {
throw new ValidationFailedException($message, $violations);
throw new ValidationFailedException($subject, $violations);
}
return $next($message);

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -30,12 +31,28 @@ class SendMessageMiddlewareTest extends TestCase
$sender,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->never())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{
$envelope = Envelope::wrap(new DummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$sender->expects($this->once())->method('send')->with($envelope);
$next->expects($this->never())->method($this->anything());
$middleware->handle($envelope, $next);
}
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
{
$message = new DummyMessage('Hey');
@ -47,7 +64,7 @@ class SendMessageMiddlewareTest extends TestCase
null,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
@ -67,8 +84,7 @@ class SendMessageMiddlewareTest extends TestCase
public function testItSkipsReceivedMessages()
{
$innerMessage = new DummyMessage('Hey');
$message = new ReceivedMessage($innerMessage);
$envelope = Envelope::wrap(new DummyMessage('Hey'))->with(new ReceivedMessage());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
@ -78,9 +94,9 @@ class SendMessageMiddlewareTest extends TestCase
)));
$sender->expects($this->never())->method('send');
$next->expects($this->once())->method('__invoke')->with($innerMessage);
$next->expects($this->once())->method('__invoke')->with($envelope);
$middleware->handle($message, $next);
$middleware->handle($envelope, $next);
}
}

View File

@ -0,0 +1,30 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Asynchronous\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class SerializerConfigurationTest extends TestCase
{
public function testSerialiazable()
{
$config = new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('Default', 'Extra')));
$this->assertTrue(is_subclass_of(SerializerConfiguration::class, \Serializable::class, true));
$this->assertEquals($config, unserialize(serialize($config)));
}
}

View File

@ -16,6 +16,7 @@ use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
@ -321,7 +322,7 @@ class DummyReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
for ($i = 0; $i < 3; ++$i) {
$handler(new DummyMessage("Dummy $i"));
$handler(Envelope::wrap(new DummyMessage("Dummy $i")));
}
}

View File

@ -0,0 +1,82 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class EnvelopeTest extends TestCase
{
public function testConstruct()
{
$envelope = new Envelope($dummy = new DummyMessage('dummy'), array(
$receivedConfig = new ReceivedMessage(),
));
$this->assertSame($dummy, $envelope->getMessage());
$this->assertArrayHasKey(ReceivedMessage::class, $configs = $envelope->all());
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
}
public function testWrap()
{
$first = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertInstanceOf(Envelope::class, $first);
$this->assertSame($dummy, $first->getMessage());
$envelope = Envelope::wrap($first);
$this->assertSame($first, $envelope);
}
public function testWithReturnsNewInstance()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertNotSame($envelope, $envelope->with(new ReceivedMessage()));
}
public function testGet()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($config = new ReceivedMessage())
;
$this->assertSame($config, $envelope->get(ReceivedMessage::class));
$this->assertNull($envelope->get(ValidationConfiguration::class));
}
public function testAll()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($receivedConfig = new ReceivedMessage())
->with($validationConfig = new ValidationConfiguration(array('foo')))
;
$configs = $envelope->all();
$this->assertArrayHasKey(ReceivedMessage::class, $configs);
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
$this->assertArrayHasKey(ValidationConfiguration::class, $configs);
$this->assertSame($validationConfig, $configs[ValidationConfiguration::class]);
}
}
class FooConfigurationConsumer implements EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware\Configuration;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class ValidationConfigurationTest extends TestCase
{
public function testConfig()
{
$config = new ValidationConfiguration($groups = array('Default', 'Extra'));
$this->assertSame($groups, $config->getGroups());
$config = new ValidationConfiguration($groups = new GroupSequence(array('Default', 'Then')));
$this->assertSame($groups, $config->getGroups());
}
public function testSerialiazable()
{
$this->assertTrue(is_subclass_of(ValidationConfiguration::class, \Serializable::class, true));
$this->assertEquals($config = new ValidationConfiguration(array('Default', 'Extra')), unserialize(serialize($config)));
$this->assertEquals($config = new ValidationConfiguration(new GroupSequence(array('Default', 'Then'))), unserialize(serialize($config)));
}
}

View File

@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Validator\ConstraintViolationListInterface;
@ -43,6 +45,30 @@ class ValidationMiddlewareTest extends TestCase
$this->assertSame('Hello', $result);
}
public function testValidateWithConfigurationAndNextMiddleware()
{
$envelope = Envelope::wrap($message = new DummyMessage('Hey'))->with(new ValidationConfiguration($groups = array('Default', 'Extra')));
$validator = $this->createMock(ValidatorInterface::class);
$validator
->expects($this->once())
->method('validate')
->with($message, null, $groups)
->willReturn($this->createMock(ConstraintViolationListInterface::class))
;
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($envelope)
->willReturn('Hello')
;
$result = (new ValidationMiddleware($validator))->handle($envelope, $next);
$this->assertSame('Hello', $result);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation.

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
@ -50,12 +51,12 @@ class AmqpExtIntegrationTest extends TestCase
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$sender->send($firstMessage = new DummyMessage('First'));
$sender->send($secondMessage = new DummyMessage('Second'));
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
@ -74,7 +75,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$sender->send(new DummyMessage('Hello'));
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
@ -98,7 +99,15 @@ class AmqpExtIntegrationTest extends TestCase
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput());
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
}
/**
@ -114,12 +123,11 @@ class AmqpExtIntegrationTest extends TestCase
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
$this->assertNull($message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
@ -44,8 +45,8 @@ class AmqpReceiverTest extends TestCase
$connection->expects($this->once())->method('ack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function ($message) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -24,16 +25,16 @@ class AmqpSenderTest extends TestCase
{
public function testItSendsTheEncodedMessage()
{
$message = new DummyMessage('Oy');
$envelope = Envelope::wrap(new DummyMessage('Oy'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
$encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded);
$encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
$sender = new AmqpSender($encoder, $connection);
$sender->send($message);
$sender->send($envelope);
}
}

View File

@ -12,10 +12,9 @@ if (!file_exists($autoload)) {
require_once $autoload;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
@ -27,13 +26,14 @@ $serializer = new Serializer(
);
$connection = Connection::fromDsn(getenv('DSN'));
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($message)
public function dispatch($envelope)
{
echo 'Get message: '.get_class($message)."\n";
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
sleep(30);
echo "Done.\n";
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
@ -25,7 +26,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -58,7 +59,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
@ -25,9 +26,9 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(new DummyMessage('Second message'));
$handler(new DummyMessage('Third message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
$handler(Envelope::wrap(new DummyMessage('Second message')));
$handler(Envelope::wrap(new DummyMessage('Third message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -78,7 +79,7 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -12,8 +12,11 @@
namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@ -26,9 +29,23 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$message = new DummyMessage('Hello');
$envelope = Envelope::wrap(new DummyMessage('Hello'));
$this->assertEquals($message, $serializer->decode($serializer->encode($message)));
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedWithConfigurationIsDecodable()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedIsHavingTheBodyAndTypeHeader()
@ -37,11 +54,12 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$encoded = $serializer->encode(new DummyMessage('Hello'));
$encoded = $serializer->encode(Envelope::wrap(new DummyMessage('Hello')));
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertArrayNotHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
}
@ -55,10 +73,31 @@ class SerializerTest extends TestCase
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
$encoded = $encoder->encode($message);
$encoded = $encoder->encode(Envelope::wrap($message));
$decoded = $encoder->decode($encoded);
$this->assertSame('Yay', $encoded['body']);
$this->assertSame($message, $decoded);
$this->assertSame($message, $decoded->getMessage());
}
public function testEncodedWithSerializationConfiguration()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$encoded = $serializer->encode($envelope);
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
$this->assertArrayHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertSame('a:2:{s:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration";C:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration":59:{a:1:{s:7:"context";a:1:{s:6:"groups";a:1:{i:0;s:3:"foo";}}}}s:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration";C:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration":82:{a:2:{s:6:"groups";a:2:{i:0;s:3:"foo";i:1;s:3:"bar";}s:17:"is_group_sequence";b:0;}}}', $encoded['headers']['X-Message-Envelope-Items']);
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -22,29 +23,33 @@ class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new DummyMessage('API'));
$handler(new DummyMessage('IPA'));
$apiMessage = new DummyMessage('API');
$ipaMessage = new DummyMessage('IPA');
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
$handler(Envelope::wrap($apiMessage));
$handler(Envelope::wrap($ipaMessage));
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA')));
$bus->expects($this->at(0))->method('dispatch')->with(Envelope::wrap($apiMessage)->with(new ReceivedMessage()));
$bus->expects($this->at(1))->method('dispatch')->with(Envelope::wrap($ipaMessage)->with(new ReceivedMessage()));
$worker = new Worker($receiver, $bus);
$worker->run();
}
public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages()
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new ReceivedMessage(new DummyMessage('API')));
$envelop = Envelope::wrap(new DummyMessage('API'))->with(new ReceivedMessage());
$receiver = new CallbackReceiver(function ($handler) use ($envelop) {
$handler($envelop);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(0))->method('dispatch')->with($envelop);
$worker = new Worker($receiver, $bus);
$worker->run();
@ -54,7 +59,7 @@ class WorkerTest extends TestCase
{
$receiver = new CallbackReceiver(function ($handler) {
try {
$handler(new DummyMessage('Hello'));
$handler(Envelope::wrap(new DummyMessage('Hello')));
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
} catch (\InvalidArgumentException $e) {

View File

@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
*/
class AmqpReceiver implements ReceiverInterface
{
private $messageDecoder;
private $decoder;
private $connection;
private $shouldStop;
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
public function __construct(DecoderInterface $decoder, Connection $connection)
{
$this->messageDecoder = $messageDecoder;
$this->decoder = $decoder;
$this->connection = $connection;
}
@ -38,8 +38,8 @@ class AmqpReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$message = $this->connection->get();
if (null === $message) {
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
$handler(null);
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
@ -51,18 +51,18 @@ class AmqpReceiver implements ReceiverInterface
}
try {
$handler($this->messageDecoder->decode(array(
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
$handler($this->decoder->decode(array(
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
)));
$this->connection->ack($message);
$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($message);
$this->connection->reject($AMQPEnvelope);
throw $e;
} catch (\Throwable $e) {
$this->connection->nack($message, AMQP_REQUEUE);
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
throw $e;
} finally {

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
@ -21,21 +22,21 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
*/
class AmqpSender implements SenderInterface
{
private $messageEncoder;
private $encoder;
private $connection;
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
public function __construct(EncoderInterface $encoder, Connection $connection)
{
$this->messageEncoder = $messageEncoder;
$this->encoder = $encoder;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function send($message)
public function send(Envelope $envelope)
{
$encodedMessage = $this->messageEncoder->encode($message);
$encodedMessage = $this->encoder->encode($envelope);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -36,8 +37,8 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
$handler($envelope);
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -34,10 +35,10 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
$receivedMessages = 0;
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
$handler($envelope);
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));

View File

@ -21,10 +21,10 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received message. Note that this message
* can be `null` if the timeout to receive something has expired.
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
*/
public function receive(callable $handler) : void;
public function receive(callable $handler): void;
/**
* Stop receiving some messages.

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,9 +21,9 @@ namespace Symfony\Component\Messenger\Transport;
interface SenderInterface
{
/**
* Sends the given message.
* Sends the given envelope.
*
* @param object $message
* @param Envelope $envelope
*/
public function send($message);
public function send(Envelope $envelope);
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,16 +21,16 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface DecoderInterface
{
/**
* Decodes the message from an encoded-form.
* Decodes an envelope and its message from an encoded-form.
*
* The `$encodedMessage` parameter is a key-value array that
* describes the message, that will be used by the different transports.
* The `$encodedEnvelope` parameter is a key-value array that
* describes the envelope and its content, that will be used by the different transports.
*
* The most common keys are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @return object
* @return Envelope
*/
public function decode(array $encodedMessage);
public function decode(array $encodedEnvelope): Envelope;
}

View File

@ -10,6 +10,7 @@
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -19,14 +20,14 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface EncoderInterface
{
/**
* Encodes a message to a common format understandable by transports. The encoded array should only
* contain scalar and arrays.
* Encodes an envelope content (message & items) to a common format understandable by transports.
* The encoded array should only contain scalar and arrays.
*
* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @param object $message The object that is put on the MessageBus by the user
* @param Envelope $envelope The envelop containing the message put on the MessageBus by the user
*/
public function encode($message): array;
public function encode(Envelope $envelope): array;
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Serializer\SerializerInterface;
/**
@ -32,27 +33,48 @@ class Serializer implements DecoderInterface, EncoderInterface
/**
* {@inheritdoc}
*/
public function decode(array $encodedMessage)
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) {
throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.');
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new \InvalidArgumentException('Encoded envelope should have at least a `body` and some `headers`.');
}
if (empty($encodedMessage['headers']['type'])) {
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
if (empty($encodedEnvelope['headers']['type'])) {
throw new \InvalidArgumentException('Encoded envelope does not have a `type` header.');
}
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
$envelopeItems = isset($encodedEnvelope['headers']['X-Message-Envelope-Items']) ? unserialize($encodedEnvelope['headers']['X-Message-Envelope-Items']) : array();
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelopeItems[SerializerConfiguration::class] ?? null) {
$context = $serializerConfig->getContext() + $context;
}
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
return new Envelope($message, $envelopeItems);
}
/**
* {@inheritdoc}
*/
public function encode($message): array
public function encode(Envelope $envelope): array
{
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelope->get(SerializerConfiguration::class)) {
$context = $serializerConfig->getContext() + $context;
}
$headers = array('type' => \get_class($envelope->getMessage()));
if ($configurations = $envelope->all()) {
$headers['X-Message-Envelope-Items'] = serialize($configurations);
}
return array(
'body' => $this->serializer->serialize($message, $this->format, $this->context),
'headers' => array('type' => \get_class($message)),
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
'headers' => $headers,
);
}
}

View File

@ -0,0 +1,46 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class SerializerConfiguration implements EnvelopeItemInterface
{
private $context;
public function __construct(array $context)
{
$this->context = $context;
}
public function getContext(): array
{
return $this->context;
}
public function serialize()
{
return serialize(array('context' => $this->context));
}
public function unserialize($serialized)
{
list('context' => $context) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($context);
}
}

View File

@ -41,16 +41,12 @@ class Worker
});
}
$this->receiver->receive(function ($message) {
if (null === $message) {
$this->receiver->receive(function (?Envelope $envelope) {
if (null === $envelope) {
return;
}
if (!$message instanceof ReceivedMessage) {
$message = new ReceivedMessage($message);
}
$this->bus->dispatch($message);
$this->bus->dispatch($envelope->with(new ReceivedMessage()));
});
}
}