[Messenger][Profiler] Trace middleware execution

This commit is contained in:
Maxime Steinhausser 2018-05-19 20:30:28 +02:00
parent 55def78682
commit e974f67b1f
6 changed files with 230 additions and 2 deletions

View File

@ -9,6 +9,7 @@
'event_listener': '#00B8F5',
'template': '#66CC00',
'doctrine': '#FF6633',
'messenger.middleware': '#BDB81E',
} %}
{% endif %}

View File

@ -22,6 +22,7 @@ use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -37,13 +38,15 @@ class MessengerPass implements CompilerPassInterface
private $busTag;
private $senderTag;
private $receiverTag;
private $debugStopwatchId;
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver', string $debugStopwatchId = 'debug.stopwatch')
{
$this->handlerTag = $handlerTag;
$this->busTag = $busTag;
$this->senderTag = $senderTag;
$this->receiverTag = $receiverTag;
$this->debugStopwatchId = $debugStopwatchId;
}
/**
@ -303,6 +306,7 @@ class MessengerPass implements CompilerPassInterface
private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
{
$debug = $container->getParameter('kernel.debug') && $container->has($this->debugStopwatchId);
$middlewareReferences = array();
foreach ($middlewareCollection as $middlewareItem) {
$id = $middlewareItem['id'];
@ -315,7 +319,7 @@ class MessengerPass implements CompilerPassInterface
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
}
if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
if ($isDefinitionAbstract = ($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
@ -329,6 +333,21 @@ class MessengerPass implements CompilerPassInterface
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}
if ($debug) {
$container->register($debugMiddlewareId = '.messenger.debug.traced.'.$messengerMiddlewareId, TraceableMiddleware::class)
// Decorates with a high priority so it's applied the earliest:
->setDecoratedService($messengerMiddlewareId, null, 100)
->setArguments(array(
new Reference($debugMiddlewareId.'.inner'),
new Reference($this->debugStopwatchId),
// In case the definition isn't abstract,
// we cannot be sure the service instance is used by one bus only.
// So we only inject the bus name when the original definition is abstract.
$isDefinitionAbstract ? $busId : null,
))
;
}
$middlewareReferences[] = new Reference($messengerMiddlewareId);
}

View File

@ -0,0 +1,69 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware\Enhancers;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Stopwatch\Stopwatch;
/**
* Collects some data about a middleware.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $inner;
private $stopwatch;
private $busName;
private $eventCategory;
public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, string $busName = null, string $eventCategory = 'messenger.middleware')
{
$this->inner = $inner;
$this->stopwatch = $stopwatch;
$this->busName = $busName;
$this->eventCategory = $eventCategory;
}
/**
* @param Envelope $envelope
*/
public function handle($envelope, callable $next)
{
$class = \get_class($this->inner);
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;
if ($this->busName) {
$eventName .= " (bus: {$this->busName})";
}
$this->stopwatch->start($eventName, $this->eventCategory);
try {
$result = $this->inner->handle($envelope->getMessageFor($this->inner), function ($message) use ($next, $eventName) {
$this->stopwatch->stop($eventName);
$result = $next($message);
$this->stopwatch->start($eventName, $this->eventCategory);
return $result;
});
} finally {
if ($this->stopwatch->isStarted($eventName)) {
$this->stopwatch->stop($eventName);
}
}
return $result;
}
}

View File

@ -41,6 +41,7 @@ use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Stopwatch\Stopwatch;
class MessengerPassTest extends TestCase
{
@ -561,6 +562,39 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container);
}
public function testDecoratesWithTraceableMiddlewareOnDebug()
{
$container = $this->getContainerBuilder();
$container->register($busId = 'message_bus', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->register('abstract_middleware', UselessMiddleware::class)->setAbstract(true);
$container->register('concrete_middleware', UselessMiddleware::class);
$container->setParameter($middlewareParameter = $busId.'.middleware', array(
array('id' => 'abstract_middleware'),
array('id' => 'concrete_middleware'),
));
$container->setParameter('kernel.debug', true);
$container->register('debug.stopwatch', Stopwatch::class);
(new MessengerPass())->process($container);
$this->assertNotNull($concreteDef = $container->getDefinition('.messenger.debug.traced.concrete_middleware'));
$this->assertEquals(array(
new Reference('.messenger.debug.traced.concrete_middleware.inner'),
new Reference('debug.stopwatch'),
null,
), $concreteDef->getArguments());
$this->assertNotNull($abstractDef = $container->getDefinition(".messenger.debug.traced.$busId.middleware.abstract_middleware"));
$this->assertEquals(array(
new Reference(".messenger.debug.traced.$busId.middleware.abstract_middleware.inner"),
new Reference('debug.stopwatch'),
$busId,
), $abstractDef->getArguments());
}
public function testItRegistersTheDebugCommand()
{
$container = $this->getContainerBuilder($commandBusId = 'command_bus');

View File

@ -0,0 +1,104 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware\Enhancers;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Stopwatch\Stopwatch;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddlewareTest extends TestCase
{
public function testHandle()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));
$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($message)
->willReturn($expectedReturnedValue = 'Hello')
;
$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
$stopwatch->expects($this->exactly(2))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;
$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$this->assertSame($expectedReturnedValue, $traced->handle($envelope, $next));
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage Foo exception from next callable
*/
public function testHandleWithException()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));
$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->willThrowException(new \RuntimeException('Foo exception from next callable'))
;
$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
// Start is only expected to be called once, as an exception is thrown by the next callable:
$stopwatch->expects($this->exactly(1))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;
$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$traced->handle($envelope, $next);
}
}

View File

@ -26,6 +26,7 @@
"symfony/process": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
"symfony/serializer": "~3.4|~4.0",
"symfony/stopwatch": "~3.4|~4.0",
"symfony/validator": "~3.4|~4.0",
"symfony/var-dumper": "~3.4|~4.0"
},