[Messenger][Profiler] Trace middleware execution
This commit is contained in:
parent
55def78682
commit
e974f67b1f
@ -9,6 +9,7 @@
|
|||||||
'event_listener': '#00B8F5',
|
'event_listener': '#00B8F5',
|
||||||
'template': '#66CC00',
|
'template': '#66CC00',
|
||||||
'doctrine': '#FF6633',
|
'doctrine': '#FF6633',
|
||||||
|
'messenger.middleware': '#BDB81E',
|
||||||
} %}
|
} %}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ use Symfony\Component\DependencyInjection\Reference;
|
|||||||
use Symfony\Component\Messenger\Handler\ChainHandler;
|
use Symfony\Component\Messenger\Handler\ChainHandler;
|
||||||
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
|
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
|
||||||
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
|
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
|
||||||
use Symfony\Component\Messenger\TraceableMessageBus;
|
use Symfony\Component\Messenger\TraceableMessageBus;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||||
@ -37,13 +38,15 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
private $busTag;
|
private $busTag;
|
||||||
private $senderTag;
|
private $senderTag;
|
||||||
private $receiverTag;
|
private $receiverTag;
|
||||||
|
private $debugStopwatchId;
|
||||||
|
|
||||||
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
|
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver', string $debugStopwatchId = 'debug.stopwatch')
|
||||||
{
|
{
|
||||||
$this->handlerTag = $handlerTag;
|
$this->handlerTag = $handlerTag;
|
||||||
$this->busTag = $busTag;
|
$this->busTag = $busTag;
|
||||||
$this->senderTag = $senderTag;
|
$this->senderTag = $senderTag;
|
||||||
$this->receiverTag = $receiverTag;
|
$this->receiverTag = $receiverTag;
|
||||||
|
$this->debugStopwatchId = $debugStopwatchId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -303,6 +306,7 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
|
|
||||||
private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
|
private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
|
||||||
{
|
{
|
||||||
|
$debug = $container->getParameter('kernel.debug') && $container->has($this->debugStopwatchId);
|
||||||
$middlewareReferences = array();
|
$middlewareReferences = array();
|
||||||
foreach ($middlewareCollection as $middlewareItem) {
|
foreach ($middlewareCollection as $middlewareItem) {
|
||||||
$id = $middlewareItem['id'];
|
$id = $middlewareItem['id'];
|
||||||
@ -315,7 +319,7 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
|
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
|
if ($isDefinitionAbstract = ($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
|
||||||
$childDefinition = new ChildDefinition($messengerMiddlewareId);
|
$childDefinition = new ChildDefinition($messengerMiddlewareId);
|
||||||
$count = \count($definition->getArguments());
|
$count = \count($definition->getArguments());
|
||||||
foreach (array_values($arguments ?? array()) as $key => $argument) {
|
foreach (array_values($arguments ?? array()) as $key => $argument) {
|
||||||
@ -329,6 +333,21 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
|
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($debug) {
|
||||||
|
$container->register($debugMiddlewareId = '.messenger.debug.traced.'.$messengerMiddlewareId, TraceableMiddleware::class)
|
||||||
|
// Decorates with a high priority so it's applied the earliest:
|
||||||
|
->setDecoratedService($messengerMiddlewareId, null, 100)
|
||||||
|
->setArguments(array(
|
||||||
|
new Reference($debugMiddlewareId.'.inner'),
|
||||||
|
new Reference($this->debugStopwatchId),
|
||||||
|
// In case the definition isn't abstract,
|
||||||
|
// we cannot be sure the service instance is used by one bus only.
|
||||||
|
// So we only inject the bus name when the original definition is abstract.
|
||||||
|
$isDefinitionAbstract ? $busId : null,
|
||||||
|
))
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
$middlewareReferences[] = new Reference($messengerMiddlewareId);
|
$middlewareReferences[] = new Reference($messengerMiddlewareId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Middleware\Enhancers;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\EnvelopeAwareInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
|
use Symfony\Component\Stopwatch\Stopwatch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collects some data about a middleware.
|
||||||
|
*
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*/
|
||||||
|
class TraceableMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
|
||||||
|
{
|
||||||
|
private $inner;
|
||||||
|
private $stopwatch;
|
||||||
|
private $busName;
|
||||||
|
private $eventCategory;
|
||||||
|
|
||||||
|
public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, string $busName = null, string $eventCategory = 'messenger.middleware')
|
||||||
|
{
|
||||||
|
$this->inner = $inner;
|
||||||
|
$this->stopwatch = $stopwatch;
|
||||||
|
$this->busName = $busName;
|
||||||
|
$this->eventCategory = $eventCategory;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Envelope $envelope
|
||||||
|
*/
|
||||||
|
public function handle($envelope, callable $next)
|
||||||
|
{
|
||||||
|
$class = \get_class($this->inner);
|
||||||
|
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;
|
||||||
|
|
||||||
|
if ($this->busName) {
|
||||||
|
$eventName .= " (bus: {$this->busName})";
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->stopwatch->start($eventName, $this->eventCategory);
|
||||||
|
|
||||||
|
try {
|
||||||
|
$result = $this->inner->handle($envelope->getMessageFor($this->inner), function ($message) use ($next, $eventName) {
|
||||||
|
$this->stopwatch->stop($eventName);
|
||||||
|
$result = $next($message);
|
||||||
|
$this->stopwatch->start($eventName, $this->eventCategory);
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
if ($this->stopwatch->isStarted($eventName)) {
|
||||||
|
$this->stopwatch->stop($eventName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
}
|
@ -41,6 +41,7 @@ use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
|||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
use Symfony\Component\Stopwatch\Stopwatch;
|
||||||
|
|
||||||
class MessengerPassTest extends TestCase
|
class MessengerPassTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -561,6 +562,39 @@ class MessengerPassTest extends TestCase
|
|||||||
(new MessengerPass())->process($container);
|
(new MessengerPass())->process($container);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testDecoratesWithTraceableMiddlewareOnDebug()
|
||||||
|
{
|
||||||
|
$container = $this->getContainerBuilder();
|
||||||
|
|
||||||
|
$container->register($busId = 'message_bus', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
|
||||||
|
$container->register('abstract_middleware', UselessMiddleware::class)->setAbstract(true);
|
||||||
|
$container->register('concrete_middleware', UselessMiddleware::class);
|
||||||
|
|
||||||
|
$container->setParameter($middlewareParameter = $busId.'.middleware', array(
|
||||||
|
array('id' => 'abstract_middleware'),
|
||||||
|
array('id' => 'concrete_middleware'),
|
||||||
|
));
|
||||||
|
|
||||||
|
$container->setParameter('kernel.debug', true);
|
||||||
|
$container->register('debug.stopwatch', Stopwatch::class);
|
||||||
|
|
||||||
|
(new MessengerPass())->process($container);
|
||||||
|
|
||||||
|
$this->assertNotNull($concreteDef = $container->getDefinition('.messenger.debug.traced.concrete_middleware'));
|
||||||
|
$this->assertEquals(array(
|
||||||
|
new Reference('.messenger.debug.traced.concrete_middleware.inner'),
|
||||||
|
new Reference('debug.stopwatch'),
|
||||||
|
null,
|
||||||
|
), $concreteDef->getArguments());
|
||||||
|
|
||||||
|
$this->assertNotNull($abstractDef = $container->getDefinition(".messenger.debug.traced.$busId.middleware.abstract_middleware"));
|
||||||
|
$this->assertEquals(array(
|
||||||
|
new Reference(".messenger.debug.traced.$busId.middleware.abstract_middleware.inner"),
|
||||||
|
new Reference('debug.stopwatch'),
|
||||||
|
$busId,
|
||||||
|
), $abstractDef->getArguments());
|
||||||
|
}
|
||||||
|
|
||||||
public function testItRegistersTheDebugCommand()
|
public function testItRegistersTheDebugCommand()
|
||||||
{
|
{
|
||||||
$container = $this->getContainerBuilder($commandBusId = 'command_bus');
|
$container = $this->getContainerBuilder($commandBusId = 'command_bus');
|
||||||
|
@ -0,0 +1,104 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Middleware\Enhancers;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
|
||||||
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Stopwatch\Stopwatch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
|
||||||
|
*/
|
||||||
|
class TraceableMiddlewareTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testHandle()
|
||||||
|
{
|
||||||
|
$busId = 'command_bus';
|
||||||
|
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));
|
||||||
|
|
||||||
|
$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||||
|
$middleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($message, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, callable $next) {
|
||||||
|
return $next($message);
|
||||||
|
}))
|
||||||
|
;
|
||||||
|
|
||||||
|
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||||
|
$next
|
||||||
|
->expects($this->once())
|
||||||
|
->method('__invoke')
|
||||||
|
->with($message)
|
||||||
|
->willReturn($expectedReturnedValue = 'Hello')
|
||||||
|
;
|
||||||
|
|
||||||
|
$stopwatch = $this->createMock(Stopwatch::class);
|
||||||
|
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
|
||||||
|
$stopwatch->expects($this->exactly(2))
|
||||||
|
->method('start')
|
||||||
|
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
|
||||||
|
;
|
||||||
|
$stopwatch->expects($this->exactly(2))
|
||||||
|
->method('stop')
|
||||||
|
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
|
||||||
|
;
|
||||||
|
|
||||||
|
$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
|
||||||
|
|
||||||
|
$this->assertSame($expectedReturnedValue, $traced->handle($envelope, $next));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @expectedException \RuntimeException
|
||||||
|
* @expectedExceptionMessage Foo exception from next callable
|
||||||
|
*/
|
||||||
|
public function testHandleWithException()
|
||||||
|
{
|
||||||
|
$busId = 'command_bus';
|
||||||
|
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));
|
||||||
|
|
||||||
|
$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||||
|
$middleware->expects($this->once())
|
||||||
|
->method('handle')
|
||||||
|
->with($message, $this->anything())
|
||||||
|
->will($this->returnCallback(function ($message, callable $next) {
|
||||||
|
return $next($message);
|
||||||
|
}))
|
||||||
|
;
|
||||||
|
|
||||||
|
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||||
|
$next
|
||||||
|
->expects($this->once())
|
||||||
|
->method('__invoke')
|
||||||
|
->willThrowException(new \RuntimeException('Foo exception from next callable'))
|
||||||
|
;
|
||||||
|
|
||||||
|
$stopwatch = $this->createMock(Stopwatch::class);
|
||||||
|
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
|
||||||
|
// Start is only expected to be called once, as an exception is thrown by the next callable:
|
||||||
|
$stopwatch->expects($this->exactly(1))
|
||||||
|
->method('start')
|
||||||
|
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
|
||||||
|
;
|
||||||
|
$stopwatch->expects($this->exactly(2))
|
||||||
|
->method('stop')
|
||||||
|
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
|
||||||
|
;
|
||||||
|
|
||||||
|
$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
|
||||||
|
$traced->handle($envelope, $next);
|
||||||
|
}
|
||||||
|
}
|
@ -26,6 +26,7 @@
|
|||||||
"symfony/process": "~3.4|~4.0",
|
"symfony/process": "~3.4|~4.0",
|
||||||
"symfony/property-access": "~3.4|~4.0",
|
"symfony/property-access": "~3.4|~4.0",
|
||||||
"symfony/serializer": "~3.4|~4.0",
|
"symfony/serializer": "~3.4|~4.0",
|
||||||
|
"symfony/stopwatch": "~3.4|~4.0",
|
||||||
"symfony/validator": "~3.4|~4.0",
|
"symfony/validator": "~3.4|~4.0",
|
||||||
"symfony/var-dumper": "~3.4|~4.0"
|
"symfony/var-dumper": "~3.4|~4.0"
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user