[Messenger][AMQP] Use delivery_mode=2 by default

This commit is contained in:
Grégoire Pineau 2019-12-12 16:55:12 +01:00
parent 96f04d9b32
commit be2eb6fcc7
2 changed files with 22 additions and 6 deletions

View File

@ -227,7 +227,7 @@ class ConnectionTest extends TestCase
); );
$amqpExchange->expects($this->once())->method('declareExchange'); $amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]); $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
@ -250,7 +250,7 @@ class ConnectionTest extends TestCase
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
$amqpExchange->expects($this->once())->method('declareExchange'); $amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]); $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
@ -373,7 +373,7 @@ class ConnectionTest extends TestCase
$delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
@ -415,7 +415,7 @@ class ConnectionTest extends TestCase
$delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$connection->publish('{}', [], 120000); $connection->publish('{}', [], 120000);
} }
@ -447,12 +447,27 @@ class ConnectionTest extends TestCase
$amqpExchange = $this->createMock(\AMQPExchange::class) $amqpExchange = $this->createMock(\AMQPExchange::class)
); );
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]); $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
} }
public function testAmqpStampDelireryModeIsUsed()
{
$factory = new TestAmqpFactory(
$this->createMock(\AMQPConnection::class),
$this->createMock(\AMQPChannel::class),
$this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
}
public function testItCanPublishWithTheDefaultRoutingKey() public function testItCanPublishWithTheDefaultRoutingKey()
{ {
$factory = new TestAmqpFactory( $factory = new TestAmqpFactory(
@ -519,7 +534,7 @@ class ConnectionTest extends TestCase
$delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
} }

View File

@ -231,6 +231,7 @@ class Connection
{ {
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; $attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
$exchange->publish( $exchange->publish(
$body, $body,