minor #31401 [Messenger] Simplifying SyncTransport and fixing bug with handlers transport (weaverryan)
This PR was merged into the 4.3 branch.
Discussion
----------
[Messenger] Simplifying SyncTransport and fixing bug with handlers transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | not needed
This is still a WIP, because it's not quite working and tests are a TODO. However, the basic idea is there. This makes SyncTransport less "weird". It acts more like a real transport... except that it "receives" and re-dispatches its message immediately.
The bug I'm trying to fix is related to the transport-based handling config that @sroze introduced. It doesn't currently play nice with the sync transport due to the unnatural way that I made it originally.
Cheers!
Commits
-------
8a49eb8660
Simplifying SyncTransport and fixing bug with handlers transport
This commit is contained in:
commit
04f117f8e2
@ -73,6 +73,7 @@
|
|||||||
|
|
||||||
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
||||||
<tag name="messenger.transport_factory" />
|
<tag name="messenger.transport_factory" />
|
||||||
|
<argument type="service" id="messenger.routable_message_bus" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
|
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
|
||||||
|
@ -24,8 +24,7 @@ CHANGELOG
|
|||||||
to stop all `messenger:consume` workers.
|
to stop all `messenger:consume` workers.
|
||||||
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
|
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
|
||||||
changed: a required 3rd `SerializerInterface` argument was added.
|
changed: a required 3rd `SerializerInterface` argument was added.
|
||||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
* Added a new `SyncTransport` to explicitly handle messages synchronously.
|
||||||
explicitly handle messages synchronously.
|
|
||||||
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
|
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
|
||||||
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
|
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
|
||||||
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
|
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
|
||||||
|
@ -15,7 +15,6 @@ use Psr\Log\LoggerAwareTrait;
|
|||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
@ -78,12 +77,6 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the message was marked (usually by SyncTransport) that it handlers
|
|
||||||
// MUST be called, mark them to be handled.
|
|
||||||
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
|
|
||||||
$handle = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// on a redelivery, only send back to queue: never call local handlers
|
// on a redelivery, only send back to queue: never call local handlers
|
||||||
if (null !== $redeliveryStamp) {
|
if (null !== $redeliveryStamp) {
|
||||||
$handle = false;
|
$handle = false;
|
||||||
|
@ -1,27 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Stamp;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stamp marks that the handlers *should* be called immediately.
|
|
||||||
*
|
|
||||||
* This is used by the SyncTransport to indicate to the
|
|
||||||
* SendMessageMiddleware that handlers *should* be called
|
|
||||||
* immediately, even though a transport was set.
|
|
||||||
*
|
|
||||||
* @experimental in 4.3
|
|
||||||
*
|
|
||||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
|
||||||
*/
|
|
||||||
class ForceCallHandlersStamp implements StampInterface
|
|
||||||
{
|
|
||||||
}
|
|
@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
@ -88,8 +87,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
|
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
|
||||||
// even with a ForceCallHandlersStamp, the next middleware won't be called
|
|
||||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
|
||||||
@ -270,21 +267,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
$middleware->handle($envelope, $this->getStackMock(false));
|
$middleware->handle($envelope, $this->getStackMock(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItHandlesWithForceCallHandlersStamp()
|
|
||||||
{
|
|
||||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
|
||||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
|
||||||
|
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sender->expects($this->once())->method('send')->willReturn($envelope);
|
|
||||||
|
|
||||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
|
|
||||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
|
||||||
|
|
||||||
// next handler *should* be called
|
|
||||||
$middleware->handle($envelope, $this->getStackMock(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
|
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
|
||||||
{
|
{
|
||||||
$container = $this->createMock(ContainerInterface::class);
|
$container = $this->createMock(ContainerInterface::class);
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
|
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
|
||||||
@ -21,7 +22,8 @@ class SyncTransportFactoryTest extends TestCase
|
|||||||
public function testCreateTransport()
|
public function testCreateTransport()
|
||||||
{
|
{
|
||||||
$serializer = $this->createMock(SerializerInterface::class);
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
$factory = new SyncTransportFactory();
|
$bus = $this->createMock(MessageBusInterface::class);
|
||||||
|
$factory = new SyncTransportFactory($bus);
|
||||||
$transport = $factory->createTransport('sync://', [], $serializer);
|
$transport = $factory->createTransport('sync://', [], $serializer);
|
||||||
$this->assertInstanceOf(SyncTransport::class, $transport);
|
$this->assertInstanceOf(SyncTransport::class, $transport);
|
||||||
}
|
}
|
||||||
|
@ -13,18 +13,29 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
||||||
|
|
||||||
class SyncTransportTest extends TestCase
|
class SyncTransportTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testSend()
|
public function testSend()
|
||||||
{
|
{
|
||||||
|
$bus = $this->createMock(MessageBusInterface::class);
|
||||||
|
$bus->expects($this->once())
|
||||||
|
->method('dispatch')
|
||||||
|
->with($this->callback(function ($arg) {
|
||||||
|
$this->assertInstanceOf(Envelope::class, $arg);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}))
|
||||||
|
->willReturnArgument(0);
|
||||||
$message = new \stdClass();
|
$message = new \stdClass();
|
||||||
$envelope = new Envelope($message);
|
$envelope = new Envelope($message);
|
||||||
$transport = new SyncTransport();
|
$transport = new SyncTransport($bus);
|
||||||
$envelope = $transport->send($envelope);
|
$envelope = $transport->send($envelope);
|
||||||
|
|
||||||
$this->assertSame($message, $envelope->getMessage());
|
$this->assertSame($message, $envelope->getMessage());
|
||||||
$this->assertNotNull($envelope->last(ForceCallHandlersStamp::class));
|
$this->assertNotNull($envelope->last(ReceivedStamp::class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,11 +13,13 @@ namespace Symfony\Component\Messenger\Transport\Sync;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A "fake" transport that marks messages to be handled immediately.
|
* Transport that immediately marks messages as received and dispatches for handling.
|
||||||
*
|
*
|
||||||
* @experimental in 4.3
|
* @experimental in 4.3
|
||||||
*
|
*
|
||||||
@ -25,6 +27,13 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
|||||||
*/
|
*/
|
||||||
class SyncTransport implements TransportInterface
|
class SyncTransport implements TransportInterface
|
||||||
{
|
{
|
||||||
|
private $messageBus;
|
||||||
|
|
||||||
|
public function __construct(MessageBusInterface $messageBus)
|
||||||
|
{
|
||||||
|
$this->messageBus = $messageBus;
|
||||||
|
}
|
||||||
|
|
||||||
public function get(): iterable
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
|
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
|
||||||
@ -47,6 +56,12 @@ class SyncTransport implements TransportInterface
|
|||||||
|
|
||||||
public function send(Envelope $envelope): Envelope
|
public function send(Envelope $envelope): Envelope
|
||||||
{
|
{
|
||||||
return $envelope->with(new ForceCallHandlersStamp());
|
/** @var SentStamp|null $sentStamp */
|
||||||
|
$sentStamp = $envelope->last(SentStamp::class);
|
||||||
|
$alias = null === $sentStamp ? 'sync' : $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
|
||||||
|
|
||||||
|
$envelope = $envelope->with(new ReceivedStamp($alias));
|
||||||
|
|
||||||
|
return $this->messageBus->dispatch($envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
@ -22,9 +23,16 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
|||||||
*/
|
*/
|
||||||
class SyncTransportFactory implements TransportFactoryInterface
|
class SyncTransportFactory implements TransportFactoryInterface
|
||||||
{
|
{
|
||||||
|
private $messageBus;
|
||||||
|
|
||||||
|
public function __construct(MessageBusInterface $messageBus)
|
||||||
|
{
|
||||||
|
$this->messageBus = $messageBus;
|
||||||
|
}
|
||||||
|
|
||||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||||
{
|
{
|
||||||
return new SyncTransport();
|
return new SyncTransport($this->messageBus);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function supports(string $dsn, array $options): bool
|
public function supports(string $dsn, array $options): bool
|
||||||
|
Reference in New Issue
Block a user