feature #30913 [Messenger] Uses an AmqpStamp to provide flags and attributes (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Uses an `AmqpStamp` to provide flags and attributes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #28885
| License       | MIT
| Doc PR        | ø

Using the `AmqpStamp` you can configure the flags and any attribute (such as `delivery_mode`).

Commits
-------

56fa574023 Uses an `AmqpStamp` to provide flags and attributes
This commit is contained in:
Fabien Potencier 2019-04-06 21:18:37 +02:00
commit 3de3e4e8d9
8 changed files with 113 additions and 50 deletions

View File

@ -18,9 +18,9 @@ CHANGELOG
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`

View File

@ -1,24 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
class AmqpRoutingKeyStampTest extends TestCase
{
public function testStamp()
{
$stamp = new AmqpRoutingKeyStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
}
}

View File

@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -41,14 +41,14 @@ class AmqpSenderTest extends TestCase
public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
/**
* @requires extension amqp
*/
class AmqpStampTest extends TestCase
{
public function testRoutingKeyOnly()
{
$stamp = new AmqpStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
$this->assertSame([], $stamp->getAttributes());
}
public function testFlagsAndAttributes()
{
$stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']);
$this->assertNull($stamp->getRoutingKey());
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
}
}

View File

@ -13,7 +13,9 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
/**
@ -430,7 +432,7 @@ class ConnectionTest extends TestCase
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, 'routing_key');
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
}
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
@ -477,7 +479,27 @@ class ConnectionTest extends TestCase
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, 'routing_key');
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}
public function testItCanPublishWithCustomFlagsAndAttributes()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with(
'body',
'routing_key',
AMQP_IMMEDIATE,
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
}
}

View File

@ -51,11 +51,12 @@ class AmqpSender implements SenderInterface
}
try {
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
$this->connection->publish(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$envelope->last(AmqpStamp::class)
);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

View File

@ -15,20 +15,35 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.3
*/
final class AmqpRoutingKeyStamp implements StampInterface
final class AmqpStamp implements StampInterface
{
private $routingKey;
private $flags;
private $attributes;
public function __construct(string $routingKey)
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
{
$this->routingKey = $routingKey;
$this->flags = $flags;
$this->attributes = $attributes;
}
public function getRoutingKey(): string
public function getRoutingKey(): ?string
{
return $this->routingKey;
}
public function getFlags(): int
{
return $this->flags;
}
public function getAttributes(): array
{
return $this->attributes;
}
}

View File

@ -171,10 +171,10 @@ class Connection
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay, $routingKey);
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
return;
}
@ -183,13 +183,14 @@ class Connection
$this->setup();
}
$this->exchange()->publish(
$this->publishOnExchange(
$this->exchange(),
$body,
$routingKey ?? $this->getDefaultPublishRoutingKey(),
AMQP_NOPARAM,
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
[
'headers' => $headers,
]
],
$amqpStamp
);
}
@ -206,19 +207,30 @@ class Connection
/**
* @throws \AMQPException
*/
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
if ($this->shouldSetup()) {
$this->setupDelay($delay, $exchangeRoutingKey);
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
}
$this->getDelayExchange()->publish(
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay),
AMQP_NOPARAM,
[
'headers' => $headers,
]
],
$amqpStamp
);
}
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
{
$exchange->publish(
$body,
$routingKey,
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
);
}