feature #26632 [Messenger] Add AMQP adapter (sroze)

This PR was squashed before being merged into the 4.1-dev branch (closes #26632).

Discussion
----------

[Messenger] Add AMQP adapter

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | ø
| License       | MIT

- [x] Depends on the Messenger component #24411
- [x] Add tests once we are all happy about the structure

---

In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.

Configuring the adapter is as simple as the following configuration:
```yaml
# config/packages/messenger_adapters.yaml
framework:
    messenger:
        adapter: "%env(MESSENGER_DSN)%"
```

With the given `.env` for example:
```
MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages
```

Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.default_sender
```

---

Additionally, multiple adapters can be created and messages routed to these ones.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        adapters:
            commands: "amqp://guest:guest@localhost:5672/%2f/commands"
            maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance"
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.commands_sender
            'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender
```

Commits
-------

798c230ad5 [Messenger] Add AMQP adapter
This commit is contained in:
Samuel ROZE 2018-04-12 09:16:00 +01:00
commit aa04d06dc4
33 changed files with 1322 additions and 86 deletions

View File

@ -12,11 +12,13 @@ addons:
- language-pack-fr-base
- ldap-utils
- slapd
- librabbitmq-dev
env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
matrix:
include:
@ -38,6 +40,7 @@ services:
- memcached
- mongodb
- redis-server
- rabbitmq
before_install:
- |
@ -134,6 +137,11 @@ before_install:
- |
# Install extra PHP extensions
if [[ ! $skip ]]; then
# Install librabbitmq
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb
# install libsodium
sudo add-apt-repository ppa:ondrej/php -y
sudo apt-get update -q
@ -142,6 +150,7 @@ before_install:
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
tfold ext.libsodium tpecl libsodium sodium.so $INI
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI
fi
- |

View File

@ -971,12 +971,17 @@ class Configuration implements ConfigurationInterface
->arrayNode('messenger')
->info('Messenger configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->fixXmlConfig('adapter')
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->beforeNormalization()
->always()
->then(function ($config) {
if (!is_array($config)) {
return array();
}
$newConfig = array();
foreach ($config as $k => $v) {
if (!is_int($k)) {
@ -1011,6 +1016,28 @@ class Configuration implements ConfigurationInterface
->end()
->end()
->end()
->arrayNode('adapters')
->useAttributeAsKey('name')
->arrayPrototype()
->beforeNormalization()
->ifString()
->then(function (string $dsn) {
return array('dsn' => $dsn);
})
->end()
->fixXmlConfig('option')
->children()
->scalarNode('dsn')->end()
->arrayNode('options')
->normalizeKeys(false)
->useAttributeAsKey('name')
->defaultValue(array())
->prototype('variable')
->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()

View File

@ -1468,6 +1468,24 @@ class FrameworkExtension extends Extension
} else {
$container->removeDefinition('messenger.middleware.validator');
}
foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createSender',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.sender'));
$container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createReceiver',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.receiver'));
}
}
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

View File

@ -72,5 +72,18 @@
<tag name="container.service_locator" />
<argument type="collection" />
</service>
<!-- Adapters -->
<service id="messenger.adapter_factory" class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
<argument type="tagged" tag="messenger.adapter_factory" />
</service>
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
<argument type="service" id="messenger.transport.default_encoder" />
<argument type="service" id="messenger.transport.default_decoder" />
<argument>%kernel.debug%</argument>
<tag name="messenger.adapter_factory" />
</service>
</services>
</container>

View File

@ -354,6 +354,7 @@
<xsd:sequence>
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>
@ -368,6 +369,19 @@
<xsd:attribute name="service" type="xsd:string" use="required"/>
</xsd:complexType>
<xsd:complexType name="messenger_adapter">
<xsd:sequence>
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
</xsd:complexType>
<xsd:complexType name="messenger_adapter_option">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="value" type="xsd:string" />
</xsd:complexType>
<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />

View File

@ -258,6 +258,7 @@ class ConfigurationTest extends TestCase
'enabled' => !class_exists(FullStack::class),
),
),
'adapters' => array(),
),
);
}

View File

@ -0,0 +1,13 @@
<?php
$container->loadFromExtension('framework', array(
'messenger' => array(
'adapters' => array(
'default' => 'amqp://localhost/%2f/messages',
'customised' => array(
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
'options' => array('queue_name' => 'Queue'),
),
),
),
));

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
<framework:option name="queue_name" value="Queue" />
</framework:adapter>
</framework:messenger>
</framework:config>
</container>

View File

@ -0,0 +1,8 @@
framework:
messenger:
adapters:
default: 'amqp://localhost/%2f/messages'
customised:
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
options:
queue_name: Queue

View File

@ -523,6 +523,7 @@ abstract class FrameworkExtensionTest extends TestCase
public function testMessenger()
{
$container = $this->createContainerFromFile('messenger');
$this->assertTrue($container->hasDefinition('message_bus'));
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
}
@ -538,6 +539,33 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
}
public function testMessengerAdapter()
{
$container = $this->createContainerFromFile('messenger_adapter');
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]);
$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]);
}
public function testTranslator()
{
$container = $this->createContainerFromFile('full');

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpAdapterFactory implements AdapterFactoryInterface
{
private $encoder;
private $decoder;
private $debug;
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->debug = $debug;
}
public function createReceiver(string $dsn, array $options): ReceiverInterface
{
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}
public function createSender(string $dsn, array $options): SenderInterface
{
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'amqp://');
}
}

View File

@ -0,0 +1,35 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
class AmqpFactory
{
public function createConnection(array $credentials): \AMQPConnection
{
return new \AMQPConnection($credentials);
}
public function createChannel(\AMQPConnection $connection): \AMQPChannel
{
return new \AMQPChannel($connection);
}
public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return new \AMQPQueue($channel);
}
public function createExchange(\AMQPChannel $channel): \AMQPExchange
{
return new \AMQPExchange($channel);
}
}

View File

@ -0,0 +1,80 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
use Symfony\Component\Messenger\Adapter\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
/**
* Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements ReceiverInterface
{
private $messageDecoder;
private $connection;
private $shouldStop;
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
{
$this->messageDecoder = $messageDecoder;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$message = $this->connection->get();
if (null === $message) {
$handler(null);
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
continue;
}
try {
$handler($this->messageDecoder->decode(array(
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
)));
$this->connection->ack($message);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($message);
throw $e;
} catch (\Throwable $e) {
$this->connection->nack($message, AMQP_REQUEUE);
throw $e;
} finally {
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
}
}
public function stop(): void
{
$this->shouldStop = true;
}
}

View File

@ -0,0 +1,42 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
/**
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpSender implements SenderInterface
{
private $messageEncoder;
private $connection;
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
{
$this->messageEncoder = $messageEncoder;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function send($message)
{
$encodedMessage = $this->messageEncoder->encode($message);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
}
}

View File

@ -0,0 +1,219 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
/**
* An AMQP connection.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class Connection
{
private $connectionCredentials;
private $exchangeConfiguration;
private $queueConfiguration;
private $debug;
private $amqpFactory;
/**
* @var \AMQPChannel|null
*/
private $amqpChannel;
/**
* @var \AMQPExchange|null
*/
private $amqpExchange;
/**
* @var \AMQPQueue|null
*/
private $amqpQueue;
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
{
$this->connectionCredentials = $connectionCredentials;
$this->debug = $debug;
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
public static function fromDsn(string $dsn, array $options = array(), bool $debug = false, AmqpFactory $amqpFactory = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new \InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
}
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : array();
$amqpOptions = array_replace_recursive(array(
'host' => $parsedUrl['host'] ?? 'localhost',
'port' => $parsedUrl['port'] ?? 5672,
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
'queue' => array(
'name' => $queueName = $pathParts[1] ?? 'messages',
),
'exchange' => array(
'name' => $queueName,
),
), $options);
if (isset($parsedUrl['user'])) {
$amqpOptions['login'] = $parsedUrl['user'];
}
if (isset($parsedUrl['pass'])) {
$amqpOptions['password'] = $parsedUrl['pass'];
}
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $parsedQuery);
$amqpOptions = array_replace_recursive($amqpOptions, $parsedQuery);
}
$exchangeOptions = $amqpOptions['exchange'];
$queueOptions = $amqpOptions['queue'];
unset($amqpOptions['queue']);
unset($amqpOptions['exchange']);
return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
}
/**
* @throws \AMQPException
*/
public function publish(string $body, array $headers = array())
{
if ($this->debug) {
$this->setup();
}
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
}
/**
* Waits and gets a message from the configured queue.
*
* @throws \AMQPException
*/
public function get(): ?\AMQPEnvelope
{
if ($this->debug) {
$this->setup();
}
try {
if (false !== $message = $this->queue()->get()) {
return $message;
}
} catch (\AMQPQueueException $e) {
if (404 === $e->getCode()) {
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
$this->setup();
return $this->get();
} else {
throw $e;
}
}
return null;
}
public function ack(\AMQPEnvelope $message)
{
return $this->queue()->ack($message->getDeliveryTag());
}
public function reject(\AMQPEnvelope $message)
{
return $this->queue()->reject($message->getDeliveryTag());
}
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM)
{
return $this->queue()->nack($message->getDeliveryTag(), $flags);
}
public function setup()
{
if (!$this->channel()->isConnected()) {
$this->clear();
}
$this->exchange()->declareExchange();
$this->queue()->declareQueue();
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
}
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
$connection = $this->amqpFactory->createConnection($this->connectionCredentials);
$connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect';
if (false === $connection->{$connectMethod}()) {
throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.');
}
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
}
return $this->amqpChannel;
}
public function queue(): \AMQPQueue
{
if (null === $this->amqpQueue) {
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
$this->amqpQueue->setName($this->queueConfiguration['name']);
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
if (isset($this->queueConfiguration['arguments'])) {
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
}
}
return $this->amqpQueue;
}
public function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpExchange->setName($this->exchangeConfiguration['name']);
$this->amqpExchange->setType($this->exchangeConfiguration['type'] ?? AMQP_EX_TYPE_FANOUT);
$this->amqpExchange->setFlags($this->exchangeConfiguration['flags'] ?? AMQP_DURABLE);
if (isset($this->exchangeConfiguration['arguments'])) {
$this->amqpExchange->setArguments($this->exchangeConfiguration['arguments']);
}
}
return $this->amqpExchange;
}
public function getConnectionCredentials(): array
{
return $this->connectionCredentials;
}
private function clear()
{
$this->amqpChannel = null;
$this->amqpQueue = null;
$this->amqpExchange = null;
}
}

View File

@ -0,0 +1,25 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\AmqpExt\Exception;
/**
* If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting
* or re-queuing the message.
*
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will
* be rejected. Otherwise, it will be re-queued.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface RejectMessageExceptionInterface extends \Throwable
{
}

View File

@ -0,0 +1,29 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\Factory;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* Creates a Messenger adapter.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface AdapterFactoryInterface
{
public function createReceiver(string $dsn, array $options): ReceiverInterface;
public function createSender(string $dsn, array $options): SenderInterface;
public function supports(string $dsn, array $options): bool;
}

View File

@ -0,0 +1,64 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\Factory;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class ChainAdapterFactory implements AdapterFactoryInterface
{
private $factories;
/**
* @param iterable|AdapterFactoryInterface[] $factories
*/
public function __construct(iterable $factories)
{
$this->factories = $factories;
}
public function createReceiver(string $dsn, array $options): ReceiverInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createReceiver($dsn, $options);
}
}
throw new \InvalidArgumentException(sprintf('No adapter supports the given DSN "%s".', $dsn));
}
public function createSender(string $dsn, array $options): SenderInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createSender($dsn, $options);
}
}
throw new \InvalidArgumentException(sprintf('No adapter supports the given DSN "%s".', $dsn));
}
public function supports(string $dsn, array $options): bool
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return true;
}
}
return false;
}
}

View File

@ -25,20 +25,19 @@ class WrapIntoReceivedMessage implements ReceiverInterface
$this->decoratedReceiver = $decoratedConsumer;
}
public function receive(): iterable
public function receive(callable $handler): void
{
$iterator = $this->decoratedReceiver->receive();
foreach ($iterator as $message) {
try {
yield new ReceivedMessage($message);
} catch (\Throwable $e) {
if (!$iterator instanceof \Generator) {
throw $e;
}
$iterator->throw($e);
$this->decoratedReceiver->receive(function ($message) use ($handler) {
if (null !== $message) {
$message = new ReceivedMessage($message);
}
}
$handler($message);
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
}

View File

@ -0,0 +1,139 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @requires extension amqp
*/
class AmqpExtIntegrationTest extends TestCase
{
protected function setUp()
{
parent::setUp();
if (!getenv('MESSENGER_AMQP_DSN')) {
$this->markTestSkipped('The "MESSENGER_AMQP_DSN" environment variable is required.');
}
}
public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$sender->send($firstMessage = new DummyMessage('First'));
$sender->send($secondMessage = new DummyMessage('Second'));
$receivedMessages = 0;
$generator = $receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
if (2 == ++$receivedMessages) {
$receiver->stop();
}
});
}
public function testItReceivesSignals()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$sender->send(new DummyMessage('Hello'));
$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
'COMPONENT_ROOT' => __DIR__.'/../../../',
'DSN' => $dsn,
));
$process->start();
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
$signalTime = microtime(true);
$timedOutTime = time() + 10;
$process->signal(15);
while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput());
}
/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), array('read_timeout' => '1'));
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2);
$receiver->receive(function ($message) {
$this->assertNull($message);
});
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
while (time() < $timedOutTime) {
if (0 === strpos($process->getOutput(), $output)) {
return;
}
usleep(100 * 1000); // 100ms
}
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
}
}

View File

@ -0,0 +1,111 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\Adapter\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @requires extension amqp
*/
class AmqpReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn(array(
'type' => DummyMessage::class,
));
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('ack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function ($message) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $message);
$receiver->stop();
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Adapter\AmqpExt\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn(array(
'type' => DummyMessage::class,
));
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('nack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Adapter\AmqpExt\WillNeverWorkException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn(array(
'type' => DummyMessage::class,
));
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('reject')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}
}
class InterruptException extends \Exception
{
}
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
{
}

View File

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
/**
* @requires extension amqp
*/
class AmqpSenderTest extends TestCase
{
public function testItSendsTheEncodedMessage()
{
$message = new DummyMessage('Oy');
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
$encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
$sender = new AmqpSender($encoder, $connection);
$sender->send($message);
}
}

View File

@ -0,0 +1,186 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
/**
* @requires extension amqp
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('amqp://');
}
public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection(array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
), array(
'name' => 'messages',
), array(
'name' => 'messages',
)),
Connection::fromDsn('amqp://localhost/%2f/messages')
);
}
public function testOverrideOptionsViaQueryParameters()
{
$this->assertEquals(
new Connection(array(
'host' => 'redis',
'port' => 1234,
'vhost' => '/',
'login' => 'guest',
'password' => 'password',
), array(
'name' => 'exchangeName',
), array(
'name' => 'queue',
)),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName')
);
}
public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
{
$this->assertEquals(
new Connection(array(
'host' => 'redis',
'port' => 1234,
'vhost' => '/',
'login' => 'guest',
'password' => 'password',
'persistent' => 'true',
), array(
'name' => 'exchangeName',
), array(
'name' => 'queueName',
)),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queue[name]=queueName', array(
'persistent' => 'true',
'exchange' => array('name' => 'toBeOverwritten'),
))
);
}
public function testSetsParametersOnTheQueueAndExchange()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpQueue->expects($this->once())->method('setArguments')->with(array(
'x-dead-letter-exchange' => 'dead-exchange',
'x-message-ttl' => '1200',
));
$amqpExchange->expects($this->once())->method('setArguments')->with(array(
'alternate-exchange' => 'alternate',
));
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[arguments][x-dead-letter-exchange]=dead-exchange', array(
'queue' => array(
'arguments' => array(
'x-message-ttl' => '1200',
),
),
'exchange' => array(
'arguments' => array(
'alternate-exchange' => 'alternate',
),
),
), true, $factory);
$connection->publish('body');
}
public function testItUsesANormalConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpConnection->expects($this->once())->method('connect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', array(), false, $factory);
$connection->publish('body');
}
public function testItAllowsToUseAPersistentConnection()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpConnection->expects($this->once())->method('pconnect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', array(), false, $factory);
$connection->publish('body');
}
}
class TestAmqpFactory extends AmqpFactory
{
private $connection;
private $channel;
private $queue;
private $exchange;
public function __construct(\AMQPConnection $connection, \AMQPChannel $channel, \AMQPQueue $queue, \AMQPExchange $exchange)
{
$this->connection = $connection;
$this->channel = $channel;
$this->queue = $queue;
$this->exchange = $exchange;
}
public function createConnection(array $credentials): \AMQPConnection
{
return $this->connection;
}
public function createChannel(\AMQPConnection $connection): \AMQPChannel
{
return $this->channel;
}
public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return $this->queue;
}
public function createExchange(\AMQPChannel $channel): \AMQPExchange
{
return $this->exchange;
}
}

View File

@ -0,0 +1,43 @@
<?php
$componentRoot = $_SERVER['COMPONENT_ROOT'];
if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
}
if (!file_exists($autoload)) {
exit('You should run "composer install --dev" in the component before running this script.');
}
require_once $autoload;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$connection = Connection::fromDsn(getenv('DSN'));
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($message)
{
echo 'Get message: '.get_class($message)."\n";
sleep(30);
echo "Done.\n";
}
});
echo "Receiving messages...\n";
$worker->run();

View File

@ -23,8 +23,8 @@ class SendMessageMiddlewareTest extends TestCase
public function testItSendsTheMessageToAssignedSender()
{
$message = new DummyMessage('Hey');
$sender = $this->createMock(SenderInterface::class);
$next = $this->createPartialMock(\stdClass::class, ['__invoke']);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
@ -39,8 +39,8 @@ class SendMessageMiddlewareTest extends TestCase
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
{
$message = new DummyMessage('Hey');
$sender = $this->createMock(SenderInterface::class);
$next = $this->createPartialMock(\stdClass::class, ['__invoke']);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
@ -56,7 +56,7 @@ class SendMessageMiddlewareTest extends TestCase
public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage()
{
$message = new DummyMessage('Hey');
$next = $this->createPartialMock(\stdClass::class, ['__invoke']);
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array()));
@ -70,8 +70,8 @@ class SendMessageMiddlewareTest extends TestCase
$innerMessage = new DummyMessage('Hey');
$message = new ReceivedMessage($innerMessage);
$sender = $this->createMock(SenderInterface::class);
$next = $this->createPartialMock(\stdClass::class, ['__invoke']);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,

View File

@ -22,40 +22,40 @@ class SenderLocatorTest extends TestCase
{
public function testItReturnsTheSenderBasedOnTheMessageClass()
{
$sender = $this->createMock(SenderInterface::class);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container = new Container();
$container->set('my_amqp_sender', $sender);
$locator = new SenderLocator($container, [
DummyMessage::class => [
$locator = new SenderLocator($container, array(
DummyMessage::class => array(
'my_amqp_sender',
]
]);
),
));
$this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals([], $locator->getSendersForMessage(new SecondMessage()));
$this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals(array(), $locator->getSendersForMessage(new SecondMessage()));
}
public function testItSupportsAWildcardInsteadOfTheMessageClass()
{
$container = new Container();
$sender = $this->createMock(SenderInterface::class);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container->set('my_amqp_sender', $sender);
$apiSender = $this->createMock(SenderInterface::class);
$apiSender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container->set('my_api_sender', $apiSender);
$locator = new SenderLocator($container, [
DummyMessage::class => [
$locator = new SenderLocator($container, array(
DummyMessage::class => array(
'my_amqp_sender',
],
'*' => [
'my_api_sender'
]
]);
),
'*' => array(
'my_api_sender',
),
));
$this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals([$apiSender], $locator->getSendersForMessage(new SecondMessage()));
$this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
}
}

View File

@ -221,12 +221,16 @@ class DummyHandler
class DummyReceiver implements ReceiverInterface
{
public function receive(): iterable
public function receive(callable $handler): void
{
for ($i = 0; $i < 3; ++$i) {
yield new DummyMessage("Dummy $i");
$handler(new DummyMessage("Dummy $i"));
}
}
public function stop(): void
{
}
}
class UndefinedMessageHandler

View File

@ -31,24 +31,24 @@ class MessageBusTest extends TestCase
$message = new DummyMessage('Hello');
$responseFromDepthMiddleware = 1234;
$firstMiddleware = $this->createMock(MiddlewareInterface::class);
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$firstMiddleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function($message, $next) {
->will($this->returnCallback(function ($message, $next) {
return $next($message);
}));
$secondMiddleware = $this->createMock(MiddlewareInterface::class);
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$secondMiddleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->willReturn($responseFromDepthMiddleware);
$bus = new MessageBus([
$bus = new MessageBus(array(
$firstMiddleware,
$secondMiddleware,
]);
));
$this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message));
}

View File

@ -22,12 +22,12 @@ class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
{
$receiver = new CallbackReceiver(function() {
yield new DummyMessage('API');
yield new DummyMessage('IPA');
$receiver = new CallbackReceiver(function ($handler) {
$handler(new DummyMessage('API'));
$handler(new DummyMessage('IPA'));
});
$bus = $this->createMock(MessageBusInterface::class);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA')));
@ -38,11 +38,11 @@ class WorkerTest extends TestCase
public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages()
{
$receiver = new CallbackReceiver(function() {
yield new ReceivedMessage(new DummyMessage('API'));
$receiver = new CallbackReceiver(function ($handler) {
$handler(new ReceivedMessage(new DummyMessage('API')));
});
$bus = $this->createMock(MessageBusInterface::class);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
@ -52,9 +52,9 @@ class WorkerTest extends TestCase
public function testWorkerIsThrowingExceptionsBackToGenerators()
{
$receiver = new CallbackReceiver(function() {
$receiver = new CallbackReceiver(function ($handler) {
try {
yield new DummyMessage('Hello');
$handler(new DummyMessage('Hello'));
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
} catch (\InvalidArgumentException $e) {
@ -63,13 +63,25 @@ class WorkerTest extends TestCase
}
});
$bus = $this->createMock(MessageBusInterface::class);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
$worker = new Worker($receiver, $bus);
$worker->run();
}
public function testWorkerDoesNotSendNullMessagesToTheBus()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(null);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->never())->method('dispatch');
$worker = new Worker($receiver, $bus);
$worker->run();
}
}
class CallbackReceiver implements ReceiverInterface
@ -81,10 +93,13 @@ class CallbackReceiver implements ReceiverInterface
$this->callable = $callable;
}
public function receive(): iterable
public function receive(callable $handler): void
{
$callable = $this->callable;
$callable($handler);
}
return $callable();
public function stop(): void
{
}
}

View File

@ -27,25 +27,21 @@ class MaximumCountReceiver implements ReceiverInterface
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
}
public function receive(): iterable
public function receive(callable $handler): void
{
$iterator = $this->decoratedReceiver->receive();
$receivedMessages = 0;
foreach ($iterator as $message) {
try {
yield $message;
} catch (\Throwable $e) {
if (!$iterator instanceof \Generator) {
throw $e;
}
$iterator->throw($e);
}
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
$handler($message);
if (++$receivedMessages >= $this->maximumNumberOfMessages) {
break;
$this->stop();
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
}

View File

@ -18,5 +18,16 @@ namespace Symfony\Component\Messenger\Transport;
*/
interface ReceiverInterface
{
public function receive(): iterable;
/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received message. Note that this message
* can be `null` if the timeout to receive something has expired.
*/
public function receive(callable $handler) : void;
/**
* Stop receiving some messages.
*/
public function stop(): void;
}

View File

@ -35,22 +35,22 @@ class Worker
*/
public function run()
{
$iterator = $this->receiver->receive();
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, function () {
$this->receiver->stop();
});
}
$this->receiver->receive(function($message) {
if (null === $message) {
return;
}
foreach ($iterator as $message) {
if (!$message instanceof ReceivedMessage) {
$message = new ReceivedMessage($message);
}
try {
$this->bus->dispatch($message);
} catch (\Throwable $e) {
if (!$iterator instanceof \Generator) {
throw $e;
}
$iterator->throw($e);
}
}
$this->bus->dispatch($message);
});
}
}

View File

@ -23,7 +23,9 @@
"symfony/dependency-injection": "~3.4.6|~4.0",
"symfony/http-kernel": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
"symfony/var-dumper": "~3.4|~4.0"
"symfony/var-dumper": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
"symfony/process": "~4.0"
},
"suggest": {
"sroze/enqueue-bridge": "For using the php-enqueue library as an adapter."