From 56fa5740239329e94f1e07a191e9f31c11596df0 Mon Sep 17 00:00:00 2001 From: Samuel ROZE Date: Sat, 6 Apr 2019 17:20:27 +0200 Subject: [PATCH] Uses an `AmqpStamp` to provide flags and attributes --- src/Symfony/Component/Messenger/CHANGELOG.md | 4 +- .../AmqpExt/AmqpRoutingKeyStampTest.php | 24 ------------ .../Transport/AmqpExt/AmqpSenderTest.php | 6 +-- .../Tests/Transport/AmqpExt/AmqpStampTest.php | 37 +++++++++++++++++++ .../Transport/AmqpExt/ConnectionTest.php | 26 ++++++++++++- .../Transport/AmqpExt/AmqpSender.php | 11 +++--- ...{AmqpRoutingKeyStamp.php => AmqpStamp.php} | 21 +++++++++-- .../Transport/AmqpExt/Connection.php | 34 +++++++++++------ 8 files changed, 113 insertions(+), 50 deletions(-) delete mode 100644 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpStampTest.php rename src/Symfony/Component/Messenger/Transport/AmqpExt/{AmqpRoutingKeyStamp.php => AmqpStamp.php} (51%) diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 03b4864520..b16eac2d86 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -18,9 +18,9 @@ CHANGELOG changed: a required 3rd `SerializerInterface` argument was added. * Added a new `SyncTransport` along with `ForceCallHandlersStamp` to explicitly handle messages synchronously. - * Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing. + * Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing. * [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for - AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead. + AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead. * [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can therefore name the queue but also configure `binding_keys`, `flags` and `arguments`. * [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection` diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php deleted file mode 100644 index 895e41b262..0000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php +++ /dev/null @@ -1,24 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; - -use PHPUnit\Framework\TestCase; -use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp; - -class AmqpRoutingKeyStampTest extends TestCase -{ - public function testStamp() - { - $stamp = new AmqpRoutingKeyStamp('routing_key'); - $this->assertSame('routing_key', $stamp->getRoutingKey()); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 178d86a516..95380a9e55 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -41,14 +41,14 @@ class AmqpSenderTest extends TestCase public function testItSendsTheEncodedMessageUsingARoutingKey() { - $envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk')); + $envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk')); $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; $serializer = $this->createMock(SerializerInterface::class); $serializer->method('encode')->with($envelope)->willReturn($encoded); $connection = $this->createMock(Connection::class); - $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk'); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); $sender = new AmqpSender($connection, $serializer); $sender->send($envelope); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpStampTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpStampTest.php new file mode 100644 index 0000000000..d9605808f6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpStampTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; + +/** + * @requires extension amqp + */ +class AmqpStampTest extends TestCase +{ + public function testRoutingKeyOnly() + { + $stamp = new AmqpStamp('routing_key'); + $this->assertSame('routing_key', $stamp->getRoutingKey()); + $this->assertSame(AMQP_NOPARAM, $stamp->getFlags()); + $this->assertSame([], $stamp->getAttributes()); + } + + public function testFlagsAndAttributes() + { + $stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']); + $this->assertNull($stamp->getRoutingKey()); + $this->assertSame(AMQP_DURABLE, $stamp->getFlags()); + $this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index a2de20b620..e6bd2059e2 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -13,7 +13,9 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; /** @@ -430,7 +432,7 @@ class ConnectionTest extends TestCase $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory); - $connection->publish('body', [], 0, 'routing_key'); + $connection->publish('body', [], 0, new AmqpStamp('routing_key')); } public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument() @@ -477,7 +479,27 @@ class ConnectionTest extends TestCase $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000'); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]); - $connection->publish('{}', [], 120000, 'routing_key'); + $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); + } + + public function testItCanPublishWithCustomFlagsAndAttributes() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with( + 'body', + 'routing_key', + AMQP_IMMEDIATE, + ['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]] + ); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2])); } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index d53ecb3b89..b873312a67 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -51,11 +51,12 @@ class AmqpSender implements SenderInterface } try { - /** @var $routingKeyStamp AmqpRoutingKeyStamp */ - $routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class); - $routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null; - - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey); + $this->connection->publish( + $encodedMessage['body'], + $encodedMessage['headers'] ?? [], + $delay, + $envelope->last(AmqpStamp::class) + ); } catch (\AMQPException $e) { throw new TransportException($e->getMessage(), 0, $e); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php similarity index 51% rename from src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php rename to src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php index b9f2aa2d81..d7a00c09b4 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php @@ -15,20 +15,35 @@ use Symfony\Component\Messenger\Stamp\StampInterface; /** * @author Guillaume Gammelin + * @author Samuel Roze * * @experimental in 4.3 */ -final class AmqpRoutingKeyStamp implements StampInterface +final class AmqpStamp implements StampInterface { private $routingKey; + private $flags; + private $attributes; - public function __construct(string $routingKey) + public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = []) { $this->routingKey = $routingKey; + $this->flags = $flags; + $this->attributes = $attributes; } - public function getRoutingKey(): string + public function getRoutingKey(): ?string { return $this->routingKey; } + + public function getFlags(): int + { + return $this->flags; + } + + public function getAttributes(): array + { + return $this->attributes; + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 1c7a33430b..ea9ba1f02a 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -171,10 +171,10 @@ class Connection * * @throws \AMQPException */ - public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void + public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void { if (0 !== $delay) { - $this->publishWithDelay($body, $headers, $delay, $routingKey); + $this->publishWithDelay($body, $headers, $delay, $amqpStamp); return; } @@ -183,13 +183,14 @@ class Connection $this->setup(); } - $this->exchange()->publish( + $this->publishOnExchange( + $this->exchange(), $body, - $routingKey ?? $this->getDefaultPublishRoutingKey(), - AMQP_NOPARAM, + (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(), [ 'headers' => $headers, - ] + ], + $amqpStamp ); } @@ -206,19 +207,30 @@ class Connection /** * @throws \AMQPException */ - private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey) + private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null) { if ($this->shouldSetup()) { - $this->setupDelay($delay, $exchangeRoutingKey); + $this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null); } - $this->getDelayExchange()->publish( + $this->publishOnExchange( + $this->getDelayExchange(), $body, $this->getRoutingKeyForDelay($delay), - AMQP_NOPARAM, [ 'headers' => $headers, - ] + ], + $amqpStamp + ); + } + + private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null) + { + $exchange->publish( + $body, + $routingKey, + $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, + array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes) ); }