Allows to register handlers on a specific transport (and get rid of this handler alias)

This commit is contained in:
Samuel ROZE 2019-04-07 10:12:18 +02:00
parent 96a7907979
commit f0b2acd67d
23 changed files with 426 additions and 209 deletions

View File

@ -81,6 +81,12 @@ CHANGELOG
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.
* [BC BREAK] The `HandlersLocationInterface::getHandlers` method needs to return `HandlerDescriptor`
instances instead of callables.
* [BC BREAK] The `HandledStamp` stamp has changed: `handlerAlias` has been renamed to `handlerName`,
`getCallableName` has been removed and its constructor only has 2 arguments now.
* [BC BREAK] The `ReceivedStamp` needs to exposes the name of the transport from which the message
has been received.
4.2.0
-----

View File

@ -84,7 +84,9 @@ EOF
foreach ($handlersByMessage as $message => $handlers) {
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
foreach ($handlers as $handler) {
$tableRows[] = [sprintf(' handled by <info>%s</>', $handler)];
$tableRows[] = [
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
];
}
}
@ -97,4 +99,18 @@ EOF
}
}
}
private function formatConditions(array $options): string
{
if (!$options) {
return '';
}
$optionsMapping = [];
foreach ($options as $key => $value) {
$optionsMapping[] = ' '.$key.'='.$value;
}
return ' (when'.implode(', ', $optionsMapping).')';
}
}

View File

@ -19,6 +19,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
@ -94,32 +95,33 @@ class MessengerPass implements CompilerPassInterface
$message = null;
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
foreach ($handles as $message => $method) {
foreach ($handles as $message => $options) {
$buses = $handlerBuses;
if (\is_int($message)) {
$message = $method;
$method = '__invoke';
if (\is_string($options)) {
$message = $options;
$options = [];
} else {
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
}
}
if (\is_array($message)) {
list($message, $priority) = $message;
} else {
$priority = $tag['priority'] ?? 0;
if (\is_string($options)) {
$options = ['method' => $options];
}
if (\is_array($method)) {
if (isset($method['bus'])) {
if (!\in_array($method['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
$priority = $tag['priority'] ?? $options['priority'] ?? 0;
$method = $options['method'] ?? '__invoke';
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
}
if (isset($options['bus'])) {
if (!\in_array($options['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
$buses = [$method['bus']];
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
}
$priority = $method['priority'] ?? $priority;
$method = $method['method'] ?? '__invoke';
$buses = [$options['bus']];
}
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@ -141,7 +143,7 @@ class MessengerPass implements CompilerPassInterface
}
foreach ($buses as $handlerBus) {
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
}
}
@ -154,15 +156,20 @@ class MessengerPass implements CompilerPassInterface
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
foreach ($handlersByMessage as $message => $handlersByPriority) {
krsort($handlersByPriority);
$handlersByBusAndMessage[$bus][$message] = array_unique(array_merge(...$handlersByPriority));
$handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
}
}
$handlersLocatorMappingByBus = [];
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
foreach ($handlersByMessage as $message => $handlerIds) {
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlers);
foreach ($handlersByMessage as $message => $handlers) {
$handlerDescriptors = [];
foreach ($handlers as $handler) {
$definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
$handlerDescriptors[] = new Reference($definitionId);
}
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
}
}
$container->addDefinitions($definitions);

View File

@ -52,7 +52,7 @@ trait HandleTrait
if (\count($handledStamps) > 1) {
$handlers = implode(', ', array_map(function (HandledStamp $stamp): string {
return sprintf('"%s"', $stamp->getHandlerAlias() ?? $stamp->getCallableName());
return sprintf('"%s"', $stamp->getHandlerName());
}, $handledStamps));
throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));

View File

@ -0,0 +1,82 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Handler;
/**
* Describes a handler and the possible associated options, such as `from_transport`, `bus`, etc.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.3
*/
final class HandlerDescriptor
{
private $handler;
private $options;
public function __construct(callable $handler, array $options = [])
{
$this->handler = $handler;
$this->options = $options;
}
public function getHandler(): callable
{
return $this->handler;
}
public function getName(): string
{
$name = $this->callableName($this->handler);
$alias = $this->options['alias'] ?? null;
if (null !== $alias) {
$name .= '@'.$alias;
}
return $name;
}
public function getOption(string $option)
{
return $this->options[$option] ?? null;
}
private function callableName(callable $handler)
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return \get_class($handler[0]).'::'.$handler[1];
}
return $handler[0].'::'.$handler[1];
}
if (\is_string($handler)) {
return $handler;
}
if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return $class->name.'::'.$r->name;
}
return $r->name;
}
return \get_class($handler).'::__invoke';
}
}

View File

@ -12,11 +12,13 @@
namespace Symfony\Component\Messenger\Handler;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
/**
* Maps a message to a list of handlers.
*
* @author Nicolas Grekas <p@tchwork.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*/
@ -25,7 +27,7 @@ class HandlersLocator implements HandlersLocatorInterface
private $handlers;
/**
* @param callable[][] $handlers
* @param HandlerDescriptor[][]|callable[][] $handlers
*/
public function __construct(array $handlers)
{
@ -40,10 +42,23 @@ class HandlersLocator implements HandlersLocatorInterface
$seen = [];
foreach (self::listTypes($envelope) as $type) {
foreach ($this->handlers[$type] ?? [] as $alias => $handler) {
if (!\in_array($handler, $seen, true)) {
yield $alias => $seen[] = $handler;
foreach ($this->handlers[$type] ?? [] as $handlerDescriptor) {
if (\is_callable($handlerDescriptor)) {
$handlerDescriptor = new HandlerDescriptor($handlerDescriptor);
}
if (!$this->shouldHandle($envelope, $handlerDescriptor)) {
continue;
}
$name = $handlerDescriptor->getName();
if (\in_array($name, $seen)) {
continue;
}
$seen[] = $name;
yield $handlerDescriptor;
}
}
}
@ -60,4 +75,17 @@ class HandlersLocator implements HandlersLocatorInterface
+ class_implements($class)
+ ['*' => '*'];
}
private function shouldHandle(Envelope $envelope, HandlerDescriptor $handlerDescriptor)
{
if (null === $received = $envelope->last(ReceivedStamp::class)) {
return true;
}
if (null === $expectedTransport = $handlerDescriptor->getOption('from_transport')) {
return true;
}
return $received->getTransportName() === $expectedTransport;
}
}

View File

@ -25,7 +25,7 @@ interface HandlersLocatorInterface
/**
* Returns the handlers for the given message name.
*
* @return iterable|callable[] Indexed by handler alias if available
* @return iterable|HandlerDescriptor[] Indexed by handler alias if available
*/
public function getHandlers(Envelope $envelope): iterable;
}

View File

@ -16,6 +16,7 @@ use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
@ -54,17 +55,16 @@ class HandleMessageMiddleware implements MiddlewareInterface
];
$exceptions = [];
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
$alias = \is_string($alias) ? $alias : null;
if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
continue;
}
try {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
$handler = $handlerDescriptor->getHandler();
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
@ -85,12 +85,11 @@ class HandleMessageMiddleware implements MiddlewareInterface
return $stack->next()->handle($envelope, $stack);
}
private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
{
$some = array_filter($envelope
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
$stamp->getHandlerAlias() === $alias;
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
return $stamp->getHandlerName() === $handlerDescriptor->getName();
});
return \count($some) > 0;

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Stamp;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
/**
* Stamp identifying a message handled by the `HandleMessageMiddleware` middleware
* and storing the handler returned value.
@ -24,54 +26,23 @@ namespace Symfony\Component\Messenger\Stamp;
final class HandledStamp implements StampInterface
{
private $result;
private $callableName;
private $handlerAlias;
private $handlerName;
/**
* @param mixed $result The returned value of the message handler
*/
public function __construct($result, string $callableName, string $handlerAlias = null)
public function __construct($result, string $handlerName)
{
$this->result = $result;
$this->callableName = $callableName;
$this->handlerAlias = $handlerAlias;
$this->handlerName = $handlerName;
}
/**
* @param mixed $result The returned value of the message handler
*/
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
public static function fromDescriptor(HandlerDescriptor $handler, $result): self
{
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
}
public static function getNameFromCallable(callable $handler): string
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return \get_class($handler[0]).'::'.$handler[1];
}
return $handler[0].'::'.$handler[1];
}
if (\is_string($handler)) {
return $handler;
}
if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return $class->name.'::'.$r->name;
}
return $r->name;
}
return \get_class($handler).'::__invoke';
return new self($result, $handler->getName());
}
/**
@ -82,13 +53,8 @@ final class HandledStamp implements StampInterface
return $this->result;
}
public function getCallableName(): string
public function getHandlerName(): string
{
return $this->callableName;
}
public function getHandlerAlias(): ?string
{
return $this->handlerAlias;
return $this->handlerName;
}
}

View File

@ -27,4 +27,15 @@ use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
*/
final class ReceivedStamp implements StampInterface
{
private $transportName;
public function __construct(string $transportName)
{
$this->transportName = $transportName;
}
public function getTransportName(): string
{
return $this->transportName;
}
}

View File

@ -40,12 +40,12 @@ class DebugCommandTest extends TestCase
{
$command = new DebugCommand([
'command_bus' => [
DummyCommand::class => [DummyCommandHandler::class],
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
DummyCommand::class => [[DummyCommandHandler::class, []]],
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
],
'query_bus' => [
DummyQuery::class => [DummyQueryHandler::class],
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
DummyQuery::class => [[DummyQueryHandler::class, []]],
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
],
]);

View File

@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger\Tests\DependencyInjection;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Compiler\ResolveChildDefinitionsPass;
use Symfony\Component\DependencyInjection\Compiler\ResolveClassPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
@ -66,13 +65,12 @@ class MessengerPassTest extends TestCase
$handlersLocatorDefinition = $container->getDefinition($busId.'.messenger.handlers_locator');
$this->assertSame(HandlersLocator::class, $handlersLocatorDefinition->getClass());
$this->assertEquals(
[
DummyMessage::class => new IteratorArgument([new Reference(DummyHandler::class)]),
SecondMessage::class => new IteratorArgument([new Reference(MissingArgumentTypeHandler::class)]),
],
$handlersLocatorDefinition->getArgument(0)
);
$handlerDescriptionMapping = $handlersLocatorDefinition->getArgument(0);
$this->assertCount(2, $handlerDescriptionMapping);
$this->assertHandlerDescriptor($container, $handlerDescriptionMapping, DummyMessage::class, [DummyHandler::class]);
$this->assertHandlerDescriptor($container, $handlerDescriptionMapping, SecondMessage::class, [MissingArgumentTypeHandler::class]);
$this->assertEquals(
[DummyReceiver::class => new Reference(DummyReceiver::class)],
@ -106,22 +104,33 @@ class MessengerPassTest extends TestCase
$commandBusHandlersLocatorDefinition = $container->getDefinition($commandBusId.'.messenger.handlers_locator');
$this->assertSame(HandlersLocator::class, $commandBusHandlersLocatorDefinition->getClass());
$this->assertEquals(
[
MultipleBusesMessage::class => new IteratorArgument([new Reference(MultipleBusesMessageHandler::class)]),
DummyCommand::class => new IteratorArgument([new Reference(DummyCommandHandler::class)]),
],
$commandBusHandlersLocatorDefinition->getArgument(0)
$this->assertHandlerDescriptor(
$container,
$commandBusHandlersLocatorDefinition->getArgument(0),
MultipleBusesMessage::class,
[MultipleBusesMessageHandler::class]
);
$this->assertHandlerDescriptor(
$container,
$commandBusHandlersLocatorDefinition->getArgument(0),
DummyCommand::class,
[DummyCommandHandler::class]
);
$queryBusHandlersLocatorDefinition = $container->getDefinition($queryBusId.'.messenger.handlers_locator');
$this->assertSame(HandlersLocator::class, $queryBusHandlersLocatorDefinition->getClass());
$this->assertEquals(
[
DummyQuery::class => new IteratorArgument([new Reference(DummyQueryHandler::class)]),
MultipleBusesMessage::class => new IteratorArgument([new Reference(MultipleBusesMessageHandler::class)]),
],
$queryBusHandlersLocatorDefinition->getArgument(0)
$this->assertHandlerDescriptor(
$container,
$queryBusHandlersLocatorDefinition->getArgument(0),
DummyQuery::class,
[DummyQueryHandler::class]
);
$this->assertHandlerDescriptor(
$container,
$queryBusHandlersLocatorDefinition->getArgument(0),
MultipleBusesMessage::class,
[MultipleBusesMessageHandler::class]
);
}
@ -156,10 +165,10 @@ class MessengerPassTest extends TestCase
$handlersMapping = $container->getDefinition($busId.'.messenger.handlers_locator')->getArgument(0);
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
$this->assertEquals(new IteratorArgument([new Reference(HandlerWithMultipleMessages::class)]), $handlersMapping[DummyMessage::class]);
$this->assertHandlerDescriptor($container, $handlersMapping, DummyMessage::class, [HandlerWithMultipleMessages::class]);
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
$this->assertEquals(new IteratorArgument([new Reference(PrioritizedHandler::class), new Reference(HandlerWithMultipleMessages::class)]), $handlersMapping[SecondMessage::class]);
$this->assertHandlerDescriptor($container, $handlersMapping, SecondMessage::class, [PrioritizedHandler::class, HandlerWithMultipleMessages::class], [['priority' => 10]]);
}
public function testGetClassesAndMethodsAndPrioritiesFromTheSubscriber()
@ -181,13 +190,20 @@ class MessengerPassTest extends TestCase
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
$dummyHandlerReference = $handlersMapping[DummyMessage::class]->getValues()[0];
$dummyHandlerDescriptorReference = $handlersMapping[DummyMessage::class]->getValues()[0];
$dummyHandlerDescriptorDefinition = $container->getDefinition($dummyHandlerDescriptorReference);
$dummyHandlerReference = $dummyHandlerDescriptorDefinition->getArgument(0);
$dummyHandlerDefinition = $container->getDefinition($dummyHandlerReference);
$this->assertSame('callable', $dummyHandlerDefinition->getClass());
$this->assertEquals([new Reference(HandlerMappingMethods::class), 'dummyMethod'], $dummyHandlerDefinition->getArgument(0));
$this->assertSame(['Closure', 'fromCallable'], $dummyHandlerDefinition->getFactory());
$secondHandlerReference = $handlersMapping[SecondMessage::class]->getValues()[1];
$secondHandlerDescriptorReference = $handlersMapping[SecondMessage::class]->getValues()[1];
$secondHandlerDescriptorDefinition = $container->getDefinition($secondHandlerDescriptorReference);
$secondHandlerReference = $secondHandlerDescriptorDefinition->getArgument(0);
$secondHandlerDefinition = $container->getDefinition($secondHandlerReference);
$this->assertSame(PrioritizedHandler::class, $secondHandlerDefinition->getClass());
}
@ -294,13 +310,19 @@ class MessengerPassTest extends TestCase
$handlersMapping = $container->getDefinition($busId.'.messenger.handlers_locator')->getArgument(0);
$this->assertArrayHasKey(DummyMessage::class, $handlersMapping);
$firstReference = $handlersMapping[DummyMessage::class]->getValues()[0];
$this->assertEquals([new Reference(HandlerWithGenerators::class), 'dummyMethod'], $container->getDefinition($firstReference)->getArgument(0));
$this->assertHandlerDescriptor(
$container,
$handlersMapping,
DummyMessage::class,
[[HandlerWithGenerators::class, 'dummyMethod']]
);
$this->assertArrayHasKey(SecondMessage::class, $handlersMapping);
$secondReference = $handlersMapping[SecondMessage::class]->getValues()[0];
$this->assertEquals([new Reference(HandlerWithGenerators::class), 'secondMessage'], $container->getDefinition($secondReference)->getArgument(0));
$this->assertHandlerDescriptor(
$container,
$handlersMapping,
SecondMessage::class,
[[HandlerWithGenerators::class, 'secondMessage']]
);
}
public function testItRegistersHandlersOnDifferentBuses()
@ -310,22 +332,29 @@ class MessengerPassTest extends TestCase
$container
->register(HandlerOnSpecificBuses::class, HandlerOnSpecificBuses::class)
->addTag('messenger.message_handler')
;
->addTag('messenger.message_handler');
(new MessengerPass())->process($container);
$eventsHandlerMapping = $container->getDefinition($eventsBusId.'.messenger.handlers_locator')->getArgument(0);
$this->assertEquals([DummyMessage::class], array_keys($eventsHandlerMapping));
$firstReference = $eventsHandlerMapping[DummyMessage::class]->getValues()[0];
$this->assertEquals([new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForEvents'], $container->getDefinition($firstReference)->getArgument(0));
$this->assertHandlerDescriptor(
$container,
$eventsHandlerMapping,
DummyMessage::class,
[[HandlerOnSpecificBuses::class, 'dummyMethodForEvents']],
[['bus' => 'event_bus']]
);
$commandsHandlerMapping = $container->getDefinition($commandsBusId.'.messenger.handlers_locator')->getArgument(0);
$this->assertEquals([DummyMessage::class], array_keys($commandsHandlerMapping));
$firstReference = $commandsHandlerMapping[DummyMessage::class]->getValues()[0];
$this->assertEquals([new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForCommands'], $container->getDefinition($firstReference)->getArgument(0));
$this->assertHandlerDescriptor(
$container,
$commandsHandlerMapping,
DummyMessage::class,
[[HandlerOnSpecificBuses::class, 'dummyMethodForCommands']],
[['bus' => 'command_bus']]
);
}
/**
@ -574,12 +603,12 @@ class MessengerPassTest extends TestCase
$this->assertEquals([
$commandBusId => [
DummyCommand::class => [DummyCommandHandler::class],
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
DummyCommand::class => [[DummyCommandHandler::class, []]],
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
],
$queryBusId => [
DummyQuery::class => [DummyQueryHandler::class],
MultipleBusesMessage::class => [MultipleBusesMessageHandler::class],
DummyQuery::class => [[DummyQueryHandler::class, []]],
MultipleBusesMessage::class => [[MultipleBusesMessageHandler::class, []]],
],
$emptyBus => [],
], $container->getDefinition('console.command.messenger_debug')->getArgument(0));
@ -601,6 +630,36 @@ class MessengerPassTest extends TestCase
return $container;
}
private function assertHandlerDescriptor(ContainerBuilder $container, array $mapping, string $message, array $handlerClasses, array $options = [])
{
$this->assertArrayHasKey($message, $mapping);
$this->assertCount(\count($handlerClasses), $mapping[$message]->getValues());
foreach ($handlerClasses as $index => $class) {
$handlerReference = $mapping[$message]->getValues()[$index];
if (\is_array($class)) {
$reference = [new Reference($class[0]), $class[1]];
$options[$index] = array_merge(['method' => $class[1]], $options[$index] ?? []);
} else {
$reference = new Reference($class);
}
$definitionArguments = $container->getDefinition($handlerReference)->getArguments();
if (\is_array($class)) {
$methodDefinition = $container->getDefinition($definitionArguments[0]);
$this->assertEquals(['Closure', 'fromCallable'], $methodDefinition->getFactory());
$this->assertEquals([$reference], $methodDefinition->getArguments());
} else {
$this->assertEquals($reference, $definitionArguments[0]);
}
$this->assertEquals($options[$index] ?? [], $definitionArguments[1]);
}
}
}
class DummyHandler

View File

@ -25,7 +25,7 @@ class EnvelopeTest extends TestCase
{
public function testConstruct()
{
$receivedStamp = new ReceivedStamp();
$receivedStamp = new ReceivedStamp('transport');
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
$this->assertSame($dummy, $envelope->getMessage());
@ -37,12 +37,12 @@ class EnvelopeTest extends TestCase
{
$envelope = new Envelope(new DummyMessage('dummy'));
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp()));
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp('transport')));
}
public function testWithoutAll()
{
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000)]);
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp('transport1'), new ReceivedStamp('transport2'), new DelayStamp(5000)]);
$envelope = $envelope->withoutAll(ReceivedStamp::class);
@ -52,7 +52,7 @@ class EnvelopeTest extends TestCase
public function testLast()
{
$receivedStamp = new ReceivedStamp();
$receivedStamp = new ReceivedStamp('transport');
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
$this->assertSame($receivedStamp, $envelope->last(ReceivedStamp::class));
@ -62,7 +62,7 @@ class EnvelopeTest extends TestCase
public function testAll()
{
$envelope = (new Envelope($dummy = new DummyMessage('dummy')))
->with($receivedStamp = new ReceivedStamp())
->with($receivedStamp = new ReceivedStamp('transport'))
->with($validationStamp = new ValidationStamp(['foo']))
;
@ -76,7 +76,7 @@ class EnvelopeTest extends TestCase
public function testWrapWithMessage()
{
$message = new \stdClass();
$stamp = new ReceivedStamp();
$stamp = new ReceivedStamp('transport');
$envelope = Envelope::wrap($message, [$stamp]);
$this->assertSame($message, $envelope->getMessage());
@ -86,7 +86,7 @@ class EnvelopeTest extends TestCase
public function testWrapWithEnvelope()
{
$envelope = new Envelope(new \stdClass(), [new DelayStamp(5)]);
$envelope = Envelope::wrap($envelope, [new ReceivedStamp()]);
$envelope = Envelope::wrap($envelope, [new ReceivedStamp('transport')]);
$this->assertCount(1, $envelope->all(DelayStamp::class));
$this->assertCount(1, $envelope->all(ReceivedStamp::class));

View File

@ -65,7 +65,7 @@ class HandleTraitTest extends TestCase
/**
* @expectedException \Symfony\Component\Messenger\Exception\LogicException
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" was handled multiple times. Only one handler is expected when using "Symfony\Component\Messenger\Tests\TestQueryBus::handle()", got 2: "FirstDummyHandler::__invoke", "dummy_2".
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" was handled multiple times. Only one handler is expected when using "Symfony\Component\Messenger\Tests\TestQueryBus::handle()", got 2: "FirstDummyHandler::__invoke", "SecondDummyHandler::__invoke".
*/
public function testHandleThrowsOnMultipleHandledStamps()
{
@ -74,7 +74,7 @@ class HandleTraitTest extends TestCase
$query = new DummyMessage('Hello');
$bus->expects($this->once())->method('dispatch')->willReturn(
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2')])
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke')])
);
$queryBus->query($query);

View File

@ -0,0 +1,46 @@
<?php
namespace Symfony\Component\Messenger\Tests\Handler;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Tests\Fixtures\DummyCommandHandler;
class HandleDescriptorTest extends TestCase
{
/**
* @dataProvider provideHandlers
*/
public function testDescriptorNames(callable $handler, ?string $expectedHandlerString)
{
$descriptor = new HandlerDescriptor($handler);
$this->assertStringMatchesFormat($expectedHandlerString, $descriptor->getName());
}
public function provideHandlers()
{
yield [function () {}, 'Closure'];
yield ['var_dump', 'var_dump'];
yield [new DummyCommandHandler(), DummyCommandHandler::class.'::__invoke'];
yield [
[new DummyCommandHandlerWithSpecificMethod(), 'handle'],
DummyCommandHandlerWithSpecificMethod::class.'::handle',
];
yield [\Closure::fromCallable(function () {}), 'Closure'];
yield [\Closure::fromCallable(new DummyCommandHandler()), DummyCommandHandler::class.'::__invoke'];
yield [\Closure::bind(\Closure::fromCallable(function () {}), new \stdClass()), 'Closure'];
yield [new class() {
public function __invoke()
{
}
}, 'class@anonymous%sHandleDescriptorTest.php%s::__invoke'];
}
}
class DummyCommandHandlerWithSpecificMethod
{
public function handle(): void
{
}
}

View File

@ -13,18 +13,41 @@ namespace Symfony\Component\Messenger\Tests\Handler;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class HandlersLocatorTest extends TestCase
{
public function testItYieldsProvidedAliasAsKey()
public function testItYieldsHandlerDescriptors()
{
$handler = $this->createPartialMock(\stdClass::class, ['__invoke']);
$locator = new HandlersLocator([
DummyMessage::class => ['dummy' => $handler],
DummyMessage::class => [$handler],
]);
$this->assertSame(['dummy' => $handler], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
$this->assertEquals([new HandlerDescriptor($handler)], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
}
public function testItReturnsOnlyHandlersMatchingTransport()
{
$firstHandler = $this->createPartialMock(\stdClass::class, ['__invoke']);
$secondHandler = $this->createPartialMock(\stdClass::class, ['__invoke']);
$locator = new HandlersLocator([
DummyMessage::class => [
$first = new HandlerDescriptor($firstHandler, ['alias' => 'one']),
new HandlerDescriptor($this->createPartialMock(\stdClass::class, ['__invoke']), ['from_transport' => 'ignored', 'alias' => 'two']),
$second = new HandlerDescriptor($secondHandler, ['from_transport' => 'transportName', 'alias' => 'three']),
],
]);
$this->assertEquals([
$first,
$second,
], iterator_to_array($locator->getHandlers(
new Envelope(new DummyMessage('Body'), [new ReceivedStamp('transportName')])
)));
}
}

View File

@ -71,7 +71,7 @@ class MessageBusTest extends TestCase
public function testThatAMiddlewareCanAddSomeStampsToTheEnvelope()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, [new ReceivedStamp()]);
$envelope = new Envelope($message, [new ReceivedStamp('transport')]);
$envelopeWithAnotherStamp = $envelope->with(new AnEnvelopeStamp());
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
@ -109,7 +109,7 @@ class MessageBusTest extends TestCase
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeStamps()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, $stamps = [new ReceivedStamp()]);
$envelope = new Envelope($message, $stamps = [new ReceivedStamp('transport')]);
$changedMessage = new DummyMessage('Changed');
$expectedEnvelope = new Envelope($changedMessage, $stamps);

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\StackMiddleware;
@ -79,28 +80,26 @@ class HandleMessageMiddlewareTest extends MiddlewareTestCase
];
yield 'A stamp is added per handler' => [
['first' => $first, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
new HandlerDescriptor($first, ['alias' => 'first']),
new HandlerDescriptor($second, ['alias' => 'second']),
],
true,
];
yield 'Yielded locator alias is used' => [
['first_alias' => $first, $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
new HandledStamp(null, $secondClass.'::__invoke'),
new HandledStamp('first result', $firstClass.'::__invoke@first'),
new HandledStamp(null, $secondClass.'::__invoke@second'),
],
true,
];
yield 'It tries all handlers' => [
['first' => $first, 'failing' => $failing, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
new HandlerDescriptor($first, ['alias' => 'first']),
new HandlerDescriptor($failing, ['alias' => 'failing']),
new HandlerDescriptor($second, ['alias' => 'second']),
],
[
new HandledStamp('first result', $firstClass.'::__invoke@first'),
new HandledStamp(null, $secondClass.'::__invoke@second'),
],
false,
];

View File

@ -195,7 +195,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
public function testItSkipsReceivedMessages()
{
$envelope = (new Envelope(new DummyMessage('Hey')))->with(new ReceivedStamp());
$envelope = (new Envelope(new DummyMessage('Hey')))->with(new ReceivedStamp('transport'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
@ -45,8 +46,8 @@ class RetryIntegrationTest extends TestCase
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
'handler' => $handler,
'throwing' => $throwingHandler,
new HandlerDescriptor($handler, ['alias' => 'first']),
new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']),
],
]);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyCommandHandler;
@ -19,54 +20,22 @@ class HandledStampTest extends TestCase
{
public function testConstruct()
{
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()', 'foo');
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()');
$this->assertSame('some result', $stamp->getResult());
$this->assertSame('FooHandler::__invoke()', $stamp->getCallableName());
$this->assertSame('foo', $stamp->getHandlerAlias());
$this->assertSame('FooHandler::__invoke()', $stamp->getHandlerName());
$stamp = new HandledStamp('some result', 'FooHandler::__invoke()');
$this->assertSame('some result', $stamp->getResult());
$this->assertSame('FooHandler::__invoke()', $stamp->getCallableName());
$this->assertNull($stamp->getHandlerAlias());
$this->assertSame('FooHandler::__invoke()', $stamp->getHandlerName());
}
/**
* @dataProvider provideCallables
*/
public function testFromCallable(callable $handler, ?string $expectedHandlerString)
public function testFromDescriptor()
{
/** @var HandledStamp $stamp */
$stamp = HandledStamp::fromCallable($handler, 'some_result', 'alias');
$this->assertStringMatchesFormat($expectedHandlerString, $stamp->getCallableName());
$this->assertSame('alias', $stamp->getHandlerAlias(), 'alias is forwarded to construct');
$stamp = HandledStamp::fromDescriptor(new HandlerDescriptor(new DummyCommandHandler()), 'some_result');
$this->assertEquals(DummyCommandHandler::class.'::__invoke', $stamp->getHandlerName());
$this->assertSame('some_result', $stamp->getResult(), 'result is forwarded to construct');
}
public function provideCallables()
{
yield [function () {}, 'Closure'];
yield ['var_dump', 'var_dump'];
yield [new DummyCommandHandler(), DummyCommandHandler::class.'::__invoke'];
yield [
[new DummyCommandHandlerWithSpecificMethod(), 'handle'],
DummyCommandHandlerWithSpecificMethod::class.'::handle',
];
yield [\Closure::fromCallable(function () {}), 'Closure'];
yield [\Closure::fromCallable(new DummyCommandHandler()), DummyCommandHandler::class.'::__invoke'];
yield [\Closure::bind(\Closure::fromCallable(function () {}), new \stdClass()), 'Closure'];
yield [new class() {
public function __invoke()
{
}
}, 'class@anonymous%sHandledStampTest.php%s::__invoke'];
}
}
class DummyCommandHandlerWithSpecificMethod
{
public function handle(): void
{
}
}

View File

@ -44,10 +44,15 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, [new ReceivedStamp()]))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, [new ReceivedStamp()]))->willReturn($envelope);
$bus->expects($this->at(0))->method('dispatch')->with(
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')])
)->willReturn($envelope);
$worker = new Worker([$receiver], $bus);
$bus->expects($this->at(1))->method('dispatch')->with(
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')])
)->willReturn($envelope);
$worker = new Worker(['transport' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
@ -62,12 +67,12 @@ class WorkerTest extends TestCase
{
$envelope = new Envelope(new DummyMessage('API'));
$receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp());
$envelope = $envelope->with(new ReceivedStamp('transport'));
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
$worker = new Worker([$receiver], $bus, []);
$worker = new Worker(['transport' => $receiver], $bus, []);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {

View File

@ -44,7 +44,7 @@ class Worker implements WorkerInterface
private $shouldStop = false;
/**
* @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
* @param ReceiverInterface[] $receivers Where the key is the transport name
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
*/
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
@ -86,13 +86,13 @@ class Worker implements WorkerInterface
while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $receiverName => $receiver) {
foreach ($this->receivers as $transportName => $receiver) {
$envelopes = $receiver->get();
foreach ($envelopes as $envelope) {
$envelopeHandled = true;
$this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
$onHandled($envelope);
}
@ -114,9 +114,9 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerStoppedEvent());
}
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy)
{
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName));
$message = $envelope->getMessage();
$context = [
@ -125,7 +125,7 @@ class Worker implements WorkerInterface
];
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
@ -133,7 +133,7 @@ class Worker implements WorkerInterface
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
if ($shouldRetry) {
if (null === $retryStrategy) {
@ -166,7 +166,7 @@ class Worker implements WorkerInterface
return;
}
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
if (null !== $this->logger) {
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);