[AmazonSqsMessenger] Use AsyncAws to handle SQS communication

This commit is contained in:
Jérémy Derussé 2020-03-16 13:18:25 +01:00 committed by Fabien Potencier
parent eebb3efa2c
commit 7c4888eed1
8 changed files with 172 additions and 315 deletions

View File

@ -105,6 +105,7 @@
"amphp/http-client": "^4.2",
"amphp/http-tunnel": "^1.0",
"async-aws/ses": "^1.0",
"async-aws/sqs": "^1.0",
"cache/integration-tests": "dev-master",
"doctrine/annotations": "~1.0",
"doctrine/cache": "~1.6",

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use AsyncAws\Sqs\SqsClient;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@ -39,7 +40,7 @@ class AmazonSqsIntegrationTest extends TestCase
{
$connection = Connection::fromDsn($dsn, []);
$connection->setup();
$this->clearSqs($connection);
$this->clearSqs($dsn);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $connection->getMessageCount());
@ -53,15 +54,12 @@ class AmazonSqsIntegrationTest extends TestCase
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
private function clearSqs(Connection $connection): void
private function clearSqs(string $dsn): void
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $connection->get()) {
usleep(5000);
continue;
}
$connection->delete($message['id']);
}
$url = parse_url($dsn);
$client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
$client->purgeQueue([
'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
]);
}
}

View File

@ -11,11 +11,15 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use AsyncAws\Core\Test\ResultMockFactory;
use AsyncAws\Sqs\Result\GetQueueUrlResult;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\Message;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
class ConnectionTest extends TestCase
{
@ -31,7 +35,7 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue', [], $httpClient)
);
}
@ -40,8 +44,8 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
);
}
@ -49,7 +53,7 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
);
}
@ -58,7 +62,7 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
);
}
@ -67,7 +71,7 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
);
}
@ -76,153 +80,63 @@ class ConnectionTest extends TestCase
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
);
}
private function handleGetQueueUrl(int $index, $mock): string
{
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$mock->expects($this->at($index))->method('request')
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
<GetQueueUrlResult>
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
</GetQueueUrlResult>
<ResponseMetadata>
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
</ResponseMetadata>
</GetQueueUrlResponse>');
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
}
public function testKeepGettingPendingMessages()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$client = $this->createMock(SqsClient::class);
$client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
$client->expects($this->at(1))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
]]));
$client->expects($this->at(2))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
]]));
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
</ReceiveMessageResult>
<ResponseMetadata>
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
</ResponseMetadata>
</ReceiveMessageResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNull($connection->get());
}
public function testUnexpectedSqsError()
{
$this->expectException(TransportException::class);
$this->expectException(HttpException::class);
$this->expectExceptionMessage('SQS error happens');
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$client = $this->createMock(SqsClient::class);
$client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));
$httpClient->expects($this->once())->method('request')->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<Error>
<Type>Sender</Type>
<Code>boom</Code>
<Message>SQS error happens</Message>
<Detail/>
</Error>
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
</ErrorResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$connection->get();
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@ -19,7 +20,6 @@ use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
@ -42,7 +42,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{
try {
$sqsEnvelope = $this->connection->get();
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (null === $sqsEnvelope) {
@ -70,7 +70,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
@ -82,7 +82,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
@ -94,7 +94,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{
try {
return $this->connection->getMessageCount();
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}

View File

@ -11,12 +11,12 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
@ -61,7 +61,7 @@ class AmazonSqsSender implements SenderInterface
$messageGroupId,
$messageDeduplicationId
);
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

View File

@ -11,7 +11,9 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@ -71,12 +73,20 @@ class AmazonSqsTransport implements TransportInterface, SetupableTransportInterf
*/
public function setup(): void
{
$this->connection->setup();
try {
$this->connection->setup();
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
public function reset()
{
$this->connection->reset();
try {
$this->connection->reset();
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
private function getReceiver(): AmazonSqsReceiver

View File

@ -11,11 +11,13 @@
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\HttpClient\HttpClient;
use AsyncAws\Sqs\Enum\QueueAttributeName;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\MessageAttributeValue;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/**
* A SQS connection.
@ -46,17 +48,17 @@ class Connection
private $configuration;
private $client;
/** @var ResponseInterface */
/** @var ReceiveMessageResult */
private $currentResponse;
/** @var array[] */
private $buffer = [];
/** @var string|null */
private $queueUrl;
public function __construct(array $configuration, HttpClientInterface $client = null)
public function __construct(array $configuration, SqsClient $client = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->client = $client ?? HttpClient::create();
$this->client = $client ?? new SqsClient([]);
}
public function __destruct()
@ -93,22 +95,24 @@ class Connection
}
$configuration = [
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
];
if ('default' === ($parsedUrl['host'] ?? 'default')) {
$configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']);
} else {
$configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
if (preg_match(';sqs.(.+).amazonaws.com;', $parsedUrl['host'], $matches)) {
$configuration['region'] = $matches[1];
$clientConfiguration = [
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
'accessKeyId' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
'accessKeySecret' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
];
unset($query['region']);
if ('default' !== ($parsedUrl['host'] ?? 'default')) {
$clientConfiguration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
if (preg_match(';^sqs\.([^\.]++)\.amazonaws\.com$;', $parsedUrl['host'], $matches)) {
$clientConfiguration['region'] = $matches[1];
}
unset($query['sslmode']);
}
@ -131,7 +135,7 @@ class Connection
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
}
return new self($configuration, $client);
return new self($configuration, new SqsClient($clientConfiguration, null, $client));
}
public function get(): ?array
@ -172,82 +176,99 @@ class Connection
private function getNewMessages(): \Generator
{
if (null === $this->currentResponse) {
$this->currentResponse = $this->request($this->getQueueUrl(), [
'Action' => 'ReceiveMessage',
$this->currentResponse = $this->client->receiveMessage([
'QueueUrl' => $this->getQueueUrl(),
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MessageAttributeName.1' => 'All',
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) {
if (!$this->fetchMessage()) {
return;
}
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
yield from $this->getPendingMessages();
}
private function fetchMessage(): bool
{
if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) {
return false;
}
foreach ($this->currentResponse->getMessages() as $message) {
$headers = [];
foreach ($xmlMessage->MessageAttribute as $item) {
if ('String' !== (string) $item->Value->DataType) {
foreach ($message->getMessageAttributes() as $name => $attribute) {
if ('String' !== $attribute->getDataType()) {
continue;
}
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
$headers[$name] = $attribute->getStringValue();
}
$this->buffer[] = [
'id' => (string) $xmlMessage->ReceiptHandle,
'body' => (string) $xmlMessage->Body,
'id' => $message->getReceiptHandle(),
'body' => $message->getBody(),
'headers' => $headers,
];
}
$this->currentResponse = null;
yield from $this->getPendingMessages();
return true;
}
public function setup(): void
{
$parameters = [
'Action' => 'CreateQueue',
// Set to false to disable setup more than once
$this->configuration['auto_setup'] = false;
if ($this->client->queueExists([
'QueueName' => $this->configuration['queue_name'],
];
'QueueOwnerAWSAccountId' => $this->configuration['account'],
])->isSuccess()) {
return;
}
if ($this->isFifoQueue($this->configuration['queue_name'])) {
if (null !== $this->configuration['account']) {
throw new InvalidArgumentException(sprintf('The Amazon SQS queue "%s" does not exists (or you don\'t have permissions on it), and can\'t be created when an account is provided.', $this->configuration['queue_name']));
}
$parameters = ['QueueName' => $this->configuration['queue_name']];
if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['FifoQueue'] = true;
}
$this->call($this->configuration['endpoint'], $parameters);
$this->client->createQueue($parameters);
$exists = $this->client->queueExists(['QueueName' => $this->configuration['queue_name']]);
// Blocking call to wait for the queue to be created
$exists->wait();
if (!$exists->isSuccess()) {
throw new TransportException(sprintf('Failed to crate the Amazon SQS queue "%s".', $this->configuration['queue_name']));
}
$this->queueUrl = null;
$this->configuration['auto_setup'] = false;
}
public function delete(string $id): void
{
$this->call($this->getQueueUrl(), [
'Action' => 'DeleteMessage',
$this->client->deleteMessage([
'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $id,
]);
}
public function getMessageCount(): int
{
$response = $this->request($this->getQueueUrl(), [
'Action' => 'GetQueueAttributes',
'AttributeNames' => ['ApproximateNumberOfMessages'],
$response = $this->client->getQueueAttributes([
'QueueUrl' => $this->getQueueUrl(),
'AttributeNames' => [QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES],
]);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) {
if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) {
continue;
}
return (int) $attribute->Value;
}
$attributes = $response->getAttributes();
return 0;
return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0);
}
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
@ -257,36 +278,40 @@ class Connection
}
$parameters = [
'Action' => 'SendMessage',
'QueueUrl' => $this->getQueueUrl(),
'MessageBody' => $body,
'DelaySeconds' => $delay,
'MessageAttributes' => [],
];
$index = 0;
foreach ($headers as $name => $value) {
++$index;
$parameters["MessageAttribute.$index.Name"] = $name;
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
$parameters['MessageAttributes'][$name] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => $value,
]);
}
if ($this->isFifoQueue($this->configuration['queue_name'])) {
if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
}
$this->call($this->getQueueUrl(), $parameters);
$this->client->sendMessage($parameters);
}
public function reset(): void
{
if (null !== $this->currentResponse) {
$this->currentResponse->cancel();
// fetch current response in order to requeue in transit messages
if (!$this->fetchMessage()) {
$this->currentResponse->cancel();
$this->currentResponse = null;
}
}
foreach ($this->getPendingMessages() as $message) {
$this->call($this->getQueueUrl(), [
'Action' => 'ChangeMessageVisibility',
$this->client->changeMessageVisibility([
'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $message['id'],
'VisibilityTimeout' => 0,
]);
@ -295,108 +320,17 @@ class Connection
private function getQueueUrl(): string
{
if (null === $this->queueUrl) {
$parameters = [
'Action' => 'GetQueueUrl',
'QueueName' => $this->configuration['queue_name'],
];
if (isset($this->configuration['account'])) {
$parameters['QueueOwnerAWSAccountId'] = $this->configuration['account'];
}
$response = $this->request($this->configuration['endpoint'], $parameters);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
$this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl;
if (null !== $this->queueUrl) {
return $this->queueUrl;
}
return $this->queueUrl;
return $this->queueUrl = $this->client->getQueueUrl([
'QueueName' => $this->configuration['queue_name'],
'QueueOwnerAWSAccountId' => $this->configuration['account'],
])->getQueueUrl();
}
private function call(string $endpoint, array $body): void
{
$this->checkResponse($this->request($endpoint, $body));
}
private function request(string $endpoint, array $body): ResponseInterface
{
if (!$this->configuration['access_key']) {
return $this->client->request('POST', $endpoint, ['body' => $body]);
}
$region = $this->configuration['region'];
$service = 'sqs';
$method = 'POST';
$requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738);
$amzDate = gmdate('Ymd\THis\Z');
$parsedUrl = parse_url($endpoint);
$headers = [
'host' => $parsedUrl['host'].($parsedUrl['port'] ? ':'.$parsedUrl['port'] : ''),
'x-amz-date' => $amzDate,
'content-type' => 'application/x-www-form-urlencoded',
];
$signedHeaders = ['host', 'x-amz-date'];
$canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string {
return sprintf('%s:%s', $headerName, $headers[$headerName]);
}, $signedHeaders))."\n";
$canonicalRequest = implode("\n", [
$method,
$parsedUrl['path'] ?? '/',
'',
$canonicalHeaders,
implode(';', $signedHeaders),
hash('sha256', $requestParameters),
]);
$algorithm = 'AWS4-HMAC-SHA256';
$credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request'];
$signingKey = 'AWS4'.$this->configuration['secret_key'];
foreach ($credentialScope as $credential) {
$signingKey = hash_hmac('sha256', $credential, $signingKey, true);
}
$stringToSign = implode("\n", [
$algorithm,
$amzDate,
implode('/', $credentialScope),
hash('sha256', $canonicalRequest),
]);
$authorizationHeader = sprintf(
'%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
$algorithm,
$this->configuration['access_key'],
implode('/', $credentialScope),
implode(';', $signedHeaders),
hash_hmac('sha256', $stringToSign, $signingKey)
);
$options = [
'headers' => $headers + [
'authorization' => $authorizationHeader,
],
'body' => $requestParameters,
];
return $this->client->request($method, $endpoint, $options);
}
private function checkResponse(ResponseInterface $response): void
{
if (200 !== $response->getStatusCode()) {
$error = new \SimpleXMLElement($response->getContent(false));
throw new TransportException($error->Error->Message);
}
}
private function isFifoQueue(string $queueName): bool
private static function isFifoQueue(string $queueName): bool
{
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
}

View File

@ -17,7 +17,7 @@
],
"require": {
"php": "^7.2.5",
"symfony/http-client": "^4.3|5.0",
"async-aws/sqs": "^1.0",
"symfony/messenger": "^4.3|^5.0",
"symfony/service-contracts": "^1.1|^2"
},