[Messenger] fix retry of messages losing the routing key and properties
This commit is contained in:
parent
591ad2203c
commit
75c674debc
@ -34,4 +34,40 @@ class AmqpStampTest extends TestCase
|
||||
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
|
||||
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
|
||||
}
|
||||
|
||||
public function testCreateFromAmqpEnvelope()
|
||||
{
|
||||
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
|
||||
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
|
||||
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
|
||||
$amqpEnvelope->method('getPriority')->willReturn(5);
|
||||
$amqpEnvelope->method('getAppId')->willReturn('appid');
|
||||
|
||||
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope);
|
||||
|
||||
$this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey());
|
||||
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
|
||||
$this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']);
|
||||
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
|
||||
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
|
||||
}
|
||||
|
||||
public function testCreateFromAmqpEnvelopeWithPreviousStamp()
|
||||
{
|
||||
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
|
||||
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
|
||||
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
|
||||
$amqpEnvelope->method('getPriority')->willReturn(5);
|
||||
$amqpEnvelope->method('getAppId')->willReturn('appid');
|
||||
|
||||
$previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]);
|
||||
|
||||
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp);
|
||||
|
||||
$this->assertSame('otherroutingkey', $stamp->getRoutingKey());
|
||||
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
|
||||
$this->assertSame(8, $stamp->getAttributes()['priority']);
|
||||
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
|
||||
$this->assertSame(AMQP_MANDATORY, $stamp->getFlags());
|
||||
}
|
||||
}
|
||||
|
@ -47,20 +47,22 @@ class AmqpSender implements SenderInterface
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
|
||||
|
||||
/** @var AmqpStamp|null $amqpStamp */
|
||||
$amqpStamp = $envelope->last(AmqpStamp::class);
|
||||
if (isset($encodedMessage['headers']['Content-Type'])) {
|
||||
$contentType = $encodedMessage['headers']['Content-Type'];
|
||||
unset($encodedMessage['headers']['Content-Type']);
|
||||
|
||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||
|
||||
if (!isset($attributes['content_type'])) {
|
||||
$attributes['content_type'] = $contentType;
|
||||
|
||||
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
|
||||
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
|
||||
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
|
||||
}
|
||||
}
|
||||
|
||||
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
|
||||
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
|
||||
}
|
||||
|
||||
try {
|
||||
$this->connection->publish(
|
||||
$encodedMessage['body'],
|
||||
|
@ -46,4 +46,33 @@ final class AmqpStamp implements NonSendableStampInterface
|
||||
{
|
||||
return $this->attributes;
|
||||
}
|
||||
|
||||
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
|
||||
{
|
||||
$attr = $previousStamp->attributes ?? [];
|
||||
|
||||
$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
|
||||
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
|
||||
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
|
||||
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
|
||||
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
|
||||
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
|
||||
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
|
||||
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
|
||||
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
|
||||
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
|
||||
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
|
||||
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
|
||||
|
||||
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
|
||||
}
|
||||
|
||||
public static function createWithAttributes(array $attributes, self $previousStamp = null): self
|
||||
{
|
||||
return new self(
|
||||
$previousStamp->routingKey ?? null,
|
||||
$previousStamp->flags ?? AMQP_NOPARAM,
|
||||
array_merge($previousStamp->attributes ?? [], $attributes)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ class Connection
|
||||
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
|
||||
{
|
||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
|
||||
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
|
||||
|
||||
$exchange->publish(
|
||||
$body,
|
||||
|
Reference in New Issue
Block a user