[Messenger] fix publishing headers set on AmqpStamp

This commit is contained in:
Tobias Schultze 2019-07-07 01:09:30 +02:00
parent d27c5307f0
commit 50b3ec4dc5
3 changed files with 24 additions and 10 deletions

View File

@ -418,6 +418,21 @@ class ConnectionTest extends TestCase
$connection->channel();
}
public function testAmqpStampHeadersAreUsed()
{
$factory = new TestAmqpFactory(
$this->createMock(\AMQPConnection::class),
$this->createMock(\AMQPChannel::class),
$this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
}
public function testItCanPublishWithTheDefaultRoutingKey()
{
$factory = new TestAmqpFactory(

View File

@ -11,7 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
@ -19,7 +19,7 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
*
* @experimental in 4.3
*/
final class AmqpStamp implements StampInterface
final class AmqpStamp implements NonSendableStampInterface
{
private $routingKey;
private $flags;

View File

@ -191,9 +191,7 @@ class Connection
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
[
'headers' => $headers,
],
$headers,
$amqpStamp
);
}
@ -223,20 +221,21 @@ class Connection
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey),
[
'headers' => $headers,
],
$headers,
$amqpStamp
);
}
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
$exchange->publish(
$body,
$routingKey,
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
$attributes
);
}