bug #32413 [Messenger] fix publishing headers set on AmqpStamp (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] fix publishing headers set on AmqpStamp

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets |
| License       | MIT
| Doc PR        |

Dispatching a message using an AmqpStamp and setting extra headers with it didn't work. We need to merge the `$attribute['headers']`, not the attributes itself as that will ignore the stamp headers because it's not recursive.
I also made the AmqpStamp a NonSendableStampInterface because it's pointless to serialize the stamp because the stamp already set's the attributes for publishing.

Commits
-------

50b3ec4dc5 [Messenger] fix publishing headers set on AmqpStamp
This commit is contained in:
Fabien Potencier 2019-07-08 08:20:10 +02:00
commit bb9251a42a
3 changed files with 24 additions and 10 deletions

View File

@ -418,6 +418,21 @@ class ConnectionTest extends TestCase
$connection->channel();
}
public function testAmqpStampHeadersAreUsed()
{
$factory = new TestAmqpFactory(
$this->createMock(\AMQPConnection::class),
$this->createMock(\AMQPChannel::class),
$this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
}
public function testItCanPublishWithTheDefaultRoutingKey()
{
$factory = new TestAmqpFactory(

View File

@ -11,7 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
@ -19,7 +19,7 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
*
* @experimental in 4.3
*/
final class AmqpStamp implements StampInterface
final class AmqpStamp implements NonSendableStampInterface
{
private $routingKey;
private $flags;

View File

@ -191,9 +191,7 @@ class Connection
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
[
'headers' => $headers,
],
$headers,
$amqpStamp
);
}
@ -223,20 +221,21 @@ class Connection
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey),
[
'headers' => $headers,
],
$headers,
$amqpStamp
);
}
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
$exchange->publish(
$body,
$routingKey,
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
$attributes
);
}