[Messenger] fix retry of messages losing the routing key and properties

This commit is contained in:
Tobias Schultze 2019-10-26 20:08:06 +02:00
parent 591ad2203c
commit 75c674debc
4 changed files with 74 additions and 7 deletions

View File

@ -34,4 +34,40 @@ class AmqpStampTest extends TestCase
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
}
public function testCreateFromAmqpEnvelope()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope);
$this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
}
public function testCreateFromAmqpEnvelopeWithPreviousStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');
$previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]);
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp);
$this->assertSame('otherroutingkey', $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame(8, $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_MANDATORY, $stamp->getFlags());
}
}

View File

@ -47,20 +47,22 @@ class AmqpSender implements SenderInterface
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
unset($encodedMessage['headers']['Content-Type']);
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
if (!isset($attributes['content_type'])) {
$attributes['content_type'] = $contentType;
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
}
}
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
}
try {
$this->connection->publish(
$encodedMessage['body'],

View File

@ -46,4 +46,33 @@ final class AmqpStamp implements NonSendableStampInterface
{
return $this->attributes;
}
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
{
$attr = $previousStamp->attributes ?? [];
$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
}
public static function createWithAttributes(array $attributes, self $previousStamp = null): self
{
return new self(
$previousStamp->routingKey ?? null,
$previousStamp->flags ?? AMQP_NOPARAM,
array_merge($previousStamp->attributes ?? [], $attributes)
);
}
}

View File

@ -225,7 +225,7 @@ class Connection
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
$exchange->publish(
$body,