feature #27182 [Messenger] Re-introduce wrapped message configuration (with fix) (sroze, ogizanagi)
This PR was merged into the 4.1 branch. Discussion ---------- [Messenger] Re-introduce wrapped message configuration (with fix) | Q | A | ------------- | --- | Branch? | 4.1 | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #26945 | License | MIT | Doc PR | ø The pull request was merged before beta1, but because it introduced a bug, it has been reverted. This adds back the merged PR but pushes a fix for the found bug. Commits -------21e49d21d8
[Messenger] Fix TraceableBus with envelope599f32c085
Ensure the envelope is passed back and can be altered Ensure that the middlewares can also update the message within the envelope7c33cb27ab
feature #26945 [Messenger] Support configuring messages when dispatching (ogizanagi)
This commit is contained in:
commit
ae6c96ea38
@ -13,12 +13,14 @@ namespace Symfony\Component\Messenger\Asynchronous\Middleware;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
||||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeAwareInterface;
|
||||||
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*/
|
*/
|
||||||
class SendMessageMiddleware implements MiddlewareInterface
|
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
|
||||||
{
|
{
|
||||||
private $senderLocator;
|
private $senderLocator;
|
||||||
|
|
||||||
@ -32,17 +34,19 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
*/
|
*/
|
||||||
public function handle($message, callable $next)
|
public function handle($message, callable $next)
|
||||||
{
|
{
|
||||||
if ($message instanceof ReceivedMessage) {
|
$envelope = Envelope::wrap($message);
|
||||||
return $next($message->getMessage());
|
if ($envelope->get(ReceivedMessage::class)) {
|
||||||
|
// It's a received message. Do not send it back:
|
||||||
|
return $next($message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
|
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
|
||||||
foreach ($senders as $sender) {
|
foreach ($senders as $sender) {
|
||||||
if (null === $sender) {
|
if (null === $sender) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
$sender->send($message);
|
$sender->send($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!\in_array(null, $senders, true)) {
|
if (!\in_array(null, $senders, true)) {
|
||||||
|
@ -12,26 +12,26 @@
|
|||||||
namespace Symfony\Component\Messenger\Asynchronous\Transport;
|
namespace Symfony\Component\Messenger\Asynchronous\Transport;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeItemInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
|
* Marker config for a received message.
|
||||||
|
* This is mainly used by the `SendMessageMiddleware` middleware to identify
|
||||||
* a message should not be sent if it was just received.
|
* a message should not be sent if it was just received.
|
||||||
*
|
*
|
||||||
* @see SendMessageMiddleware
|
* @see SendMessageMiddleware
|
||||||
*
|
*
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*/
|
*/
|
||||||
final class ReceivedMessage
|
final class ReceivedMessage implements EnvelopeItemInterface
|
||||||
{
|
{
|
||||||
private $message;
|
public function serialize()
|
||||||
|
|
||||||
public function __construct($message)
|
|
||||||
{
|
{
|
||||||
$this->message = $message;
|
return '';
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getMessage()
|
public function unserialize($serialized)
|
||||||
{
|
{
|
||||||
return $this->message;
|
// noop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Asynchronous\Transport;
|
namespace Symfony\Component\Messenger\Asynchronous\Transport;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -27,12 +28,12 @@ class WrapIntoReceivedMessage implements ReceiverInterface
|
|||||||
|
|
||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
{
|
{
|
||||||
$this->decoratedReceiver->receive(function ($message) use ($handler) {
|
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
|
||||||
if (null !== $message) {
|
if (null !== $envelope) {
|
||||||
$message = new ReceivedMessage($message);
|
$envelope = $envelope->with(new ReceivedMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
$handler($message);
|
$handler($envelope);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
89
src/Symfony/Component/Messenger/Envelope.php
Normal file
89
src/Symfony/Component/Messenger/Envelope.php
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A message wrapped in an envelope with items (configurations, markers, ...).
|
||||||
|
*
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.1
|
||||||
|
*/
|
||||||
|
final class Envelope
|
||||||
|
{
|
||||||
|
private $items = array();
|
||||||
|
private $message;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param object $message
|
||||||
|
* @param EnvelopeItemInterface[] $items
|
||||||
|
*/
|
||||||
|
public function __construct($message, array $items = array())
|
||||||
|
{
|
||||||
|
$this->message = $message;
|
||||||
|
foreach ($items as $item) {
|
||||||
|
$this->items[\get_class($item)] = $item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap a message into an envelope if not already wrapped.
|
||||||
|
*
|
||||||
|
* @param Envelope|object $message
|
||||||
|
*/
|
||||||
|
public static function wrap($message): self
|
||||||
|
{
|
||||||
|
return $message instanceof self ? $message : new self($message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Envelope a new Envelope instance with additional item
|
||||||
|
*/
|
||||||
|
public function with(EnvelopeItemInterface $item): self
|
||||||
|
{
|
||||||
|
$cloned = clone $this;
|
||||||
|
|
||||||
|
$cloned->items[\get_class($item)] = $item;
|
||||||
|
|
||||||
|
return $cloned;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function withMessage($message): self
|
||||||
|
{
|
||||||
|
$cloned = clone $this;
|
||||||
|
|
||||||
|
$cloned->message = $message;
|
||||||
|
|
||||||
|
return $cloned;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function get(string $itemFqcn): ?EnvelopeItemInterface
|
||||||
|
{
|
||||||
|
return $this->items[$itemFqcn] ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return EnvelopeItemInterface[] indexed by fqcn
|
||||||
|
*/
|
||||||
|
public function all(): array
|
||||||
|
{
|
||||||
|
return $this->items;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return object The original message contained in the envelope
|
||||||
|
*/
|
||||||
|
public function getMessage()
|
||||||
|
{
|
||||||
|
return $this->message;
|
||||||
|
}
|
||||||
|
}
|
23
src/Symfony/Component/Messenger/EnvelopeAwareInterface.php
Normal file
23
src/Symfony/Component/Messenger/EnvelopeAwareInterface.php
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Messenger protagonist aware of the message envelope and its content.
|
||||||
|
*
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.1
|
||||||
|
*/
|
||||||
|
interface EnvelopeAwareInterface
|
||||||
|
{
|
||||||
|
}
|
24
src/Symfony/Component/Messenger/EnvelopeItemInterface.php
Normal file
24
src/Symfony/Component/Messenger/EnvelopeItemInterface.php
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An envelope item related to a message.
|
||||||
|
* This item must be serializable for transport.
|
||||||
|
*
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.1
|
||||||
|
*/
|
||||||
|
interface EnvelopeItemInterface extends \Serializable
|
||||||
|
{
|
||||||
|
}
|
@ -44,10 +44,10 @@ class MessageBus implements MessageBusInterface
|
|||||||
throw new InvalidArgumentException(sprintf('Invalid type for message argument. Expected object, but got "%s".', \gettype($message)));
|
throw new InvalidArgumentException(sprintf('Invalid type for message argument. Expected object, but got "%s".', \gettype($message)));
|
||||||
}
|
}
|
||||||
|
|
||||||
return \call_user_func($this->callableForNextMiddleware(0), $message);
|
return \call_user_func($this->callableForNextMiddleware(0, Envelope::wrap($message)), $message);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function callableForNextMiddleware(int $index): callable
|
private function callableForNextMiddleware(int $index, Envelope $currentEnvelope): callable
|
||||||
{
|
{
|
||||||
if (null === $this->indexedMiddlewareHandlers) {
|
if (null === $this->indexedMiddlewareHandlers) {
|
||||||
$this->indexedMiddlewareHandlers = \is_array($this->middlewareHandlers) ? array_values($this->middlewareHandlers) : iterator_to_array($this->middlewareHandlers, false);
|
$this->indexedMiddlewareHandlers = \is_array($this->middlewareHandlers) ? array_values($this->middlewareHandlers) : iterator_to_array($this->middlewareHandlers, false);
|
||||||
@ -59,8 +59,19 @@ class MessageBus implements MessageBusInterface
|
|||||||
|
|
||||||
$middleware = $this->indexedMiddlewareHandlers[$index];
|
$middleware = $this->indexedMiddlewareHandlers[$index];
|
||||||
|
|
||||||
return function ($message) use ($middleware, $index) {
|
return function ($message) use ($middleware, $index, $currentEnvelope) {
|
||||||
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
|
if ($message instanceof Envelope) {
|
||||||
|
$currentEnvelope = $message;
|
||||||
|
} else {
|
||||||
|
$message = $currentEnvelope->withMessage($message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$middleware instanceof EnvelopeAwareInterface) {
|
||||||
|
// Do not provide the envelope if the middleware cannot read it:
|
||||||
|
$message = $message->getMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1, $currentEnvelope));
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ interface MessageBusInterface
|
|||||||
*
|
*
|
||||||
* The bus can return a value coming from handlers, but is not required to do so.
|
* The bus can return a value coming from handlers, but is not required to do so.
|
||||||
*
|
*
|
||||||
* @param object $message
|
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
|
||||||
*
|
*
|
||||||
* @return mixed
|
* @return mixed
|
||||||
*/
|
*/
|
||||||
|
@ -0,0 +1,58 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Middleware\Configuration;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\EnvelopeItemInterface;
|
||||||
|
use Symfony\Component\Validator\Constraints\GroupSequence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.1
|
||||||
|
*/
|
||||||
|
final class ValidationConfiguration implements EnvelopeItemInterface
|
||||||
|
{
|
||||||
|
private $groups;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string[]|GroupSequence $groups
|
||||||
|
*/
|
||||||
|
public function __construct($groups)
|
||||||
|
{
|
||||||
|
$this->groups = $groups;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getGroups()
|
||||||
|
{
|
||||||
|
return $this->groups;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function serialize()
|
||||||
|
{
|
||||||
|
$isGroupSequence = $this->groups instanceof GroupSequence;
|
||||||
|
|
||||||
|
return serialize(array(
|
||||||
|
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
|
||||||
|
'is_group_sequence' => $isGroupSequence,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function unserialize($serialized)
|
||||||
|
{
|
||||||
|
list(
|
||||||
|
'groups' => $groups,
|
||||||
|
'is_group_sequence' => $isGroupSequence
|
||||||
|
) = unserialize($serialized, array('allowed_classes' => false));
|
||||||
|
|
||||||
|
$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
|
||||||
|
}
|
||||||
|
}
|
@ -11,7 +11,6 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Middleware;
|
namespace Symfony\Component\Messenger\Middleware;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -51,10 +50,6 @@ class LoggingMiddleware implements MiddlewareInterface
|
|||||||
|
|
||||||
private function createContext($message): array
|
private function createContext($message): array
|
||||||
{
|
{
|
||||||
if ($message instanceof ReceivedMessage) {
|
|
||||||
$message = $message->getMessage();
|
|
||||||
}
|
|
||||||
|
|
||||||
return array(
|
return array(
|
||||||
'message' => $message,
|
'message' => $message,
|
||||||
'class' => \get_class($message),
|
'class' => \get_class($message),
|
||||||
|
@ -11,13 +11,16 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Middleware;
|
namespace Symfony\Component\Messenger\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeAwareInterface;
|
||||||
use Symfony\Component\Messenger\Exception\ValidationFailedException;
|
use Symfony\Component\Messenger\Exception\ValidationFailedException;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
|
||||||
use Symfony\Component\Validator\Validator\ValidatorInterface;
|
use Symfony\Component\Validator\Validator\ValidatorInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
|
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
|
||||||
*/
|
*/
|
||||||
class ValidationMiddleware implements MiddlewareInterface
|
class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
|
||||||
{
|
{
|
||||||
private $validator;
|
private $validator;
|
||||||
|
|
||||||
@ -28,9 +31,17 @@ class ValidationMiddleware implements MiddlewareInterface
|
|||||||
|
|
||||||
public function handle($message, callable $next)
|
public function handle($message, callable $next)
|
||||||
{
|
{
|
||||||
$violations = $this->validator->validate($message);
|
$envelope = Envelope::wrap($message);
|
||||||
|
$subject = $envelope->getMessage();
|
||||||
|
$groups = null;
|
||||||
|
/** @var ValidationConfiguration|null $validationConfig */
|
||||||
|
if ($validationConfig = $envelope->get(ValidationConfiguration::class)) {
|
||||||
|
$groups = $validationConfig->getGroups();
|
||||||
|
}
|
||||||
|
|
||||||
|
$violations = $this->validator->validate($subject, null, $groups);
|
||||||
if (\count($violations)) {
|
if (\count($violations)) {
|
||||||
throw new ValidationFailedException($message, $violations);
|
throw new ValidationFailedException($subject, $violations);
|
||||||
}
|
}
|
||||||
|
|
||||||
return $next($message);
|
return $next($message);
|
||||||
|
@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
|
|||||||
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
|
||||||
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
||||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||||
|
|
||||||
@ -30,12 +31,28 @@ class SendMessageMiddlewareTest extends TestCase
|
|||||||
$sender,
|
$sender,
|
||||||
)));
|
)));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($message);
|
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||||
$next->expects($this->never())->method($this->anything());
|
$next->expects($this->never())->method($this->anything());
|
||||||
|
|
||||||
$middleware->handle($message, $next);
|
$middleware->handle($message, $next);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap(new DummyMessage('Hey'));
|
||||||
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||||
|
|
||||||
|
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
|
||||||
|
$sender,
|
||||||
|
)));
|
||||||
|
|
||||||
|
$sender->expects($this->once())->method('send')->with($envelope);
|
||||||
|
$next->expects($this->never())->method($this->anything());
|
||||||
|
|
||||||
|
$middleware->handle($envelope, $next);
|
||||||
|
}
|
||||||
|
|
||||||
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
|
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
|
||||||
{
|
{
|
||||||
$message = new DummyMessage('Hey');
|
$message = new DummyMessage('Hey');
|
||||||
@ -47,7 +64,7 @@ class SendMessageMiddlewareTest extends TestCase
|
|||||||
null,
|
null,
|
||||||
)));
|
)));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($message);
|
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||||
$next->expects($this->once())->method($this->anything());
|
$next->expects($this->once())->method($this->anything());
|
||||||
|
|
||||||
$middleware->handle($message, $next);
|
$middleware->handle($message, $next);
|
||||||
@ -67,8 +84,7 @@ class SendMessageMiddlewareTest extends TestCase
|
|||||||
|
|
||||||
public function testItSkipsReceivedMessages()
|
public function testItSkipsReceivedMessages()
|
||||||
{
|
{
|
||||||
$innerMessage = new DummyMessage('Hey');
|
$envelope = Envelope::wrap(new DummyMessage('Hey'))->with(new ReceivedMessage());
|
||||||
$message = new ReceivedMessage($innerMessage);
|
|
||||||
|
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||||
@ -78,9 +94,9 @@ class SendMessageMiddlewareTest extends TestCase
|
|||||||
)));
|
)));
|
||||||
|
|
||||||
$sender->expects($this->never())->method('send');
|
$sender->expects($this->never())->method('send');
|
||||||
$next->expects($this->once())->method('__invoke')->with($innerMessage);
|
$next->expects($this->once())->method('__invoke')->with($envelope);
|
||||||
|
|
||||||
$middleware->handle($message, $next);
|
$middleware->handle($envelope, $next);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,30 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Asynchronous\Serialization;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
|
||||||
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*/
|
||||||
|
class SerializerConfigurationTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testSerialiazable()
|
||||||
|
{
|
||||||
|
$config = new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('Default', 'Extra')));
|
||||||
|
|
||||||
|
$this->assertTrue(is_subclass_of(SerializerConfiguration::class, \Serializable::class, true));
|
||||||
|
$this->assertEquals($config, unserialize(serialize($config)));
|
||||||
|
}
|
||||||
|
}
|
@ -17,6 +17,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
|
|||||||
use Symfony\Component\DependencyInjection\Reference;
|
use Symfony\Component\DependencyInjection\Reference;
|
||||||
use Symfony\Component\DependencyInjection\ServiceLocator;
|
use Symfony\Component\DependencyInjection\ServiceLocator;
|
||||||
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
|
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
|
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
|
||||||
@ -357,7 +358,7 @@ class DummyReceiver implements ReceiverInterface
|
|||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
{
|
{
|
||||||
for ($i = 0; $i < 3; ++$i) {
|
for ($i = 0; $i < 3; ++$i) {
|
||||||
$handler(new DummyMessage("Dummy $i"));
|
$handler(Envelope::wrap(new DummyMessage("Dummy $i")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
82
src/Symfony/Component/Messenger/Tests/EnvelopeTest.php
Normal file
82
src/Symfony/Component/Messenger/Tests/EnvelopeTest.php
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeAwareInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*/
|
||||||
|
class EnvelopeTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testConstruct()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope($dummy = new DummyMessage('dummy'), array(
|
||||||
|
$receivedConfig = new ReceivedMessage(),
|
||||||
|
));
|
||||||
|
|
||||||
|
$this->assertSame($dummy, $envelope->getMessage());
|
||||||
|
$this->assertArrayHasKey(ReceivedMessage::class, $configs = $envelope->all());
|
||||||
|
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWrap()
|
||||||
|
{
|
||||||
|
$first = Envelope::wrap($dummy = new DummyMessage('dummy'));
|
||||||
|
|
||||||
|
$this->assertInstanceOf(Envelope::class, $first);
|
||||||
|
$this->assertSame($dummy, $first->getMessage());
|
||||||
|
|
||||||
|
$envelope = Envelope::wrap($first);
|
||||||
|
$this->assertSame($first, $envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWithReturnsNewInstance()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'));
|
||||||
|
|
||||||
|
$this->assertNotSame($envelope, $envelope->with(new ReceivedMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testGet()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
|
||||||
|
->with($config = new ReceivedMessage())
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->assertSame($config, $envelope->get(ReceivedMessage::class));
|
||||||
|
$this->assertNull($envelope->get(ValidationConfiguration::class));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testAll()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
|
||||||
|
->with($receivedConfig = new ReceivedMessage())
|
||||||
|
->with($validationConfig = new ValidationConfiguration(array('foo')))
|
||||||
|
;
|
||||||
|
|
||||||
|
$configs = $envelope->all();
|
||||||
|
$this->assertArrayHasKey(ReceivedMessage::class, $configs);
|
||||||
|
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
|
||||||
|
$this->assertArrayHasKey(ValidationConfiguration::class, $configs);
|
||||||
|
$this->assertSame($validationConfig, $configs[ValidationConfiguration::class]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class FooConfigurationConsumer implements EnvelopeAwareInterface
|
||||||
|
{
|
||||||
|
}
|
@ -12,6 +12,10 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests;
|
namespace Symfony\Component\Messenger\Tests;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeAwareInterface;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeItemInterface;
|
||||||
use Symfony\Component\Messenger\MessageBus;
|
use Symfony\Component\Messenger\MessageBus;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
@ -61,4 +65,117 @@ class MessageBusTest extends TestCase
|
|||||||
|
|
||||||
$this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message));
|
$this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItKeepsTheEnvelopeEvenThroughAMiddlewareThatIsNotEnvelopeAware()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Hello');
|
||||||
|
$envelope = new Envelope($message, array(new ReceivedMessage()));
|
||||||
|
|
||||||
|
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||||
|
$firstMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($message, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, $next) {
|
||||||
|
return $next($message);
|
||||||
|
}));
|
||||||
|
|
||||||
|
$secondMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
|
||||||
|
$secondMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($envelope, $this->anything())
|
||||||
|
;
|
||||||
|
|
||||||
|
$bus = new MessageBus(array(
|
||||||
|
$firstMiddleware,
|
||||||
|
$secondMiddleware,
|
||||||
|
));
|
||||||
|
|
||||||
|
$bus->dispatch($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testThatAMiddlewareCanAddSomeItemsToTheEnvelope()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Hello');
|
||||||
|
$envelope = new Envelope($message, array(new ReceivedMessage()));
|
||||||
|
$envelopeWithAnotherItem = $envelope->with(new AnEnvelopeItem());
|
||||||
|
|
||||||
|
$firstMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
|
||||||
|
$firstMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($envelope, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, $next) {
|
||||||
|
return $next($message->with(new AnEnvelopeItem()));
|
||||||
|
}));
|
||||||
|
|
||||||
|
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||||
|
$secondMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($message, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, $next) {
|
||||||
|
return $next($message);
|
||||||
|
}));
|
||||||
|
|
||||||
|
$thirdMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
|
||||||
|
$thirdMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($envelopeWithAnotherItem, $this->anything())
|
||||||
|
;
|
||||||
|
|
||||||
|
$bus = new MessageBus(array(
|
||||||
|
$firstMiddleware,
|
||||||
|
$secondMiddleware,
|
||||||
|
$thirdMiddleware,
|
||||||
|
));
|
||||||
|
|
||||||
|
$bus->dispatch($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeItems()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Hello');
|
||||||
|
$envelope = new Envelope($message, $items = array(new ReceivedMessage()));
|
||||||
|
|
||||||
|
$changedMessage = new DummyMessage('Changed');
|
||||||
|
$expectedEnvelope = new Envelope($changedMessage, $items);
|
||||||
|
|
||||||
|
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||||
|
$firstMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($message, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, $next) use ($changedMessage) {
|
||||||
|
return $next($changedMessage);
|
||||||
|
}));
|
||||||
|
|
||||||
|
$secondMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
|
||||||
|
$secondMiddleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($expectedEnvelope, $this->anything())
|
||||||
|
;
|
||||||
|
|
||||||
|
$bus = new MessageBus(array(
|
||||||
|
$firstMiddleware,
|
||||||
|
$secondMiddleware,
|
||||||
|
));
|
||||||
|
|
||||||
|
$bus->dispatch($envelope);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class AnEnvelopeItem implements EnvelopeItemInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function serialize()
|
||||||
|
{
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function unserialize($serialized)
|
||||||
|
{
|
||||||
|
return new self();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Middleware\Configuration;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
|
||||||
|
use Symfony\Component\Validator\Constraints\GroupSequence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*/
|
||||||
|
class ValidationConfigurationTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testConfig()
|
||||||
|
{
|
||||||
|
$config = new ValidationConfiguration($groups = array('Default', 'Extra'));
|
||||||
|
$this->assertSame($groups, $config->getGroups());
|
||||||
|
|
||||||
|
$config = new ValidationConfiguration($groups = new GroupSequence(array('Default', 'Then')));
|
||||||
|
$this->assertSame($groups, $config->getGroups());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testSerialiazable()
|
||||||
|
{
|
||||||
|
$this->assertTrue(is_subclass_of(ValidationConfiguration::class, \Serializable::class, true));
|
||||||
|
$this->assertEquals($config = new ValidationConfiguration(array('Default', 'Extra')), unserialize(serialize($config)));
|
||||||
|
$this->assertEquals($config = new ValidationConfiguration(new GroupSequence(array('Default', 'Then'))), unserialize(serialize($config)));
|
||||||
|
}
|
||||||
|
}
|
@ -12,6 +12,8 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Middleware;
|
namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
|
||||||
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
|
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Validator\ConstraintViolationListInterface;
|
use Symfony\Component\Validator\ConstraintViolationListInterface;
|
||||||
@ -43,6 +45,30 @@ class ValidationMiddlewareTest extends TestCase
|
|||||||
$this->assertSame('Hello', $result);
|
$this->assertSame('Hello', $result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testValidateWithConfigurationAndNextMiddleware()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap($message = new DummyMessage('Hey'))->with(new ValidationConfiguration($groups = array('Default', 'Extra')));
|
||||||
|
|
||||||
|
$validator = $this->createMock(ValidatorInterface::class);
|
||||||
|
$validator
|
||||||
|
->expects($this->once())
|
||||||
|
->method('validate')
|
||||||
|
->with($message, null, $groups)
|
||||||
|
->willReturn($this->createMock(ConstraintViolationListInterface::class))
|
||||||
|
;
|
||||||
|
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||||
|
$next
|
||||||
|
->expects($this->once())
|
||||||
|
->method('__invoke')
|
||||||
|
->with($envelope)
|
||||||
|
->willReturn('Hello')
|
||||||
|
;
|
||||||
|
|
||||||
|
$result = (new ValidationMiddleware($validator))->handle($envelope, $next);
|
||||||
|
|
||||||
|
$this->assertSame('Hello', $result);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException
|
* @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException
|
||||||
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation.
|
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation.
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests;
|
namespace Symfony\Component\Messenger\Tests;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\TraceableMessageBus;
|
use Symfony\Component\Messenger\TraceableMessageBus;
|
||||||
@ -30,6 +31,18 @@ class TraceableMessageBusTest extends TestCase
|
|||||||
$this->assertSame(array(array('message' => $message, 'result' => $result)), $traceableBus->getDispatchedMessages());
|
$this->assertSame(array(array('message' => $message, 'result' => $result)), $traceableBus->getDispatchedMessages());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItTracesResultWithEnvelope()
|
||||||
|
{
|
||||||
|
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
$bus->expects($this->once())->method('dispatch')->with($envelope)->willReturn($result = array('foo' => 'bar'));
|
||||||
|
|
||||||
|
$traceableBus = new TraceableMessageBus($bus);
|
||||||
|
$this->assertSame($result, $traceableBus->dispatch($envelope));
|
||||||
|
$this->assertSame(array(array('message' => $message, 'result' => $result)), $traceableBus->getDispatchedMessages());
|
||||||
|
}
|
||||||
|
|
||||||
public function testItTracesExceptions()
|
public function testItTracesExceptions()
|
||||||
{
|
{
|
||||||
$message = new DummyMessage('Hello');
|
$message = new DummyMessage('Hello');
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
@ -50,12 +51,12 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($serializer, $connection);
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($serializer, $connection);
|
||||||
|
|
||||||
$sender->send($firstMessage = new DummyMessage('First'));
|
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
||||||
$sender->send($secondMessage = new DummyMessage('Second'));
|
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
|
||||||
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
|
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
|
||||||
|
|
||||||
if (2 === ++$receivedMessages) {
|
if (2 === ++$receivedMessages) {
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
@ -74,7 +75,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($serializer, $connection);
|
||||||
$sender->send(new DummyMessage('Hello'));
|
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
||||||
|
|
||||||
$amqpReadTimeout = 30;
|
$amqpReadTimeout = 30;
|
||||||
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
|
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
|
||||||
@ -98,7 +99,15 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
|
|
||||||
$this->assertFalse($process->isRunning());
|
$this->assertFalse($process->isRunning());
|
||||||
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
|
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
|
||||||
$this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput());
|
$this->assertSame($expectedOutput.<<<'TXT'
|
||||||
|
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
||||||
|
with items: [
|
||||||
|
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
|
||||||
|
]
|
||||||
|
Done.
|
||||||
|
|
||||||
|
TXT
|
||||||
|
, $process->getOutput());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -114,12 +123,11 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($serializer, $connection);
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
||||||
$this->assertNull($message);
|
$this->assertNull($envelope);
|
||||||
|
|
||||||
if (2 === ++$receivedMessages) {
|
if (2 === ++$receivedMessages) {
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
||||||
@ -44,8 +45,8 @@ class AmqpReceiverTest extends TestCase
|
|||||||
$connection->expects($this->once())->method('ack')->with($envelope);
|
$connection->expects($this->once())->method('ack')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($serializer, $connection);
|
||||||
$receiver->receive(function ($message) use ($receiver) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
$this->assertEquals(new DummyMessage('Hi'), $message);
|
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
@ -24,16 +25,16 @@ class AmqpSenderTest extends TestCase
|
|||||||
{
|
{
|
||||||
public function testItSendsTheEncodedMessage()
|
public function testItSendsTheEncodedMessage()
|
||||||
{
|
{
|
||||||
$message = new DummyMessage('Oy');
|
$envelope = Envelope::wrap(new DummyMessage('Oy'));
|
||||||
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
|
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
|
||||||
|
|
||||||
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
|
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
|
||||||
$encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded);
|
$encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
||||||
|
|
||||||
$sender = new AmqpSender($encoder, $connection);
|
$sender = new AmqpSender($encoder, $connection);
|
||||||
$sender->send($message);
|
$sender->send($envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,10 +12,9 @@ if (!file_exists($autoload)) {
|
|||||||
|
|
||||||
require_once $autoload;
|
require_once $autoload;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
@ -27,13 +26,14 @@ $serializer = new Serializer(
|
|||||||
);
|
);
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('DSN'));
|
$connection = Connection::fromDsn(getenv('DSN'));
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($serializer, $connection);
|
||||||
|
|
||||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||||
public function dispatch($message)
|
public function dispatch($envelope)
|
||||||
{
|
{
|
||||||
echo 'Get message: '.get_class($message)."\n";
|
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
|
||||||
|
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
|
||||||
|
|
||||||
sleep(30);
|
sleep(30);
|
||||||
echo "Done.\n";
|
echo "Done.\n";
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
@ -25,7 +26,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
|
|||||||
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
|
||||||
{
|
{
|
||||||
$callable = function ($handler) {
|
$callable = function ($handler) {
|
||||||
$handler(new DummyMessage('API'));
|
$handler(Envelope::wrap(new DummyMessage('API')));
|
||||||
};
|
};
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
@ -58,7 +59,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
|
|||||||
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
|
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
|
||||||
{
|
{
|
||||||
$callable = function ($handler) {
|
$callable = function ($handler) {
|
||||||
$handler(new DummyMessage('API'));
|
$handler(Envelope::wrap(new DummyMessage('API')));
|
||||||
};
|
};
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
|
||||||
@ -25,9 +26,9 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
|
|||||||
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
|
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
|
||||||
{
|
{
|
||||||
$callable = function ($handler) {
|
$callable = function ($handler) {
|
||||||
$handler(new DummyMessage('First message'));
|
$handler(Envelope::wrap(new DummyMessage('First message')));
|
||||||
$handler(new DummyMessage('Second message'));
|
$handler(Envelope::wrap(new DummyMessage('Second message')));
|
||||||
$handler(new DummyMessage('Third message'));
|
$handler(Envelope::wrap(new DummyMessage('Third message')));
|
||||||
};
|
};
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
@ -78,7 +79,7 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
|
|||||||
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
|
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
|
||||||
{
|
{
|
||||||
$callable = function ($handler) {
|
$callable = function ($handler) {
|
||||||
$handler(new DummyMessage('First message'));
|
$handler(Envelope::wrap(new DummyMessage('First message')));
|
||||||
};
|
};
|
||||||
|
|
||||||
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
|
||||||
|
@ -12,8 +12,11 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
|
namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
@ -26,9 +29,23 @@ class SerializerTest extends TestCase
|
|||||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||||
);
|
);
|
||||||
|
|
||||||
$message = new DummyMessage('Hello');
|
$envelope = Envelope::wrap(new DummyMessage('Hello'));
|
||||||
|
|
||||||
$this->assertEquals($message, $serializer->decode($serializer->encode($message)));
|
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testEncodedWithConfigurationIsDecodable()
|
||||||
|
{
|
||||||
|
$serializer = new Serializer(
|
||||||
|
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||||
|
);
|
||||||
|
|
||||||
|
$envelope = Envelope::wrap(new DummyMessage('Hello'))
|
||||||
|
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
|
||||||
|
->with(new ValidationConfiguration(array('foo', 'bar')))
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testEncodedIsHavingTheBodyAndTypeHeader()
|
public function testEncodedIsHavingTheBodyAndTypeHeader()
|
||||||
@ -37,11 +54,12 @@ class SerializerTest extends TestCase
|
|||||||
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||||
);
|
);
|
||||||
|
|
||||||
$encoded = $serializer->encode(new DummyMessage('Hello'));
|
$encoded = $serializer->encode(Envelope::wrap(new DummyMessage('Hello')));
|
||||||
|
|
||||||
$this->assertArrayHasKey('body', $encoded);
|
$this->assertArrayHasKey('body', $encoded);
|
||||||
$this->assertArrayHasKey('headers', $encoded);
|
$this->assertArrayHasKey('headers', $encoded);
|
||||||
$this->assertArrayHasKey('type', $encoded['headers']);
|
$this->assertArrayHasKey('type', $encoded['headers']);
|
||||||
|
$this->assertArrayNotHasKey('X-Message-Envelope-Items', $encoded['headers']);
|
||||||
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,10 +73,31 @@ class SerializerTest extends TestCase
|
|||||||
|
|
||||||
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
|
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
|
||||||
|
|
||||||
$encoded = $encoder->encode($message);
|
$encoded = $encoder->encode(Envelope::wrap($message));
|
||||||
$decoded = $encoder->decode($encoded);
|
$decoded = $encoder->decode($encoded);
|
||||||
|
|
||||||
$this->assertSame('Yay', $encoded['body']);
|
$this->assertSame('Yay', $encoded['body']);
|
||||||
$this->assertSame($message, $decoded);
|
$this->assertSame($message, $decoded->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testEncodedWithSerializationConfiguration()
|
||||||
|
{
|
||||||
|
$serializer = new Serializer(
|
||||||
|
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
|
||||||
|
);
|
||||||
|
|
||||||
|
$envelope = Envelope::wrap(new DummyMessage('Hello'))
|
||||||
|
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
|
||||||
|
->with(new ValidationConfiguration(array('foo', 'bar')))
|
||||||
|
;
|
||||||
|
|
||||||
|
$encoded = $serializer->encode($envelope);
|
||||||
|
|
||||||
|
$this->assertArrayHasKey('body', $encoded);
|
||||||
|
$this->assertArrayHasKey('headers', $encoded);
|
||||||
|
$this->assertArrayHasKey('type', $encoded['headers']);
|
||||||
|
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
||||||
|
$this->assertArrayHasKey('X-Message-Envelope-Items', $encoded['headers']);
|
||||||
|
$this->assertSame('a:2:{s:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration";C:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration":59:{a:1:{s:7:"context";a:1:{s:6:"groups";a:1:{i:0;s:3:"foo";}}}}s:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration";C:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration":82:{a:2:{s:6:"groups";a:2:{i:0;s:3:"foo";i:1;s:3:"bar";}s:17:"is_group_sequence";b:0;}}}', $encoded['headers']['X-Message-Envelope-Items']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
@ -22,29 +23,33 @@ class WorkerTest extends TestCase
|
|||||||
{
|
{
|
||||||
public function testWorkerDispatchTheReceivedMessage()
|
public function testWorkerDispatchTheReceivedMessage()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$apiMessage = new DummyMessage('API');
|
||||||
$handler(new DummyMessage('API'));
|
$ipaMessage = new DummyMessage('IPA');
|
||||||
$handler(new DummyMessage('IPA'));
|
|
||||||
|
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
|
||||||
|
$handler(Envelope::wrap($apiMessage));
|
||||||
|
$handler(Envelope::wrap($ipaMessage));
|
||||||
});
|
});
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
|
$bus->expects($this->at(0))->method('dispatch')->with(Envelope::wrap($apiMessage)->with(new ReceivedMessage()));
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA')));
|
$bus->expects($this->at(1))->method('dispatch')->with(Envelope::wrap($ipaMessage)->with(new ReceivedMessage()));
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages()
|
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$envelop = Envelope::wrap(new DummyMessage('API'))->with(new ReceivedMessage());
|
||||||
$handler(new ReceivedMessage(new DummyMessage('API')));
|
$receiver = new CallbackReceiver(function ($handler) use ($envelop) {
|
||||||
|
$handler($envelop);
|
||||||
});
|
});
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
|
$bus->expects($this->at(0))->method('dispatch')->with($envelop);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
@ -54,7 +59,7 @@ class WorkerTest extends TestCase
|
|||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new CallbackReceiver(function ($handler) {
|
||||||
try {
|
try {
|
||||||
$handler(new DummyMessage('Hello'));
|
$handler(Envelope::wrap(new DummyMessage('Hello')));
|
||||||
|
|
||||||
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
|
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
|
||||||
} catch (\InvalidArgumentException $e) {
|
} catch (\InvalidArgumentException $e) {
|
||||||
|
@ -29,18 +29,20 @@ class TraceableMessageBus implements MessageBusInterface
|
|||||||
*/
|
*/
|
||||||
public function dispatch($message)
|
public function dispatch($message)
|
||||||
{
|
{
|
||||||
|
$messageToTrace = $message instanceof Envelope ? $message->getMessage() : $message;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$result = $this->decoratedBus->dispatch($message);
|
$result = $this->decoratedBus->dispatch($message);
|
||||||
|
|
||||||
$this->dispatchedMessages[] = array(
|
$this->dispatchedMessages[] = array(
|
||||||
'message' => $message,
|
'message' => $messageToTrace,
|
||||||
'result' => $result,
|
'result' => $result,
|
||||||
);
|
);
|
||||||
|
|
||||||
return $result;
|
return $result;
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$this->dispatchedMessages[] = array(
|
$this->dispatchedMessages[] = array(
|
||||||
'message' => $message,
|
'message' => $messageToTrace,
|
||||||
'exception' => $e,
|
'exception' => $e,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
|||||||
*/
|
*/
|
||||||
class AmqpReceiver implements ReceiverInterface
|
class AmqpReceiver implements ReceiverInterface
|
||||||
{
|
{
|
||||||
private $messageDecoder;
|
private $decoder;
|
||||||
private $connection;
|
private $connection;
|
||||||
private $shouldStop;
|
private $shouldStop;
|
||||||
|
|
||||||
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
|
public function __construct(DecoderInterface $decoder, Connection $connection)
|
||||||
{
|
{
|
||||||
$this->messageDecoder = $messageDecoder;
|
$this->decoder = $decoder;
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,8 +38,8 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
{
|
{
|
||||||
while (!$this->shouldStop) {
|
while (!$this->shouldStop) {
|
||||||
$message = $this->connection->get();
|
$AMQPEnvelope = $this->connection->get();
|
||||||
if (null === $message) {
|
if (null === $AMQPEnvelope) {
|
||||||
$handler(null);
|
$handler(null);
|
||||||
|
|
||||||
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
|
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
|
||||||
@ -51,18 +51,18 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$handler($this->messageDecoder->decode(array(
|
$handler($this->decoder->decode(array(
|
||||||
'body' => $message->getBody(),
|
'body' => $AMQPEnvelope->getBody(),
|
||||||
'headers' => $message->getHeaders(),
|
'headers' => $AMQPEnvelope->getHeaders(),
|
||||||
)));
|
)));
|
||||||
|
|
||||||
$this->connection->ack($message);
|
$this->connection->ack($AMQPEnvelope);
|
||||||
} catch (RejectMessageExceptionInterface $e) {
|
} catch (RejectMessageExceptionInterface $e) {
|
||||||
$this->connection->reject($message);
|
$this->connection->reject($AMQPEnvelope);
|
||||||
|
|
||||||
throw $e;
|
throw $e;
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$this->connection->nack($message, AMQP_REQUEUE);
|
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
|
||||||
|
|
||||||
throw $e;
|
throw $e;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||||
|
|
||||||
@ -21,21 +22,21 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
|||||||
*/
|
*/
|
||||||
class AmqpSender implements SenderInterface
|
class AmqpSender implements SenderInterface
|
||||||
{
|
{
|
||||||
private $messageEncoder;
|
private $encoder;
|
||||||
private $connection;
|
private $connection;
|
||||||
|
|
||||||
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
|
public function __construct(EncoderInterface $encoder, Connection $connection)
|
||||||
{
|
{
|
||||||
$this->messageEncoder = $messageEncoder;
|
$this->encoder = $encoder;
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function send($message): void
|
public function send(Envelope $envelope)
|
||||||
{
|
{
|
||||||
$encodedMessage = $this->messageEncoder->encode($message);
|
$encodedMessage = $this->encoder->encode($envelope);
|
||||||
|
|
||||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
|
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -36,8 +37,8 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
|
|||||||
|
|
||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
{
|
{
|
||||||
$this->decoratedReceiver->receive(function ($message) use ($handler) {
|
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
|
||||||
$handler($message);
|
$handler($envelope);
|
||||||
|
|
||||||
$memoryResolver = $this->memoryResolver;
|
$memoryResolver = $this->memoryResolver;
|
||||||
if ($memoryResolver() > $this->memoryLimit) {
|
if ($memoryResolver() > $this->memoryLimit) {
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
namespace Symfony\Component\Messenger\Transport\Enhancers;
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -34,10 +35,10 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
|
|
||||||
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
|
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
|
||||||
$handler($message);
|
$handler($envelope);
|
||||||
|
|
||||||
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
|
||||||
$this->stop();
|
$this->stop();
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));
|
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));
|
||||||
|
@ -21,8 +21,8 @@ interface ReceiverInterface
|
|||||||
/**
|
/**
|
||||||
* Receive some messages to the given handler.
|
* Receive some messages to the given handler.
|
||||||
*
|
*
|
||||||
* The handler will have, as argument, the received message. Note that this message
|
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
|
||||||
* can be `null` if the timeout to receive something has expired.
|
* Note that this envelope can be `null` if the timeout to receive something has expired.
|
||||||
*/
|
*/
|
||||||
public function receive(callable $handler): void;
|
public function receive(callable $handler): void;
|
||||||
|
|
||||||
|
@ -11,6 +11,8 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport;
|
namespace Symfony\Component\Messenger\Transport;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*
|
*
|
||||||
@ -19,9 +21,9 @@ namespace Symfony\Component\Messenger\Transport;
|
|||||||
interface SenderInterface
|
interface SenderInterface
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Sends the given message.
|
* Sends the given envelope.
|
||||||
*
|
*
|
||||||
* @param object $message
|
* @param Envelope $envelope
|
||||||
*/
|
*/
|
||||||
public function send($message): void;
|
public function send(Envelope $envelope);
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,8 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Serialization;
|
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*
|
*
|
||||||
@ -19,16 +21,16 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
|
|||||||
interface DecoderInterface
|
interface DecoderInterface
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Decodes the message from an encoded-form.
|
* Decodes an envelope and its message from an encoded-form.
|
||||||
*
|
*
|
||||||
* The `$encodedMessage` parameter is a key-value array that
|
* The `$encodedEnvelope` parameter is a key-value array that
|
||||||
* describes the message, that will be used by the different transports.
|
* describes the envelope and its content, that will be used by the different transports.
|
||||||
*
|
*
|
||||||
* The most common keys are:
|
* The most common keys are:
|
||||||
* - `body` (string) - the message body
|
* - `body` (string) - the message body
|
||||||
* - `headers` (string<string>) - a key/value pair of headers
|
* - `headers` (string<string>) - a key/value pair of headers
|
||||||
*
|
*
|
||||||
* @return object
|
* @return Envelope
|
||||||
*/
|
*/
|
||||||
public function decode(array $encodedMessage);
|
public function decode(array $encodedEnvelope): Envelope;
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Serialization;
|
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
@ -19,14 +20,14 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
|
|||||||
interface EncoderInterface
|
interface EncoderInterface
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Encodes a message to a common format understandable by transports. The encoded array should only
|
* Encodes an envelope content (message & items) to a common format understandable by transports.
|
||||||
* contain scalar and arrays.
|
* The encoded array should only contain scalar and arrays.
|
||||||
*
|
*
|
||||||
* The most common keys of the encoded array are:
|
* The most common keys of the encoded array are:
|
||||||
* - `body` (string) - the message body
|
* - `body` (string) - the message body
|
||||||
* - `headers` (string<string>) - a key/value pair of headers
|
* - `headers` (string<string>) - a key/value pair of headers
|
||||||
*
|
*
|
||||||
* @param object $message The object that is put on the MessageBus by the user
|
* @param Envelope $envelope The envelop containing the message put on the MessageBus by the user
|
||||||
*/
|
*/
|
||||||
public function encode($message): array;
|
public function encode(Envelope $envelope): array;
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Serialization;
|
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Serializer\SerializerInterface;
|
use Symfony\Component\Serializer\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -32,27 +33,48 @@ class Serializer implements DecoderInterface, EncoderInterface
|
|||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function decode(array $encodedMessage)
|
public function decode(array $encodedEnvelope): Envelope
|
||||||
{
|
{
|
||||||
if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) {
|
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
|
||||||
throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.');
|
throw new \InvalidArgumentException('Encoded envelope should have at least a `body` and some `headers`.');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (empty($encodedMessage['headers']['type'])) {
|
if (empty($encodedEnvelope['headers']['type'])) {
|
||||||
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
|
throw new \InvalidArgumentException('Encoded envelope does not have a `type` header.');
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
|
$envelopeItems = isset($encodedEnvelope['headers']['X-Message-Envelope-Items']) ? unserialize($encodedEnvelope['headers']['X-Message-Envelope-Items']) : array();
|
||||||
|
|
||||||
|
$context = $this->context;
|
||||||
|
/** @var SerializerConfiguration|null $serializerConfig */
|
||||||
|
if ($serializerConfig = $envelopeItems[SerializerConfiguration::class] ?? null) {
|
||||||
|
$context = $serializerConfig->getContext() + $context;
|
||||||
|
}
|
||||||
|
|
||||||
|
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
|
||||||
|
|
||||||
|
return new Envelope($message, $envelopeItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function encode($message): array
|
public function encode(Envelope $envelope): array
|
||||||
{
|
{
|
||||||
|
$context = $this->context;
|
||||||
|
/** @var SerializerConfiguration|null $serializerConfig */
|
||||||
|
if ($serializerConfig = $envelope->get(SerializerConfiguration::class)) {
|
||||||
|
$context = $serializerConfig->getContext() + $context;
|
||||||
|
}
|
||||||
|
|
||||||
|
$headers = array('type' => \get_class($envelope->getMessage()));
|
||||||
|
if ($configurations = $envelope->all()) {
|
||||||
|
$headers['X-Message-Envelope-Items'] = serialize($configurations);
|
||||||
|
}
|
||||||
|
|
||||||
return array(
|
return array(
|
||||||
'body' => $this->serializer->serialize($message, $this->format, $this->context),
|
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
|
||||||
'headers' => array('type' => \get_class($message)),
|
'headers' => $headers,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,46 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\EnvelopeItemInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.1
|
||||||
|
*/
|
||||||
|
final class SerializerConfiguration implements EnvelopeItemInterface
|
||||||
|
{
|
||||||
|
private $context;
|
||||||
|
|
||||||
|
public function __construct(array $context)
|
||||||
|
{
|
||||||
|
$this->context = $context;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getContext(): array
|
||||||
|
{
|
||||||
|
return $this->context;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function serialize()
|
||||||
|
{
|
||||||
|
return serialize(array('context' => $this->context));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function unserialize($serialized)
|
||||||
|
{
|
||||||
|
list('context' => $context) = unserialize($serialized, array('allowed_classes' => false));
|
||||||
|
|
||||||
|
$this->__construct($context);
|
||||||
|
}
|
||||||
|
}
|
@ -41,16 +41,12 @@ class Worker
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->receiver->receive(function ($message) {
|
$this->receiver->receive(function (?Envelope $envelope) {
|
||||||
if (null === $message) {
|
if (null === $envelope) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$message instanceof ReceivedMessage) {
|
$this->bus->dispatch($envelope->with(new ReceivedMessage()));
|
||||||
$message = new ReceivedMessage($message);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->bus->dispatch($message);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user