diff --git a/composer.json b/composer.json index afe59f04f7..1de26a2e5b 100644 --- a/composer.json +++ b/composer.json @@ -105,6 +105,7 @@ "amphp/http-client": "^4.2", "amphp/http-tunnel": "^1.0", "async-aws/ses": "^1.0", + "async-aws/sqs": "^1.0", "cache/integration-tests": "dev-master", "doctrine/annotations": "~1.0", "doctrine/cache": "~1.6", diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php index 645175b402..251c821a07 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; +use AsyncAws\Sqs\SqsClient; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; @@ -39,7 +40,7 @@ class AmazonSqsIntegrationTest extends TestCase { $connection = Connection::fromDsn($dsn, []); $connection->setup(); - $this->clearSqs($connection); + $this->clearSqs($dsn); $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $this->assertSame(1, $connection->getMessageCount()); @@ -53,15 +54,12 @@ class AmazonSqsIntegrationTest extends TestCase $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); } - private function clearSqs(Connection $connection): void + private function clearSqs(string $dsn): void { - $wait = 0; - while ($wait++ < 50) { - if (null === $message = $connection->get()) { - usleep(5000); - continue; - } - $connection->delete($message['id']); - } + $url = parse_url($dsn); + $client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]); + $client->purgeQueue([ + 'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(), + ]); } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php index f6d295c275..b1bc86cc86 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php @@ -11,11 +11,15 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; +use AsyncAws\Core\Exception\Http\HttpException; +use AsyncAws\Core\Test\ResultMockFactory; +use AsyncAws\Sqs\Result\GetQueueUrlResult; +use AsyncAws\Sqs\Result\ReceiveMessageResult; +use AsyncAws\Sqs\SqsClient; +use AsyncAws\Sqs\ValueObject\Message; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; -use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Contracts\HttpClient\HttpClientInterface; -use Symfony\Contracts\HttpClient\ResponseInterface; class ConnectionTest extends TestCase { @@ -31,7 +35,7 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient), + new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), Connection::fromDsn('sqs://default/queue', [], $httpClient) ); } @@ -40,8 +44,8 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient), - Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient) + new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), + Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient) ); } @@ -49,7 +53,7 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient), + new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), Connection::fromDsn('sqs://localhost/queue', [], $httpClient) ); } @@ -58,7 +62,7 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient), + new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient) ); } @@ -67,7 +71,7 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), + new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient) ); } @@ -76,153 +80,63 @@ class ConnectionTest extends TestCase { $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $this->assertEquals( - new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), + new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient) ); } - private function handleGetQueueUrl(int $index, $mock): string - { - $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); - - $mock->expects($this->at($index))->method('request') - ->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']]) - ->willReturn($response); - $response->expects($this->once())->method('getStatusCode')->willReturn(200); - $response->expects($this->once())->method('getContent')->willReturn(' - - https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue - - - 470a6f13-2ed9-4181-ad8a-2fdea142988e - - '); - - return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'; - } - public function testKeepGettingPendingMessages() { - $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); - $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); + $client = $this->createMock(SqsClient::class); + $client->expects($this->any()) + ->method('getQueueUrl') + ->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123]) + ->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'])); + $client->expects($this->at(1)) + ->method('receiveMessage') + ->with([ + 'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue', + 'MaxNumberOfMessages' => 9, + 'WaitTimeSeconds' => 20, + 'MessageAttributeNames' => ['All'], + 'VisibilityTimeout' => null, + ]) + ->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [ + new Message(['MessageId' => 1, 'Body' => 'this is a test']), + new Message(['MessageId' => 2, 'Body' => 'this is a test']), + new Message(['MessageId' => 3, 'Body' => 'this is a test']), + ]])); + $client->expects($this->at(2)) + ->method('receiveMessage') + ->with([ + 'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue', + 'MaxNumberOfMessages' => 9, + 'WaitTimeSeconds' => 20, + 'MessageAttributeNames' => ['All'], + 'VisibilityTimeout' => null, + ]) + ->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [ + ]])); - $queueUrl = $this->handleGetQueueUrl(0, $httpClient); - - $httpClient->expects($this->at(1))->method('request') - ->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']]) - ->willReturn($response); - $response->expects($this->once())->method('getContent')->willReturn(' - - - 5fea7756-0ea4-451a-a703-a558b933e274 - - MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw - Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE - auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= - - fafb00f5732ab283681e124bf8747ed1 - {"body":"this is a test","headers":{}} - - SenderId - 195004372649 - - - SentTimestamp - 1238099229000 - - - ApproximateReceiveCount - 5 - - - ApproximateFirstReceiveTimestamp - 1250700979248 - - - - 5fea7756-0ea4-451a-a703-a558b933e274 - - MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw - Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE - auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= - - fafb00f5732ab283681e124bf8747ed1 - {"body":"this is a test","headers":{}} - - SenderId - 195004372649 - - - SentTimestamp - 1238099229000 - - - ApproximateReceiveCount - 5 - - - ApproximateFirstReceiveTimestamp - 1250700979248 - - - - 5fea7756-0ea4-451a-a703-a558b933e274 - - MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw - Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE - auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= - - fafb00f5732ab283681e124bf8747ed1 - {"body":"this is a test","headers":{}} - - SenderId - 195004372649 - - - SentTimestamp - 1238099229000 - - - ApproximateReceiveCount - 5 - - - ApproximateFirstReceiveTimestamp - 1250700979248 - - - - - b6633655-283d-45b4-aee4-4e84e0ae6afa - - '); - - $connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient); + $connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client); $this->assertNotNull($connection->get()); $this->assertNotNull($connection->get()); $this->assertNotNull($connection->get()); + $this->assertNull($connection->get()); } public function testUnexpectedSqsError() { - $this->expectException(TransportException::class); + $this->expectException(HttpException::class); $this->expectExceptionMessage('SQS error happens'); - $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); - $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); + $client = $this->createMock(SqsClient::class); + $client->expects($this->any()) + ->method('getQueueUrl') + ->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123]) + ->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens')); - $httpClient->expects($this->once())->method('request')->willReturn($response); - $response->expects($this->once())->method('getStatusCode')->willReturn(400); - $response->expects($this->once())->method('getContent')->willReturn(' - - Sender - boom - SQS error happens - - - 30441e49-5246-5231-9c87-4bd704b81ce9 - '); - $connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient); + $connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client); $connection->get(); } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php index f8ac81034f..89dcf0627c 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; +use AsyncAws\Core\Exception\Http\HttpException; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; @@ -19,7 +20,6 @@ use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; -use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface; /** * @author Jérémy Derussé @@ -42,7 +42,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface { try { $sqsEnvelope = $this->connection->get(); - } catch (HttpExceptionInterface $e) { + } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } if (null === $sqsEnvelope) { @@ -70,7 +70,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface { try { $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); - } catch (HttpExceptionInterface $e) { + } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } } @@ -82,7 +82,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface { try { $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); - } catch (HttpExceptionInterface $e) { + } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } } @@ -94,7 +94,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface { try { return $this->connection->getMessageCount(); - } catch (HttpExceptionInterface $e) { + } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php index 12b8a369cc..5bb584dd07 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php @@ -11,12 +11,12 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; +use AsyncAws\Core\Exception\Http\HttpException; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; -use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface; /** * @author Jérémy Derussé @@ -61,7 +61,7 @@ class AmazonSqsSender implements SenderInterface $messageGroupId, $messageDeduplicationId ); - } catch (HttpExceptionInterface $e) { + } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php index 6560b937f1..ebca00d3bd 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php @@ -11,7 +11,9 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; +use AsyncAws\Core\Exception\Http\HttpException; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\SetupableTransportInterface; @@ -71,12 +73,20 @@ class AmazonSqsTransport implements TransportInterface, SetupableTransportInterf */ public function setup(): void { - $this->connection->setup(); + try { + $this->connection->setup(); + } catch (HttpException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } } public function reset() { - $this->connection->reset(); + try { + $this->connection->reset(); + } catch (HttpException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } } private function getReceiver(): AmazonSqsReceiver diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index fe13dba98b..5e551c0fdf 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -11,11 +11,13 @@ namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; -use Symfony\Component\HttpClient\HttpClient; +use AsyncAws\Sqs\Enum\QueueAttributeName; +use AsyncAws\Sqs\Result\ReceiveMessageResult; +use AsyncAws\Sqs\SqsClient; +use AsyncAws\Sqs\ValueObject\MessageAttributeValue; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Contracts\HttpClient\HttpClientInterface; -use Symfony\Contracts\HttpClient\ResponseInterface; /** * A SQS connection. @@ -46,17 +48,17 @@ class Connection private $configuration; private $client; - /** @var ResponseInterface */ + /** @var ReceiveMessageResult */ private $currentResponse; /** @var array[] */ private $buffer = []; /** @var string|null */ private $queueUrl; - public function __construct(array $configuration, HttpClientInterface $client = null) + public function __construct(array $configuration, SqsClient $client = null) { $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); - $this->client = $client ?? HttpClient::create(); + $this->client = $client ?? new SqsClient([]); } public function __destruct() @@ -93,22 +95,24 @@ class Connection } $configuration = [ - 'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']), 'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']), 'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']), 'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']), 'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']), 'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']), - 'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']), - 'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']), ]; - if ('default' === ($parsedUrl['host'] ?? 'default')) { - $configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']); - } else { - $configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : ''); - if (preg_match(';sqs.(.+).amazonaws.com;', $parsedUrl['host'], $matches)) { - $configuration['region'] = $matches[1]; + $clientConfiguration = [ + 'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']), + 'accessKeyId' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']), + 'accessKeySecret' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']), + ]; + unset($query['region']); + + if ('default' !== ($parsedUrl['host'] ?? 'default')) { + $clientConfiguration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : ''); + if (preg_match(';^sqs\.([^\.]++)\.amazonaws\.com$;', $parsedUrl['host'], $matches)) { + $clientConfiguration['region'] = $matches[1]; } unset($query['sslmode']); } @@ -131,7 +135,7 @@ class Connection throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); } - return new self($configuration, $client); + return new self($configuration, new SqsClient($clientConfiguration, null, $client)); } public function get(): ?array @@ -172,82 +176,99 @@ class Connection private function getNewMessages(): \Generator { if (null === $this->currentResponse) { - $this->currentResponse = $this->request($this->getQueueUrl(), [ - 'Action' => 'ReceiveMessage', + $this->currentResponse = $this->client->receiveMessage([ + 'QueueUrl' => $this->getQueueUrl(), 'VisibilityTimeout' => $this->configuration['visibility_timeout'], 'MaxNumberOfMessages' => $this->configuration['buffer_size'], - 'MessageAttributeName.1' => 'All', + 'MessageAttributeNames' => ['All'], 'WaitTimeSeconds' => $this->configuration['wait_time'], ]); } - if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) { + if (!$this->fetchMessage()) { return; } - $xml = new \SimpleXMLElement($this->currentResponse->getContent()); - foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) { + yield from $this->getPendingMessages(); + } + + private function fetchMessage(): bool + { + if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) { + return false; + } + + foreach ($this->currentResponse->getMessages() as $message) { $headers = []; - foreach ($xmlMessage->MessageAttribute as $item) { - if ('String' !== (string) $item->Value->DataType) { + foreach ($message->getMessageAttributes() as $name => $attribute) { + if ('String' !== $attribute->getDataType()) { continue; } - $headers[(string) $item->Name] = (string) $item->Value->StringValue; + + $headers[$name] = $attribute->getStringValue(); } + $this->buffer[] = [ - 'id' => (string) $xmlMessage->ReceiptHandle, - 'body' => (string) $xmlMessage->Body, + 'id' => $message->getReceiptHandle(), + 'body' => $message->getBody(), 'headers' => $headers, ]; } $this->currentResponse = null; - yield from $this->getPendingMessages(); + return true; } public function setup(): void { - $parameters = [ - 'Action' => 'CreateQueue', + // Set to false to disable setup more than once + $this->configuration['auto_setup'] = false; + if ($this->client->queueExists([ 'QueueName' => $this->configuration['queue_name'], - ]; + 'QueueOwnerAWSAccountId' => $this->configuration['account'], + ])->isSuccess()) { + return; + } - if ($this->isFifoQueue($this->configuration['queue_name'])) { + if (null !== $this->configuration['account']) { + throw new InvalidArgumentException(sprintf('The Amazon SQS queue "%s" does not exists (or you don\'t have permissions on it), and can\'t be created when an account is provided.', $this->configuration['queue_name'])); + } + + $parameters = ['QueueName' => $this->configuration['queue_name']]; + + if (self::isFifoQueue($this->configuration['queue_name'])) { $parameters['FifoQueue'] = true; } - $this->call($this->configuration['endpoint'], $parameters); + $this->client->createQueue($parameters); + $exists = $this->client->queueExists(['QueueName' => $this->configuration['queue_name']]); + // Blocking call to wait for the queue to be created + $exists->wait(); + if (!$exists->isSuccess()) { + throw new TransportException(sprintf('Failed to crate the Amazon SQS queue "%s".', $this->configuration['queue_name'])); + } $this->queueUrl = null; - - $this->configuration['auto_setup'] = false; } public function delete(string $id): void { - $this->call($this->getQueueUrl(), [ - 'Action' => 'DeleteMessage', + $this->client->deleteMessage([ + 'QueueUrl' => $this->getQueueUrl(), 'ReceiptHandle' => $id, ]); } public function getMessageCount(): int { - $response = $this->request($this->getQueueUrl(), [ - 'Action' => 'GetQueueAttributes', - 'AttributeNames' => ['ApproximateNumberOfMessages'], + $response = $this->client->getQueueAttributes([ + 'QueueUrl' => $this->getQueueUrl(), + 'AttributeNames' => [QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES], ]); - $this->checkResponse($response); - $xml = new \SimpleXMLElement($response->getContent()); - foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) { - if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) { - continue; - } - return (int) $attribute->Value; - } + $attributes = $response->getAttributes(); - return 0; + return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0); } public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void @@ -257,36 +278,40 @@ class Connection } $parameters = [ - 'Action' => 'SendMessage', + 'QueueUrl' => $this->getQueueUrl(), 'MessageBody' => $body, 'DelaySeconds' => $delay, + 'MessageAttributes' => [], ]; - $index = 0; foreach ($headers as $name => $value) { - ++$index; - $parameters["MessageAttribute.$index.Name"] = $name; - $parameters["MessageAttribute.$index.Value.DataType"] = 'String'; - $parameters["MessageAttribute.$index.Value.StringValue"] = $value; + $parameters['MessageAttributes'][$name] = new MessageAttributeValue([ + 'DataType' => 'String', + 'StringValue' => $value, + ]); } - if ($this->isFifoQueue($this->configuration['queue_name'])) { + if (self::isFifoQueue($this->configuration['queue_name'])) { $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers])); } - $this->call($this->getQueueUrl(), $parameters); + $this->client->sendMessage($parameters); } public function reset(): void { if (null !== $this->currentResponse) { - $this->currentResponse->cancel(); + // fetch current response in order to requeue in transit messages + if (!$this->fetchMessage()) { + $this->currentResponse->cancel(); + $this->currentResponse = null; + } } foreach ($this->getPendingMessages() as $message) { - $this->call($this->getQueueUrl(), [ - 'Action' => 'ChangeMessageVisibility', + $this->client->changeMessageVisibility([ + 'QueueUrl' => $this->getQueueUrl(), 'ReceiptHandle' => $message['id'], 'VisibilityTimeout' => 0, ]); @@ -295,108 +320,17 @@ class Connection private function getQueueUrl(): string { - if (null === $this->queueUrl) { - $parameters = [ - 'Action' => 'GetQueueUrl', - 'QueueName' => $this->configuration['queue_name'], - ]; - if (isset($this->configuration['account'])) { - $parameters['QueueOwnerAWSAccountId'] = $this->configuration['account']; - } - - $response = $this->request($this->configuration['endpoint'], $parameters); - $this->checkResponse($response); - $xml = new \SimpleXMLElement($response->getContent()); - - $this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl; + if (null !== $this->queueUrl) { + return $this->queueUrl; } - return $this->queueUrl; + return $this->queueUrl = $this->client->getQueueUrl([ + 'QueueName' => $this->configuration['queue_name'], + 'QueueOwnerAWSAccountId' => $this->configuration['account'], + ])->getQueueUrl(); } - private function call(string $endpoint, array $body): void - { - $this->checkResponse($this->request($endpoint, $body)); - } - - private function request(string $endpoint, array $body): ResponseInterface - { - if (!$this->configuration['access_key']) { - return $this->client->request('POST', $endpoint, ['body' => $body]); - } - - $region = $this->configuration['region']; - $service = 'sqs'; - - $method = 'POST'; - $requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738); - $amzDate = gmdate('Ymd\THis\Z'); - $parsedUrl = parse_url($endpoint); - - $headers = [ - 'host' => $parsedUrl['host'].($parsedUrl['port'] ? ':'.$parsedUrl['port'] : ''), - 'x-amz-date' => $amzDate, - 'content-type' => 'application/x-www-form-urlencoded', - ]; - - $signedHeaders = ['host', 'x-amz-date']; - $canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string { - return sprintf('%s:%s', $headerName, $headers[$headerName]); - }, $signedHeaders))."\n"; - - $canonicalRequest = implode("\n", [ - $method, - $parsedUrl['path'] ?? '/', - '', - $canonicalHeaders, - implode(';', $signedHeaders), - hash('sha256', $requestParameters), - ]); - - $algorithm = 'AWS4-HMAC-SHA256'; - $credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request']; - - $signingKey = 'AWS4'.$this->configuration['secret_key']; - foreach ($credentialScope as $credential) { - $signingKey = hash_hmac('sha256', $credential, $signingKey, true); - } - - $stringToSign = implode("\n", [ - $algorithm, - $amzDate, - implode('/', $credentialScope), - hash('sha256', $canonicalRequest), - ]); - - $authorizationHeader = sprintf( - '%s Credential=%s/%s, SignedHeaders=%s, Signature=%s', - $algorithm, - $this->configuration['access_key'], - implode('/', $credentialScope), - implode(';', $signedHeaders), - hash_hmac('sha256', $stringToSign, $signingKey) - ); - - $options = [ - 'headers' => $headers + [ - 'authorization' => $authorizationHeader, - ], - 'body' => $requestParameters, - ]; - - return $this->client->request($method, $endpoint, $options); - } - - private function checkResponse(ResponseInterface $response): void - { - if (200 !== $response->getStatusCode()) { - $error = new \SimpleXMLElement($response->getContent(false)); - - throw new TransportException($error->Error->Message); - } - } - - private function isFifoQueue(string $queueName): bool + private static function isFifoQueue(string $queueName): bool { return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX)); } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json index a6758c1300..9e678cf6a8 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json @@ -17,7 +17,7 @@ ], "require": { "php": "^7.2.5", - "symfony/http-client": "^4.3|5.0", + "async-aws/sqs": "^1.0", "symfony/messenger": "^4.3|^5.0", "symfony/service-contracts": "^1.1|^2" },