[Messenger] Added X-Ray trace header support to the SQS transport

This commit is contained in:
Pavel Kirpitsov 2021-04-06 12:45:46 +03:00
parent 3ca3de50c3
commit 5fa7ff9541
5 changed files with 69 additions and 2 deletions

View File

@ -6,6 +6,7 @@ CHANGELOG
* Added new `debug` option to log HTTP requests and responses.
* Allowed for receiver & sender injection into AmazonSqsTransport
* Add X-Ray trace header support to the SQS transport
5.2.0
-----

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -53,4 +54,22 @@ class AmazonSqsSenderTest extends TestCase
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
public function testSendWithAmazonSqsXrayTraceHeaderStamp()
{
$envelope = (new Envelope(new DummyMessage('Oy')))
->with($stamp = new AmazonSqsXrayTraceHeaderStamp('traceHeader'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('send')
->with($encoded['body'], $encoded['headers'], 0, null, null, $stamp->getTraceId());
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@ -45,6 +45,7 @@ class AmazonSqsSender implements SenderInterface
$messageGroupId = null;
$messageDeduplicationId = null;
$xrayTraceId = null;
/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
@ -53,13 +54,20 @@ class AmazonSqsSender implements SenderInterface
$messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
}
/** @var AmazonSqsXrayTraceHeaderStamp|null $amazonSqsXrayTraceHeaderStamp */
$amazonSqsXrayTraceHeaderStamp = $envelope->last(AmazonSqsXrayTraceHeaderStamp::class);
if (null !== $amazonSqsXrayTraceHeaderStamp) {
$xrayTraceId = $amazonSqsXrayTraceHeaderStamp->getTraceId();
}
try {
$this->connection->send(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$messageGroupId,
$messageDeduplicationId
$messageDeduplicationId,
$xrayTraceId
);
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);

View File

@ -0,0 +1,29 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
final class AmazonSqsXrayTraceHeaderStamp implements NonSendableStampInterface
{
private $traceId;
public function __construct(string $traceId)
{
$this->traceId = $traceId;
}
public function getTraceId(): string
{
return $this->traceId;
}
}

View File

@ -11,10 +11,12 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Sqs\Enum\MessageSystemAttributeName;
use AsyncAws\Sqs\Enum\QueueAttributeName;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\MessageAttributeValue;
use AsyncAws\Sqs\ValueObject\MessageSystemAttributeValue;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
@ -312,7 +314,7 @@ class Connection
return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0);
}
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null, ?string $xrayTraceId = null): void
{
if ($this->configuration['auto_setup']) {
$this->setup();
@ -323,6 +325,7 @@ class Connection
'MessageBody' => $body,
'DelaySeconds' => $delay,
'MessageAttributes' => [],
'MessageSystemAttributes' => [],
];
$specialHeaders = [];
@ -346,6 +349,13 @@ class Connection
]);
}
if (null !== $xrayTraceId) {
$parameters['MessageSystemAttributes'][MessageSystemAttributeName::AWSTRACE_HEADER] = new MessageSystemAttributeValue([
'DataType' => 'String',
'StringValue' => $xrayTraceId,
]);
}
if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));