Allow stamps to be passed directly to MessageBusInterface::dispatch()

And changing signature of Envelope::__construct() to accept an array of envelopes
This commit is contained in:
Ryan Weaver 2019-03-26 08:34:20 -04:00
parent 88042a317a
commit e861de7e61
18 changed files with 115 additions and 52 deletions

View File

@ -4,6 +4,12 @@ CHANGELOG
4.3.0
-----
* [BC BREAK] The `Envelope::__construct()` signature changed:
you can no longer pass an unlimited number of stamps as the second,
third, fourth, arguments etc: stamps are now an array passed to the
second argument.
* [BC BREAK] The `MessageBusInterface::dispatch()` signature changed:
a second argument `array $stamps = []` was added.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to

View File

@ -26,9 +26,10 @@ final class Envelope
private $message;
/**
* @param object $message
* @param object $message
* @param StampInterface[] $stamps
*/
public function __construct($message, StampInterface ...$stamps)
public function __construct($message, array $stamps = [])
{
if (!\is_object($message)) {
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object but got %s.', __METHOD__, \gettype($message)));
@ -40,6 +41,19 @@ final class Envelope
}
}
/**
* Makes sure the message is in an Envelope and adds the given stamps.
*
* @param object|Envelope $message
* @param StampInterface[] $stamps
*/
public static function wrap($message, array $stamps = []): self
{
$envelope = $message instanceof self ? $message : new self($message);
return $envelope->with(...$stamps);
}
/**
* @return Envelope a new Envelope instance with additional stamp
*/

View File

@ -52,12 +52,12 @@ class MessageBus implements MessageBusInterface
/**
* {@inheritdoc}
*/
public function dispatch($message): Envelope
public function dispatch($message, array $stamps = []): Envelope
{
if (!\is_object($message)) {
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object, but got %s.', __METHOD__, \gettype($message)));
}
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
$envelope = Envelope::wrap($message, $stamps);
$middlewareIterator = $this->middlewareAggregate->getIterator();
while ($middlewareIterator instanceof \IteratorAggregate) {

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -21,7 +23,8 @@ interface MessageBusInterface
/**
* Dispatches the given message.
*
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
* @param StampInterface[] $stamps
*/
public function dispatch($message): Envelope;
public function dispatch($message, array $stamps = []): Envelope;
}

View File

@ -37,7 +37,7 @@ class RoutableMessageBus implements MessageBusInterface
$this->busLocator = $busLocator;
}
public function dispatch($envelope): Envelope
public function dispatch($envelope, array $stamps = []): Envelope
{
if (!$envelope instanceof Envelope) {
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
@ -53,6 +53,6 @@ class RoutableMessageBus implements MessageBusInterface
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
}
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope, $stamps);
}
}

View File

@ -26,7 +26,7 @@ class EnvelopeTest extends TestCase
public function testConstruct()
{
$receivedStamp = new ReceivedStamp();
$envelope = new Envelope($dummy = new DummyMessage('dummy'), $receivedStamp);
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
$this->assertSame($dummy, $envelope->getMessage());
$this->assertArrayHasKey(ReceivedStamp::class, $stamps = $envelope->all());
@ -42,7 +42,7 @@ class EnvelopeTest extends TestCase
public function testWithoutAll()
{
$envelope = new Envelope(new DummyMessage('dummy'), new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000));
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000)]);
$envelope = $envelope->withoutAll(ReceivedStamp::class);
@ -53,7 +53,7 @@ class EnvelopeTest extends TestCase
public function testLast()
{
$receivedStamp = new ReceivedStamp();
$envelope = new Envelope($dummy = new DummyMessage('dummy'), $receivedStamp);
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
$this->assertSame($receivedStamp, $envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(ValidationStamp::class));
@ -72,4 +72,23 @@ class EnvelopeTest extends TestCase
$this->assertArrayHasKey(ValidationStamp::class, $stamps);
$this->assertSame($validationStamp, $stamps[ValidationStamp::class][0]);
}
public function testWrapWithMessage()
{
$message = new \stdClass();
$stamp = new ReceivedStamp();
$envelope = Envelope::wrap($message, [$stamp]);
$this->assertSame($message, $envelope->getMessage());
$this->assertSame([ReceivedStamp::class => [$stamp]], $envelope->all());
}
public function testWrapWithEnvelope()
{
$envelope = new Envelope(new \stdClass(), [new DelayStamp(5)]);
$envelope = Envelope::wrap($envelope, [new ReceivedStamp()]);
$this->assertCount(1, $envelope->all(DelayStamp::class));
$this->assertCount(1, $envelope->all(ReceivedStamp::class));
}
}

View File

@ -31,7 +31,7 @@ class HandleTraitTest extends TestCase
$query = new DummyMessage('Hello');
$bus->expects($this->once())->method('dispatch')->willReturn(
new Envelope($query, new HandledStamp('result', 'DummyHandler::__invoke'))
new Envelope($query, [new HandledStamp('result', 'DummyHandler::__invoke')])
);
$this->assertSame('result', $queryBus->query($query));
@ -42,7 +42,7 @@ class HandleTraitTest extends TestCase
$bus = $this->createMock(MessageBus::class);
$queryBus = new TestQueryBus($bus);
$envelope = new Envelope(new DummyMessage('Hello'), new HandledStamp('result', 'DummyHandler::__invoke'));
$envelope = new Envelope(new DummyMessage('Hello'), [new HandledStamp('result', 'DummyHandler::__invoke')]);
$bus->expects($this->once())->method('dispatch')->willReturn($envelope);
$this->assertSame('result', $queryBus->query($envelope));
@ -74,7 +74,7 @@ class HandleTraitTest extends TestCase
$query = new DummyMessage('Hello');
$bus->expects($this->once())->method('dispatch')->willReturn(
new Envelope($query, new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2'))
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2')])
);
$queryBus->query($query);

View File

@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -69,7 +71,7 @@ class MessageBusTest extends TestCase
public function testThatAMiddlewareCanAddSomeStampsToTheEnvelope()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, new ReceivedStamp());
$envelope = new Envelope($message, [new ReceivedStamp()]);
$envelopeWithAnotherStamp = $envelope->with(new AnEnvelopeStamp());
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
@ -107,10 +109,10 @@ class MessageBusTest extends TestCase
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeStamps()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, ...$stamps = [new ReceivedStamp()]);
$envelope = new Envelope($message, $stamps = [new ReceivedStamp()]);
$changedMessage = new DummyMessage('Changed');
$expectedEnvelope = new Envelope($changedMessage, ...$stamps);
$expectedEnvelope = new Envelope($changedMessage, $stamps);
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$firstMiddleware->expects($this->once())
@ -134,4 +136,16 @@ class MessageBusTest extends TestCase
$bus->dispatch($envelope);
}
public function testItAddsTheStamps()
{
$finalEnvelope = (new MessageBus())->dispatch(new \stdClass(), [new DelayStamp(5), new BusNameStamp('bar')]);
$this->assertCount(2, $finalEnvelope->all());
}
public function testItAddsTheStampsToEnvelope()
{
$finalEnvelope = (new MessageBus())->dispatch(new Envelope(new \stdClass()), [new DelayStamp(5), new BusNameStamp('bar')]);
$this->assertCount(2, $finalEnvelope->all());
}
}

View File

@ -44,8 +44,8 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase
$messageBus = new MessageBus([
$middleware,
new DispatchingMiddleware($eventBus, [
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($firstEvent, [new DispatchAfterCurrentBusStamp()]),
new Envelope($secondEvent, [new DispatchAfterCurrentBusStamp()]),
$thirdEvent, // Not in a new transaction
]),
$handlingMiddleware,
@ -80,8 +80,8 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase
$messageBus = new MessageBus([
$middleware,
new DispatchingMiddleware($eventBus, [
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
new Envelope($firstEvent, [new DispatchAfterCurrentBusStamp()]),
new Envelope($secondEvent, [new DispatchAfterCurrentBusStamp()]),
]),
$handlingMiddleware,
]);

View File

@ -86,7 +86,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
public function testItSendsToOnlyOneSenderOnRedelivery()
{
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
// even with a ForceCallHandlersStamp, the next middleware won't be called
$envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();

View File

@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(0, 'sender_alias'));
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
$this->assertTrue($strategy->isRetryable($envelope));
}
@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsNotRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(3, 'sender_alias'));
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]);
$this->assertFalse($strategy->isRetryable($envelope));
}
@ -48,7 +48,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
{
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp($previousRetries, 'sender_alias'));
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]);
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
}

View File

@ -18,12 +18,13 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
class RoutableMessageBusTest extends TestCase
{
public function testItRoutesToTheCorrectBus()
{
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('foo_bus')]);
$bus1 = $this->createMock(MessageBusInterface::class);
$bus2 = $this->createMock(MessageBusInterface::class);
@ -32,11 +33,12 @@ class RoutableMessageBusTest extends TestCase
$container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
$container->expects($this->once())->method('get')->will($this->returnValue($bus2));
$stamp = new DelayStamp(5);
$bus1->expects($this->never())->method('dispatch');
$bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
$bus2->expects($this->once())->method('dispatch')->with($envelope, [$stamp])->willReturn($envelope);
$routableBus = new RoutableMessageBus($container);
$this->assertSame($envelope, $routableBus->dispatch($envelope));
$this->assertSame($envelope, $routableBus->dispatch($envelope, [$stamp]));
}
public function testItExceptionOnMissingStamp()
@ -58,7 +60,7 @@ class RoutableMessageBusTest extends TestCase
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Invalid bus name');
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('foo_bus')]);
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->once())->method('has')->willReturn(false);

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\TraceableMessageBus;
@ -24,22 +25,25 @@ class TraceableMessageBusTest extends TestCase
{
$message = new DummyMessage('Hello');
$stamp = new DelayStamp(5);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->once())->method('dispatch')->with($message)->willReturn(new Envelope($message));
$bus->expects($this->once())->method('dispatch')->with($message, [$stamp])->willReturn(new Envelope($message));
$traceableBus = new TraceableMessageBus($bus);
$line = __LINE__ + 1;
$traceableBus->dispatch($message);
$traceableBus->dispatch($message, [$stamp]);
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset([
$actualTracedMessage = $tracedMessages[0];
unset($actualTracedMessage['callTime']); // don't check, too variable
$this->assertEquals([
'message' => $message,
'stamps' => [],
'stamps' => [[$stamp]],
'caller' => [
'name' => 'TraceableMessageBusTest.php',
'file' => __FILE__,
'line' => $line,
],
], $tracedMessages[0], true);
], $actualTracedMessage);
}
public function testItTracesDispatchWithEnvelope()
@ -54,7 +58,9 @@ class TraceableMessageBusTest extends TestCase
$line = __LINE__ + 1;
$traceableBus->dispatch($envelope);
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset([
$actualTracedMessage = $tracedMessages[0];
unset($actualTracedMessage['callTime']); // don't check, too variable
$this->assertEquals([
'message' => $message,
'stamps' => [[$stamp]],
'caller' => [
@ -62,7 +68,7 @@ class TraceableMessageBusTest extends TestCase
'file' => __FILE__,
'line' => $line,
],
], $tracedMessages[0], true);
], $actualTracedMessage);
}
public function testItTracesExceptions()
@ -82,7 +88,9 @@ class TraceableMessageBusTest extends TestCase
}
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset([
$actualTracedMessage = $tracedMessages[0];
unset($actualTracedMessage['callTime']); // don't check, too variable
$this->assertEquals([
'message' => $message,
'exception' => $exception,
'stamps' => [],
@ -91,6 +99,6 @@ class TraceableMessageBusTest extends TestCase
'file' => __FILE__,
'line' => $line,
],
], $tracedMessages[0], true);
], $actualTracedMessage);
}
}

View File

@ -56,7 +56,7 @@ class AmqpReceiverTest extends TestCase
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
}
/**
@ -71,7 +71,7 @@ class AmqpReceiverTest extends TestCase
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
}
private function createAMQPEnvelope()

View File

@ -12,13 +12,10 @@ if (!file_exists($autoload)) {
require_once $autoload;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
@ -33,7 +30,7 @@ $receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
public function dispatch($envelope): Envelope
public function dispatch($envelope, array $stamps = []): Envelope
{
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));

View File

@ -43,8 +43,8 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, [new ReceivedStamp()]))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, [new ReceivedStamp()]))->willReturn($envelope);
$worker = new Worker([$receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
@ -78,7 +78,7 @@ class WorkerTest extends TestCase
public function testDispatchCausesRetry()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@ -117,7 +117,7 @@ class WorkerTest extends TestCase
public function testDispatchCausesRejectWhenNoRetry()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();

View File

@ -29,9 +29,9 @@ class TraceableMessageBus implements MessageBusInterface
/**
* {@inheritdoc}
*/
public function dispatch($message): Envelope
public function dispatch($message, array $stamps = []): Envelope
{
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
$envelope = Envelope::wrap($message, $stamps);
$context = [
'stamps' => array_values($envelope->all()),
'message' => $envelope->getMessage(),
@ -40,7 +40,7 @@ class TraceableMessageBus implements MessageBusInterface
];
try {
return $this->decoratedBus->dispatch($message);
return $this->decoratedBus->dispatch($message, $stamps);
} catch (\Throwable $e) {
$context['exception'] = $e;

View File

@ -83,7 +83,7 @@ class Serializer implements SerializerInterface
throw new MessageDecodingFailedException(sprintf('Could not decode message: %s.', $e->getMessage()), $e->getCode(), $e);
}
return new Envelope($message, ...$stamps);
return new Envelope($message, $stamps);
}
/**