Uses an AmqpStamp
to provide flags and attributes
This commit is contained in:
parent
09dee1737d
commit
56fa574023
@ -18,9 +18,9 @@ CHANGELOG
|
|||||||
changed: a required 3rd `SerializerInterface` argument was added.
|
changed: a required 3rd `SerializerInterface` argument was added.
|
||||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||||
explicitly handle messages synchronously.
|
explicitly handle messages synchronously.
|
||||||
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
|
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
|
||||||
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
|
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
|
||||||
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
|
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
|
||||||
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
|
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
|
||||||
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
|
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
|
||||||
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
|
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
|
|
||||||
|
|
||||||
class AmqpRoutingKeyStampTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testStamp()
|
|
||||||
{
|
|
||||||
$stamp = new AmqpRoutingKeyStamp('routing_key');
|
|
||||||
$this->assertSame('routing_key', $stamp->getRoutingKey());
|
|
||||||
}
|
|
||||||
}
|
|
@ -14,8 +14,8 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
@ -41,14 +41,14 @@ class AmqpSenderTest extends TestCase
|
|||||||
|
|
||||||
public function testItSendsTheEncodedMessageUsingARoutingKey()
|
public function testItSendsTheEncodedMessageUsingARoutingKey()
|
||||||
{
|
{
|
||||||
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
|
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
|
||||||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
||||||
|
|
||||||
$serializer = $this->createMock(SerializerInterface::class);
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
$serializer->method('encode')->with($envelope)->willReturn($encoded);
|
$serializer->method('encode')->with($envelope)->willReturn($encoded);
|
||||||
|
|
||||||
$connection = $this->createMock(Connection::class);
|
$connection = $this->createMock(Connection::class);
|
||||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
|
||||||
|
|
||||||
$sender = new AmqpSender($connection, $serializer);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @requires extension amqp
|
||||||
|
*/
|
||||||
|
class AmqpStampTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testRoutingKeyOnly()
|
||||||
|
{
|
||||||
|
$stamp = new AmqpStamp('routing_key');
|
||||||
|
$this->assertSame('routing_key', $stamp->getRoutingKey());
|
||||||
|
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
|
||||||
|
$this->assertSame([], $stamp->getAttributes());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testFlagsAndAttributes()
|
||||||
|
{
|
||||||
|
$stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']);
|
||||||
|
$this->assertNull($stamp->getRoutingKey());
|
||||||
|
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
|
||||||
|
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
|
||||||
|
}
|
||||||
|
}
|
@ -13,7 +13,9 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -430,7 +432,7 @@ class ConnectionTest extends TestCase
|
|||||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
|
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
|
||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
|
||||||
$connection->publish('body', [], 0, 'routing_key');
|
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
|
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
|
||||||
@ -477,7 +479,27 @@ class ConnectionTest extends TestCase
|
|||||||
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
|
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
|
||||||
$connection->publish('{}', [], 120000, 'routing_key');
|
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItCanPublishWithCustomFlagsAndAttributes()
|
||||||
|
{
|
||||||
|
$factory = new TestAmqpFactory(
|
||||||
|
$amqpConnection = $this->createMock(\AMQPConnection::class),
|
||||||
|
$amqpChannel = $this->createMock(\AMQPChannel::class),
|
||||||
|
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||||
|
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||||
|
);
|
||||||
|
|
||||||
|
$amqpExchange->expects($this->once())->method('publish')->with(
|
||||||
|
'body',
|
||||||
|
'routing_key',
|
||||||
|
AMQP_IMMEDIATE,
|
||||||
|
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
|
||||||
|
);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
|
||||||
|
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,11 +51,12 @@ class AmqpSender implements SenderInterface
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
|
$this->connection->publish(
|
||||||
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
|
$encodedMessage['body'],
|
||||||
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
|
$encodedMessage['headers'] ?? [],
|
||||||
|
$delay,
|
||||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
|
$envelope->last(AmqpStamp::class)
|
||||||
|
);
|
||||||
} catch (\AMQPException $e) {
|
} catch (\AMQPException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
@ -15,20 +15,35 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Guillaume Gammelin <ggammelin@gmail.com>
|
* @author Guillaume Gammelin <ggammelin@gmail.com>
|
||||||
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
*
|
*
|
||||||
* @experimental in 4.3
|
* @experimental in 4.3
|
||||||
*/
|
*/
|
||||||
final class AmqpRoutingKeyStamp implements StampInterface
|
final class AmqpStamp implements StampInterface
|
||||||
{
|
{
|
||||||
private $routingKey;
|
private $routingKey;
|
||||||
|
private $flags;
|
||||||
|
private $attributes;
|
||||||
|
|
||||||
public function __construct(string $routingKey)
|
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
|
||||||
{
|
{
|
||||||
$this->routingKey = $routingKey;
|
$this->routingKey = $routingKey;
|
||||||
|
$this->flags = $flags;
|
||||||
|
$this->attributes = $attributes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getRoutingKey(): string
|
public function getRoutingKey(): ?string
|
||||||
{
|
{
|
||||||
return $this->routingKey;
|
return $this->routingKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function getFlags(): int
|
||||||
|
{
|
||||||
|
return $this->flags;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getAttributes(): array
|
||||||
|
{
|
||||||
|
return $this->attributes;
|
||||||
|
}
|
||||||
}
|
}
|
@ -171,10 +171,10 @@ class Connection
|
|||||||
*
|
*
|
||||||
* @throws \AMQPException
|
* @throws \AMQPException
|
||||||
*/
|
*/
|
||||||
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
|
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
|
||||||
{
|
{
|
||||||
if (0 !== $delay) {
|
if (0 !== $delay) {
|
||||||
$this->publishWithDelay($body, $headers, $delay, $routingKey);
|
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -183,13 +183,14 @@ class Connection
|
|||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->exchange()->publish(
|
$this->publishOnExchange(
|
||||||
|
$this->exchange(),
|
||||||
$body,
|
$body,
|
||||||
$routingKey ?? $this->getDefaultPublishRoutingKey(),
|
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
|
||||||
AMQP_NOPARAM,
|
|
||||||
[
|
[
|
||||||
'headers' => $headers,
|
'headers' => $headers,
|
||||||
]
|
],
|
||||||
|
$amqpStamp
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,19 +207,30 @@ class Connection
|
|||||||
/**
|
/**
|
||||||
* @throws \AMQPException
|
* @throws \AMQPException
|
||||||
*/
|
*/
|
||||||
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
|
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
|
||||||
{
|
{
|
||||||
if ($this->shouldSetup()) {
|
if ($this->shouldSetup()) {
|
||||||
$this->setupDelay($delay, $exchangeRoutingKey);
|
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->getDelayExchange()->publish(
|
$this->publishOnExchange(
|
||||||
|
$this->getDelayExchange(),
|
||||||
$body,
|
$body,
|
||||||
$this->getRoutingKeyForDelay($delay),
|
$this->getRoutingKeyForDelay($delay),
|
||||||
AMQP_NOPARAM,
|
|
||||||
[
|
[
|
||||||
'headers' => $headers,
|
'headers' => $headers,
|
||||||
]
|
],
|
||||||
|
$amqpStamp
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
|
||||||
|
{
|
||||||
|
$exchange->publish(
|
||||||
|
$body,
|
||||||
|
$routingKey,
|
||||||
|
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
|
||||||
|
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user