feature #30707 [Messenger][DX] Allow stamps to be passed directly to MessageBusInterface::dispatch() (weaverryan)
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger][DX] Allow stamps to be passed directly to MessageBusInterface::dispatch()
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | yes
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | TODO
Me again o/!
This proposal is *purely* for DX. With `DelayStamp`, the proposal of QueueNameStamp and future things like `AmqpRoutingKeyStamp`, stamps are becoming more common for end users to use. This changes how it looks to use them:
```php
// before
$bus->dispatch(new Envelope(new SendSmsNotification('Hi!'), new DelayStamp(10), new QueueNameStamp('low')));
// after
$bus->dispatch(new SendSmsNotification('Hi!'), [new DelayStamp(10), new QueueNameStamp('low')]);
```
It's definitely a BC break, which is allowed because the component is experimental, though it should be minimized. This BC break shouldn't be felt by most end users, as creating your own bus is an advanced use-case. Even if you decorated it, you'll get an obvious error.
Commits
-------
e861de7e61
Allow stamps to be passed directly to MessageBusInterface::dispatch()
This commit is contained in:
commit
3abca64c5d
@ -4,6 +4,12 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* [BC BREAK] The `Envelope::__construct()` signature changed:
|
||||
you can no longer pass an unlimited number of stamps as the second,
|
||||
third, fourth, arguments etc: stamps are now an array passed to the
|
||||
second argument.
|
||||
* [BC BREAK] The `MessageBusInterface::dispatch()` signature changed:
|
||||
a second argument `array $stamps = []` was added.
|
||||
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
|
||||
changed: a required 3rd `SerializerInterface` argument was added.
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
|
@ -26,9 +26,10 @@ final class Envelope
|
||||
private $message;
|
||||
|
||||
/**
|
||||
* @param object $message
|
||||
* @param object $message
|
||||
* @param StampInterface[] $stamps
|
||||
*/
|
||||
public function __construct($message, StampInterface ...$stamps)
|
||||
public function __construct($message, array $stamps = [])
|
||||
{
|
||||
if (!\is_object($message)) {
|
||||
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object but got %s.', __METHOD__, \gettype($message)));
|
||||
@ -40,6 +41,19 @@ final class Envelope
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes sure the message is in an Envelope and adds the given stamps.
|
||||
*
|
||||
* @param object|Envelope $message
|
||||
* @param StampInterface[] $stamps
|
||||
*/
|
||||
public static function wrap($message, array $stamps = []): self
|
||||
{
|
||||
$envelope = $message instanceof self ? $message : new self($message);
|
||||
|
||||
return $envelope->with(...$stamps);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Envelope a new Envelope instance with additional stamp
|
||||
*/
|
||||
|
@ -52,12 +52,12 @@ class MessageBus implements MessageBusInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function dispatch($message): Envelope
|
||||
public function dispatch($message, array $stamps = []): Envelope
|
||||
{
|
||||
if (!\is_object($message)) {
|
||||
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object, but got %s.', __METHOD__, \gettype($message)));
|
||||
}
|
||||
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
|
||||
$envelope = Envelope::wrap($message, $stamps);
|
||||
$middlewareIterator = $this->middlewareAggregate->getIterator();
|
||||
|
||||
while ($middlewareIterator instanceof \IteratorAggregate) {
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
namespace Symfony\Component\Messenger;
|
||||
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*
|
||||
@ -21,7 +23,8 @@ interface MessageBusInterface
|
||||
/**
|
||||
* Dispatches the given message.
|
||||
*
|
||||
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
|
||||
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
|
||||
* @param StampInterface[] $stamps
|
||||
*/
|
||||
public function dispatch($message): Envelope;
|
||||
public function dispatch($message, array $stamps = []): Envelope;
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ class RoutableMessageBus implements MessageBusInterface
|
||||
$this->busLocator = $busLocator;
|
||||
}
|
||||
|
||||
public function dispatch($envelope): Envelope
|
||||
public function dispatch($envelope, array $stamps = []): Envelope
|
||||
{
|
||||
if (!$envelope instanceof Envelope) {
|
||||
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
|
||||
@ -53,6 +53,6 @@ class RoutableMessageBus implements MessageBusInterface
|
||||
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
|
||||
}
|
||||
|
||||
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
|
||||
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope, $stamps);
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ class EnvelopeTest extends TestCase
|
||||
public function testConstruct()
|
||||
{
|
||||
$receivedStamp = new ReceivedStamp();
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), $receivedStamp);
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
|
||||
|
||||
$this->assertSame($dummy, $envelope->getMessage());
|
||||
$this->assertArrayHasKey(ReceivedStamp::class, $stamps = $envelope->all());
|
||||
@ -42,7 +42,7 @@ class EnvelopeTest extends TestCase
|
||||
|
||||
public function testWithoutAll()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('dummy'), new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000));
|
||||
$envelope = new Envelope(new DummyMessage('dummy'), [new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000)]);
|
||||
|
||||
$envelope = $envelope->withoutAll(ReceivedStamp::class);
|
||||
|
||||
@ -53,7 +53,7 @@ class EnvelopeTest extends TestCase
|
||||
public function testLast()
|
||||
{
|
||||
$receivedStamp = new ReceivedStamp();
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), $receivedStamp);
|
||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'), [$receivedStamp]);
|
||||
|
||||
$this->assertSame($receivedStamp, $envelope->last(ReceivedStamp::class));
|
||||
$this->assertNull($envelope->last(ValidationStamp::class));
|
||||
@ -72,4 +72,23 @@ class EnvelopeTest extends TestCase
|
||||
$this->assertArrayHasKey(ValidationStamp::class, $stamps);
|
||||
$this->assertSame($validationStamp, $stamps[ValidationStamp::class][0]);
|
||||
}
|
||||
|
||||
public function testWrapWithMessage()
|
||||
{
|
||||
$message = new \stdClass();
|
||||
$stamp = new ReceivedStamp();
|
||||
$envelope = Envelope::wrap($message, [$stamp]);
|
||||
|
||||
$this->assertSame($message, $envelope->getMessage());
|
||||
$this->assertSame([ReceivedStamp::class => [$stamp]], $envelope->all());
|
||||
}
|
||||
|
||||
public function testWrapWithEnvelope()
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass(), [new DelayStamp(5)]);
|
||||
$envelope = Envelope::wrap($envelope, [new ReceivedStamp()]);
|
||||
|
||||
$this->assertCount(1, $envelope->all(DelayStamp::class));
|
||||
$this->assertCount(1, $envelope->all(ReceivedStamp::class));
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ class HandleTraitTest extends TestCase
|
||||
|
||||
$query = new DummyMessage('Hello');
|
||||
$bus->expects($this->once())->method('dispatch')->willReturn(
|
||||
new Envelope($query, new HandledStamp('result', 'DummyHandler::__invoke'))
|
||||
new Envelope($query, [new HandledStamp('result', 'DummyHandler::__invoke')])
|
||||
);
|
||||
|
||||
$this->assertSame('result', $queryBus->query($query));
|
||||
@ -42,7 +42,7 @@ class HandleTraitTest extends TestCase
|
||||
$bus = $this->createMock(MessageBus::class);
|
||||
$queryBus = new TestQueryBus($bus);
|
||||
|
||||
$envelope = new Envelope(new DummyMessage('Hello'), new HandledStamp('result', 'DummyHandler::__invoke'));
|
||||
$envelope = new Envelope(new DummyMessage('Hello'), [new HandledStamp('result', 'DummyHandler::__invoke')]);
|
||||
$bus->expects($this->once())->method('dispatch')->willReturn($envelope);
|
||||
|
||||
$this->assertSame('result', $queryBus->query($envelope));
|
||||
@ -74,7 +74,7 @@ class HandleTraitTest extends TestCase
|
||||
|
||||
$query = new DummyMessage('Hello');
|
||||
$bus->expects($this->once())->method('dispatch')->willReturn(
|
||||
new Envelope($query, new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2'))
|
||||
new Envelope($query, [new HandledStamp('first_result', 'FirstDummyHandler::__invoke'), new HandledStamp('second_result', 'SecondDummyHandler::__invoke', 'dummy_2')])
|
||||
);
|
||||
|
||||
$queryBus->query($query);
|
||||
|
@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
@ -69,7 +71,7 @@ class MessageBusTest extends TestCase
|
||||
public function testThatAMiddlewareCanAddSomeStampsToTheEnvelope()
|
||||
{
|
||||
$message = new DummyMessage('Hello');
|
||||
$envelope = new Envelope($message, new ReceivedStamp());
|
||||
$envelope = new Envelope($message, [new ReceivedStamp()]);
|
||||
$envelopeWithAnotherStamp = $envelope->with(new AnEnvelopeStamp());
|
||||
|
||||
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
@ -107,10 +109,10 @@ class MessageBusTest extends TestCase
|
||||
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeStamps()
|
||||
{
|
||||
$message = new DummyMessage('Hello');
|
||||
$envelope = new Envelope($message, ...$stamps = [new ReceivedStamp()]);
|
||||
$envelope = new Envelope($message, $stamps = [new ReceivedStamp()]);
|
||||
|
||||
$changedMessage = new DummyMessage('Changed');
|
||||
$expectedEnvelope = new Envelope($changedMessage, ...$stamps);
|
||||
$expectedEnvelope = new Envelope($changedMessage, $stamps);
|
||||
|
||||
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
|
||||
$firstMiddleware->expects($this->once())
|
||||
@ -134,4 +136,16 @@ class MessageBusTest extends TestCase
|
||||
|
||||
$bus->dispatch($envelope);
|
||||
}
|
||||
|
||||
public function testItAddsTheStamps()
|
||||
{
|
||||
$finalEnvelope = (new MessageBus())->dispatch(new \stdClass(), [new DelayStamp(5), new BusNameStamp('bar')]);
|
||||
$this->assertCount(2, $finalEnvelope->all());
|
||||
}
|
||||
|
||||
public function testItAddsTheStampsToEnvelope()
|
||||
{
|
||||
$finalEnvelope = (new MessageBus())->dispatch(new Envelope(new \stdClass()), [new DelayStamp(5), new BusNameStamp('bar')]);
|
||||
$this->assertCount(2, $finalEnvelope->all());
|
||||
}
|
||||
}
|
||||
|
@ -44,8 +44,8 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase
|
||||
$messageBus = new MessageBus([
|
||||
$middleware,
|
||||
new DispatchingMiddleware($eventBus, [
|
||||
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
|
||||
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
|
||||
new Envelope($firstEvent, [new DispatchAfterCurrentBusStamp()]),
|
||||
new Envelope($secondEvent, [new DispatchAfterCurrentBusStamp()]),
|
||||
$thirdEvent, // Not in a new transaction
|
||||
]),
|
||||
$handlingMiddleware,
|
||||
@ -80,8 +80,8 @@ class DispatchAfterCurrentBusMiddlewareTest extends TestCase
|
||||
$messageBus = new MessageBus([
|
||||
$middleware,
|
||||
new DispatchingMiddleware($eventBus, [
|
||||
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
|
||||
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
|
||||
new Envelope($firstEvent, [new DispatchAfterCurrentBusStamp()]),
|
||||
new Envelope($secondEvent, [new DispatchAfterCurrentBusStamp()]),
|
||||
]),
|
||||
$handlingMiddleware,
|
||||
]);
|
||||
|
@ -86,7 +86,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
||||
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
|
||||
// even with a ForceCallHandlersStamp, the next middleware won't be called
|
||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
||||
public function testIsRetryable()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(3);
|
||||
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(0, 'sender_alias'));
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
|
||||
|
||||
$this->assertTrue($strategy->isRetryable($envelope));
|
||||
}
|
||||
@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
||||
public function testIsNotRetryable()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(3);
|
||||
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(3, 'sender_alias'));
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]);
|
||||
|
||||
$this->assertFalse($strategy->isRetryable($envelope));
|
||||
}
|
||||
@ -48,7 +48,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
||||
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
|
||||
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp($previousRetries, 'sender_alias'));
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]);
|
||||
|
||||
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
|
||||
}
|
||||
|
@ -18,12 +18,13 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
|
||||
class RoutableMessageBusTest extends TestCase
|
||||
{
|
||||
public function testItRoutesToTheCorrectBus()
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('foo_bus')]);
|
||||
|
||||
$bus1 = $this->createMock(MessageBusInterface::class);
|
||||
$bus2 = $this->createMock(MessageBusInterface::class);
|
||||
@ -32,11 +33,12 @@ class RoutableMessageBusTest extends TestCase
|
||||
$container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
|
||||
$container->expects($this->once())->method('get')->will($this->returnValue($bus2));
|
||||
|
||||
$stamp = new DelayStamp(5);
|
||||
$bus1->expects($this->never())->method('dispatch');
|
||||
$bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||
$bus2->expects($this->once())->method('dispatch')->with($envelope, [$stamp])->willReturn($envelope);
|
||||
|
||||
$routableBus = new RoutableMessageBus($container);
|
||||
$this->assertSame($envelope, $routableBus->dispatch($envelope));
|
||||
$this->assertSame($envelope, $routableBus->dispatch($envelope, [$stamp]));
|
||||
}
|
||||
|
||||
public function testItExceptionOnMissingStamp()
|
||||
@ -58,7 +60,7 @@ class RoutableMessageBusTest extends TestCase
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$this->expectExceptionMessage('Invalid bus name');
|
||||
|
||||
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('foo_bus')]);
|
||||
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->once())->method('has')->willReturn(false);
|
||||
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\TraceableMessageBus;
|
||||
@ -24,22 +25,25 @@ class TraceableMessageBusTest extends TestCase
|
||||
{
|
||||
$message = new DummyMessage('Hello');
|
||||
|
||||
$stamp = new DelayStamp(5);
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
$bus->expects($this->once())->method('dispatch')->with($message)->willReturn(new Envelope($message));
|
||||
$bus->expects($this->once())->method('dispatch')->with($message, [$stamp])->willReturn(new Envelope($message));
|
||||
|
||||
$traceableBus = new TraceableMessageBus($bus);
|
||||
$line = __LINE__ + 1;
|
||||
$traceableBus->dispatch($message);
|
||||
$traceableBus->dispatch($message, [$stamp]);
|
||||
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
|
||||
$this->assertArraySubset([
|
||||
$actualTracedMessage = $tracedMessages[0];
|
||||
unset($actualTracedMessage['callTime']); // don't check, too variable
|
||||
$this->assertEquals([
|
||||
'message' => $message,
|
||||
'stamps' => [],
|
||||
'stamps' => [[$stamp]],
|
||||
'caller' => [
|
||||
'name' => 'TraceableMessageBusTest.php',
|
||||
'file' => __FILE__,
|
||||
'line' => $line,
|
||||
],
|
||||
], $tracedMessages[0], true);
|
||||
], $actualTracedMessage);
|
||||
}
|
||||
|
||||
public function testItTracesDispatchWithEnvelope()
|
||||
@ -54,7 +58,9 @@ class TraceableMessageBusTest extends TestCase
|
||||
$line = __LINE__ + 1;
|
||||
$traceableBus->dispatch($envelope);
|
||||
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
|
||||
$this->assertArraySubset([
|
||||
$actualTracedMessage = $tracedMessages[0];
|
||||
unset($actualTracedMessage['callTime']); // don't check, too variable
|
||||
$this->assertEquals([
|
||||
'message' => $message,
|
||||
'stamps' => [[$stamp]],
|
||||
'caller' => [
|
||||
@ -62,7 +68,7 @@ class TraceableMessageBusTest extends TestCase
|
||||
'file' => __FILE__,
|
||||
'line' => $line,
|
||||
],
|
||||
], $tracedMessages[0], true);
|
||||
], $actualTracedMessage);
|
||||
}
|
||||
|
||||
public function testItTracesExceptions()
|
||||
@ -82,7 +88,9 @@ class TraceableMessageBusTest extends TestCase
|
||||
}
|
||||
|
||||
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
|
||||
$this->assertArraySubset([
|
||||
$actualTracedMessage = $tracedMessages[0];
|
||||
unset($actualTracedMessage['callTime']); // don't check, too variable
|
||||
$this->assertEquals([
|
||||
'message' => $message,
|
||||
'exception' => $exception,
|
||||
'stamps' => [],
|
||||
@ -91,6 +99,6 @@ class TraceableMessageBusTest extends TestCase
|
||||
'file' => __FILE__,
|
||||
'line' => $line,
|
||||
],
|
||||
], $tracedMessages[0], true);
|
||||
], $actualTracedMessage);
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ class AmqpReceiverTest extends TestCase
|
||||
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
|
||||
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,7 +71,7 @@ class AmqpReceiverTest extends TestCase
|
||||
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
|
||||
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
|
||||
}
|
||||
|
||||
private function createAMQPEnvelope()
|
||||
|
@ -12,13 +12,10 @@ if (!file_exists($autoload)) {
|
||||
|
||||
require_once $autoload;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
use Symfony\Component\Serializer as SerializerComponent;
|
||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
|
||||
@ -33,7 +30,7 @@ $receiver = new AmqpReceiver($connection, $serializer);
|
||||
$retryStrategy = new MultiplierRetryStrategy(3, 0);
|
||||
|
||||
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
|
||||
public function dispatch($envelope): Envelope
|
||||
public function dispatch($envelope, array $stamps = []): Envelope
|
||||
{
|
||||
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
|
||||
echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
|
||||
|
@ -43,8 +43,8 @@ class WorkerTest extends TestCase
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, [new ReceivedStamp()]))->willReturn($envelope);
|
||||
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, [new ReceivedStamp()]))->willReturn($envelope);
|
||||
|
||||
$worker = new Worker([$receiver], $bus);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
@ -78,7 +78,7 @@ class WorkerTest extends TestCase
|
||||
public function testDispatchCausesRetry()
|
||||
{
|
||||
$receiver = new DummyReceiver([
|
||||
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
|
||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
|
||||
]);
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
@ -117,7 +117,7 @@ class WorkerTest extends TestCase
|
||||
public function testDispatchCausesRejectWhenNoRetry()
|
||||
{
|
||||
$receiver = new DummyReceiver([
|
||||
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
|
||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
|
||||
]);
|
||||
|
||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||
|
@ -29,9 +29,9 @@ class TraceableMessageBus implements MessageBusInterface
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function dispatch($message): Envelope
|
||||
public function dispatch($message, array $stamps = []): Envelope
|
||||
{
|
||||
$envelope = $message instanceof Envelope ? $message : new Envelope($message);
|
||||
$envelope = Envelope::wrap($message, $stamps);
|
||||
$context = [
|
||||
'stamps' => array_values($envelope->all()),
|
||||
'message' => $envelope->getMessage(),
|
||||
@ -40,7 +40,7 @@ class TraceableMessageBus implements MessageBusInterface
|
||||
];
|
||||
|
||||
try {
|
||||
return $this->decoratedBus->dispatch($message);
|
||||
return $this->decoratedBus->dispatch($message, $stamps);
|
||||
} catch (\Throwable $e) {
|
||||
$context['exception'] = $e;
|
||||
|
||||
|
@ -83,7 +83,7 @@ class Serializer implements SerializerInterface
|
||||
throw new MessageDecodingFailedException(sprintf('Could not decode message: %s.', $e->getMessage()), $e->getCode(), $e);
|
||||
}
|
||||
|
||||
return new Envelope($message, ...$stamps);
|
||||
return new Envelope($message, $stamps);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user