Adding the "sync" transport to call handlers synchronously

This commit is contained in:
Ryan Weaver 2019-03-28 14:48:35 -04:00
parent 162d5a8272
commit 3da5a438aa
7 changed files with 144 additions and 2 deletions

View File

@ -67,6 +67,10 @@
<argument type="service" id="messenger.transport.serializer" />
</service>
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
<tag name="messenger.transport_factory" />
</service>
<!-- retry -->
<service id="messenger.retry_strategy_locator">
<tag name="container.service_locator" />

View File

@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages asynchronously.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`

View File

@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
@ -81,7 +82,13 @@ class SendMessageMiddleware implements MiddlewareInterface
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
}
// on a redelivery, never call local handlers
// if the message was marked (usually by SyncTransport) that it handlers
// MUST be called, mark them to be handled.
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
$handle = true;
}
// on a redelivery, only send back to queue: never call local handlers
if (null !== $redeliveryStamp) {
$handle = false;
}

View File

@ -0,0 +1,27 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Stamp marks that the handlers *should* be called immediately.
*
* This is used by the SyncTransport to indicate to the
* SendMessageMiddleware that handlers *should* be called
* immediately, even though a transport was set.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class ForceCallHandlersStamp implements StampInterface
{
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
@ -86,6 +87,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
public function testItSendsToOnlyOneSenderOnRedelivery()
{
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
// even with a ForceCallHandlersStamp, the next middleware won't be called
$envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
@ -237,7 +240,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware->handle($envelope, $this->getStackMock());
}
public function testItDoesNotDispatchOnRetry()
public function testItDoesNotDispatchOnRedeliver()
{
$envelope = new Envelope(new DummyMessage('original envelope'));
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
@ -251,4 +254,18 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware->handle($envelope, $this->getStackMock(false));
}
public function testItHandlesWithForceCallHandlersStamp()
{
$envelope = new Envelope(new DummyMessage('original envelope'));
$envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender->expects($this->once())->method('send')->willReturn($envelope);
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
// next handler *should* be called
$middleware->handle($envelope, $this->getStackMock(true));
}
}

View File

@ -0,0 +1,52 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Sync;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* A "fake" transport that marks messages to be handled immediately.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class SyncTransport implements TransportInterface
{
public function receive(callable $handler): void
{
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
}
public function stop(): void
{
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
}
public function ack(Envelope $envelope): void
{
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
}
public function reject(Envelope $envelope): void
{
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
}
public function send(Envelope $envelope): Envelope
{
return $envelope->with(new ForceCallHandlersStamp());
}
}

View File

@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Sync;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class SyncTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options): TransportInterface
{
return new SyncTransport();
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'sync://');
}
}