From 749054a1e8c3b2c18fcdd088219f4928020a327e Mon Sep 17 00:00:00 2001 From: Maxime Steinhausser Date: Mon, 16 Apr 2018 13:07:19 +0200 Subject: [PATCH] [Messenger] Support configuring messages when dispatching --- .../Middleware/SendMessageMiddleware.php | 14 ++-- .../Transport/ReceivedMessage.php | 16 ++-- .../Transport/WrapIntoReceivedMessage.php | 9 +- src/Symfony/Component/Messenger/Envelope.php | 80 ++++++++++++++++++ .../Messenger/EnvelopeAwareInterface.php | 23 ++++++ .../Messenger/EnvelopeItemInterface.php | 24 ++++++ .../Component/Messenger/MessageBus.php | 6 ++ .../Messenger/MessageBusInterface.php | 2 +- .../Configuration/ValidationConfiguration.php | 58 +++++++++++++ .../Middleware/LoggingMiddleware.php | 5 -- .../Middleware/ValidationMiddleware.php | 17 +++- .../Middleware/SendMessageMiddlewareTest.php | 28 +++++-- .../SerializerConfigurationTest.php | 30 +++++++ .../DependencyInjection/MessengerPassTest.php | 3 +- .../Messenger/Tests/EnvelopeTest.php | 82 +++++++++++++++++++ .../ValidationConfigurationTest.php | 38 +++++++++ .../Middleware/ValidationMiddlewareTest.php | 26 ++++++ .../AmqpExt/AmqpExtIntegrationTest.php | 26 ++++-- .../Transport/AmqpExt/AmqpReceiverTest.php | 5 +- .../Transport/AmqpExt/AmqpSenderTest.php | 7 +- .../AmqpExt/Fixtures/long_receiver.php | 12 +-- ...pWhenMemoryUsageIsExceededReceiverTest.php | 5 +- ...WhenMessageCountIsExceededReceiverTest.php | 9 +- .../Serialization/SerializerTest.php | 49 +++++++++-- .../Component/Messenger/Tests/WorkerTest.php | 25 +++--- .../Transport/AmqpExt/AmqpReceiver.php | 22 ++--- .../Transport/AmqpExt/AmqpSender.php | 11 +-- .../StopWhenMemoryUsageIsExceededReceiver.php | 5 +- ...StopWhenMessageCountIsExceededReceiver.php | 7 +- .../Messenger/Transport/ReceiverInterface.php | 6 +- .../Messenger/Transport/SenderInterface.php | 8 +- .../Serialization/DecoderInterface.php | 12 +-- .../Serialization/EncoderInterface.php | 9 +- .../Transport/Serialization/Serializer.php | 40 +++++++-- .../Serialization/SerializerConfiguration.php | 46 +++++++++++ src/Symfony/Component/Messenger/Worker.php | 10 +-- 36 files changed, 649 insertions(+), 126 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Envelope.php create mode 100644 src/Symfony/Component/Messenger/EnvelopeAwareInterface.php create mode 100644 src/Symfony/Component/Messenger/EnvelopeItemInterface.php create mode 100644 src/Symfony/Component/Messenger/Middleware/Configuration/ValidationConfiguration.php create mode 100644 src/Symfony/Component/Messenger/Tests/Asynchronous/Transport/Serialization/SerializerConfigurationTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/EnvelopeTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Middleware/Configuration/ValidationConfigurationTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Serialization/SerializerConfiguration.php diff --git a/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php index eeba363cc8..9aabe794a1 100644 --- a/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php @@ -13,12 +13,14 @@ namespace Symfony\Component\Messenger\Asynchronous\Middleware; use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface; use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\EnvelopeAwareInterface; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; /** * @author Samuel Roze */ -class SendMessageMiddleware implements MiddlewareInterface +class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface { private $senderLocator; @@ -32,17 +34,19 @@ class SendMessageMiddleware implements MiddlewareInterface */ public function handle($message, callable $next) { - if ($message instanceof ReceivedMessage) { - return $next($message->getMessage()); + $envelope = Envelope::wrap($message); + if ($envelope->get(ReceivedMessage::class)) { + // It's a received message. Do not send it back: + return $next($message); } - if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) { + if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) { foreach ($senders as $sender) { if (null === $sender) { continue; } - $sender->send($message); + $sender->send($envelope); } if (!\in_array(null, $senders, true)) { diff --git a/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php b/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php index 1b1298da63..c713a589ad 100644 --- a/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php +++ b/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php @@ -12,26 +12,26 @@ namespace Symfony\Component\Messenger\Asynchronous\Transport; use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware; +use Symfony\Component\Messenger\EnvelopeItemInterface; /** - * Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify + * Marker config for a received message. + * This is mainly used by the `SendMessageMiddleware` middleware to identify * a message should not be sent if it was just received. * * @see SendMessageMiddleware * * @author Samuel Roze */ -final class ReceivedMessage +final class ReceivedMessage implements EnvelopeItemInterface { - private $message; - - public function __construct($message) + public function serialize() { - $this->message = $message; + return ''; } - public function getMessage() + public function unserialize($serialized) { - return $this->message; + // noop } } diff --git a/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php index 8af87a2a45..4b10d6445f 100644 --- a/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php +++ b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Asynchronous\Transport; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\ReceiverInterface; /** @@ -27,12 +28,12 @@ class WrapIntoReceivedMessage implements ReceiverInterface public function receive(callable $handler): void { - $this->decoratedReceiver->receive(function ($message) use ($handler) { - if (null !== $message) { - $message = new ReceivedMessage($message); + $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) { + if (null !== $envelope) { + $envelope = $envelope->with(new ReceivedMessage()); } - $handler($message); + $handler($envelope); }); } diff --git a/src/Symfony/Component/Messenger/Envelope.php b/src/Symfony/Component/Messenger/Envelope.php new file mode 100644 index 0000000000..14778593c4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Envelope.php @@ -0,0 +1,80 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * A message wrapped in an envelope with items (configurations, markers, ...). + * + * @author Maxime Steinhausser + * + * @experimental in 4.1 + */ +final class Envelope +{ + private $items = array(); + private $message; + + /** + * @param object $message + * @param EnvelopeItemInterface[] $items + */ + public function __construct($message, array $items = array()) + { + $this->message = $message; + foreach ($items as $item) { + $this->items[\get_class($item)] = $item; + } + } + + /** + * Wrap a message into an envelope if not already wrapped. + * + * @param Envelope|object $message + */ + public static function wrap($message): self + { + return $message instanceof self ? $message : new self($message); + } + + /** + * @return Envelope a new Envelope instance with additional item + */ + public function with(EnvelopeItemInterface $item): self + { + $cloned = clone $this; + + $cloned->items[\get_class($item)] = $item; + + return $cloned; + } + + public function get(string $itemFqcn): ?EnvelopeItemInterface + { + return $this->items[$itemFqcn] ?? null; + } + + /** + * @return EnvelopeItemInterface[] indexed by fqcn + */ + public function all(): array + { + return $this->items; + } + + /** + * @return object The original message contained in the envelope + */ + public function getMessage() + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Messenger/EnvelopeAwareInterface.php b/src/Symfony/Component/Messenger/EnvelopeAwareInterface.php new file mode 100644 index 0000000000..c19bc84362 --- /dev/null +++ b/src/Symfony/Component/Messenger/EnvelopeAwareInterface.php @@ -0,0 +1,23 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * A Messenger protagonist aware of the message envelope and its content. + * + * @author Maxime Steinhausser + * + * @experimental in 4.1 + */ +interface EnvelopeAwareInterface +{ +} diff --git a/src/Symfony/Component/Messenger/EnvelopeItemInterface.php b/src/Symfony/Component/Messenger/EnvelopeItemInterface.php new file mode 100644 index 0000000000..2561a12754 --- /dev/null +++ b/src/Symfony/Component/Messenger/EnvelopeItemInterface.php @@ -0,0 +1,24 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * An envelope item related to a message. + * This item must be serializable for transport. + * + * @author Maxime Steinhausser + * + * @experimental in 4.1 + */ +interface EnvelopeItemInterface extends \Serializable +{ +} diff --git a/src/Symfony/Component/Messenger/MessageBus.php b/src/Symfony/Component/Messenger/MessageBus.php index ef0eb5b1c4..08d7486737 100644 --- a/src/Symfony/Component/Messenger/MessageBus.php +++ b/src/Symfony/Component/Messenger/MessageBus.php @@ -55,6 +55,12 @@ class MessageBus implements MessageBusInterface $middleware = $this->indexedMiddlewares[$index]; return function ($message) use ($middleware, $index) { + $message = Envelope::wrap($message); + if (!$middleware instanceof EnvelopeAwareInterface) { + // Do not provide the envelope if the middleware cannot read it: + $message = $message->getMessage(); + } + return $middleware->handle($message, $this->callableForNextMiddleware($index + 1)); }; } diff --git a/src/Symfony/Component/Messenger/MessageBusInterface.php b/src/Symfony/Component/Messenger/MessageBusInterface.php index 1d441ea568..8c2b91d1af 100644 --- a/src/Symfony/Component/Messenger/MessageBusInterface.php +++ b/src/Symfony/Component/Messenger/MessageBusInterface.php @@ -23,7 +23,7 @@ interface MessageBusInterface * * The bus can return a value coming from handlers, but is not required to do so. * - * @param object $message + * @param object|Envelope $message The message or the message pre-wrapped in an envelope * * @return mixed */ diff --git a/src/Symfony/Component/Messenger/Middleware/Configuration/ValidationConfiguration.php b/src/Symfony/Component/Messenger/Middleware/Configuration/ValidationConfiguration.php new file mode 100644 index 0000000000..1b6180857b --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/Configuration/ValidationConfiguration.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware\Configuration; + +use Symfony\Component\Messenger\EnvelopeItemInterface; +use Symfony\Component\Validator\Constraints\GroupSequence; + +/** + * @author Maxime Steinhausser + * + * @experimental in 4.1 + */ +final class ValidationConfiguration implements EnvelopeItemInterface +{ + private $groups; + + /** + * @param string[]|GroupSequence $groups + */ + public function __construct($groups) + { + $this->groups = $groups; + } + + public function getGroups() + { + return $this->groups; + } + + public function serialize() + { + $isGroupSequence = $this->groups instanceof GroupSequence; + + return serialize(array( + 'groups' => $isGroupSequence ? $this->groups->groups : $this->groups, + 'is_group_sequence' => $isGroupSequence, + )); + } + + public function unserialize($serialized) + { + list( + 'groups' => $groups, + 'is_group_sequence' => $isGroupSequence + ) = unserialize($serialized, array('allowed_classes' => false)); + + $this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups); + } +} diff --git a/src/Symfony/Component/Messenger/Middleware/LoggingMiddleware.php b/src/Symfony/Component/Messenger/Middleware/LoggingMiddleware.php index 4de6e42575..ebaf8525c0 100644 --- a/src/Symfony/Component/Messenger/Middleware/LoggingMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/LoggingMiddleware.php @@ -11,7 +11,6 @@ namespace Symfony\Component\Messenger\Middleware; -use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; use Psr\Log\LoggerInterface; /** @@ -51,10 +50,6 @@ class LoggingMiddleware implements MiddlewareInterface private function createContext($message): array { - if ($message instanceof ReceivedMessage) { - $message = $message->getMessage(); - } - return array( 'message' => $message, 'class' => \get_class($message), diff --git a/src/Symfony/Component/Messenger/Middleware/ValidationMiddleware.php b/src/Symfony/Component/Messenger/Middleware/ValidationMiddleware.php index 3b168367cd..e588d9256b 100644 --- a/src/Symfony/Component/Messenger/Middleware/ValidationMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/ValidationMiddleware.php @@ -11,13 +11,16 @@ namespace Symfony\Component\Messenger\Middleware; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\EnvelopeAwareInterface; use Symfony\Component\Messenger\Exception\ValidationFailedException; +use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration; use Symfony\Component\Validator\Validator\ValidatorInterface; /** * @author Tobias Nyholm */ -class ValidationMiddleware implements MiddlewareInterface +class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface { private $validator; @@ -28,9 +31,17 @@ class ValidationMiddleware implements MiddlewareInterface public function handle($message, callable $next) { - $violations = $this->validator->validate($message); + $envelope = Envelope::wrap($message); + $subject = $envelope->getMessage(); + $groups = null; + /** @var ValidationConfiguration|null $validationConfig */ + if ($validationConfig = $envelope->get(ValidationConfiguration::class)) { + $groups = $validationConfig->getGroups(); + } + + $violations = $this->validator->validate($subject, null, $groups); if (\count($violations)) { - throw new ValidationFailedException($message, $violations); + throw new ValidationFailedException($subject, $violations); } return $next($message); diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php index 6398aff361..aa22741c39 100644 --- a/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php @@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface; use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\SenderInterface; @@ -30,12 +31,28 @@ class SendMessageMiddlewareTest extends TestCase $sender, ))); - $sender->expects($this->once())->method('send')->with($message); + $sender->expects($this->once())->method('send')->with(Envelope::wrap($message)); $next->expects($this->never())->method($this->anything()); $middleware->handle($message, $next); } + public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage() + { + $envelope = Envelope::wrap(new DummyMessage('Hey')); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); + + $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( + $sender, + ))); + + $sender->expects($this->once())->method('send')->with($envelope); + $next->expects($this->never())->method($this->anything()); + + $middleware->handle($envelope, $next); + } + public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull() { $message = new DummyMessage('Hey'); @@ -47,7 +64,7 @@ class SendMessageMiddlewareTest extends TestCase null, ))); - $sender->expects($this->once())->method('send')->with($message); + $sender->expects($this->once())->method('send')->with(Envelope::wrap($message)); $next->expects($this->once())->method($this->anything()); $middleware->handle($message, $next); @@ -67,8 +84,7 @@ class SendMessageMiddlewareTest extends TestCase public function testItSkipsReceivedMessages() { - $innerMessage = new DummyMessage('Hey'); - $message = new ReceivedMessage($innerMessage); + $envelope = Envelope::wrap(new DummyMessage('Hey'))->with(new ReceivedMessage()); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $next = $this->createPartialMock(\stdClass::class, array('__invoke')); @@ -78,9 +94,9 @@ class SendMessageMiddlewareTest extends TestCase ))); $sender->expects($this->never())->method('send'); - $next->expects($this->once())->method('__invoke')->with($innerMessage); + $next->expects($this->once())->method('__invoke')->with($envelope); - $middleware->handle($message, $next); + $middleware->handle($envelope, $next); } } diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Transport/Serialization/SerializerConfigurationTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Transport/Serialization/SerializerConfigurationTest.php new file mode 100644 index 0000000000..6ebcc8ddfa --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Transport/Serialization/SerializerConfigurationTest.php @@ -0,0 +1,30 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Asynchronous\Serialization; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +/** + * @author Maxime Steinhausser + */ +class SerializerConfigurationTest extends TestCase +{ + public function testSerialiazable() + { + $config = new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('Default', 'Extra'))); + + $this->assertTrue(is_subclass_of(SerializerConfiguration::class, \Serializable::class, true)); + $this->assertEquals($config, unserialize(serialize($config))); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 4f2636ab10..bba1020b28 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -16,6 +16,7 @@ use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\DependencyInjection\ServiceLocator; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator; @@ -324,7 +325,7 @@ class DummyReceiver implements ReceiverInterface public function receive(callable $handler): void { for ($i = 0; $i < 3; ++$i) { - $handler(new DummyMessage("Dummy $i")); + $handler(Envelope::wrap(new DummyMessage("Dummy $i"))); } } diff --git a/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php b/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php new file mode 100644 index 0000000000..053275b6e7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php @@ -0,0 +1,82 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\EnvelopeAwareInterface; +use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +/** + * @author Maxime Steinhausser + */ +class EnvelopeTest extends TestCase +{ + public function testConstruct() + { + $envelope = new Envelope($dummy = new DummyMessage('dummy'), array( + $receivedConfig = new ReceivedMessage(), + )); + + $this->assertSame($dummy, $envelope->getMessage()); + $this->assertArrayHasKey(ReceivedMessage::class, $configs = $envelope->all()); + $this->assertSame($receivedConfig, $configs[ReceivedMessage::class]); + } + + public function testWrap() + { + $first = Envelope::wrap($dummy = new DummyMessage('dummy')); + + $this->assertInstanceOf(Envelope::class, $first); + $this->assertSame($dummy, $first->getMessage()); + + $envelope = Envelope::wrap($first); + $this->assertSame($first, $envelope); + } + + public function testWithReturnsNewInstance() + { + $envelope = Envelope::wrap($dummy = new DummyMessage('dummy')); + + $this->assertNotSame($envelope, $envelope->with(new ReceivedMessage())); + } + + public function testGet() + { + $envelope = Envelope::wrap($dummy = new DummyMessage('dummy')) + ->with($config = new ReceivedMessage()) + ; + + $this->assertSame($config, $envelope->get(ReceivedMessage::class)); + $this->assertNull($envelope->get(ValidationConfiguration::class)); + } + + public function testAll() + { + $envelope = Envelope::wrap($dummy = new DummyMessage('dummy')) + ->with($receivedConfig = new ReceivedMessage()) + ->with($validationConfig = new ValidationConfiguration(array('foo'))) + ; + + $configs = $envelope->all(); + $this->assertArrayHasKey(ReceivedMessage::class, $configs); + $this->assertSame($receivedConfig, $configs[ReceivedMessage::class]); + $this->assertArrayHasKey(ValidationConfiguration::class, $configs); + $this->assertSame($validationConfig, $configs[ValidationConfiguration::class]); + } +} + +class FooConfigurationConsumer implements EnvelopeAwareInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/Configuration/ValidationConfigurationTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/Configuration/ValidationConfigurationTest.php new file mode 100644 index 0000000000..6fd6962994 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Middleware/Configuration/ValidationConfigurationTest.php @@ -0,0 +1,38 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Middleware\Configuration; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration; +use Symfony\Component\Validator\Constraints\GroupSequence; + +/** + * @author Maxime Steinhausser + */ +class ValidationConfigurationTest extends TestCase +{ + public function testConfig() + { + $config = new ValidationConfiguration($groups = array('Default', 'Extra')); + $this->assertSame($groups, $config->getGroups()); + + $config = new ValidationConfiguration($groups = new GroupSequence(array('Default', 'Then'))); + $this->assertSame($groups, $config->getGroups()); + } + + public function testSerialiazable() + { + $this->assertTrue(is_subclass_of(ValidationConfiguration::class, \Serializable::class, true)); + $this->assertEquals($config = new ValidationConfiguration(array('Default', 'Extra')), unserialize(serialize($config))); + $this->assertEquals($config = new ValidationConfiguration(new GroupSequence(array('Default', 'Then'))), unserialize(serialize($config))); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/ValidationMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/ValidationMiddlewareTest.php index 2bd2ba8af2..b2a44b5fa6 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/ValidationMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/ValidationMiddlewareTest.php @@ -12,6 +12,8 @@ namespace Symfony\Component\Messenger\Tests\Middleware; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration; use Symfony\Component\Messenger\Middleware\ValidationMiddleware; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Validator\ConstraintViolationListInterface; @@ -43,6 +45,30 @@ class ValidationMiddlewareTest extends TestCase $this->assertSame('Hello', $result); } + public function testValidateWithConfigurationAndNextMiddleware() + { + $envelope = Envelope::wrap($message = new DummyMessage('Hey'))->with(new ValidationConfiguration($groups = array('Default', 'Extra'))); + + $validator = $this->createMock(ValidatorInterface::class); + $validator + ->expects($this->once()) + ->method('validate') + ->with($message, null, $groups) + ->willReturn($this->createMock(ConstraintViolationListInterface::class)) + ; + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); + $next + ->expects($this->once()) + ->method('__invoke') + ->with($envelope) + ->willReturn('Hello') + ; + + $result = (new ValidationMiddleware($validator))->handle($envelope, $next); + + $this->assertSame('Hello', $result); + } + /** * @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException * @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation. diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index 5ce9c457e9..c01ffdadb7 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; @@ -50,12 +51,12 @@ class AmqpExtIntegrationTest extends TestCase $sender = new AmqpSender($serializer, $connection); $receiver = new AmqpReceiver($serializer, $connection); - $sender->send($firstMessage = new DummyMessage('First')); - $sender->send($secondMessage = new DummyMessage('Second')); + $sender->send($first = Envelope::wrap(new DummyMessage('First'))); + $sender->send($second = Envelope::wrap(new DummyMessage('Second'))); $receivedMessages = 0; - $receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) { - $this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message); + $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { + $this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope); if (2 === ++$receivedMessages) { $receiver->stop(); @@ -74,7 +75,7 @@ class AmqpExtIntegrationTest extends TestCase $connection->queue()->purge(); $sender = new AmqpSender($serializer, $connection); - $sender->send(new DummyMessage('Hello')); + $sender->send(Envelope::wrap(new DummyMessage('Hello'))); $amqpReadTimeout = 30; $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout; @@ -98,7 +99,15 @@ class AmqpExtIntegrationTest extends TestCase $this->assertFalse($process->isRunning()); $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); - $this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput()); + $this->assertSame($expectedOutput.<<<'TXT' +Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage +with items: [ + "Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage" +] +Done. + +TXT + , $process->getOutput()); } /** @@ -114,12 +123,11 @@ class AmqpExtIntegrationTest extends TestCase $connection->setup(); $connection->queue()->purge(); - $sender = new AmqpSender($serializer, $connection); $receiver = new AmqpReceiver($serializer, $connection); $receivedMessages = 0; - $receiver->receive(function ($message) use ($receiver, &$receivedMessages) { - $this->assertNull($message); + $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { + $this->assertNull($envelope); if (2 === ++$receivedMessages) { $receiver->stop(); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 2045764216..4d0b779e3b 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; @@ -44,8 +45,8 @@ class AmqpReceiverTest extends TestCase $connection->expects($this->once())->method('ack')->with($envelope); $receiver = new AmqpReceiver($serializer, $connection); - $receiver->receive(function ($message) use ($receiver) { - $this->assertEquals(new DummyMessage('Hi'), $message); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); $receiver->stop(); }); } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 54859a58ac..8848b022f6 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -24,16 +25,16 @@ class AmqpSenderTest extends TestCase { public function testItSendsTheEncodedMessage() { - $message = new DummyMessage('Oy'); + $envelope = Envelope::wrap(new DummyMessage('Oy')); $encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class)); $encoder = $this->getMockBuilder(EncoderInterface::class)->getMock(); - $encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded); + $encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']); $sender = new AmqpSender($encoder, $connection); - $sender->send($message); + $sender->send($envelope); } } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php index 2115b1a81d..b7231a5d47 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php @@ -12,10 +12,9 @@ if (!file_exists($autoload)) { require_once $autoload; -use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; -use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; -use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; +use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Worker; use Symfony\Component\Serializer as SerializerComponent; @@ -27,13 +26,14 @@ $serializer = new Serializer( ); $connection = Connection::fromDsn(getenv('DSN')); -$sender = new AmqpSender($serializer, $connection); $receiver = new AmqpReceiver($serializer, $connection); $worker = new Worker($receiver, new class() implements MessageBusInterface { - public function dispatch($message) + public function dispatch($envelope) { - echo 'Get message: '.get_class($message)."\n"; + echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n"; + echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT)); + sleep(30); echo "Done.\n"; } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php index f317c6a5bf..a34be3bfc2 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiverTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; @@ -25,7 +26,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) { $callable = function ($handler) { - $handler(new DummyMessage('API')); + $handler(Envelope::wrap(new DummyMessage('API'))); }; $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) @@ -58,7 +59,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase public function testReceiverLogsMemoryExceededWhenLoggerIsGiven() { $callable = function ($handler) { - $handler(new DummyMessage('API')); + $handler(Envelope::wrap(new DummyMessage('API'))); }; $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php index 7a516744e1..e5c51335b3 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Enhancers/StopWhenMessageCountIsExceededReceiverTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; @@ -25,9 +26,9 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop) { $callable = function ($handler) { - $handler(new DummyMessage('First message')); - $handler(new DummyMessage('Second message')); - $handler(new DummyMessage('Third message')); + $handler(Envelope::wrap(new DummyMessage('First message'))); + $handler(Envelope::wrap(new DummyMessage('Second message'))); + $handler(Envelope::wrap(new DummyMessage('Third message'))); }; $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) @@ -78,7 +79,7 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven() { $callable = function ($handler) { - $handler(new DummyMessage('First message')); + $handler(Envelope::wrap(new DummyMessage('First message'))); }; $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php index 2e227c0f2f..214b773092 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php @@ -12,8 +12,11 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; @@ -26,9 +29,23 @@ class SerializerTest extends TestCase new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) ); - $message = new DummyMessage('Hello'); + $envelope = Envelope::wrap(new DummyMessage('Hello')); - $this->assertEquals($message, $serializer->decode($serializer->encode($message))); + $this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope))); + } + + public function testEncodedWithConfigurationIsDecodable() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = Envelope::wrap(new DummyMessage('Hello')) + ->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo')))) + ->with(new ValidationConfiguration(array('foo', 'bar'))) + ; + + $this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope))); } public function testEncodedIsHavingTheBodyAndTypeHeader() @@ -37,11 +54,12 @@ class SerializerTest extends TestCase new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) ); - $encoded = $serializer->encode(new DummyMessage('Hello')); + $encoded = $serializer->encode(Envelope::wrap(new DummyMessage('Hello'))); $this->assertArrayHasKey('body', $encoded); $this->assertArrayHasKey('headers', $encoded); $this->assertArrayHasKey('type', $encoded['headers']); + $this->assertArrayNotHasKey('X-Message-Envelope-Items', $encoded['headers']); $this->assertEquals(DummyMessage::class, $encoded['headers']['type']); } @@ -55,10 +73,31 @@ class SerializerTest extends TestCase $encoder = new Serializer($serializer, 'csv', array('foo' => 'bar')); - $encoded = $encoder->encode($message); + $encoded = $encoder->encode(Envelope::wrap($message)); $decoded = $encoder->decode($encoded); $this->assertSame('Yay', $encoded['body']); - $this->assertSame($message, $decoded); + $this->assertSame($message, $decoded->getMessage()); + } + + public function testEncodedWithSerializationConfiguration() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = Envelope::wrap(new DummyMessage('Hello')) + ->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo')))) + ->with(new ValidationConfiguration(array('foo', 'bar'))) + ; + + $encoded = $serializer->encode($envelope); + + $this->assertArrayHasKey('body', $encoded); + $this->assertArrayHasKey('headers', $encoded); + $this->assertArrayHasKey('type', $encoded['headers']); + $this->assertEquals(DummyMessage::class, $encoded['headers']['type']); + $this->assertArrayHasKey('X-Message-Envelope-Items', $encoded['headers']); + $this->assertSame('a:2:{s:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration";C:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration":59:{a:1:{s:7:"context";a:1:{s:6:"groups";a:1:{i:0;s:3:"foo";}}}}s:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration";C:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration":82:{a:2:{s:6:"groups";a:2:{i:0;s:3:"foo";i:1;s:3:"bar";}s:17:"is_group_sequence";b:0;}}}', $encoded['headers']['X-Message-Envelope-Items']); } } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 7599d8dadc..bdfa6fe188 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -22,29 +23,33 @@ class WorkerTest extends TestCase { public function testWorkerDispatchTheReceivedMessage() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new DummyMessage('API')); - $handler(new DummyMessage('IPA')); + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) { + $handler(Envelope::wrap($apiMessage)); + $handler(Envelope::wrap($ipaMessage)); }); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); - $bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA'))); + $bus->expects($this->at(0))->method('dispatch')->with(Envelope::wrap($apiMessage)->with(new ReceivedMessage())); + $bus->expects($this->at(1))->method('dispatch')->with(Envelope::wrap($ipaMessage)->with(new ReceivedMessage())); $worker = new Worker($receiver, $bus); $worker->run(); } - public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages() + public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new ReceivedMessage(new DummyMessage('API'))); + $envelop = Envelope::wrap(new DummyMessage('API'))->with(new ReceivedMessage()); + $receiver = new CallbackReceiver(function ($handler) use ($envelop) { + $handler($envelop); }); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); + $bus->expects($this->at(0))->method('dispatch')->with($envelop); $worker = new Worker($receiver, $bus); $worker->run(); @@ -54,7 +59,7 @@ class WorkerTest extends TestCase { $receiver = new CallbackReceiver(function ($handler) { try { - $handler(new DummyMessage('Hello')); + $handler(Envelope::wrap(new DummyMessage('Hello'))); $this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.'); } catch (\InvalidArgumentException $e) { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index e61ef03895..0e6fbff8ee 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; */ class AmqpReceiver implements ReceiverInterface { - private $messageDecoder; + private $decoder; private $connection; private $shouldStop; - public function __construct(DecoderInterface $messageDecoder, Connection $connection) + public function __construct(DecoderInterface $decoder, Connection $connection) { - $this->messageDecoder = $messageDecoder; + $this->decoder = $decoder; $this->connection = $connection; } @@ -38,8 +38,8 @@ class AmqpReceiver implements ReceiverInterface public function receive(callable $handler): void { while (!$this->shouldStop) { - $message = $this->connection->get(); - if (null === $message) { + $AMQPEnvelope = $this->connection->get(); + if (null === $AMQPEnvelope) { $handler(null); usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000); @@ -51,18 +51,18 @@ class AmqpReceiver implements ReceiverInterface } try { - $handler($this->messageDecoder->decode(array( - 'body' => $message->getBody(), - 'headers' => $message->getHeaders(), + $handler($this->decoder->decode(array( + 'body' => $AMQPEnvelope->getBody(), + 'headers' => $AMQPEnvelope->getHeaders(), ))); - $this->connection->ack($message); + $this->connection->ack($AMQPEnvelope); } catch (RejectMessageExceptionInterface $e) { - $this->connection->reject($message); + $this->connection->reject($AMQPEnvelope); throw $e; } catch (\Throwable $e) { - $this->connection->nack($message, AMQP_REQUEUE); + $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); throw $e; } finally { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 0c4bb18f31..397c52dec2 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; @@ -21,21 +22,21 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; */ class AmqpSender implements SenderInterface { - private $messageEncoder; + private $encoder; private $connection; - public function __construct(EncoderInterface $messageEncoder, Connection $connection) + public function __construct(EncoderInterface $encoder, Connection $connection) { - $this->messageEncoder = $messageEncoder; + $this->encoder = $encoder; $this->connection = $connection; } /** * {@inheritdoc} */ - public function send($message) + public function send(Envelope $envelope) { - $encodedMessage = $this->messageEncoder->encode($message); + $encodedMessage = $this->encoder->encode($envelope); $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); } diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php index 4a05afe770..37b1978233 100644 --- a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMemoryUsageIsExceededReceiver.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\Enhancers; use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\ReceiverInterface; /** @@ -36,8 +37,8 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface public function receive(callable $handler): void { - $this->decoratedReceiver->receive(function ($message) use ($handler) { - $handler($message); + $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) { + $handler($envelope); $memoryResolver = $this->memoryResolver; if ($memoryResolver() > $this->memoryLimit) { diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php index dc61466bbf..420eb8fa63 100644 --- a/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/StopWhenMessageCountIsExceededReceiver.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\Enhancers; use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\ReceiverInterface; /** @@ -34,10 +35,10 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface { $receivedMessages = 0; - $this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) { - $handler($message); + $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) { + $handler($envelope); - if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) { + if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { $this->stop(); if (null !== $this->logger) { $this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages)); diff --git a/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php index 1c29fbe43a..0f61fe3428 100644 --- a/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php @@ -21,10 +21,10 @@ interface ReceiverInterface /** * Receive some messages to the given handler. * - * The handler will have, as argument, the received message. Note that this message - * can be `null` if the timeout to receive something has expired. + * The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message. + * Note that this envelope can be `null` if the timeout to receive something has expired. */ - public function receive(callable $handler) : void; + public function receive(callable $handler): void; /** * Stop receiving some messages. diff --git a/src/Symfony/Component/Messenger/Transport/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/SenderInterface.php index a142e1f009..ba36b80fe2 100644 --- a/src/Symfony/Component/Messenger/Transport/SenderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/SenderInterface.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Transport; +use Symfony\Component\Messenger\Envelope; + /** * @author Samuel Roze * @@ -19,9 +21,9 @@ namespace Symfony\Component\Messenger\Transport; interface SenderInterface { /** - * Sends the given message. + * Sends the given envelope. * - * @param object $message + * @param Envelope $envelope */ - public function send($message); + public function send(Envelope $envelope); } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php b/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php index a232dfbf10..026a321033 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Transport\Serialization; +use Symfony\Component\Messenger\Envelope; + /** * @author Samuel Roze * @@ -19,16 +21,16 @@ namespace Symfony\Component\Messenger\Transport\Serialization; interface DecoderInterface { /** - * Decodes the message from an encoded-form. + * Decodes an envelope and its message from an encoded-form. * - * The `$encodedMessage` parameter is a key-value array that - * describes the message, that will be used by the different transports. + * The `$encodedEnvelope` parameter is a key-value array that + * describes the envelope and its content, that will be used by the different transports. * * The most common keys are: * - `body` (string) - the message body * - `headers` (string) - a key/value pair of headers * - * @return object + * @return Envelope */ - public function decode(array $encodedMessage); + public function decode(array $encodedEnvelope): Envelope; } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php b/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php index 658b9e5b5e..1dce6fdae3 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php @@ -10,6 +10,7 @@ */ namespace Symfony\Component\Messenger\Transport\Serialization; +use Symfony\Component\Messenger\Envelope; /** * @author Samuel Roze @@ -19,14 +20,14 @@ namespace Symfony\Component\Messenger\Transport\Serialization; interface EncoderInterface { /** - * Encodes a message to a common format understandable by transports. The encoded array should only - * contain scalar and arrays. + * Encodes an envelope content (message & items) to a common format understandable by transports. + * The encoded array should only contain scalar and arrays. * * The most common keys of the encoded array are: * - `body` (string) - the message body * - `headers` (string) - a key/value pair of headers * - * @param object $message The object that is put on the MessageBus by the user + * @param Envelope $envelope The envelop containing the message put on the MessageBus by the user */ - public function encode($message): array; + public function encode(Envelope $envelope): array; } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php index 65c2ac55e8..cfb30090e5 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\Serialization; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Serializer\SerializerInterface; /** @@ -32,27 +33,48 @@ class Serializer implements DecoderInterface, EncoderInterface /** * {@inheritdoc} */ - public function decode(array $encodedMessage) + public function decode(array $encodedEnvelope): Envelope { - if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) { - throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.'); + if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) { + throw new \InvalidArgumentException('Encoded envelope should have at least a `body` and some `headers`.'); } - if (empty($encodedMessage['headers']['type'])) { - throw new \InvalidArgumentException('Encoded message does not have a `type` header.'); + if (empty($encodedEnvelope['headers']['type'])) { + throw new \InvalidArgumentException('Encoded envelope does not have a `type` header.'); } - return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context); + $envelopeItems = isset($encodedEnvelope['headers']['X-Message-Envelope-Items']) ? unserialize($encodedEnvelope['headers']['X-Message-Envelope-Items']) : array(); + + $context = $this->context; + /** @var SerializerConfiguration|null $serializerConfig */ + if ($serializerConfig = $envelopeItems[SerializerConfiguration::class] ?? null) { + $context = $serializerConfig->getContext() + $context; + } + + $message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context); + + return new Envelope($message, $envelopeItems); } /** * {@inheritdoc} */ - public function encode($message): array + public function encode(Envelope $envelope): array { + $context = $this->context; + /** @var SerializerConfiguration|null $serializerConfig */ + if ($serializerConfig = $envelope->get(SerializerConfiguration::class)) { + $context = $serializerConfig->getContext() + $context; + } + + $headers = array('type' => \get_class($envelope->getMessage())); + if ($configurations = $envelope->all()) { + $headers['X-Message-Envelope-Items'] = serialize($configurations); + } + return array( - 'body' => $this->serializer->serialize($message, $this->format, $this->context), - 'headers' => array('type' => \get_class($message)), + 'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context), + 'headers' => $headers, ); } } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/SerializerConfiguration.php b/src/Symfony/Component/Messenger/Transport/Serialization/SerializerConfiguration.php new file mode 100644 index 0000000000..478e197080 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Serialization/SerializerConfiguration.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Serialization; + +use Symfony\Component\Messenger\EnvelopeItemInterface; + +/** + * @author Maxime Steinhausser + * + * @experimental in 4.1 + */ +final class SerializerConfiguration implements EnvelopeItemInterface +{ + private $context; + + public function __construct(array $context) + { + $this->context = $context; + } + + public function getContext(): array + { + return $this->context; + } + + public function serialize() + { + return serialize(array('context' => $this->context)); + } + + public function unserialize($serialized) + { + list('context' => $context) = unserialize($serialized, array('allowed_classes' => false)); + + $this->__construct($context); + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 2033f3a770..918b61a52a 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -41,16 +41,12 @@ class Worker }); } - $this->receiver->receive(function ($message) { - if (null === $message) { + $this->receiver->receive(function (?Envelope $envelope) { + if (null === $envelope) { return; } - if (!$message instanceof ReceivedMessage) { - $message = new ReceivedMessage($message); - } - - $this->bus->dispatch($message); + $this->bus->dispatch($envelope->with(new ReceivedMessage())); }); } }