[Messenger][AMQP] Use delivery_mode=2 by default
This commit is contained in:
parent
96f04d9b32
commit
be2eb6fcc7
@ -227,7 +227,7 @@ class ConnectionTest extends TestCase
|
|||||||
);
|
);
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
|
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||||
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
|
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
|
||||||
|
|
||||||
@ -250,7 +250,7 @@ class ConnectionTest extends TestCase
|
|||||||
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
|
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
|
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||||
$amqpQueue0->expects($this->once())->method('declareQueue');
|
$amqpQueue0->expects($this->once())->method('declareQueue');
|
||||||
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
|
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
|
||||||
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
|
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
|
||||||
@ -373,7 +373,7 @@ class ConnectionTest extends TestCase
|
|||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
|
||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||||
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
|
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
|
||||||
@ -415,7 +415,7 @@ class ConnectionTest extends TestCase
|
|||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||||
$connection->publish('{}', [], 120000);
|
$connection->publish('{}', [], 120000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,12 +447,27 @@ class ConnectionTest extends TestCase
|
|||||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||||
);
|
);
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
|
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
|
||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||||
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
|
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testAmqpStampDelireryModeIsUsed()
|
||||||
|
{
|
||||||
|
$factory = new TestAmqpFactory(
|
||||||
|
$this->createMock(\AMQPConnection::class),
|
||||||
|
$this->createMock(\AMQPChannel::class),
|
||||||
|
$this->createMock(\AMQPQueue::class),
|
||||||
|
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||||
|
);
|
||||||
|
|
||||||
|
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||||
|
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
|
||||||
|
}
|
||||||
|
|
||||||
public function testItCanPublishWithTheDefaultRoutingKey()
|
public function testItCanPublishWithTheDefaultRoutingKey()
|
||||||
{
|
{
|
||||||
$factory = new TestAmqpFactory(
|
$factory = new TestAmqpFactory(
|
||||||
@ -519,7 +534,7 @@ class ConnectionTest extends TestCase
|
|||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||||
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,6 +231,7 @@ class Connection
|
|||||||
{
|
{
|
||||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||||
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
|
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
|
||||||
|
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
|
||||||
|
|
||||||
$exchange->publish(
|
$exchange->publish(
|
||||||
$body,
|
$body,
|
||||||
|
Reference in New Issue
Block a user