Adding the "sync" transport to call handlers synchronously
This commit is contained in:
parent
162d5a8272
commit
3da5a438aa
@ -67,6 +67,10 @@
|
||||
<argument type="service" id="messenger.transport.serializer" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
||||
<tag name="messenger.transport_factory" />
|
||||
</service>
|
||||
|
||||
<!-- retry -->
|
||||
<service id="messenger.retry_strategy_locator">
|
||||
<tag name="container.service_locator" />
|
||||
|
@ -4,6 +4,8 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
explicitly handle messages asynchronously.
|
||||
* Added optional parameter `prefetch_count` in connection configuration,
|
||||
to setup channel prefetch count
|
||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||
|
@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\NullLogger;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
@ -81,7 +82,13 @@ class SendMessageMiddleware implements MiddlewareInterface
|
||||
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||
}
|
||||
|
||||
// on a redelivery, never call local handlers
|
||||
// if the message was marked (usually by SyncTransport) that it handlers
|
||||
// MUST be called, mark them to be handled.
|
||||
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
|
||||
$handle = true;
|
||||
}
|
||||
|
||||
// on a redelivery, only send back to queue: never call local handlers
|
||||
if (null !== $redeliveryStamp) {
|
||||
$handle = false;
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
/**
|
||||
* Stamp marks that the handlers *should* be called immediately.
|
||||
*
|
||||
* This is used by the SyncTransport to indicate to the
|
||||
* SendMessageMiddleware that handlers *should* be called
|
||||
* immediately, even though a transport was set.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class ForceCallHandlersStamp implements StampInterface
|
||||
{
|
||||
}
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
@ -86,6 +87,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
||||
// even with a ForceCallHandlersStamp, the next middleware won't be called
|
||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
@ -237,7 +240,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
|
||||
public function testItDoesNotDispatchOnRetry()
|
||||
public function testItDoesNotDispatchOnRedeliver()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
|
||||
@ -251,4 +254,18 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
|
||||
public function testItHandlesWithForceCallHandlersStamp()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender->expects($this->once())->method('send')->willReturn($envelope);
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||
|
||||
// next handler *should* be called
|
||||
$middleware->handle($envelope, $this->getStackMock(true));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* A "fake" transport that marks messages to be handled immediately.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransport implements TransportInterface
|
||||
{
|
||||
public function receive(callable $handler): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
return $envelope->with(new ForceCallHandlersStamp());
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
public function createTransport(string $dsn, array $options): TransportInterface
|
||||
{
|
||||
return new SyncTransport();
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === strpos($dsn, 'sync://');
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user