Allows to register handlers on a specific transport (and get rid of this handler alias)
This commit is contained in:
parent
96a7907979
commit
f0b2acd67d
@ -81,6 +81,12 @@ CHANGELOG
|
||||
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
|
||||
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
|
||||
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.
|
||||
* [BC BREAK] The `HandlersLocationInterface::getHandlers` method needs to return `HandlerDescriptor`
|
||||
instances instead of callables.
|
||||
* [BC BREAK] The `HandledStamp` stamp has changed: `handlerAlias` has been renamed to `handlerName`,
|
||||
`getCallableName` has been removed and its constructor only has 2 arguments now.
|
||||
* [BC BREAK] The `ReceivedStamp` needs to exposes the name of the transport from which the message
|
||||
has been received.
|
||||
|
||||
4.2.0
|
||||
-----
|
||||
|
@ -84,7 +84,9 @@ EOF
|
||||
foreach ($handlersByMessage as $message => $handlers) {
|
||||
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
|
||||
foreach ($handlers as $handler) {
|
||||
$tableRows[] = [sprintf(' handled by <info>%s</>', $handler)];
|
||||
$tableRows[] = [
|
||||
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,4 +99,18 @@ EOF
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function formatConditions(array $options): string
|
||||
{
|
||||
if (!$options) {
|
||||
return '';
|
||||
}
|
||||
|
||||
$optionsMapping = [];
|
||||
foreach ($options as $key => $value) {
|
||||
$optionsMapping[] = ' '.$key.'='.$value;
|
||||
}
|
||||
|
||||
return ' (when'.implode(', ', $optionsMapping).')';
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Definition;
|
||||
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
|
||||
use Symfony\Component\DependencyInjection\Reference;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
|
||||
use Symfony\Component\Messenger\TraceableMessageBus;
|
||||
@ -94,32 +95,33 @@ class MessengerPass implements CompilerPassInterface
|
||||
$message = null;
|
||||
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
|
||||
|
||||
foreach ($handles as $message => $method) {
|
||||
foreach ($handles as $message => $options) {
|
||||
$buses = $handlerBuses;
|
||||
|
||||
if (\is_int($message)) {
|
||||
$message = $method;
|
||||
$method = '__invoke';
|
||||
if (\is_string($options)) {
|
||||
$message = $options;
|
||||
$options = [];
|
||||
} else {
|
||||
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
|
||||
}
|
||||
}
|
||||
|
||||
if (\is_array($message)) {
|
||||
list($message, $priority) = $message;
|
||||
} else {
|
||||
$priority = $tag['priority'] ?? 0;
|
||||
if (\is_string($options)) {
|
||||
$options = ['method' => $options];
|
||||
}
|
||||
|
||||
if (\is_array($method)) {
|
||||
if (isset($method['bus'])) {
|
||||
if (!\in_array($method['bus'], $busIds)) {
|
||||
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
|
||||
$priority = $tag['priority'] ?? $options['priority'] ?? 0;
|
||||
$method = $options['method'] ?? '__invoke';
|
||||
|
||||
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
|
||||
}
|
||||
if (isset($options['bus'])) {
|
||||
if (!\in_array($options['bus'], $busIds)) {
|
||||
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
|
||||
|
||||
$buses = [$method['bus']];
|
||||
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
|
||||
}
|
||||
|
||||
$priority = $method['priority'] ?? $priority;
|
||||
$method = $method['method'] ?? '__invoke';
|
||||
$buses = [$options['bus']];
|
||||
}
|
||||
|
||||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
||||
@ -141,7 +143,7 @@ class MessengerPass implements CompilerPassInterface
|
||||
}
|
||||
|
||||
foreach ($buses as $handlerBus) {
|
||||
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
|
||||
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,15 +156,20 @@ class MessengerPass implements CompilerPassInterface
|
||||
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
|
||||
foreach ($handlersByMessage as $message => $handlersByPriority) {
|
||||
krsort($handlersByPriority);
|
||||
$handlersByBusAndMessage[$bus][$message] = array_unique(array_merge(...$handlersByPriority));
|
||||
$handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
|
||||
}
|
||||
}
|
||||
|
||||
$handlersLocatorMappingByBus = [];
|
||||
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
|
||||
foreach ($handlersByMessage as $message => $handlerIds) {
|
||||
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
|
||||
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlers);
|
||||
foreach ($handlersByMessage as $message => $handlers) {
|
||||
$handlerDescriptors = [];
|
||||
foreach ($handlers as $handler) {
|
||||
$definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
|
||||
$handlerDescriptors[] = new Reference($definitionId);
|
||||
}
|
||||
|
||||
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
|
||||
}
|
||||
}
|
||||
$container->addDefinitions($definitions);
|
||||
|
@ -52,7 +52,7 @@ trait HandleTrait
|
||||
|
||||
if (\count($handledStamps) > 1) {
|
||||
$handlers = implode(', ', array_map(function (HandledStamp $stamp): string {
|
||||
return sprintf('"%s"', $stamp->getHandlerAlias() ?? $stamp->getCallableName());
|
||||
return sprintf('"%s"', $stamp->getHandlerName());
|
||||
}, $handledStamps));
|
||||
|
||||
throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));
|
||||
|
@ -0,0 +1,82 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Handler;
|
||||
|
||||
/**
|
||||
* Describes a handler and the possible associated options, such as `from_transport`, `bus`, etc.
|
||||
*
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
final class HandlerDescriptor
|
||||
{
|
||||
private $handler;
|
||||
private $options;
|
||||
|
||||
public function __construct(callable $handler, array $options = [])
|
||||
{
|
||||
$this->handler = $handler;
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
public function getHandler(): callable
|
||||
{
|
||||
return $this->handler;
|
||||
}
|
||||
|
||||
public function getName(): string
|
||||
{
|
||||
$name = $this->callableName($this->handler);
|
||||
$alias = $this->options['alias'] ?? null;
|
||||
|
||||
if (null !== $alias) {
|
||||
$name .= '@'.$alias;
|
||||
}
|
||||
|
||||
return $name;
|
||||
}
|
||||
|
||||
public function getOption(string $option)
|
||||
{
|
||||
return $this->options[$option] ?? null;
|
||||
}
|
||||
|
||||
private function callableName(callable $handler)
|
||||
{
|
||||
if (\is_array($handler)) {
|
||||
if (\is_object($handler[0])) {
|
||||
return \get_class($handler[0]).'::'.$handler[1];
|
||||
}
|
||||
|
||||
return $handler[0].'::'.$handler[1];
|
||||
}
|
||||
|
||||
if (\is_string($handler)) {
|
||||
return $handler;
|
||||
}
|
||||
|
||||
if ($handler instanceof \Closure) {
|
||||
$r = new \ReflectionFunction($handler);
|
||||
if (false !== strpos($r->name, '{closure}')) {
|
||||
return 'Closure';
|
||||
}
|
||||
if ($class = $r->getClosureScopeClass()) {
|
||||
return $class->name.'::'.$r->name;
|
||||
}
|
||||
|
||||
return $r->name;
|
||||
}
|
||||
|
||||
return \get_class($handler).'::__invoke';
|
||||
}
|
||||
}
|
@ -12,11 +12,13 @@
|
||||
namespace Symfony\Component\Messenger\Handler;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
|
||||
/**
|
||||
* Maps a message to a list of handlers.
|
||||
*
|
||||
* @author Nicolas Grekas <p@tchwork.com>
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*
|
||||
* @experimental in 4.2
|
||||
*/
|
||||
@ -25,7 +27,7 @@ class HandlersLocator implements HandlersLocatorInterface
|
||||
private $handlers;
|
||||
|
||||
/**
|
||||
* @param callable[][] $handlers
|
||||
* @param HandlerDescriptor[][]|callable[][] $handlers
|
||||
*/
|
||||
public function __construct(array $handlers)
|
||||
{
|
||||
@ -40,10 +42,23 @@ class HandlersLocator implements HandlersLocatorInterface
|
||||
$seen = [];
|
||||
|
||||
foreach (self::listTypes($envelope) as $type) {
|
||||
foreach ($this->handlers[$type] ?? [] as $alias => $handler) {
|
||||
if (!\in_array($handler, $seen, true)) {
|
||||
yield $alias => $seen[] = $handler;
|
||||
foreach ($this->handlers[$type] ?? [] as $handlerDescriptor) {
|
||||
if (\is_callable($handlerDescriptor)) {
|
||||
$handlerDescriptor = new HandlerDescriptor($handlerDescriptor);
|
||||
}
|
||||
|
||||
if (!$this->shouldHandle($envelope, $handlerDescriptor)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$name = $handlerDescriptor->getName();
|
||||
if (\in_array($name, $seen)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$seen[] = $name;
|
||||
|
||||
yield $handlerDescriptor;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -60,4 +75,17 @@ class HandlersLocator implements HandlersLocatorInterface
|
||||
+ class_implements($class)
|
||||
+ ['*' => '*'];
|
||||
}
|
||||
|
||||
private function shouldHandle(Envelope $envelope, HandlerDescriptor $handlerDescriptor)
|
||||
{
|
||||
if (null === $received = $envelope->last(ReceivedStamp::class)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (null === $expectedTransport = $handlerDescriptor->getOption('from_transport')) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return $received->getTransportName() === $expectedTransport;
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ interface HandlersLocatorInterface
|
||||
/**
|
||||
* Returns the handlers for the given message name.
|
||||
*
|
||||
* @return iterable|callable[] Indexed by handler alias if available
|
||||
* @return iterable|HandlerDescriptor[] Indexed by handler alias if available
|
||||
*/
|
||||
public function getHandlers(Envelope $envelope): iterable;
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ use Psr\Log\NullLogger;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
|
||||
use Symfony\Component\Messenger\Stamp\HandledStamp;
|
||||
|
||||
@ -54,17 +55,16 @@ class HandleMessageMiddleware implements MiddlewareInterface
|
||||
];
|
||||
|
||||
$exceptions = [];
|
||||
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
|
||||
$alias = \is_string($alias) ? $alias : null;
|
||||
|
||||
if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
|
||||
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
|
||||
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
|
||||
$handler = $handlerDescriptor->getHandler();
|
||||
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
|
||||
$envelope = $envelope->with($handledStamp);
|
||||
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
|
||||
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
|
||||
} catch (\Throwable $e) {
|
||||
$exceptions[] = $e;
|
||||
}
|
||||
@ -85,12 +85,11 @@ class HandleMessageMiddleware implements MiddlewareInterface
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
|
||||
private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
|
||||
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
|
||||
{
|
||||
$some = array_filter($envelope
|
||||
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
|
||||
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
|
||||
$stamp->getHandlerAlias() === $alias;
|
||||
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
|
||||
return $stamp->getHandlerName() === $handlerDescriptor->getName();
|
||||
});
|
||||
|
||||
return \count($some) > 0;
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
|
||||
/**
|
||||
* Stamp identifying a message handled by the `HandleMessageMiddleware` middleware
|
||||
* and storing the handler returned value.
|
||||
@ -24,54 +26,23 @@ namespace Symfony\Component\Messenger\Stamp;
|
||||
final class HandledStamp implements StampInterface
|
||||
{
|
||||
private $result;
|
||||
private $callableName;
|
||||
private $handlerAlias;
|
||||
private $handlerName;
|
||||
|
||||
/**
|
||||
* @param mixed $result The returned value of the message handler
|
||||
*/
|
||||
public function __construct($result, string $callableName, string $handlerAlias = null)
|
||||
public function __construct($result, string $handlerName)
|
||||
{
|
||||
$this->result = $result;
|
||||
$this->callableName = $callableName;
|
||||
$this->handlerAlias = $handlerAlias;
|
||||
$this->handlerName = $handlerName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $result The returned value of the message handler
|
||||
*/
|
||||
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
|
||||
public static function fromDescriptor(HandlerDescriptor $handler, $result): self
|
||||
{
|
||||
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
|
||||
}
|
||||
|
||||
public static function getNameFromCallable(callable $handler): string
|
||||
{
|
||||
if (\is_array($handler)) {
|
||||
if (\is_object($handler[0])) {
|
||||
return \get_class($handler[0]).'::'.$handler[1];
|
||||
}
|
||||
|
||||
return $handler[0].'::'.$handler[1];
|
||||
}
|
||||
|
||||
if (\is_string($handler)) {
|
||||
return $handler;
|
||||
}
|
||||
|
||||
if ($handler instanceof \Closure) {
|
||||
$r = new \ReflectionFunction($handler);
|
||||
if (false !== strpos($r->name, '{closure}')) {
|
||||
return 'Closure';
|
||||
}
|
||||
if ($class = $r->getClosureScopeClass()) {
|
||||
return $class->name.'::'.$r->name;
|
||||
}
|
||||
|
||||
return $r->name;
|
||||
}
|
||||
|
||||
return \get_class($handler).'::__invoke';
|
||||
return new self($result, $handler->getName());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -82,13 +53,8 @@ final class HandledStamp implements StampInterface
|
||||
return $this->result;
|
||||
}
|
||||
|
||||
public function getCallableName(): string
|
||||
public function getHandlerName(): string
|
||||
{
|
||||
return $this->callableName;
|
||||
}
|
||||
|
||||
public function getHandlerAlias(): ?string
|
||||
{
|
||||
return $this->handlerAlias;
|
||||
return $this->handlerName;
|
||||
}
|
||||
}
|
||||
|
@ -27,4 +27,15 @@ use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||
*/
|
||||
final class ReceivedStamp implements StampInterface
|
||||
{
|
||||
private $transportName;
|
||||
|
||||
public function __construct(string $transportName)
|
||||
{
|
||||
$this->transportName = $transportName;
|
||||
}
|
||||
|
||||
public function getTransportName(): string
|
||||
{
|
||||
return $this->transportName;
|
||||
}
|
||||
}
|
||||
|
@ -40,12 +40,12 @@ class DebugCommandTest extends TestCase
|
||||
{
|
||||
$command = new DebugCommand([
|
||||
'command_bus' => [
|
||||
DummyCommand::class => [DummyCommandHandler::class],
|
||||
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
|
||||
DummyCommand::class => [[DummyCommandHandler::class, []]],
|
||||
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
|
||||
],
|
||||
'query_bus' => [
|
||||
DummyQuery::class => [DummyQueryHandler::class],
|
||||
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
|
||||
DummyQuery::class => [[DummyQueryHandler::class, []]],
|
||||
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
|
||||
],
|
||||
]);
|
||||
|
||||
|
@ -12,7 +12,6 @@
|
||||
namespace Symfony\Component\Messenger\Tests\DependencyInjection;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
|
||||
use Symfony\Component\DependencyInjection\Compiler\ResolveChildDefinitionsPass;
|
||||
use Symfony\Component\DependencyInjection\Compiler\ResolveClassPass;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
@ -66,13 +65,12 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
$handlersLocatorDefinition = $container->getDefinition($busId.'.messenger.handlers_locator');
|
||||
$this->assertSame(HandlersLocator::class, $handlersLocatorDefinition->getClass());
|
||||
$this->assertEquals(
|
||||
[
|
||||
DummyMessage::class => new IteratorArgument([new Reference(DummyHandler::class)]),
|
||||
SecondMessage::class => new IteratorArgument([new Reference(MissingArgumentTypeHandler::class)]),
|
||||
],
|
||||
$handlersLocatorDefinition->getArgument(0)
|
||||
);
|
||||
|
||||
$handlerDescriptionMapping = $handlersLocatorDefinition->getArgument(0);
|
||||
$this->assertCount(2, $handlerDescriptionMapping);
|
||||
|
||||
$this->assertHandlerDescriptor($container, $handlerDescriptionMapping, DummyMessage::class, [DummyHandler::class]);
|
||||
$this->assertHandlerDescriptor($container, $handlerDescriptionMapping, SecondMessage::class, [MissingArgumentTypeHandler::class]);
|
||||
|
||||
$this->assertEquals(
|
||||
[DummyReceiver::class => new Reference(DummyReceiver::class)],
|
||||
@ -106,22 +104,33 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
$commandBusHandlersLocatorDefinition = $container->getDefinition($commandBusId.'.messenger.handlers_locator');
|
||||
$this->assertSame(HandlersLocator::class, $commandBusHandlersLocatorDefinition->getClass());
|
||||
$this->assertEquals(
|
||||
[
|
||||
MultipleBusesMessage::class => new IteratorArgument([new Reference(MultipleBusesMessageHandler::class)]),
|
||||
DummyCommand::class => new IteratorArgument([new Reference(DummyCommandHandler::class)]),
|
||||
],
|
||||
$commandBusHandlersLocatorDefinition->getArgument(0)
|
||||
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$commandBusHandlersLocatorDefinition->getArgument(0),
|
||||
MultipleBusesMessage::class,
|
||||
[MultipleBusesMessageHandler::class]
|
||||
);
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$commandBusHandlersLocatorDefinition->getArgument(0),
|
||||
DummyCommand::class,
|
||||
[DummyCommandHandler::class]
|
||||
);
|
||||
|
||||
$queryBusHandlersLocatorDefinition = $container->getDefinition($queryBusId.'.messenger.handlers_locator');
|
||||
$this->assertSame(HandlersLocator::class, $queryBusHandlersLocatorDefinition->getClass());
|
||||
$this->assertEquals(
|
||||
[
|
||||
DummyQuery::class => new IteratorArgument([new Reference(DummyQueryHandler::class)]),
|
||||
MultipleBusesMessage::class => new IteratorArgument([new Reference(MultipleBusesMessageHandler::class)]),
|
||||
],
|
||||
$queryBusHandlersLocatorDefinition->getArgument(0)
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$queryBusHandlersLocatorDefinition->getArgument(0),
|
||||
DummyQuery::class,
|
||||
[DummyQueryHandler::class]
|
||||
);
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$queryBusHandlersLocatorDefinition->getArgument(0),
|
||||
MultipleBusesMessage::class,
|
||||
[MultipleBusesMessageHandler::class]
|
||||
);
|
||||
}
|
||||
|
||||
@ -156,10 +165,10 @@ class MessengerPassTest extends TestCase
|
||||
$handlersMapping = $container->getDefinition($busId.'.messenger.handlers_locator')->getArgument(0);
|
||||
|
||||
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
|
||||
$this->assertEquals(new IteratorArgument([new Reference(HandlerWithMultipleMessages::class)]), $handlersMapping[DummyMessage::class]);
|
||||
$this->assertHandlerDescriptor($container, $handlersMapping, DummyMessage::class, [HandlerWithMultipleMessages::class]);
|
||||
|
||||
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
|
||||
$this->assertEquals(new IteratorArgument([new Reference(PrioritizedHandler::class), new Reference(HandlerWithMultipleMessages::class)]), $handlersMapping[SecondMessage::class]);
|
||||
$this->assertHandlerDescriptor($container, $handlersMapping, SecondMessage::class, [PrioritizedHandler::class, HandlerWithMultipleMessages::class], [['priority' => 10]]);
|
||||
}
|
||||
|
||||
public function testGetClassesAndMethodsAndPrioritiesFromTheSubscriber()
|
||||
@ -181,13 +190,20 @@ class MessengerPassTest extends TestCase
|
||||
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
|
||||
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
|
||||
|
||||
$dummyHandlerReference = $handlersMapping[DummyMessage::class]->getValues()[0];
|
||||
$dummyHandlerDescriptorReference = $handlersMapping[DummyMessage::class]->getValues()[0];
|
||||
$dummyHandlerDescriptorDefinition = $container->getDefinition($dummyHandlerDescriptorReference);
|
||||
|
||||
$dummyHandlerReference = $dummyHandlerDescriptorDefinition->getArgument(0);
|
||||
$dummyHandlerDefinition = $container->getDefinition($dummyHandlerReference);
|
||||
|
||||
$this->assertSame('callable', $dummyHandlerDefinition->getClass());
|
||||
$this->assertEquals([new Reference(HandlerMappingMethods::class), 'dummyMethod'], $dummyHandlerDefinition->getArgument(0));
|
||||
$this->assertSame(['Closure', 'fromCallable'], $dummyHandlerDefinition->getFactory());
|
||||
|
||||
$secondHandlerReference = $handlersMapping[SecondMessage::class]->getValues()[1];
|
||||
$secondHandlerDescriptorReference = $handlersMapping[SecondMessage::class]->getValues()[1];
|
||||
$secondHandlerDescriptorDefinition = $container->getDefinition($secondHandlerDescriptorReference);
|
||||
|
||||
$secondHandlerReference = $secondHandlerDescriptorDefinition->getArgument(0);
|
||||
$secondHandlerDefinition = $container->getDefinition($secondHandlerReference);
|
||||
$this->assertSame(PrioritizedHandler::class, $secondHandlerDefinition->getClass());
|
||||
}
|
||||
@ -294,13 +310,19 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
$handlersMapping = $container->getDefinition($busId.'.messenger.handlers_locator')->getArgument(0);
|
||||
|
||||
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
|
||||
$firstReference = $handlersMapping[DummyMessage::class]->getValues()[0];
|
||||
$this->assertEquals([new Reference(HandlerWithGenerators::class), 'dummyMethod'], $container->getDefinition($firstReference)->getArgument(0));
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$handlersMapping,
|
||||
DummyMessage::class,
|
||||
[[HandlerWithGenerators::class, 'dummyMethod']]
|
||||
);
|
||||
|
||||
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
|
||||
$secondReference = $handlersMapping[SecondMessage::class]->getValues()[0];
|
||||
$this->assertEquals([new Reference(HandlerWithGenerators::class), 'secondMessage'], $container->getDefinition($secondReference)->getArgument(0));
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$handlersMapping,
|
||||
SecondMessage::class,
|
||||
[[HandlerWithGenerators::class, 'secondMessage']]
|
||||
);
|
||||
}
|
||||
|
||||
public function testItRegistersHandlersOnDifferentBuses()
|
||||
@ -310,22 +332,29 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
$container
|
||||
->register(HandlerOnSpecificBuses::class, HandlerOnSpecificBuses::class)
|
||||
->addTag('messenger.message_handler')
|
||||
;
|
||||
->addTag('messenger.message_handler');
|
||||
|
||||
(new MessengerPass())->process($container);
|
||||
|
||||
$eventsHandlerMapping = $container->getDefinition($eventsBusId.'.messenger.handlers_locator')->getArgument(0);
|
||||
|
||||
$this->assertEquals([DummyMessage::class], array_keys($eventsHandlerMapping));
|
||||
$firstReference = $eventsHandlerMapping[DummyMessage::class]->getValues()[0];
|
||||
$this->assertEquals([new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForEvents'], $container->getDefinition($firstReference)->getArgument(0));
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$eventsHandlerMapping,
|
||||
DummyMessage::class,
|
||||
[[HandlerOnSpecificBuses::class, 'dummyMethodForEvents']],
|
||||
[['bus' => 'event_bus']]
|
||||
);
|
||||
|
||||
$commandsHandlerMapping = $container->getDefinition($commandsBusId.'.messenger.handlers_locator')->getArgument(0);
|
||||
|
||||
$this->assertEquals([DummyMessage::class], array_keys($commandsHandlerMapping));
|
||||
$firstReference = $commandsHandlerMapping[DummyMessage::class]->getValues()[0];
|
||||
$this->assertEquals([new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForCommands'], $container->getDefinition($firstReference)->getArgument(0));
|
||||
$this->assertHandlerDescriptor(
|
||||
$container,
|
||||
$commandsHandlerMapping,
|
||||
DummyMessage::class,
|
||||
[[HandlerOnSpecificBuses::class, 'dummyMethodForCommands']],
|
||||
[['bus' => 'command_bus']]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -574,12 +603,12 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
$this->assertEquals([
|
||||
$commandBusId => [
|
||||
DummyCommand::class => [DummyCommandHandler::class],
|
||||
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
|
||||
DummyCommand::class => [[DummyCommandHandler::class, []]],
|
||||
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
|
||||
],
|
||||
$queryBusId => [
|
||||
DummyQuery::class => [DummyQueryHandler::class],
|
||||
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
|
||||
DummyQuery::class => [[DummyQueryHandler::class, []]],
|
||||
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
|
||||
],
|
||||
$emptyBus => [],
|
||||
], $container->getDefinition('console.command.messenger_debug')->getArgument(0));
|
||||
@ -601,6 +630,36 @@ class MessengerPassTest extends TestCase
|
||||
|
||||
return $container;
|
||||
}
|
||||
|
||||
private function assertHandlerDescriptor(ContainerBuilder $container, array $mapping, string $message, array $handlerClasses, array $options = [])
|
||||
{
|
||||
$this->assertArrayHasKey($message, $mapping);
|
||||
$this->assertCount(\count($handlerClasses), $mapping[$message]->getValues());
|
||||
|
||||
foreach ($handlerClasses as $index => $class) {
|
||||
$handlerReference = $mapping[$message]->getValues()[$index];
|
||||
|
||||
if (\is_array($class)) {
|
||||
$reference = [new Reference($class[0]), $class[1]];
|
||||
$options[$index] = array_merge(['method' => $class[1]], $options[$index] ?? []);
|
||||
} else {
|
||||
$reference = new Reference($class);
|
||||
}
|
||||
|
||||
$definitionArguments = $container->getDefinition($handlerReference)->getArguments();
|
||||
|
||||
if (\is_array($class)) {
|
||||
$methodDefinition = $container->getDefinition($definitionArguments[0]);
|
||||
|
||||
$this->assertEquals(['Closure', 'fromCallable'], $methodDefinition->getFactory());
|
||||
$this->assertEquals([$reference], $methodDefinition->getArguments());
|
||||
} else {
|
||||
$this->assertEquals($reference, $definitionArguments[0]);
|
||||
}
|
||||
|
||||
$this->assertEquals($options[$index] ?? [], $definitionArguments[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DummyHandler
|
||||
|
@ -25,7 +25,7 @@ class EnvelopeTest extends TestCase
|
||||
{
|
||||
public function testConstruct()
|
||||
{
|
||||
$receivedStamp = new ReceivedStamp();
|
||||
$receivedStamp = new ReceivedStamp('transport');
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
|
||||
|
||||
$this->assertSame($dummy, $envelope->getMessage());
|
||||
@ -37,12 +37,12 @@ class EnvelopeTest extends TestCase
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('dummy'));
|
||||
|
||||
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp()));
|
||||
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp('transport')));
|
||||
}
|
||||
|
||||
public function testWithoutAll()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000)]);
|
||||
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp('transport1'), new ReceivedStamp('transport2'), new DelayStamp(5000)]);
|
||||
|
||||
$envelope = $envelope->withoutAll(ReceivedStamp::class);
|
||||
|
||||
@ -52,7 +52,7 @@ class EnvelopeTest extends TestCase
|
||||
|
||||
public function testLast()
|
||||
{
|
||||
$receivedStamp = new ReceivedStamp();
|
||||
$receivedStamp = new ReceivedStamp('transport');
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
|
||||
|
||||
$this->assertSame($receivedStamp, $envelope->last(ReceivedStamp::class));
|
||||
@ -62,7 +62,7 @@ class EnvelopeTest extends TestCase
|
||||
public function testAll()
|
||||
{
|
||||
$envelope = (new Envelope($dummy = new DummyMessage('dummy')))
|
||||
->with($receivedStamp = new ReceivedStamp())
|
||||
->with($receivedStamp = new ReceivedStamp('transport'))
|
||||
->with($validationStamp = new ValidationStamp(['foo']))
|
||||
;
|
||||
|
||||
@ -76,7 +76,7 @@ class EnvelopeTest extends TestCase
|
||||
public function testWrapWithMessage()
|
||||
{
|
||||
$message = new \stdClass();
|
||||
$stamp = new ReceivedStamp();
|
||||
$stamp = new ReceivedStamp('transport');
|
||||
$envelope = Envelope::wrap($message, [$stamp]);
|
||||
|
||||
$this->assertSame($message, $envelope->getMessage());
|
||||
@ -86,7 +86,7 @@ class EnvelopeTest extends TestCase
|
||||
public function testWrapWithEnvelope()
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass(), [new DelayStamp(5)]);
|
||||
$envelope = Envelope::wrap($envelope, [new ReceivedStamp()]);
|
||||
$envelope = Envelope::wrap($envelope, [new ReceivedStamp('transport')]);
|
||||
|
||||
$this->assertCount(1, $envelope->all(DelayStamp::class));
|
||||
$this->assertCount(1, $envelope->all(ReceivedStamp::class));
|
||||
|
@ -65,7 +65,7 @@ class HandleTraitTest extends TestCase
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Exception\LogicException
|
||||
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" was handled multiple times. Only one handler is expected when using "Symfony\Component\Messenger\Tests\TestQueryBus::handle()", got 2: "FirstDummyHandler::__invoke", "dummy_2".
|
||||
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" was handled multiple times. Only one handler is expected when using "Symfony\Component\Messenger\Tests\TestQueryBus::handle()", got 2: "FirstDummyHandler::__invoke", "SecondDummyHandler::__invoke".
|
||||
*/
|
||||
public function testHandleThrowsOnMultipleHandledStamps()
|
||||
{
|
||||
@ -74,7 +74,7 @@ class HandleTraitTest extends TestCase
|
||||
|
||||
$query = new DummyMessage('Hello');
|
||||
$bus->expects($this->once())->method('dispatch')->willReturn(
|
||||
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2')])
|
||||
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke')])
|
||||
);
|
||||
|
||||
$queryBus->query($query);
|
||||
|
@ -0,0 +1,46 @@
|
||||
<?php
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Handler;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyCommandHandler;
|
||||
|
||||
class HandleDescriptorTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider provideHandlers
|
||||
*/
|
||||
public function testDescriptorNames(callable $handler, ?string $expectedHandlerString)
|
||||
{
|
||||
$descriptor = new HandlerDescriptor($handler);
|
||||
|
||||
$this->assertStringMatchesFormat($expectedHandlerString, $descriptor->getName());
|
||||
}
|
||||
|
||||
public function provideHandlers()
|
||||
{
|
||||
yield [function () {}, 'Closure'];
|
||||
yield ['var_dump', 'var_dump'];
|
||||
yield [new DummyCommandHandler(), DummyCommandHandler::class.'::__invoke'];
|
||||
yield [
|
||||
[new DummyCommandHandlerWithSpecificMethod(), 'handle'],
|
||||
DummyCommandHandlerWithSpecificMethod::class.'::handle',
|
||||
];
|
||||
yield [\Closure::fromCallable(function () {}), 'Closure'];
|
||||
yield [\Closure::fromCallable(new DummyCommandHandler()), DummyCommandHandler::class.'::__invoke'];
|
||||
yield [\Closure::bind(\Closure::fromCallable(function () {}), new \stdClass()), 'Closure'];
|
||||
yield [new class() {
|
||||
public function __invoke()
|
||||
{
|
||||
}
|
||||
}, 'class@anonymous%sHandleDescriptorTest.php%s::__invoke'];
|
||||
}
|
||||
}
|
||||
|
||||
class DummyCommandHandlerWithSpecificMethod
|
||||
{
|
||||
public function handle(): void
|
||||
{
|
||||
}
|
||||
}
|
@ -13,18 +13,41 @@ namespace Symfony\Component\Messenger\Tests\Handler;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
|
||||
class HandlersLocatorTest extends TestCase
|
||||
{
|
||||
public function testItYieldsProvidedAliasAsKey()
|
||||
public function testItYieldsHandlerDescriptors()
|
||||
{
|
||||
$handler = $this->createPartialMock(\stdClass::class, ['__invoke']);
|
||||
$locator = new HandlersLocator([
|
||||
DummyMessage::class => ['dummy' => $handler],
|
||||
DummyMessage::class => [$handler],
|
||||
]);
|
||||
|
||||
$this->assertSame(['dummy' => $handler], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
|
||||
$this->assertEquals([new HandlerDescriptor($handler)], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
|
||||
}
|
||||
|
||||
public function testItReturnsOnlyHandlersMatchingTransport()
|
||||
{
|
||||
$firstHandler = $this->createPartialMock(\stdClass::class, ['__invoke']);
|
||||
$secondHandler = $this->createPartialMock(\stdClass::class, ['__invoke']);
|
||||
|
||||
$locator = new HandlersLocator([
|
||||
DummyMessage::class => [
|
||||
$first = new HandlerDescriptor($firstHandler, ['alias' => 'one']),
|
||||
new HandlerDescriptor($this->createPartialMock(\stdClass::class, ['__invoke']), ['from_transport' => 'ignored', 'alias' => 'two']),
|
||||
$second = new HandlerDescriptor($secondHandler, ['from_transport' => 'transportName', 'alias' => 'three']),
|
||||
],
|
||||
]);
|
||||
|
||||
$this->assertEquals([
|
||||
$first,
|
||||
$second,
|
||||
], iterator_to_array($locator->getHandlers(
|
||||
new Envelope(new DummyMessage('Body'), [new ReceivedStamp('transportName')])
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ class MessageBusTest extends TestCase
|
||||
public function testThatAMiddlewareCanAddSomeStampsToTheEnvelope()
|
||||
{
|
||||
$message = new DummyMessage('Hello');
|
||||
$envelope = new Envelope($message, [new ReceivedStamp()]);
|
||||
$envelope = new Envelope($message, [new ReceivedStamp('transport')]);
|
||||
$envelopeWithAnotherStamp = $envelope->with(new AnEnvelopeStamp());
|
||||
|
||||
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
@ -109,7 +109,7 @@ class MessageBusTest extends TestCase
|
||||
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeStamps()
|
||||
{
|
||||
$message = new DummyMessage('Hello');
|
||||
$envelope = new Envelope($message, $stamps = [new ReceivedStamp()]);
|
||||
$envelope = new Envelope($message, $stamps = [new ReceivedStamp('transport')]);
|
||||
|
||||
$changedMessage = new DummyMessage('Changed');
|
||||
$expectedEnvelope = new Envelope($changedMessage, $stamps);
|
||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Middleware\StackMiddleware;
|
||||
@ -79,28 +80,26 @@ class HandleMessageMiddlewareTest extends MiddlewareTestCase
|
||||
];
|
||||
|
||||
yield 'A stamp is added per handler' => [
|
||||
['first' => $first, 'second' => $second],
|
||||
[
|
||||
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
|
||||
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
|
||||
new HandlerDescriptor($first, ['alias' => 'first']),
|
||||
new HandlerDescriptor($second, ['alias' => 'second']),
|
||||
],
|
||||
true,
|
||||
];
|
||||
|
||||
yield 'Yielded locator alias is used' => [
|
||||
['first_alias' => $first, $second],
|
||||
[
|
||||
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
|
||||
new HandledStamp(null, $secondClass.'::__invoke'),
|
||||
new HandledStamp('first result', $firstClass.'::__invoke@first'),
|
||||
new HandledStamp(null, $secondClass.'::__invoke@second'),
|
||||
],
|
||||
true,
|
||||
];
|
||||
|
||||
yield 'It tries all handlers' => [
|
||||
['first' => $first, 'failing' => $failing, 'second' => $second],
|
||||
[
|
||||
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
|
||||
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
|
||||
new HandlerDescriptor($first, ['alias' => 'first']),
|
||||
new HandlerDescriptor($failing, ['alias' => 'failing']),
|
||||
new HandlerDescriptor($second, ['alias' => 'second']),
|
||||
],
|
||||
[
|
||||
new HandledStamp('first result', $firstClass.'::__invoke@first'),
|
||||
new HandledStamp(null, $secondClass.'::__invoke@second'),
|
||||
],
|
||||
false,
|
||||
];
|
||||
|
@ -195,7 +195,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
public function testItSkipsReceivedMessages()
|
||||
{
|
||||
$envelope = (new Envelope(new DummyMessage('Hey')))->with(new ReceivedStamp());
|
||||
$envelope = (new Envelope(new DummyMessage('Hey')))->with(new ReceivedStamp('transport'));
|
||||
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
||||
@ -45,8 +46,8 @@ class RetryIntegrationTest extends TestCase
|
||||
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
|
||||
$handlerLocator = new HandlersLocator([
|
||||
DummyMessage::class => [
|
||||
'handler' => $handler,
|
||||
'throwing' => $throwingHandler,
|
||||
new HandlerDescriptor($handler, ['alias' => 'first']),
|
||||
new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']),
|
||||
],
|
||||
]);
|
||||
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Tests\Stamp;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Stamp\HandledStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyCommandHandler;
|
||||
|
||||
@ -19,54 +20,22 @@ class HandledStampTest extends TestCase
|
||||
{
|
||||
public function testConstruct()
|
||||
{
|
||||
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()', 'foo');
|
||||
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()');
|
||||
|
||||
$this->assertSame('some result', $stamp->getResult());
|
||||
$this->assertSame('FooHandler::__invoke()', $stamp->getCallableName());
|
||||
$this->assertSame('foo', $stamp->getHandlerAlias());
|
||||
$this->assertSame('FooHandler::__invoke()', $stamp->getHandlerName());
|
||||
|
||||
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()');
|
||||
|
||||
$this->assertSame('some result', $stamp->getResult());
|
||||
$this->assertSame('FooHandler::__invoke()', $stamp->getCallableName());
|
||||
$this->assertNull($stamp->getHandlerAlias());
|
||||
$this->assertSame('FooHandler::__invoke()', $stamp->getHandlerName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider provideCallables
|
||||
*/
|
||||
public function testFromCallable(callable $handler, ?string $expectedHandlerString)
|
||||
public function testFromDescriptor()
|
||||
{
|
||||
/** @var HandledStamp $stamp */
|
||||
$stamp = HandledStamp::fromCallable($handler, 'some_result', 'alias');
|
||||
$this->assertStringMatchesFormat($expectedHandlerString, $stamp->getCallableName());
|
||||
$this->assertSame('alias', $stamp->getHandlerAlias(), 'alias is forwarded to construct');
|
||||
$stamp = HandledStamp::fromDescriptor(new HandlerDescriptor(new DummyCommandHandler()), 'some_result');
|
||||
|
||||
$this->assertEquals(DummyCommandHandler::class.'::__invoke', $stamp->getHandlerName());
|
||||
$this->assertSame('some_result', $stamp->getResult(), 'result is forwarded to construct');
|
||||
}
|
||||
|
||||
public function provideCallables()
|
||||
{
|
||||
yield [function () {}, 'Closure'];
|
||||
yield ['var_dump', 'var_dump'];
|
||||
yield [new DummyCommandHandler(), DummyCommandHandler::class.'::__invoke'];
|
||||
yield [
|
||||
[new DummyCommandHandlerWithSpecificMethod(), 'handle'],
|
||||
DummyCommandHandlerWithSpecificMethod::class.'::handle',
|
||||
];
|
||||
yield [\Closure::fromCallable(function () {}), 'Closure'];
|
||||
yield [\Closure::fromCallable(new DummyCommandHandler()), DummyCommandHandler::class.'::__invoke'];
|
||||
yield [\Closure::bind(\Closure::fromCallable(function () {}), new \stdClass()), 'Closure'];
|
||||
yield [new class() {
|
||||
public function __invoke()
|
||||
{
|
||||
}
|
||||
}, 'class@anonymous%sHandledStampTest.php%s::__invoke'];
|
||||
}
|
||||
}
|
||||
|
||||
class DummyCommandHandlerWithSpecificMethod
|
||||
{
|
||||
public function handle(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
|
@ -44,10 +44,15 @@ class WorkerTest extends TestCase
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, [new ReceivedStamp()]))->willReturn($envelope);
|
||||
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, [new ReceivedStamp()]))->willReturn($envelope);
|
||||
$bus->expects($this->at(0))->method('dispatch')->with(
|
||||
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')])
|
||||
)->willReturn($envelope);
|
||||
|
||||
$worker = new Worker([$receiver], $bus);
|
||||
$bus->expects($this->at(1))->method('dispatch')->with(
|
||||
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')])
|
||||
)->willReturn($envelope);
|
||||
|
||||
$worker = new Worker(['transport' => $receiver], $bus);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
@ -62,12 +67,12 @@ class WorkerTest extends TestCase
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('API'));
|
||||
$receiver = new DummyReceiver([[$envelope]]);
|
||||
$envelope = $envelope->with(new ReceivedStamp());
|
||||
$envelope = $envelope->with(new ReceivedStamp('transport'));
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$worker = new Worker([$receiver], $bus, []);
|
||||
$worker = new Worker(['transport' => $receiver], $bus, []);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
// stop after the messages finish
|
||||
if (null === $envelope) {
|
||||
|
@ -44,7 +44,7 @@ class Worker implements WorkerInterface
|
||||
private $shouldStop = false;
|
||||
|
||||
/**
|
||||
* @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
|
||||
* @param ReceiverInterface[] $receivers Where the key is the transport name
|
||||
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
|
||||
*/
|
||||
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||
@ -86,13 +86,13 @@ class Worker implements WorkerInterface
|
||||
|
||||
while (false === $this->shouldStop) {
|
||||
$envelopeHandled = false;
|
||||
foreach ($this->receivers as $receiverName => $receiver) {
|
||||
foreach ($this->receivers as $transportName => $receiver) {
|
||||
$envelopes = $receiver->get();
|
||||
|
||||
foreach ($envelopes as $envelope) {
|
||||
$envelopeHandled = true;
|
||||
|
||||
$this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
|
||||
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
|
||||
$onHandled($envelope);
|
||||
}
|
||||
|
||||
@ -114,9 +114,9 @@ class Worker implements WorkerInterface
|
||||
$this->dispatchEvent(new WorkerStoppedEvent());
|
||||
}
|
||||
|
||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
|
||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy)
|
||||
{
|
||||
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
|
||||
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName));
|
||||
|
||||
$message = $envelope->getMessage();
|
||||
$context = [
|
||||
@ -125,7 +125,7 @@ class Worker implements WorkerInterface
|
||||
];
|
||||
|
||||
try {
|
||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
|
||||
} catch (\Throwable $throwable) {
|
||||
if ($throwable instanceof HandlerFailedException) {
|
||||
$envelope = $throwable->getEnvelope();
|
||||
@ -133,7 +133,7 @@ class Worker implements WorkerInterface
|
||||
|
||||
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
||||
|
||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
|
||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
||||
|
||||
if ($shouldRetry) {
|
||||
if (null === $retryStrategy) {
|
||||
@ -166,7 +166,7 @@ class Worker implements WorkerInterface
|
||||
return;
|
||||
}
|
||||
|
||||
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
|
||||
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
|
||||
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
||||
|
Reference in New Issue
Block a user