[Messenger] Add message timestamp to amqp connection
This commit is contained in:
parent
e1cfbd20fe
commit
281540e005
@ -21,6 +21,8 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
|
||||
/**
|
||||
* @requires extension amqp
|
||||
*
|
||||
* @group time-sensitive
|
||||
*/
|
||||
class ConnectionTest extends TestCase
|
||||
{
|
||||
@ -266,7 +268,7 @@ class ConnectionTest extends TestCase
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
|
||||
|
||||
@ -289,7 +291,7 @@ class ConnectionTest extends TestCase
|
||||
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
|
||||
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
$amqpQueue0->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
|
||||
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
|
||||
@ -326,7 +328,7 @@ class ConnectionTest extends TestCase
|
||||
$factory->method('createQueue')->willReturn($amqpQueue);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
|
||||
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
|
||||
@ -439,7 +441,7 @@ class ConnectionTest extends TestCase
|
||||
$delayQueue->expects($this->once())->method('declareQueue');
|
||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
|
||||
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
|
||||
@ -481,7 +483,7 @@ class ConnectionTest extends TestCase
|
||||
$delayQueue->expects($this->once())->method('declareQueue');
|
||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
|
||||
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
$connection->publish('{}', [], 120000);
|
||||
}
|
||||
|
||||
@ -513,7 +515,7 @@ class ConnectionTest extends TestCase
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
|
||||
@ -528,7 +530,7 @@ class ConnectionTest extends TestCase
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
|
||||
@ -600,7 +602,7 @@ class ConnectionTest extends TestCase
|
||||
$delayQueue->expects($this->once())->method('declareQueue');
|
||||
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
|
||||
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
|
||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
|
||||
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
||||
}
|
||||
|
||||
@ -617,7 +619,7 @@ class ConnectionTest extends TestCase
|
||||
'body',
|
||||
'routing_key',
|
||||
AMQP_IMMEDIATE,
|
||||
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
|
||||
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class], 'timestamp' => time()]
|
||||
);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||
|
@ -317,6 +317,7 @@ class Connection
|
||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
|
||||
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
|
||||
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
|
||||
|
||||
$exchange->publish(
|
||||
$body,
|
||||
|
Reference in New Issue
Block a user