[Messenger] internal cleanups

This commit is contained in:
Nicolas Grekas 2018-10-17 17:45:44 +02:00
parent f24794da31
commit 4a3edd0b37
26 changed files with 133 additions and 142 deletions

View File

@ -44,21 +44,17 @@ class SendMessageMiddleware implements MiddlewareInterface
return;
}
$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());
$sender = $this->senderLocator->getSender($envelope);
if ($sender) {
$sender->send($envelope);
if (!$this->mustSendAndHandle($envelope->getMessage())) {
if (!AbstractSenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $envelope)) {
// message has no corresponding handler
return;
}
}
$next($envelope);
}
private function mustSendAndHandle($message): bool
{
return (bool) AbstractSenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
}
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -18,21 +20,24 @@ namespace Symfony\Component\Messenger\Asynchronous\Routing;
*/
abstract class AbstractSenderLocator implements SenderLocatorInterface
{
public static function getValueFromMessageRouting(array $mapping, $message)
public static function getValueFromMessageRouting(array $mapping, Envelope $envelope)
{
if (isset($mapping[\get_class($message)])) {
return $mapping[\get_class($message)];
}
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
return current($parentsMapping);
}
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
return current($interfaceMapping);
}
if (isset($mapping['*'])) {
return $mapping['*'];
if (isset($mapping[$class = \get_class($envelope->getMessage())])) {
return $mapping[$class];
}
return null;
foreach (class_parents($class) as $name) {
if (isset($mapping[$name])) {
return $mapping[$name];
}
}
foreach (class_implements($class) as $name) {
if (isset($mapping[$name])) {
return $mapping[$name];
}
}
return $mapping['*'] ?? null;
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
@ -31,9 +32,9 @@ class ContainerSenderLocator extends AbstractSenderLocator
/**
* {@inheritdoc}
*/
public function getSenderForMessage($message): ?SenderInterface
public function getSender(Envelope $envelope): ?SenderInterface
{
$senderId = self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
$senderId = self::getValueFromMessageRouting($this->messageToSenderIdMapping, $envelope);
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -29,15 +30,15 @@ class SenderLocator extends AbstractSenderLocator
/**
* {@inheritdoc}
*/
public function getSenderForMessage($message): ?SenderInterface
public function getSender(Envelope $envelope): ?SenderInterface
{
$sender = self::getValueFromMessageRouting($this->messageToSenderMapping, $message);
$sender = self::getValueFromMessageRouting($this->messageToSenderMapping, $envelope);
if (null === $sender) {
return null;
}
if (!$sender instanceof SenderInterface) {
throw new RuntimeException(sprintf('The sender instance provided for message "%s" should be of type "%s" but got "%s".', \get_class($message), SenderInterface::class, \is_object($sender) ? \get_class($sender) : \gettype($sender)));
throw new RuntimeException(sprintf('The sender instance provided for message "%s" should be of type "%s" but got "%s".', \get_class($envelope->getMessage()), SenderInterface::class, \is_object($sender) ? \get_class($sender) : \gettype($sender)));
}
return $sender;

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
@ -21,10 +22,6 @@ interface SenderLocatorInterface
{
/**
* Gets the sender (if applicable) for the given message object.
*
* @param object $message
*
* @return SenderInterface|null
*/
public function getSenderForMessage($message): ?SenderInterface;
public function getSender(Envelope $envelope): ?SenderInterface;
}

View File

@ -27,6 +27,12 @@ CHANGELOG
* `Envelope`'s constructor and `with()` method now accept `StampInterface` objects as variadic parameters
* Renamed and moved `ReceivedMessage`, `ValidationConfiguration` and `SerializerConfiguration` in the `Stamp` namespace
* Removed the `WrapIntoReceivedMessage`
* `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
* `MessengerDataCollector::getMessages()` returns an iterable, not just an array anymore
* `AbstractHandlerLocator` is now internal
* `HandlerLocatorInterface::resolve()` has been replaced by `getHandler(Envelope $envelope)`
* `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
* `SenderInterface::send()` returns `void`
4.1.0
-----

View File

@ -17,7 +17,6 @@ use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\HttpKernel\DataCollector\LateDataCollectorInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\VarDumper\Caster\ClassStub;
use Symfony\Component\VarDumper\Cloner\Data;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -55,14 +54,10 @@ class MessengerDataCollector extends DataCollector implements LateDataCollectorI
}
// Order by call time
usort($messages, function (array $a, array $b): int {
return $a[1] > $b[1] ? 1 : -1;
});
usort($messages, function ($a, $b) { return $a[1] <=> $b[1]; });
// Keep the messages clones only
$this->data['messages'] = array_map(function (array $item): Data {
return $item[0];
}, $messages);
$this->data['messages'] = array_column($messages, 0);
}
/**
@ -112,18 +107,21 @@ class MessengerDataCollector extends DataCollector implements LateDataCollectorI
public function getExceptionsCount(string $bus = null): int
{
return array_reduce($this->getMessages($bus), function (int $carry, Data $message) {
return $carry += isset($message['exception']) ? 1 : 0;
}, 0);
$count = 0;
foreach ($this->getMessages($bus) as $message) {
$count += (int) isset($message['exception']);
}
return $count;
}
public function getMessages(string $bus = null): array
public function getMessages(string $bus = null): iterable
{
$messages = $this->data['messages'] ?? array();
return $bus ? array_filter($messages, function (Data $message) use ($bus): bool {
return $bus === $message['bus'];
}) : $messages;
foreach ($this->data['messages'] ?? array() as $message) {
if (null === $bus || $bus === $message['bus']) {
yield $message;
}
}
}
public function getBuses(): array

View File

@ -28,6 +28,9 @@ final class Envelope
*/
public function __construct($message, StampInterface ...$stamps)
{
if (!\is_object($message)) {
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object but got %s.', __METHOD__, \gettype($message)));
}
$this->message = $message;
foreach ($stamps as $stamp) {

View File

@ -30,7 +30,7 @@ class ChainHandler
*/
public function __construct(array $handlers)
{
if (empty($handlers)) {
if (!$handlers) {
throw new InvalidArgumentException('A collection of message handlers requires at least one handler.');
}

View File

@ -11,30 +11,33 @@
namespace Symfony\Component\Messenger\Handler\Locator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
/**
* @author Miha Vrhovnik <miha.vrhovnik@gmail.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @internal
*/
abstract class AbstractHandlerLocator implements HandlerLocatorInterface
{
public function resolve($message): callable
public function getHandler(Envelope $envelope): callable
{
$class = \get_class($message);
$class = \get_class($envelope->getMessage());
if ($handler = $this->getHandler($class)) {
if ($handler = $this->getHandlerByName($class)) {
return $handler;
}
foreach (class_implements($class, false) as $interface) {
if ($handler = $this->getHandler($interface)) {
foreach (class_parents($class) as $name) {
if ($handler = $this->getHandlerByName($name)) {
return $handler;
}
}
foreach (class_parents($class, false) as $parent) {
if ($handler = $this->getHandler($parent)) {
foreach (class_implements($class) as $name) {
if ($handler = $this->getHandlerByName($name)) {
return $handler;
}
}
@ -42,5 +45,5 @@ abstract class AbstractHandlerLocator implements HandlerLocatorInterface
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $class));
}
abstract protected function getHandler(string $class): ?callable;
abstract protected function getHandlerByName(string $name): ?callable;
}

View File

@ -29,8 +29,8 @@ class ContainerHandlerLocator extends AbstractHandlerLocator
/**
* {@inheritdoc}
*/
protected function getHandler(string $class): ?callable
protected function getHandlerByName(string $name): ?callable
{
return $this->container->has($class) ? $this->container->get($class) : null;
return $this->container->has($name) ? $this->container->get($name) : null;
}
}

View File

@ -29,8 +29,8 @@ class HandlerLocator extends AbstractHandlerLocator
/**
* {@inheritdoc}
*/
protected function getHandler(string $class): ?callable
protected function getHandlerByName(string $name): ?callable
{
return $this->messageToHandlerMapping[$class] ?? null;
return $this->messageToHandlerMapping[$name] ?? null;
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Handler\Locator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
/**
@ -21,11 +22,7 @@ interface HandlerLocatorInterface
/**
* Returns the handler for the given message.
*
* @param object $message
*
* @throws NoHandlerForMessageException
*
* @return callable
* @throws NoHandlerForMessageException When no handler is found
*/
public function resolve($message): callable;
public function getHandler(Envelope $envelope): callable;
}

View File

@ -19,11 +19,11 @@ use Symfony\Component\Messenger\Handler\Locator\HandlerLocatorInterface;
*/
class HandleMessageMiddleware implements MiddlewareInterface
{
private $messageHandlerResolver;
private $messageHandlerLocator;
public function __construct(HandlerLocatorInterface $messageHandlerResolver)
public function __construct(HandlerLocatorInterface $messageHandlerLocator)
{
$this->messageHandlerResolver = $messageHandlerResolver;
$this->messageHandlerLocator = $messageHandlerLocator;
}
/**
@ -31,10 +31,8 @@ class HandleMessageMiddleware implements MiddlewareInterface
*/
public function handle(Envelope $envelope, callable $next): void
{
$message = $envelope->getMessage();
$handler = $this->messageHandlerResolver->resolve($message);
$handler($message);
$handler = $this->messageHandlerLocator->getHandler($envelope);
$handler($envelope->getMessage());
$next($envelope);
}
}

View File

@ -32,27 +32,21 @@ class LoggingMiddleware implements MiddlewareInterface
public function handle(Envelope $envelope, callable $next): void
{
$message = $envelope->getMessage();
$this->logger->debug('Starting handling message {class}', $this->createContext($message));
$context = array(
'message' => $message,
'name' => \get_class($message),
);
$this->logger->debug('Starting handling message {name}', $context);
try {
$next($envelope);
} catch (\Throwable $e) {
$this->logger->warning('An exception occurred while handling message {class}', array_merge(
$this->createContext($message),
array('exception' => $e)
));
$context['exception'] = $e;
$this->logger->warning('An exception occurred while handling message {name}', $context);
throw $e;
}
$this->logger->debug('Finished handling message {class}', $this->createContext($message));
}
private function createContext($message): array
{
return array(
'message' => $message,
'class' => \get_class($message),
);
$this->logger->debug('Finished handling message {name}', $context);
}
}

View File

@ -158,7 +158,7 @@ class InMemorySenderLocator implements SenderLocatorInterface
$this->sender = $sender;
}
public function getSenderForMessage($message): ?SenderInterface
public function getSender(Envelope $envelope): ?SenderInterface
{
return $this->sender;
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Asynchronous\Routing;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\Messenger\Asynchronous\Routing\ContainerSenderLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
@ -32,8 +33,8 @@ class ContainerSenderLocatorTest extends TestCase
DummyMessage::class => 'my_amqp_sender',
));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSender(new Envelope(new DummyMessage('Hello'))));
$this->assertNull($locator->getSender(new Envelope(new SecondMessage())));
}
public function testItReturnsTheSenderBasedOnTheMessageParentClass()
@ -51,8 +52,8 @@ class ContainerSenderLocatorTest extends TestCase
DummyMessage::class => 'my_amqp_sender',
));
$this->assertSame($sender, $locator->getSenderForMessage(new ChildDummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSender(new Envelope(new ChildDummyMessage('Hello'))));
$this->assertNull($locator->getSender(new Envelope(new SecondMessage())));
}
public function testItReturnsTheSenderBasedOnTheMessageInterface()
@ -66,8 +67,8 @@ class ContainerSenderLocatorTest extends TestCase
DummyMessageInterface::class => 'my_amqp_sender',
));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSender(new Envelope(new DummyMessage('Hello'))));
$this->assertNull($locator->getSender(new Envelope(new SecondMessage())));
}
public function testItSupportsAWildcardInsteadOfTheMessageClass()
@ -85,7 +86,7 @@ class ContainerSenderLocatorTest extends TestCase
'*' => 'my_api_sender',
));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertSame($apiSender, $locator->getSenderForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSender(new Envelope(new DummyMessage('Hello'))));
$this->assertSame($apiSender, $locator->getSender(new Envelope(new SecondMessage())));
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Asynchronous\Routing;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
@ -27,8 +28,8 @@ class SenderLocatorTest extends TestCase
DummyMessage::class => $sender,
));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSender(new Envelope(new DummyMessage('Hello'))));
$this->assertNull($locator->getSender(new Envelope(new SecondMessage())));
}
public function testItThrowsExceptionIfConfigurationIsWrong()
@ -38,6 +39,6 @@ class SenderLocatorTest extends TestCase
));
$this->expectException(RuntimeException::class);
$locator->getSenderForMessage(new DummyMessage('Hello'));
$locator->getSender(new Envelope(new DummyMessage('Hello')));
}
}

View File

@ -48,14 +48,14 @@ class MessengerDataCollectorTest extends TestCase
$collector->lateCollect();
$messages = $collector->getMessages();
$messages = iterator_to_array($collector->getMessages());
$this->assertCount(1, $messages);
$file = __FILE__;
$expected = <<<DUMP
array:4 [
"bus" => "default"
"stamps" => null
"stamps" => []
"message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"value" => Symfony\Component\Messenger\Tests\Fixtures\DummyMessage %A
@ -93,14 +93,14 @@ DUMP;
$collector->lateCollect();
$messages = $collector->getMessages();
$messages = iterator_to_array($collector->getMessages());
$this->assertCount(1, $messages);
$file = __FILE__;
$this->assertStringMatchesFormat(<<<DUMP
array:5 [
"bus" => "default"
"stamps" => null
"stamps" => []
"message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"value" => Symfony\Component\Messenger\Tests\Fixtures\DummyMessage %A
@ -141,7 +141,7 @@ DUMP
$collector->lateCollect();
$messages = $collector->getMessages();
$messages = iterator_to_array($collector->getMessages());
$this->assertCount(5, $messages);
$this->assertSame('#1', $messages[0]['message']['value']['message']);

View File

@ -4,6 +4,7 @@ namespace Symfony\Component\Messenger\Tests\Handler\Locator;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -19,7 +20,7 @@ class ContainerHandlerLocatorTest extends TestCase
$container->set(DummyMessage::class, $handler);
$locator = new ContainerHandlerLocator($container);
$resolvedHandler = $locator->resolve(new DummyMessage('Hey'));
$resolvedHandler = $locator->getHandler(new Envelope(new DummyMessage('Hey')));
$this->assertSame($handler, $resolvedHandler);
}
@ -31,10 +32,10 @@ class ContainerHandlerLocatorTest extends TestCase
public function testThrowsNoHandlerException()
{
$locator = new ContainerHandlerLocator(new Container());
$locator->resolve(new DummyMessage('Hey'));
$locator->getHandler(new Envelope(new DummyMessage('Hey')));
}
public function testResolveMessageViaTheirInterface()
public function testGetHandlerViaInterface()
{
$handler = function () {};
@ -42,12 +43,12 @@ class ContainerHandlerLocatorTest extends TestCase
$container->set(DummyMessageInterface::class, $handler);
$locator = new ContainerHandlerLocator($container);
$resolvedHandler = $locator->resolve(new DummyMessage('Hey'));
$resolvedHandler = $locator->getHandler(new Envelope(new DummyMessage('Hey')));
$this->assertSame($handler, $resolvedHandler);
}
public function testResolveMessageViaTheirParentClass()
public function testGetHandlerViaParentClass()
{
$handler = function () {};
@ -55,7 +56,7 @@ class ContainerHandlerLocatorTest extends TestCase
$container->set(DummyMessage::class, $handler);
$locator = new ContainerHandlerLocator($container);
$resolvedHandler = $locator->resolve(new ChildDummyMessage('Hey'));
$resolvedHandler = $locator->getHandler(new Envelope(new ChildDummyMessage('Hey')));
$this->assertSame($handler, $resolvedHandler);
}

View File

@ -32,7 +32,7 @@ class TraceableMessageBusTest extends TestCase
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset(array(
'message' => $message,
'stamps' => null,
'stamps' => array(),
'caller' => array(
'name' => 'TraceableMessageBusTest.php',
'file' => __FILE__,
@ -83,7 +83,7 @@ class TraceableMessageBusTest extends TestCase
$this->assertArraySubset(array(
'message' => $message,
'exception' => $exception,
'stamps' => null,
'stamps' => array(),
'caller' => array(
'name' => 'TraceableMessageBusTest.php',
'file' => __FILE__,

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
@ -25,7 +26,7 @@ class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
public function testReceiverStopsWhenTimeLimitIsReached()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(new Envelope(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -29,30 +29,22 @@ class TraceableMessageBus implements MessageBusInterface
*/
public function dispatch($message): void
{
$caller = $this->getCaller();
$callTime = microtime(true);
$messageToTrace = $message instanceof Envelope ? $message->getMessage() : $message;
$stamps = $message instanceof Envelope ? array_values($message->all()) : null;
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
$context = array(
'stamps' => array_values($envelope->all()),
'message' => $envelope->getMessage(),
'caller' => $this->getCaller(),
'callTime' => microtime(true),
);
try {
$this->decoratedBus->dispatch($message);
$this->dispatchedMessages[] = array(
'stamps' => $stamps,
'message' => $messageToTrace,
'callTime' => $callTime,
'caller' => $caller,
);
} catch (\Throwable $e) {
$this->dispatchedMessages[] = array(
'stamps' => $stamps,
'message' => $messageToTrace,
'exception' => $e,
'callTime' => $callTime,
'caller' => $caller,
);
$context['exception'] = $e;
throw $e;
} finally {
$this->dispatchedMessages[] = $context;
}
}
@ -82,10 +74,7 @@ class TraceableMessageBus implements MessageBusInterface
$line = $trace[$i]['line'];
while (++$i < 8) {
if (isset($trace[$i]['function'], $trace[$i]['file']) && empty($trace[$i]['class']) && 0 !== strpos(
$trace[$i]['function'],
'call_user_func'
)) {
if (isset($trace[$i]['function'], $trace[$i]['file']) && empty($trace[$i]['class']) && 0 !== strpos($trace[$i]['function'], 'call_user_func')) {
$file = $trace[$i]['file'];
$line = $trace[$i]['line'];

View File

@ -35,7 +35,7 @@ class AmqpSender implements SenderInterface
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope)
public function send(Envelope $envelope): void
{
$encodedMessage = $this->serializer->encode($envelope);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -32,13 +33,13 @@ class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
$startTime = time();
$startTime = microtime(true);
$endTime = $startTime + $this->timeLimitInSeconds;
$this->decoratedReceiver->receive(function ($message) use ($handler, $endTime) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) {
$handler($envelope);
if ($endTime < time()) {
if ($endTime < microtime(true)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', array('timeLimit' => $this->timeLimitInSeconds));

View File

@ -20,8 +20,6 @@ interface SenderInterface
{
/**
* Sends the given envelope.
*
* @param Envelope $envelope
*/
public function send(Envelope $envelope);
public function send(Envelope $envelope): void;
}