feature #36431 [Messenger] Add FIFO support to the SQS transport (cv65kr)

This PR was squashed before being merged into the 5.1-dev branch.

Discussion
----------

[Messenger] Add FIFO support to the SQS transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | no
| License       | MIT
| Doc PR        | --

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html

Commits
-------

37601753f1 [Messenger] Add FIFO support to the SQS transport
This commit is contained in:
Fabien Potencier 2020-04-17 05:34:16 +02:00
commit 67948a7ffb
7 changed files with 127 additions and 21 deletions

View File

@ -22,6 +22,7 @@ env:
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
matrix:

View File

@ -5,4 +5,4 @@ CHANGELOG
-----
* Introduced the Amazon SQS bridge.
* Added FIFO support to the SQS transport

View File

@ -17,26 +17,35 @@ use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
class AmazonSqsIntegrationTest extends TestCase
{
private $connection;
public function testConnectionSendToFifoQueueAndGet(): void
{
if (!getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_FIFO_QUEUE_DSN" environment variable is required.');
}
protected function setUp(): void
$this->execute(getenv('MESSENGER_SQS_FIFO_QUEUE_DSN'));
}
public function testConnectionSendAndGet(): void
{
if (!getenv('MESSENGER_SQS_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
}
$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
$this->connection->setup();
$this->clearSqs();
$this->execute(getenv('MESSENGER_SQS_DSN'));
}
public function testConnectionSendAndGet()
private function execute(string $dsn): void
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $this->connection->getMessageCount());
$connection = Connection::fromDsn($dsn, []);
$connection->setup();
$this->clearSqs($connection);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $connection->getMessageCount());
$wait = 0;
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
while ((null === $encoded = $connection->get()) && $wait++ < 200) {
usleep(5000);
}
@ -44,15 +53,15 @@ class AmazonSqsIntegrationTest extends TestCase
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
private function clearSqs()
private function clearSqs(Connection $connection): void
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $this->connection->get()) {
if (null === $message = $connection->get()) {
usleep(5000);
continue;
}
$this->connection->delete($message['id']);
$connection->delete($message['id']);
}
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
@ -20,7 +21,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class AmazonSqsSenderTest extends TestCase
{
public function testSend()
public function testSend(): void
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
@ -36,4 +37,24 @@ class AmazonSqsSenderTest extends TestCase
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
public function testSendWithAmazonSqsFifoStamp(): void
{
$envelope = (new Envelope(new DummyMessage('Oy')))
->with($stamp = new AmazonSqsFifoStamp('testGroup', 'testDeduplicationId'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')
->with($encoded['body'], $encoded['headers'], 0, $stamp->getMessageGroupId(), $stamp->getMessageDeduplicationId());
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
final class AmazonSqsFifoStamp implements NonSendableStampInterface
{
private $messageGroupId;
private $messageDeduplicationId;
public function __construct(?string $messageGroupId = null, ?string $messageDeduplicationId = null)
{
$this->messageGroupId = $messageGroupId;
$this->messageDeduplicationId = $messageDeduplicationId;
}
public function getMessageGroupId(): ?string
{
return $this->messageGroupId;
}
public function getMessageDeduplicationId(): ?string
{
return $this->messageDeduplicationId;
}
}

View File

@ -43,8 +43,24 @@ class AmazonSqsSender implements SenderInterface
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;
$messageGroupId = null;
$messageDeduplicationId = null;
/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
if (null !== $amazonSqsFifoStamp) {
$messageGroupId = $amazonSqsFifoStamp->getMessageGroupId();
$messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
}
try {
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
$this->connection->send(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$messageGroupId,
$messageDeduplicationId
);
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

View File

@ -27,6 +27,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface;
*/
class Connection
{
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
'wait_time' => 20,
@ -196,10 +198,16 @@ class Connection
public function setup(): void
{
$this->call($this->configuration['endpoint'], [
$parameters = [
'Action' => 'CreateQueue',
'QueueName' => $this->configuration['queue_name'],
]);
];
if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['FifoQueue'] = true;
}
$this->call($this->configuration['endpoint'], $parameters);
$this->queueUrl = null;
$this->configuration['auto_setup'] = false;
@ -232,17 +240,26 @@ class Connection
return 0;
}
public function send(string $body, array $headers, int $delay = 0): void
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$this->call($this->getQueueUrl(), [
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
$parameters = [
'Action' => 'SendMessage',
'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
'MessageBody' => $messageBody,
'DelaySeconds' => $delay,
]);
];
if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
}
$this->call($this->getQueueUrl(), $parameters);
}
public function reset(): void
@ -362,4 +379,9 @@ class Connection
throw new TransportException($error->Error->Message);
}
}
private function isFifoQueue(string $queueName): bool
{
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
}
}