[AmazonSqsMessenger] Use AsyncAws to handle SQS communication

This commit is contained in:
Jérémy Derussé 2020-03-16 13:18:25 +01:00 committed by Fabien Potencier
parent eebb3efa2c
commit 7c4888eed1
8 changed files with 172 additions and 315 deletions

View File

@ -105,6 +105,7 @@
"amphp/http-client": "^4.2", "amphp/http-client": "^4.2",
"amphp/http-tunnel": "^1.0", "amphp/http-tunnel": "^1.0",
"async-aws/ses": "^1.0", "async-aws/ses": "^1.0",
"async-aws/sqs": "^1.0",
"cache/integration-tests": "dev-master", "cache/integration-tests": "dev-master",
"doctrine/annotations": "~1.0", "doctrine/annotations": "~1.0",
"doctrine/cache": "~1.6", "doctrine/cache": "~1.6",

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use AsyncAws\Sqs\SqsClient;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@ -39,7 +40,7 @@ class AmazonSqsIntegrationTest extends TestCase
{ {
$connection = Connection::fromDsn($dsn, []); $connection = Connection::fromDsn($dsn, []);
$connection->setup(); $connection->setup();
$this->clearSqs($connection); $this->clearSqs($dsn);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $connection->getMessageCount()); $this->assertSame(1, $connection->getMessageCount());
@ -53,15 +54,12 @@ class AmazonSqsIntegrationTest extends TestCase
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
} }
private function clearSqs(Connection $connection): void private function clearSqs(string $dsn): void
{ {
$wait = 0; $url = parse_url($dsn);
while ($wait++ < 50) { $client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
if (null === $message = $connection->get()) { $client->purgeQueue([
usleep(5000); 'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
continue; ]);
}
$connection->delete($message['id']);
}
} }
} }

View File

@ -11,11 +11,15 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use AsyncAws\Core\Test\ResultMockFactory;
use AsyncAws\Sqs\Result\GetQueueUrlResult;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\Message;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface; use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
class ConnectionTest extends TestCase class ConnectionTest extends TestCase
{ {
@ -31,7 +35,7 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient), new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue', [], $httpClient) Connection::fromDsn('sqs://default/queue', [], $httpClient)
); );
} }
@ -40,8 +44,8 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient), new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient) Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
); );
} }
@ -49,7 +53,7 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient), new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost/queue', [], $httpClient) Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
); );
} }
@ -58,7 +62,7 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient), new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient) Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
); );
} }
@ -67,7 +71,7 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient) Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
); );
} }
@ -76,153 +80,63 @@ class ConnectionTest extends TestCase
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals( $this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient) Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
); );
} }
private function handleGetQueueUrl(int $index, $mock): string
{
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$mock->expects($this->at($index))->method('request')
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
<GetQueueUrlResult>
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
</GetQueueUrlResult>
<ResponseMetadata>
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
</ResponseMetadata>
</GetQueueUrlResponse>');
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
}
public function testKeepGettingPendingMessages() public function testKeepGettingPendingMessages()
{ {
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $client = $this->createMock(SqsClient::class);
$response = $this->getMockBuilder(ResponseInterface::class)->getMock(); $client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
$client->expects($this->at(1))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
]]));
$client->expects($this->at(2))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
]]));
$queueUrl = $this->handleGetQueueUrl(0, $httpClient); $connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
</ReceiveMessageResult>
<ResponseMetadata>
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
</ResponseMetadata>
</ReceiveMessageResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
$this->assertNotNull($connection->get()); $this->assertNotNull($connection->get());
$this->assertNotNull($connection->get()); $this->assertNotNull($connection->get());
$this->assertNotNull($connection->get()); $this->assertNotNull($connection->get());
$this->assertNull($connection->get());
} }
public function testUnexpectedSqsError() public function testUnexpectedSqsError()
{ {
$this->expectException(TransportException::class); $this->expectException(HttpException::class);
$this->expectExceptionMessage('SQS error happens'); $this->expectExceptionMessage('SQS error happens');
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); $client = $this->createMock(SqsClient::class);
$response = $this->getMockBuilder(ResponseInterface::class)->getMock(); $client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));
$httpClient->expects($this->once())->method('request')->willReturn($response); $connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<Error>
<Type>Sender</Type>
<Code>boom</Code>
<Message>SQS error happens</Message>
<Detail/>
</Error>
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
</ErrorResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
$connection->get(); $connection->get();
} }
} }

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@ -19,7 +20,6 @@ use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/** /**
* @author Jérémy Derussé <jeremy@derusse.com> * @author Jérémy Derussé <jeremy@derusse.com>
@ -42,7 +42,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{ {
try { try {
$sqsEnvelope = $this->connection->get(); $sqsEnvelope = $this->connection->get();
} catch (HttpExceptionInterface $e) { } catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
if (null === $sqsEnvelope) { if (null === $sqsEnvelope) {
@ -70,7 +70,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{ {
try { try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) { } catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
} }
@ -82,7 +82,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{ {
try { try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) { } catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
} }
@ -94,7 +94,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{ {
try { try {
return $this->connection->getMessageCount(); return $this->connection->getMessageCount();
} catch (HttpExceptionInterface $e) { } catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }
} }

View File

@ -11,12 +11,12 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/** /**
* @author Jérémy Derussé <jeremy@derusse.com> * @author Jérémy Derussé <jeremy@derusse.com>
@ -61,7 +61,7 @@ class AmazonSqsSender implements SenderInterface
$messageGroupId, $messageGroupId,
$messageDeduplicationId $messageDeduplicationId
); );
} catch (HttpExceptionInterface $e) { } catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e); throw new TransportException($e->getMessage(), 0, $e);
} }

View File

@ -11,7 +11,9 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface; use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@ -71,12 +73,20 @@ class AmazonSqsTransport implements TransportInterface, SetupableTransportInterf
*/ */
public function setup(): void public function setup(): void
{ {
$this->connection->setup(); try {
$this->connection->setup();
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
} }
public function reset() public function reset()
{ {
$this->connection->reset(); try {
$this->connection->reset();
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
} }
private function getReceiver(): AmazonSqsReceiver private function getReceiver(): AmazonSqsReceiver

View File

@ -11,11 +11,13 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\HttpClient\HttpClient; use AsyncAws\Sqs\Enum\QueueAttributeName;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\MessageAttributeValue;
use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface; use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/** /**
* A SQS connection. * A SQS connection.
@ -46,17 +48,17 @@ class Connection
private $configuration; private $configuration;
private $client; private $client;
/** @var ResponseInterface */ /** @var ReceiveMessageResult */
private $currentResponse; private $currentResponse;
/** @var array[] */ /** @var array[] */
private $buffer = []; private $buffer = [];
/** @var string|null */ /** @var string|null */
private $queueUrl; private $queueUrl;
public function __construct(array $configuration, HttpClientInterface $client = null) public function __construct(array $configuration, SqsClient $client = null)
{ {
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->client = $client ?? HttpClient::create(); $this->client = $client ?? new SqsClient([]);
} }
public function __destruct() public function __destruct()
@ -93,22 +95,24 @@ class Connection
} }
$configuration = [ $configuration = [
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']), 'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']), 'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']), 'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']), 'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']), 'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
]; ];
if ('default' === ($parsedUrl['host'] ?? 'default')) { $clientConfiguration = [
$configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']); 'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
} else { 'accessKeyId' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
$configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : ''); 'accessKeySecret' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
if (preg_match(';sqs.(.+).amazonaws.com;', $parsedUrl['host'], $matches)) { ];
$configuration['region'] = $matches[1]; unset($query['region']);
if ('default' !== ($parsedUrl['host'] ?? 'default')) {
$clientConfiguration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
if (preg_match(';^sqs\.([^\.]++)\.amazonaws\.com$;', $parsedUrl['host'], $matches)) {
$clientConfiguration['region'] = $matches[1];
} }
unset($query['sslmode']); unset($query['sslmode']);
} }
@ -131,7 +135,7 @@ class Connection
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
} }
return new self($configuration, $client); return new self($configuration, new SqsClient($clientConfiguration, null, $client));
} }
public function get(): ?array public function get(): ?array
@ -172,82 +176,99 @@ class Connection
private function getNewMessages(): \Generator private function getNewMessages(): \Generator
{ {
if (null === $this->currentResponse) { if (null === $this->currentResponse) {
$this->currentResponse = $this->request($this->getQueueUrl(), [ $this->currentResponse = $this->client->receiveMessage([
'Action' => 'ReceiveMessage', 'QueueUrl' => $this->getQueueUrl(),
'VisibilityTimeout' => $this->configuration['visibility_timeout'], 'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'], 'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MessageAttributeName.1' => 'All', 'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => $this->configuration['wait_time'], 'WaitTimeSeconds' => $this->configuration['wait_time'],
]); ]);
} }
if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) { if (!$this->fetchMessage()) {
return; return;
} }
$xml = new \SimpleXMLElement($this->currentResponse->getContent()); yield from $this->getPendingMessages();
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) { }
private function fetchMessage(): bool
{
if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) {
return false;
}
foreach ($this->currentResponse->getMessages() as $message) {
$headers = []; $headers = [];
foreach ($xmlMessage->MessageAttribute as $item) { foreach ($message->getMessageAttributes() as $name => $attribute) {
if ('String' !== (string) $item->Value->DataType) { if ('String' !== $attribute->getDataType()) {
continue; continue;
} }
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
$headers[$name] = $attribute->getStringValue();
} }
$this->buffer[] = [ $this->buffer[] = [
'id' => (string) $xmlMessage->ReceiptHandle, 'id' => $message->getReceiptHandle(),
'body' => (string) $xmlMessage->Body, 'body' => $message->getBody(),
'headers' => $headers, 'headers' => $headers,
]; ];
} }
$this->currentResponse = null; $this->currentResponse = null;
yield from $this->getPendingMessages(); return true;
} }
public function setup(): void public function setup(): void
{ {
$parameters = [ // Set to false to disable setup more than once
'Action' => 'CreateQueue', $this->configuration['auto_setup'] = false;
if ($this->client->queueExists([
'QueueName' => $this->configuration['queue_name'], 'QueueName' => $this->configuration['queue_name'],
]; 'QueueOwnerAWSAccountId' => $this->configuration['account'],
])->isSuccess()) {
return;
}
if ($this->isFifoQueue($this->configuration['queue_name'])) { if (null !== $this->configuration['account']) {
throw new InvalidArgumentException(sprintf('The Amazon SQS queue "%s" does not exists (or you don\'t have permissions on it), and can\'t be created when an account is provided.', $this->configuration['queue_name']));
}
$parameters = ['QueueName' => $this->configuration['queue_name']];
if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['FifoQueue'] = true; $parameters['FifoQueue'] = true;
} }
$this->call($this->configuration['endpoint'], $parameters); $this->client->createQueue($parameters);
$exists = $this->client->queueExists(['QueueName' => $this->configuration['queue_name']]);
// Blocking call to wait for the queue to be created
$exists->wait();
if (!$exists->isSuccess()) {
throw new TransportException(sprintf('Failed to crate the Amazon SQS queue "%s".', $this->configuration['queue_name']));
}
$this->queueUrl = null; $this->queueUrl = null;
$this->configuration['auto_setup'] = false;
} }
public function delete(string $id): void public function delete(string $id): void
{ {
$this->call($this->getQueueUrl(), [ $this->client->deleteMessage([
'Action' => 'DeleteMessage', 'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $id, 'ReceiptHandle' => $id,
]); ]);
} }
public function getMessageCount(): int public function getMessageCount(): int
{ {
$response = $this->request($this->getQueueUrl(), [ $response = $this->client->getQueueAttributes([
'Action' => 'GetQueueAttributes', 'QueueUrl' => $this->getQueueUrl(),
'AttributeNames' => ['ApproximateNumberOfMessages'], 'AttributeNames' => [QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES],
]); ]);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) {
if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) {
continue;
}
return (int) $attribute->Value; $attributes = $response->getAttributes();
}
return 0; return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0);
} }
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
@ -257,36 +278,40 @@ class Connection
} }
$parameters = [ $parameters = [
'Action' => 'SendMessage', 'QueueUrl' => $this->getQueueUrl(),
'MessageBody' => $body, 'MessageBody' => $body,
'DelaySeconds' => $delay, 'DelaySeconds' => $delay,
'MessageAttributes' => [],
]; ];
$index = 0;
foreach ($headers as $name => $value) { foreach ($headers as $name => $value) {
++$index; $parameters['MessageAttributes'][$name] = new MessageAttributeValue([
$parameters["MessageAttribute.$index.Name"] = $name; 'DataType' => 'String',
$parameters["MessageAttribute.$index.Value.DataType"] = 'String'; 'StringValue' => $value,
$parameters["MessageAttribute.$index.Value.StringValue"] = $value; ]);
} }
if ($this->isFifoQueue($this->configuration['queue_name'])) { if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers])); $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
} }
$this->call($this->getQueueUrl(), $parameters); $this->client->sendMessage($parameters);
} }
public function reset(): void public function reset(): void
{ {
if (null !== $this->currentResponse) { if (null !== $this->currentResponse) {
$this->currentResponse->cancel(); // fetch current response in order to requeue in transit messages
if (!$this->fetchMessage()) {
$this->currentResponse->cancel();
$this->currentResponse = null;
}
} }
foreach ($this->getPendingMessages() as $message) { foreach ($this->getPendingMessages() as $message) {
$this->call($this->getQueueUrl(), [ $this->client->changeMessageVisibility([
'Action' => 'ChangeMessageVisibility', 'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $message['id'], 'ReceiptHandle' => $message['id'],
'VisibilityTimeout' => 0, 'VisibilityTimeout' => 0,
]); ]);
@ -295,108 +320,17 @@ class Connection
private function getQueueUrl(): string private function getQueueUrl(): string
{ {
if (null === $this->queueUrl) { if (null !== $this->queueUrl) {
$parameters = [ return $this->queueUrl;
'Action' => 'GetQueueUrl',
'QueueName' => $this->configuration['queue_name'],
];
if (isset($this->configuration['account'])) {
$parameters['QueueOwnerAWSAccountId'] = $this->configuration['account'];
}
$response = $this->request($this->configuration['endpoint'], $parameters);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
$this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl;
} }
return $this->queueUrl; return $this->queueUrl = $this->client->getQueueUrl([
'QueueName' => $this->configuration['queue_name'],
'QueueOwnerAWSAccountId' => $this->configuration['account'],
])->getQueueUrl();
} }
private function call(string $endpoint, array $body): void private static function isFifoQueue(string $queueName): bool
{
$this->checkResponse($this->request($endpoint, $body));
}
private function request(string $endpoint, array $body): ResponseInterface
{
if (!$this->configuration['access_key']) {
return $this->client->request('POST', $endpoint, ['body' => $body]);
}
$region = $this->configuration['region'];
$service = 'sqs';
$method = 'POST';
$requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738);
$amzDate = gmdate('Ymd\THis\Z');
$parsedUrl = parse_url($endpoint);
$headers = [
'host' => $parsedUrl['host'].($parsedUrl['port'] ? ':'.$parsedUrl['port'] : ''),
'x-amz-date' => $amzDate,
'content-type' => 'application/x-www-form-urlencoded',
];
$signedHeaders = ['host', 'x-amz-date'];
$canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string {
return sprintf('%s:%s', $headerName, $headers[$headerName]);
}, $signedHeaders))."\n";
$canonicalRequest = implode("\n", [
$method,
$parsedUrl['path'] ?? '/',
'',
$canonicalHeaders,
implode(';', $signedHeaders),
hash('sha256', $requestParameters),
]);
$algorithm = 'AWS4-HMAC-SHA256';
$credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request'];
$signingKey = 'AWS4'.$this->configuration['secret_key'];
foreach ($credentialScope as $credential) {
$signingKey = hash_hmac('sha256', $credential, $signingKey, true);
}
$stringToSign = implode("\n", [
$algorithm,
$amzDate,
implode('/', $credentialScope),
hash('sha256', $canonicalRequest),
]);
$authorizationHeader = sprintf(
'%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
$algorithm,
$this->configuration['access_key'],
implode('/', $credentialScope),
implode(';', $signedHeaders),
hash_hmac('sha256', $stringToSign, $signingKey)
);
$options = [
'headers' => $headers + [
'authorization' => $authorizationHeader,
],
'body' => $requestParameters,
];
return $this->client->request($method, $endpoint, $options);
}
private function checkResponse(ResponseInterface $response): void
{
if (200 !== $response->getStatusCode()) {
$error = new \SimpleXMLElement($response->getContent(false));
throw new TransportException($error->Error->Message);
}
}
private function isFifoQueue(string $queueName): bool
{ {
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX)); return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
} }

View File

@ -17,7 +17,7 @@
], ],
"require": { "require": {
"php": "^7.2.5", "php": "^7.2.5",
"symfony/http-client": "^4.3|5.0", "async-aws/sqs": "^1.0",
"symfony/messenger": "^4.3|^5.0", "symfony/messenger": "^4.3|^5.0",
"symfony/service-contracts": "^1.1|^2" "symfony/service-contracts": "^1.1|^2"
}, },