diff --git a/.travis.yml b/.travis.yml index 64c8c343e5..3bf1dd5967 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,7 @@ env: - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages - MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages - MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable + - MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable - SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1 matrix: diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md index cf996bb4f6..61d7a11dc2 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md @@ -5,4 +5,4 @@ CHANGELOG ----- * Introduced the Amazon SQS bridge. - + * Added FIFO support to the SQS transport diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php index cd398edcbf..645175b402 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php @@ -17,26 +17,35 @@ use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; class AmazonSqsIntegrationTest extends TestCase { - private $connection; + public function testConnectionSendToFifoQueueAndGet(): void + { + if (!getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')) { + $this->markTestSkipped('The "MESSENGER_SQS_FIFO_QUEUE_DSN" environment variable is required.'); + } - protected function setUp(): void + $this->execute(getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')); + } + + public function testConnectionSendAndGet(): void { if (!getenv('MESSENGER_SQS_DSN')) { $this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.'); } - $this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []); - $this->connection->setup(); - $this->clearSqs(); + $this->execute(getenv('MESSENGER_SQS_DSN')); } - public function testConnectionSendAndGet() + private function execute(string $dsn): void { - $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); - $this->assertSame(1, $this->connection->getMessageCount()); + $connection = Connection::fromDsn($dsn, []); + $connection->setup(); + $this->clearSqs($connection); + + $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + $this->assertSame(1, $connection->getMessageCount()); $wait = 0; - while ((null === $encoded = $this->connection->get()) && $wait++ < 200) { + while ((null === $encoded = $connection->get()) && $wait++ < 200) { usleep(5000); } @@ -44,15 +53,15 @@ class AmazonSqsIntegrationTest extends TestCase $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); } - private function clearSqs() + private function clearSqs(Connection $connection): void { $wait = 0; while ($wait++ < 50) { - if (null === $message = $this->connection->get()) { + if (null === $message = $connection->get()) { usleep(5000); continue; } - $this->connection->delete($message['id']); + $connection->delete($message['id']); } } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php index f0a1178d41..412faf0807 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; use Symfony\Component\Messenger\Envelope; @@ -20,7 +21,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; class AmazonSqsSenderTest extends TestCase { - public function testSend() + public function testSend(): void { $envelope = new Envelope(new DummyMessage('Oy')); $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; @@ -36,4 +37,24 @@ class AmazonSqsSenderTest extends TestCase $sender = new AmazonSqsSender($connection, $serializer); $sender->send($envelope); } + + public function testSendWithAmazonSqsFifoStamp(): void + { + $envelope = (new Envelope(new DummyMessage('Oy'))) + ->with($stamp = new AmazonSqsFifoStamp('testGroup', 'testDeduplicationId')); + + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $connection->expects($this->once())->method('send') + ->with($encoded['body'], $encoded['headers'], 0, $stamp->getMessageGroupId(), $stamp->getMessageDeduplicationId()); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new AmazonSqsSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsFifoStamp.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsFifoStamp.php new file mode 100644 index 0000000000..f904a0ed0b --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsFifoStamp.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; + +final class AmazonSqsFifoStamp implements NonSendableStampInterface +{ + private $messageGroupId; + + private $messageDeduplicationId; + + public function __construct(?string $messageGroupId = null, ?string $messageDeduplicationId = null) + { + $this->messageGroupId = $messageGroupId; + $this->messageDeduplicationId = $messageDeduplicationId; + } + + public function getMessageGroupId(): ?string + { + return $this->messageGroupId; + } + + public function getMessageDeduplicationId(): ?string + { + return $this->messageDeduplicationId; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php index 146cacf6d0..12b8a369cc 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php @@ -43,8 +43,24 @@ class AmazonSqsSender implements SenderInterface $delayStamp = $envelope->last(DelayStamp::class); $delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0; + $messageGroupId = null; + $messageDeduplicationId = null; + + /** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */ + $amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class); + if (null !== $amazonSqsFifoStamp) { + $messageGroupId = $amazonSqsFifoStamp->getMessageGroupId(); + $messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId(); + } + try { - $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); + $this->connection->send( + $encodedMessage['body'], + $encodedMessage['headers'] ?? [], + $delay, + $messageGroupId, + $messageDeduplicationId + ); } catch (HttpExceptionInterface $e) { throw new TransportException($e->getMessage(), 0, $e); } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index abd636e5c0..f65d28d148 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -27,6 +27,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface; */ class Connection { + private const AWS_SQS_FIFO_SUFFIX = '.fifo'; + private const DEFAULT_OPTIONS = [ 'buffer_size' => 9, 'wait_time' => 20, @@ -196,10 +198,16 @@ class Connection public function setup(): void { - $this->call($this->configuration['endpoint'], [ + $parameters = [ 'Action' => 'CreateQueue', 'QueueName' => $this->configuration['queue_name'], - ]); + ]; + + if ($this->isFifoQueue($this->configuration['queue_name'])) { + $parameters['FifoQueue'] = true; + } + + $this->call($this->configuration['endpoint'], $parameters); $this->queueUrl = null; $this->configuration['auto_setup'] = false; @@ -232,17 +240,26 @@ class Connection return 0; } - public function send(string $body, array $headers, int $delay = 0): void + public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void { if ($this->configuration['auto_setup']) { $this->setup(); } - $this->call($this->getQueueUrl(), [ + $messageBody = json_encode(['body' => $body, 'headers' => $headers]); + + $parameters = [ 'Action' => 'SendMessage', - 'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]), + 'MessageBody' => $messageBody, 'DelaySeconds' => $delay, - ]); + ]; + + if ($this->isFifoQueue($this->configuration['queue_name'])) { + $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; + $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody); + } + + $this->call($this->getQueueUrl(), $parameters); } public function reset(): void @@ -362,4 +379,9 @@ class Connection throw new TransportException($error->Error->Message); } } + + private function isFifoQueue(string $queueName): bool + { + return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX)); + } }