From 5fa7ff95413ef18df5ed484a4aa961dbee086cae Mon Sep 17 00:00:00 2001 From: Pavel Kirpitsov Date: Tue, 6 Apr 2021 12:45:46 +0300 Subject: [PATCH] [Messenger] Added X-Ray trace header support to the SQS transport --- .../Messenger/Bridge/AmazonSqs/CHANGELOG.md | 1 + .../Tests/Transport/AmazonSqsSenderTest.php | 19 ++++++++++++ .../AmazonSqs/Transport/AmazonSqsSender.php | 10 ++++++- .../AmazonSqsXrayTraceHeaderStamp.php | 29 +++++++++++++++++++ .../Bridge/AmazonSqs/Transport/Connection.php | 12 +++++++- 5 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsXrayTraceHeaderStamp.php diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md index fbe4fce5d0..506449162c 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md @@ -6,6 +6,7 @@ CHANGELOG * Added new `debug` option to log HTTP requests and responses. * Allowed for receiver & sender injection into AmazonSqsTransport + * Add X-Ray trace header support to the SQS transport 5.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php index 10ebae60f6..a3269841e4 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php @@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -53,4 +54,22 @@ class AmazonSqsSenderTest extends TestCase $sender = new AmazonSqsSender($connection, $serializer); $sender->send($envelope); } + + public function testSendWithAmazonSqsXrayTraceHeaderStamp() + { + $envelope = (new Envelope(new DummyMessage('Oy'))) + ->with($stamp = new AmazonSqsXrayTraceHeaderStamp('traceHeader')); + + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('send') + ->with($encoded['body'], $encoded['headers'], 0, null, null, $stamp->getTraceId()); + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new AmazonSqsSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php index 5bb584dd07..1994313720 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php @@ -45,6 +45,7 @@ class AmazonSqsSender implements SenderInterface $messageGroupId = null; $messageDeduplicationId = null; + $xrayTraceId = null; /** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */ $amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class); @@ -53,13 +54,20 @@ class AmazonSqsSender implements SenderInterface $messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId(); } + /** @var AmazonSqsXrayTraceHeaderStamp|null $amazonSqsXrayTraceHeaderStamp */ + $amazonSqsXrayTraceHeaderStamp = $envelope->last(AmazonSqsXrayTraceHeaderStamp::class); + if (null !== $amazonSqsXrayTraceHeaderStamp) { + $xrayTraceId = $amazonSqsXrayTraceHeaderStamp->getTraceId(); + } + try { $this->connection->send( $encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $messageGroupId, - $messageDeduplicationId + $messageDeduplicationId, + $xrayTraceId ); } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsXrayTraceHeaderStamp.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsXrayTraceHeaderStamp.php new file mode 100644 index 0000000000..fb5e33ca67 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsXrayTraceHeaderStamp.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; + +final class AmazonSqsXrayTraceHeaderStamp implements NonSendableStampInterface +{ + private $traceId; + + public function __construct(string $traceId) + { + $this->traceId = $traceId; + } + + public function getTraceId(): string + { + return $this->traceId; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index b6bc8306c9..3872d39d53 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -11,10 +11,12 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; +use AsyncAws\Sqs\Enum\MessageSystemAttributeName; use AsyncAws\Sqs\Enum\QueueAttributeName; use AsyncAws\Sqs\Result\ReceiveMessageResult; use AsyncAws\Sqs\SqsClient; use AsyncAws\Sqs\ValueObject\MessageAttributeValue; +use AsyncAws\Sqs\ValueObject\MessageSystemAttributeValue; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; @@ -312,7 +314,7 @@ class Connection return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0); } - public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void + public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null, ?string $xrayTraceId = null): void { if ($this->configuration['auto_setup']) { $this->setup(); @@ -323,6 +325,7 @@ class Connection 'MessageBody' => $body, 'DelaySeconds' => $delay, 'MessageAttributes' => [], + 'MessageSystemAttributes' => [], ]; $specialHeaders = []; @@ -346,6 +349,13 @@ class Connection ]); } + if (null !== $xrayTraceId) { + $parameters['MessageSystemAttributes'][MessageSystemAttributeName::AWSTRACE_HEADER] = new MessageSystemAttributeValue([ + 'DataType' => 'String', + 'StringValue' => $xrayTraceId, + ]); + } + if (self::isFifoQueue($this->configuration['queue_name'])) { $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));