Simplifying SyncTransport and fixing bug with handlers transport

This commit is contained in:
Ryan Weaver 2019-05-06 20:56:51 -04:00
parent 0462e1124c
commit 8a49eb8660
9 changed files with 46 additions and 62 deletions

View File

@ -73,6 +73,7 @@
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory"> <service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
<tag name="messenger.transport_factory" /> <tag name="messenger.transport_factory" />
<argument type="service" id="messenger.routable_message_bus" />
</service> </service>
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory"> <service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">

View File

@ -24,8 +24,7 @@ CHANGELOG
to stop all `messenger:consume` workers. to stop all `messenger:consume` workers.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature * [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added. changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to * Added a new `SyncTransport` to explicitly handle messages synchronously.
explicitly handle messages synchronously.
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing. * Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for * [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead. AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.

View File

@ -15,7 +15,6 @@ use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
@ -78,12 +77,6 @@ class SendMessageMiddleware implements MiddlewareInterface
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null))); $envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
} }
// if the message was marked (usually by SyncTransport) that it handlers
// MUST be called, mark them to be handled.
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
$handle = true;
}
// on a redelivery, only send back to queue: never call local handlers // on a redelivery, only send back to queue: never call local handlers
if (null !== $redeliveryStamp) { if (null !== $redeliveryStamp) {
$handle = false; $handle = false;

View File

@ -1,27 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Stamp marks that the handlers *should* be called immediately.
*
* This is used by the SyncTransport to indicate to the
* SendMessageMiddleware that handlers *should* be called
* immediately, even though a transport was set.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class ForceCallHandlersStamp implements StampInterface
{
}

View File

@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
@ -88,8 +87,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
public function testItSendsToOnlyOneSenderOnRedelivery() public function testItSendsToOnlyOneSenderOnRedelivery()
{ {
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]); $envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
// even with a ForceCallHandlersStamp, the next middleware won't be called
$envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
@ -270,21 +267,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware->handle($envelope, $this->getStackMock(false)); $middleware->handle($envelope, $this->getStackMock(false));
} }
public function testItHandlesWithForceCallHandlersStamp()
{
$envelope = new Envelope(new DummyMessage('original envelope'));
$envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender->expects($this->once())->method('send')->willReturn($envelope);
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
$middleware = new SendMessageMiddleware($sendersLocator);
// next handler *should* be called
$middleware->handle($envelope, $this->getStackMock(true));
}
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = []) private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
{ {
$container = $this->createMock(ContainerInterface::class); $container = $this->createMock(ContainerInterface::class);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\Sync\SyncTransport; use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
@ -21,7 +22,8 @@ class SyncTransportFactoryTest extends TestCase
public function testCreateTransport() public function testCreateTransport()
{ {
$serializer = $this->createMock(SerializerInterface::class); $serializer = $this->createMock(SerializerInterface::class);
$factory = new SyncTransportFactory(); $bus = $this->createMock(MessageBusInterface::class);
$factory = new SyncTransportFactory($bus);
$transport = $factory->createTransport('sync://', [], $serializer); $transport = $factory->createTransport('sync://', [], $serializer);
$this->assertInstanceOf(SyncTransport::class, $transport); $this->assertInstanceOf(SyncTransport::class, $transport);
} }

View File

@ -13,18 +13,29 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Sync\SyncTransport; use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
class SyncTransportTest extends TestCase class SyncTransportTest extends TestCase
{ {
public function testSend() public function testSend()
{ {
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())
->method('dispatch')
->with($this->callback(function ($arg) {
$this->assertInstanceOf(Envelope::class, $arg);
return true;
}))
->willReturnArgument(0);
$message = new \stdClass(); $message = new \stdClass();
$envelope = new Envelope($message); $envelope = new Envelope($message);
$transport = new SyncTransport(); $transport = new SyncTransport($bus);
$envelope = $transport->send($envelope); $envelope = $transport->send($envelope);
$this->assertSame($message, $envelope->getMessage()); $this->assertSame($message, $envelope->getMessage());
$this->assertNotNull($envelope->last(ForceCallHandlersStamp::class)); $this->assertNotNull($envelope->last(ReceivedStamp::class));
} }
} }

View File

@ -13,11 +13,13 @@ namespace Symfony\Component\Messenger\Transport\Sync;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
/** /**
* A "fake" transport that marks messages to be handled immediately. * Transport that immediately marks messages as received and dispatches for handling.
* *
* @experimental in 4.3 * @experimental in 4.3
* *
@ -25,6 +27,13 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
*/ */
class SyncTransport implements TransportInterface class SyncTransport implements TransportInterface
{ {
private $messageBus;
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}
public function get(): iterable public function get(): iterable
{ {
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.'); throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
@ -47,6 +56,12 @@ class SyncTransport implements TransportInterface
public function send(Envelope $envelope): Envelope public function send(Envelope $envelope): Envelope
{ {
return $envelope->with(new ForceCallHandlersStamp()); /** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
$alias = null === $sentStamp ? 'sync' : $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
$envelope = $envelope->with(new ReceivedStamp($alias));
return $this->messageBus->dispatch($envelope);
} }
} }

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\Sync; namespace Symfony\Component\Messenger\Transport\Sync;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
@ -22,9 +23,16 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
*/ */
class SyncTransportFactory implements TransportFactoryInterface class SyncTransportFactory implements TransportFactoryInterface
{ {
private $messageBus;
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{ {
return new SyncTransport(); return new SyncTransport($this->messageBus);
} }
public function supports(string $dsn, array $options): bool public function supports(string $dsn, array $options): bool