bug #34134 [Messenger] fix retry of messages losing the routing key and properties (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] fix retry of messages losing the routing key and properties

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       | Fix #32994 <!-- prefix each issue number with "Fix #", if any -->
| License       | MIT
| Doc PR        |

Messages sent for retry in rabbitmq lost the routing key and properties like the priority. Now we read those original properties and sent the retry message with the same properties (unless those properties have already been set manually before).

Commits
-------

75c674debc [Messenger] fix retry of messages losing the routing key and properties
This commit is contained in:
Fabien Potencier 2019-11-04 07:59:16 +01:00
commit e057a9c89f
4 changed files with 74 additions and 7 deletions

View File

@ -34,4 +34,40 @@ class AmqpStampTest extends TestCase
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
}
public function testCreateFromAmqpEnvelope()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope);
$this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
}
public function testCreateFromAmqpEnvelopeWithPreviousStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');
$previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]);
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp);
$this->assertSame('otherroutingkey', $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame(8, $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_MANDATORY, $stamp->getFlags());
}
}

View File

@ -47,20 +47,22 @@ class AmqpSender implements SenderInterface
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
unset($encodedMessage['headers']['Content-Type']);
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
if (!isset($attributes['content_type'])) {
$attributes['content_type'] = $contentType;
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
}
}
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
}
try {
$this->connection->publish(
$encodedMessage['body'],

View File

@ -46,4 +46,33 @@ final class AmqpStamp implements NonSendableStampInterface
{
return $this->attributes;
}
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
{
$attr = $previousStamp->attributes ?? [];
$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
}
public static function createWithAttributes(array $attributes, self $previousStamp = null): self
{
return new self(
$previousStamp->routingKey ?? null,
$previousStamp->flags ?? AMQP_NOPARAM,
array_merge($previousStamp->attributes ?? [], $attributes)
);
}
}

View File

@ -225,7 +225,7 @@ class Connection
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
$exchange->publish(
$body,