[Messenger] fix publishing headers set on AmqpStamp
This commit is contained in:
parent
d27c5307f0
commit
50b3ec4dc5
@ -418,6 +418,21 @@ class ConnectionTest extends TestCase
|
||||
$connection->channel();
|
||||
}
|
||||
|
||||
public function testAmqpStampHeadersAreUsed()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
$this->createMock(\AMQPConnection::class),
|
||||
$this->createMock(\AMQPChannel::class),
|
||||
$this->createMock(\AMQPQueue::class),
|
||||
$amqpExchange = $this->createMock(\AMQPExchange::class)
|
||||
);
|
||||
|
||||
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
|
||||
|
||||
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
|
||||
}
|
||||
|
||||
public function testItCanPublishWithTheDefaultRoutingKey()
|
||||
{
|
||||
$factory = new TestAmqpFactory(
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
|
||||
|
||||
/**
|
||||
* @author Guillaume Gammelin <ggammelin@gmail.com>
|
||||
@ -19,7 +19,7 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
final class AmqpStamp implements StampInterface
|
||||
final class AmqpStamp implements NonSendableStampInterface
|
||||
{
|
||||
private $routingKey;
|
||||
private $flags;
|
||||
|
@ -191,9 +191,7 @@ class Connection
|
||||
$this->exchange(),
|
||||
$body,
|
||||
$this->getRoutingKeyForMessage($amqpStamp),
|
||||
[
|
||||
'headers' => $headers,
|
||||
],
|
||||
$headers,
|
||||
$amqpStamp
|
||||
);
|
||||
}
|
||||
@ -223,20 +221,21 @@ class Connection
|
||||
$this->getDelayExchange(),
|
||||
$body,
|
||||
$this->getRoutingKeyForDelay($delay, $routingKey),
|
||||
[
|
||||
'headers' => $headers,
|
||||
],
|
||||
$headers,
|
||||
$amqpStamp
|
||||
);
|
||||
}
|
||||
|
||||
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
|
||||
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
|
||||
{
|
||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
|
||||
|
||||
$exchange->publish(
|
||||
$body,
|
||||
$routingKey,
|
||||
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
|
||||
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
|
||||
$attributes
|
||||
);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user