[Messenger] Support configuring messages when dispatching

This commit is contained in:
Maxime Steinhausser 2018-04-16 13:07:19 +02:00 committed by Samuel ROZE
parent 47da23c7cb
commit 749054a1e8
36 changed files with 649 additions and 126 deletions

View File

@ -13,12 +13,14 @@ namespace Symfony\Component\Messenger\Asynchronous\Middleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageMiddleware implements MiddlewareInterface
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;
@ -32,17 +34,19 @@ class SendMessageMiddleware implements MiddlewareInterface
*/
public function handle($message, callable $next)
{
if ($message instanceof ReceivedMessage) {
return $next($message->getMessage());
$envelope = Envelope::wrap($message);
if ($envelope->get(ReceivedMessage::class)) {
// It's a received message. Do not send it back:
return $next($message);
}
if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}
$sender->send($message);
$sender->send($envelope);
}
if (!\in_array(null, $senders, true)) {

View File

@ -12,26 +12,26 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
* Marker config for a received message.
* This is mainly used by the `SendMessageMiddleware` middleware to identify
* a message should not be sent if it was just received.
*
* @see SendMessageMiddleware
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
final class ReceivedMessage
final class ReceivedMessage implements EnvelopeItemInterface
{
private $message;
public function __construct($message)
public function serialize()
{
$this->message = $message;
return '';
}
public function getMessage()
public function unserialize($serialized)
{
return $this->message;
// noop
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -27,12 +28,12 @@ class WrapIntoReceivedMessage implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
if (null !== $message) {
$message = new ReceivedMessage($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
if (null !== $envelope) {
$envelope = $envelope->with(new ReceivedMessage());
}
$handler($message);
$handler($envelope);
});
}

View File

@ -0,0 +1,80 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A message wrapped in an envelope with items (configurations, markers, ...).
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class Envelope
{
private $items = array();
private $message;
/**
* @param object $message
* @param EnvelopeItemInterface[] $items
*/
public function __construct($message, array $items = array())
{
$this->message = $message;
foreach ($items as $item) {
$this->items[\get_class($item)] = $item;
}
}
/**
* Wrap a message into an envelope if not already wrapped.
*
* @param Envelope|object $message
*/
public static function wrap($message): self
{
return $message instanceof self ? $message : new self($message);
}
/**
* @return Envelope a new Envelope instance with additional item
*/
public function with(EnvelopeItemInterface $item): self
{
$cloned = clone $this;
$cloned->items[\get_class($item)] = $item;
return $cloned;
}
public function get(string $itemFqcn): ?EnvelopeItemInterface
{
return $this->items[$itemFqcn] ?? null;
}
/**
* @return EnvelopeItemInterface[] indexed by fqcn
*/
public function all(): array
{
return $this->items;
}
/**
* @return object The original message contained in the envelope
*/
public function getMessage()
{
return $this->message;
}
}

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A Messenger protagonist aware of the message envelope and its content.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* An envelope item related to a message.
* This item must be serializable for transport.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeItemInterface extends \Serializable
{
}

View File

@ -55,6 +55,12 @@ class MessageBus implements MessageBusInterface
$middleware = $this->indexedMiddlewares[$index];
return function ($message) use ($middleware, $index) {
$message = Envelope::wrap($message);
if (!$middleware instanceof EnvelopeAwareInterface) {
// Do not provide the envelope if the middleware cannot read it:
$message = $message->getMessage();
}
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
};
}

View File

@ -23,7 +23,7 @@ interface MessageBusInterface
*
* The bus can return a value coming from handlers, but is not required to do so.
*
* @param object $message
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @return mixed
*/

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware\Configuration;
use Symfony\Component\Messenger\EnvelopeItemInterface;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class ValidationConfiguration implements EnvelopeItemInterface
{
private $groups;
/**
* @param string[]|GroupSequence $groups
*/
public function __construct($groups)
{
$this->groups = $groups;
}
public function getGroups()
{
return $this->groups;
}
public function serialize()
{
$isGroupSequence = $this->groups instanceof GroupSequence;
return serialize(array(
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
'is_group_sequence' => $isGroupSequence,
));
}
public function unserialize($serialized)
{
list(
'groups' => $groups,
'is_group_sequence' => $isGroupSequence
) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
}
}

View File

@ -11,7 +11,6 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Psr\Log\LoggerInterface;
/**
@ -51,10 +50,6 @@ class LoggingMiddleware implements MiddlewareInterface
private function createContext($message): array
{
if ($message instanceof ReceivedMessage) {
$message = $message->getMessage();
}
return array(
'message' => $message,
'class' => \get_class($message),

View File

@ -11,13 +11,16 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Exception\ValidationFailedException;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Validator\ValidatorInterface;
/**
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class ValidationMiddleware implements MiddlewareInterface
class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $validator;
@ -28,9 +31,17 @@ class ValidationMiddleware implements MiddlewareInterface
public function handle($message, callable $next)
{
$violations = $this->validator->validate($message);
$envelope = Envelope::wrap($message);
$subject = $envelope->getMessage();
$groups = null;
/** @var ValidationConfiguration|null $validationConfig */
if ($validationConfig = $envelope->get(ValidationConfiguration::class)) {
$groups = $validationConfig->getGroups();
}
$violations = $this->validator->validate($subject, null, $groups);
if (\count($violations)) {
throw new ValidationFailedException($message, $violations);
throw new ValidationFailedException($subject, $violations);
}
return $next($message);

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -30,12 +31,28 @@ class SendMessageMiddlewareTest extends TestCase
$sender,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->never())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{
$envelope = Envelope::wrap(new DummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$sender->expects($this->once())->method('send')->with($envelope);
$next->expects($this->never())->method($this->anything());
$middleware->handle($envelope, $next);
}
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
{
$message = new DummyMessage('Hey');
@ -47,7 +64,7 @@ class SendMessageMiddlewareTest extends TestCase
null,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
@ -67,8 +84,7 @@ class SendMessageMiddlewareTest extends TestCase
public function testItSkipsReceivedMessages()
{
$innerMessage = new DummyMessage('Hey');
$message = new ReceivedMessage($innerMessage);
$envelope = Envelope::wrap(new DummyMessage('Hey'))->with(new ReceivedMessage());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
@ -78,9 +94,9 @@ class SendMessageMiddlewareTest extends TestCase
)));
$sender->expects($this->never())->method('send');
$next->expects($this->once())->method('__invoke')->with($innerMessage);
$next->expects($this->once())->method('__invoke')->with($envelope);
$middleware->handle($message, $next);
$middleware->handle($envelope, $next);
}
}

View File

@ -0,0 +1,30 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Asynchronous\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class SerializerConfigurationTest extends TestCase
{
public function testSerialiazable()
{
$config = new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('Default', 'Extra')));
$this->assertTrue(is_subclass_of(SerializerConfiguration::class, \Serializable::class, true));
$this->assertEquals($config, unserialize(serialize($config)));
}
}

View File

@ -16,6 +16,7 @@ use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
@ -324,7 +325,7 @@ class DummyReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
for ($i = 0; $i < 3; ++$i) {
$handler(new DummyMessage("Dummy $i"));
$handler(Envelope::wrap(new DummyMessage("Dummy $i")));
}
}

View File

@ -0,0 +1,82 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class EnvelopeTest extends TestCase
{
public function testConstruct()
{
$envelope = new Envelope($dummy = new DummyMessage('dummy'), array(
$receivedConfig = new ReceivedMessage(),
));
$this->assertSame($dummy, $envelope->getMessage());
$this->assertArrayHasKey(ReceivedMessage::class, $configs = $envelope->all());
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
}
public function testWrap()
{
$first = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertInstanceOf(Envelope::class, $first);
$this->assertSame($dummy, $first->getMessage());
$envelope = Envelope::wrap($first);
$this->assertSame($first, $envelope);
}
public function testWithReturnsNewInstance()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertNotSame($envelope, $envelope->with(new ReceivedMessage()));
}
public function testGet()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($config = new ReceivedMessage())
;
$this->assertSame($config, $envelope->get(ReceivedMessage::class));
$this->assertNull($envelope->get(ValidationConfiguration::class));
}
public function testAll()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($receivedConfig = new ReceivedMessage())
->with($validationConfig = new ValidationConfiguration(array('foo')))
;
$configs = $envelope->all();
$this->assertArrayHasKey(ReceivedMessage::class, $configs);
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
$this->assertArrayHasKey(ValidationConfiguration::class, $configs);
$this->assertSame($validationConfig, $configs[ValidationConfiguration::class]);
}
}
class FooConfigurationConsumer implements EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware\Configuration;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class ValidationConfigurationTest extends TestCase
{
public function testConfig()
{
$config = new ValidationConfiguration($groups = array('Default', 'Extra'));
$this->assertSame($groups, $config->getGroups());
$config = new ValidationConfiguration($groups = new GroupSequence(array('Default', 'Then')));
$this->assertSame($groups, $config->getGroups());
}
public function testSerialiazable()
{
$this->assertTrue(is_subclass_of(ValidationConfiguration::class, \Serializable::class, true));
$this->assertEquals($config = new ValidationConfiguration(array('Default', 'Extra')), unserialize(serialize($config)));
$this->assertEquals($config = new ValidationConfiguration(new GroupSequence(array('Default', 'Then'))), unserialize(serialize($config)));
}
}

View File

@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Validator\ConstraintViolationListInterface;
@ -43,6 +45,30 @@ class ValidationMiddlewareTest extends TestCase
$this->assertSame('Hello', $result);
}
public function testValidateWithConfigurationAndNextMiddleware()
{
$envelope = Envelope::wrap($message = new DummyMessage('Hey'))->with(new ValidationConfiguration($groups = array('Default', 'Extra')));
$validator = $this->createMock(ValidatorInterface::class);
$validator
->expects($this->once())
->method('validate')
->with($message, null, $groups)
->willReturn($this->createMock(ConstraintViolationListInterface::class))
;
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($envelope)
->willReturn('Hello')
;
$result = (new ValidationMiddleware($validator))->handle($envelope, $next);
$this->assertSame('Hello', $result);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation.

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
@ -50,12 +51,12 @@ class AmqpExtIntegrationTest extends TestCase
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$sender->send($firstMessage = new DummyMessage('First'));
$sender->send($secondMessage = new DummyMessage('Second'));
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
@ -74,7 +75,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$sender->send(new DummyMessage('Hello'));
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
@ -98,7 +99,15 @@ class AmqpExtIntegrationTest extends TestCase
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput());
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
}
/**
@ -114,12 +123,11 @@ class AmqpExtIntegrationTest extends TestCase
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
$this->assertNull($message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
@ -44,8 +45,8 @@ class AmqpReceiverTest extends TestCase
$connection->expects($this->once())->method('ack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function ($message) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -24,16 +25,16 @@ class AmqpSenderTest extends TestCase
{
public function testItSendsTheEncodedMessage()
{
$message = new DummyMessage('Oy');
$envelope = Envelope::wrap(new DummyMessage('Oy'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
$encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded);
$encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
$sender = new AmqpSender($encoder, $connection);
$sender->send($message);
$sender->send($envelope);
}
}

View File

@ -12,10 +12,9 @@ if (!file_exists($autoload)) {
require_once $autoload;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
@ -27,13 +26,14 @@ $serializer = new Serializer(
);
$connection = Connection::fromDsn(getenv('DSN'));
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($message)
public function dispatch($envelope)
{
echo 'Get message: '.get_class($message)."\n";
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
sleep(30);
echo "Done.\n";
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
@ -25,7 +26,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -58,7 +59,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
@ -25,9 +26,9 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(new DummyMessage('Second message'));
$handler(new DummyMessage('Third message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
$handler(Envelope::wrap(new DummyMessage('Second message')));
$handler(Envelope::wrap(new DummyMessage('Third message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -78,7 +79,7 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -12,8 +12,11 @@
namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@ -26,9 +29,23 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$message = new DummyMessage('Hello');
$envelope = Envelope::wrap(new DummyMessage('Hello'));
$this->assertEquals($message, $serializer->decode($serializer->encode($message)));
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedWithConfigurationIsDecodable()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedIsHavingTheBodyAndTypeHeader()
@ -37,11 +54,12 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$encoded = $serializer->encode(new DummyMessage('Hello'));
$encoded = $serializer->encode(Envelope::wrap(new DummyMessage('Hello')));
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertArrayNotHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
}
@ -55,10 +73,31 @@ class SerializerTest extends TestCase
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
$encoded = $encoder->encode($message);
$encoded = $encoder->encode(Envelope::wrap($message));
$decoded = $encoder->decode($encoded);
$this->assertSame('Yay', $encoded['body']);
$this->assertSame($message, $decoded);
$this->assertSame($message, $decoded->getMessage());
}
public function testEncodedWithSerializationConfiguration()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$encoded = $serializer->encode($envelope);
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
$this->assertArrayHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertSame('a:2:{s:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration";C:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration":59:{a:1:{s:7:"context";a:1:{s:6:"groups";a:1:{i:0;s:3:"foo";}}}}s:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration";C:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration":82:{a:2:{s:6:"groups";a:2:{i:0;s:3:"foo";i:1;s:3:"bar";}s:17:"is_group_sequence";b:0;}}}', $encoded['headers']['X-Message-Envelope-Items']);
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -22,29 +23,33 @@ class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new DummyMessage('API'));
$handler(new DummyMessage('IPA'));
$apiMessage = new DummyMessage('API');
$ipaMessage = new DummyMessage('IPA');
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
$handler(Envelope::wrap($apiMessage));
$handler(Envelope::wrap($ipaMessage));
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA')));
$bus->expects($this->at(0))->method('dispatch')->with(Envelope::wrap($apiMessage)->with(new ReceivedMessage()));
$bus->expects($this->at(1))->method('dispatch')->with(Envelope::wrap($ipaMessage)->with(new ReceivedMessage()));
$worker = new Worker($receiver, $bus);
$worker->run();
}
public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages()
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new ReceivedMessage(new DummyMessage('API')));
$envelop = Envelope::wrap(new DummyMessage('API'))->with(new ReceivedMessage());
$receiver = new CallbackReceiver(function ($handler) use ($envelop) {
$handler($envelop);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(0))->method('dispatch')->with($envelop);
$worker = new Worker($receiver, $bus);
$worker->run();
@ -54,7 +59,7 @@ class WorkerTest extends TestCase
{
$receiver = new CallbackReceiver(function ($handler) {
try {
$handler(new DummyMessage('Hello'));
$handler(Envelope::wrap(new DummyMessage('Hello')));
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
} catch (\InvalidArgumentException $e) {

View File

@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
*/
class AmqpReceiver implements ReceiverInterface
{
private $messageDecoder;
private $decoder;
private $connection;
private $shouldStop;
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
public function __construct(DecoderInterface $decoder, Connection $connection)
{
$this->messageDecoder = $messageDecoder;
$this->decoder = $decoder;
$this->connection = $connection;
}
@ -38,8 +38,8 @@ class AmqpReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$message = $this->connection->get();
if (null === $message) {
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
$handler(null);
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
@ -51,18 +51,18 @@ class AmqpReceiver implements ReceiverInterface
}
try {
$handler($this->messageDecoder->decode(array(
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
$handler($this->decoder->decode(array(
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
)));
$this->connection->ack($message);
$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($message);
$this->connection->reject($AMQPEnvelope);
throw $e;
} catch (\Throwable $e) {
$this->connection->nack($message, AMQP_REQUEUE);
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
throw $e;
} finally {

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
@ -21,21 +22,21 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
*/
class AmqpSender implements SenderInterface
{
private $messageEncoder;
private $encoder;
private $connection;
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
public function __construct(EncoderInterface $encoder, Connection $connection)
{
$this->messageEncoder = $messageEncoder;
$this->encoder = $encoder;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function send($message)
public function send(Envelope $envelope)
{
$encodedMessage = $this->messageEncoder->encode($message);
$encodedMessage = $this->encoder->encode($envelope);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -36,8 +37,8 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
$handler($envelope);
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -34,10 +35,10 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
$receivedMessages = 0;
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
$handler($envelope);
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));

View File

@ -21,10 +21,10 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received message. Note that this message
* can be `null` if the timeout to receive something has expired.
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
*/
public function receive(callable $handler) : void;
public function receive(callable $handler): void;
/**
* Stop receiving some messages.

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,9 +21,9 @@ namespace Symfony\Component\Messenger\Transport;
interface SenderInterface
{
/**
* Sends the given message.
* Sends the given envelope.
*
* @param object $message
* @param Envelope $envelope
*/
public function send($message);
public function send(Envelope $envelope);
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,16 +21,16 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface DecoderInterface
{
/**
* Decodes the message from an encoded-form.
* Decodes an envelope and its message from an encoded-form.
*
* The `$encodedMessage` parameter is a key-value array that
* describes the message, that will be used by the different transports.
* The `$encodedEnvelope` parameter is a key-value array that
* describes the envelope and its content, that will be used by the different transports.
*
* The most common keys are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @return object
* @return Envelope
*/
public function decode(array $encodedMessage);
public function decode(array $encodedEnvelope): Envelope;
}

View File

@ -10,6 +10,7 @@
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -19,14 +20,14 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface EncoderInterface
{
/**
* Encodes a message to a common format understandable by transports. The encoded array should only
* contain scalar and arrays.
* Encodes an envelope content (message & items) to a common format understandable by transports.
* The encoded array should only contain scalar and arrays.
*
* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @param object $message The object that is put on the MessageBus by the user
* @param Envelope $envelope The envelop containing the message put on the MessageBus by the user
*/
public function encode($message): array;
public function encode(Envelope $envelope): array;
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Serializer\SerializerInterface;
/**
@ -32,27 +33,48 @@ class Serializer implements DecoderInterface, EncoderInterface
/**
* {@inheritdoc}
*/
public function decode(array $encodedMessage)
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) {
throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.');
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new \InvalidArgumentException('Encoded envelope should have at least a `body` and some `headers`.');
}
if (empty($encodedMessage['headers']['type'])) {
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
if (empty($encodedEnvelope['headers']['type'])) {
throw new \InvalidArgumentException('Encoded envelope does not have a `type` header.');
}
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
$envelopeItems = isset($encodedEnvelope['headers']['X-Message-Envelope-Items']) ? unserialize($encodedEnvelope['headers']['X-Message-Envelope-Items']) : array();
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelopeItems[SerializerConfiguration::class] ?? null) {
$context = $serializerConfig->getContext() + $context;
}
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
return new Envelope($message, $envelopeItems);
}
/**
* {@inheritdoc}
*/
public function encode($message): array
public function encode(Envelope $envelope): array
{
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelope->get(SerializerConfiguration::class)) {
$context = $serializerConfig->getContext() + $context;
}
$headers = array('type' => \get_class($envelope->getMessage()));
if ($configurations = $envelope->all()) {
$headers['X-Message-Envelope-Items'] = serialize($configurations);
}
return array(
'body' => $this->serializer->serialize($message, $this->format, $this->context),
'headers' => array('type' => \get_class($message)),
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
'headers' => $headers,
);
}
}

View File

@ -0,0 +1,46 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class SerializerConfiguration implements EnvelopeItemInterface
{
private $context;
public function __construct(array $context)
{
$this->context = $context;
}
public function getContext(): array
{
return $this->context;
}
public function serialize()
{
return serialize(array('context' => $this->context));
}
public function unserialize($serialized)
{
list('context' => $context) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($context);
}
}

View File

@ -41,16 +41,12 @@ class Worker
});
}
$this->receiver->receive(function ($message) {
if (null === $message) {
$this->receiver->receive(function (?Envelope $envelope) {
if (null === $envelope) {
return;
}
if (!$message instanceof ReceivedMessage) {
$message = new ReceivedMessage($message);
}
$this->bus->dispatch($message);
$this->bus->dispatch($envelope->with(new ReceivedMessage()));
});
}
}