[AmazonSqsMessenger] Use AsyncAws to handle SQS communication
This commit is contained in:
parent
eebb3efa2c
commit
7c4888eed1
@ -105,6 +105,7 @@
|
|||||||
"amphp/http-client": "^4.2",
|
"amphp/http-client": "^4.2",
|
||||||
"amphp/http-tunnel": "^1.0",
|
"amphp/http-tunnel": "^1.0",
|
||||||
"async-aws/ses": "^1.0",
|
"async-aws/ses": "^1.0",
|
||||||
|
"async-aws/sqs": "^1.0",
|
||||||
"cache/integration-tests": "dev-master",
|
"cache/integration-tests": "dev-master",
|
||||||
"doctrine/annotations": "~1.0",
|
"doctrine/annotations": "~1.0",
|
||||||
"doctrine/cache": "~1.6",
|
"doctrine/cache": "~1.6",
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
|
||||||
|
|
||||||
|
use AsyncAws\Sqs\SqsClient;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
|
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
|
||||||
@ -39,7 +40,7 @@ class AmazonSqsIntegrationTest extends TestCase
|
|||||||
{
|
{
|
||||||
$connection = Connection::fromDsn($dsn, []);
|
$connection = Connection::fromDsn($dsn, []);
|
||||||
$connection->setup();
|
$connection->setup();
|
||||||
$this->clearSqs($connection);
|
$this->clearSqs($dsn);
|
||||||
|
|
||||||
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
|
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
|
||||||
$this->assertSame(1, $connection->getMessageCount());
|
$this->assertSame(1, $connection->getMessageCount());
|
||||||
@ -53,15 +54,12 @@ class AmazonSqsIntegrationTest extends TestCase
|
|||||||
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
|
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function clearSqs(Connection $connection): void
|
private function clearSqs(string $dsn): void
|
||||||
{
|
{
|
||||||
$wait = 0;
|
$url = parse_url($dsn);
|
||||||
while ($wait++ < 50) {
|
$client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
|
||||||
if (null === $message = $connection->get()) {
|
$client->purgeQueue([
|
||||||
usleep(5000);
|
'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
|
||||||
continue;
|
]);
|
||||||
}
|
|
||||||
$connection->delete($message['id']);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,11 +11,15 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
|
||||||
|
|
||||||
|
use AsyncAws\Core\Exception\Http\HttpException;
|
||||||
|
use AsyncAws\Core\Test\ResultMockFactory;
|
||||||
|
use AsyncAws\Sqs\Result\GetQueueUrlResult;
|
||||||
|
use AsyncAws\Sqs\Result\ReceiveMessageResult;
|
||||||
|
use AsyncAws\Sqs\SqsClient;
|
||||||
|
use AsyncAws\Sqs\ValueObject\Message;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
|
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
|
||||||
use Symfony\Component\Messenger\Exception\TransportException;
|
|
||||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||||
use Symfony\Contracts\HttpClient\ResponseInterface;
|
|
||||||
|
|
||||||
class ConnectionTest extends TestCase
|
class ConnectionTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -31,7 +35,7 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
|
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://default/queue', [], $httpClient)
|
Connection::fromDsn('sqs://default/queue', [], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -40,8 +44,8 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
|
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
|
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +53,7 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
|
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
|
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -58,7 +62,7 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
|
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
|
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -67,7 +71,7 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
|
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
|
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -76,153 +80,63 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
|
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
|
||||||
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
|
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function handleGetQueueUrl(int $index, $mock): string
|
|
||||||
{
|
|
||||||
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
|
|
||||||
|
|
||||||
$mock->expects($this->at($index))->method('request')
|
|
||||||
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
|
|
||||||
->willReturn($response);
|
|
||||||
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
|
|
||||||
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
|
|
||||||
<GetQueueUrlResult>
|
|
||||||
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
|
|
||||||
</GetQueueUrlResult>
|
|
||||||
<ResponseMetadata>
|
|
||||||
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
|
|
||||||
</ResponseMetadata>
|
|
||||||
</GetQueueUrlResponse>');
|
|
||||||
|
|
||||||
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testKeepGettingPendingMessages()
|
public function testKeepGettingPendingMessages()
|
||||||
{
|
{
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$client = $this->createMock(SqsClient::class);
|
||||||
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
|
$client->expects($this->any())
|
||||||
|
->method('getQueueUrl')
|
||||||
|
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
|
||||||
|
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
|
||||||
|
$client->expects($this->at(1))
|
||||||
|
->method('receiveMessage')
|
||||||
|
->with([
|
||||||
|
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
|
||||||
|
'MaxNumberOfMessages' => 9,
|
||||||
|
'WaitTimeSeconds' => 20,
|
||||||
|
'MessageAttributeNames' => ['All'],
|
||||||
|
'VisibilityTimeout' => null,
|
||||||
|
])
|
||||||
|
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
|
||||||
|
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
|
||||||
|
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
|
||||||
|
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
|
||||||
|
]]));
|
||||||
|
$client->expects($this->at(2))
|
||||||
|
->method('receiveMessage')
|
||||||
|
->with([
|
||||||
|
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
|
||||||
|
'MaxNumberOfMessages' => 9,
|
||||||
|
'WaitTimeSeconds' => 20,
|
||||||
|
'MessageAttributeNames' => ['All'],
|
||||||
|
'VisibilityTimeout' => null,
|
||||||
|
])
|
||||||
|
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
|
||||||
|
]]));
|
||||||
|
|
||||||
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
|
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
|
||||||
|
|
||||||
$httpClient->expects($this->at(1))->method('request')
|
|
||||||
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
|
|
||||||
->willReturn($response);
|
|
||||||
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
|
|
||||||
<ReceiveMessageResult>
|
|
||||||
<Message>
|
|
||||||
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
|
|
||||||
<ReceiptHandle>
|
|
||||||
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
|
|
||||||
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
|
|
||||||
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
|
|
||||||
</ReceiptHandle>
|
|
||||||
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
|
|
||||||
<Body>{"body":"this is a test","headers":{}}</Body>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SenderId</Name>
|
|
||||||
<Value>195004372649</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SentTimestamp</Name>
|
|
||||||
<Value>1238099229000</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateReceiveCount</Name>
|
|
||||||
<Value>5</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateFirstReceiveTimestamp</Name>
|
|
||||||
<Value>1250700979248</Value>
|
|
||||||
</Attribute>
|
|
||||||
</Message>
|
|
||||||
<Message>
|
|
||||||
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
|
|
||||||
<ReceiptHandle>
|
|
||||||
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
|
|
||||||
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
|
|
||||||
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
|
|
||||||
</ReceiptHandle>
|
|
||||||
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
|
|
||||||
<Body>{"body":"this is a test","headers":{}}</Body>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SenderId</Name>
|
|
||||||
<Value>195004372649</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SentTimestamp</Name>
|
|
||||||
<Value>1238099229000</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateReceiveCount</Name>
|
|
||||||
<Value>5</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateFirstReceiveTimestamp</Name>
|
|
||||||
<Value>1250700979248</Value>
|
|
||||||
</Attribute>
|
|
||||||
</Message>
|
|
||||||
<Message>
|
|
||||||
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
|
|
||||||
<ReceiptHandle>
|
|
||||||
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
|
|
||||||
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
|
|
||||||
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
|
|
||||||
</ReceiptHandle>
|
|
||||||
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
|
|
||||||
<Body>{"body":"this is a test","headers":{}}</Body>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SenderId</Name>
|
|
||||||
<Value>195004372649</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>SentTimestamp</Name>
|
|
||||||
<Value>1238099229000</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateReceiveCount</Name>
|
|
||||||
<Value>5</Value>
|
|
||||||
</Attribute>
|
|
||||||
<Attribute>
|
|
||||||
<Name>ApproximateFirstReceiveTimestamp</Name>
|
|
||||||
<Value>1250700979248</Value>
|
|
||||||
</Attribute>
|
|
||||||
</Message>
|
|
||||||
</ReceiveMessageResult>
|
|
||||||
<ResponseMetadata>
|
|
||||||
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
|
|
||||||
</ResponseMetadata>
|
|
||||||
</ReceiveMessageResponse>');
|
|
||||||
|
|
||||||
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
|
|
||||||
$this->assertNotNull($connection->get());
|
$this->assertNotNull($connection->get());
|
||||||
$this->assertNotNull($connection->get());
|
$this->assertNotNull($connection->get());
|
||||||
$this->assertNotNull($connection->get());
|
$this->assertNotNull($connection->get());
|
||||||
|
$this->assertNull($connection->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testUnexpectedSqsError()
|
public function testUnexpectedSqsError()
|
||||||
{
|
{
|
||||||
$this->expectException(TransportException::class);
|
$this->expectException(HttpException::class);
|
||||||
$this->expectExceptionMessage('SQS error happens');
|
$this->expectExceptionMessage('SQS error happens');
|
||||||
|
|
||||||
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
|
$client = $this->createMock(SqsClient::class);
|
||||||
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
|
$client->expects($this->any())
|
||||||
|
->method('getQueueUrl')
|
||||||
|
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
|
||||||
|
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));
|
||||||
|
|
||||||
$httpClient->expects($this->once())->method('request')->willReturn($response);
|
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
|
||||||
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
|
|
||||||
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
|
|
||||||
<Error>
|
|
||||||
<Type>Sender</Type>
|
|
||||||
<Code>boom</Code>
|
|
||||||
<Message>SQS error happens</Message>
|
|
||||||
<Detail/>
|
|
||||||
</Error>
|
|
||||||
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
|
|
||||||
</ErrorResponse>');
|
|
||||||
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
|
|
||||||
$connection->get();
|
$connection->get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
||||||
|
|
||||||
|
use AsyncAws\Core\Exception\Http\HttpException;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\LogicException;
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
@ -19,7 +20,6 @@ use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
|||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Jérémy Derussé <jeremy@derusse.com>
|
* @author Jérémy Derussé <jeremy@derusse.com>
|
||||||
@ -42,7 +42,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$sqsEnvelope = $this->connection->get();
|
$sqsEnvelope = $this->connection->get();
|
||||||
} catch (HttpExceptionInterface $e) {
|
} catch (HttpException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
if (null === $sqsEnvelope) {
|
if (null === $sqsEnvelope) {
|
||||||
@ -70,7 +70,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
|
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
|
||||||
} catch (HttpExceptionInterface $e) {
|
} catch (HttpException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,7 +82,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
|
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
|
||||||
} catch (HttpExceptionInterface $e) {
|
} catch (HttpException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -94,7 +94,7 @@ class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
return $this->connection->getMessageCount();
|
return $this->connection->getMessageCount();
|
||||||
} catch (HttpExceptionInterface $e) {
|
} catch (HttpException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,12 +11,12 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
||||||
|
|
||||||
|
use AsyncAws\Core\Exception\Http\HttpException;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\TransportException;
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Jérémy Derussé <jeremy@derusse.com>
|
* @author Jérémy Derussé <jeremy@derusse.com>
|
||||||
@ -61,7 +61,7 @@ class AmazonSqsSender implements SenderInterface
|
|||||||
$messageGroupId,
|
$messageGroupId,
|
||||||
$messageDeduplicationId
|
$messageDeduplicationId
|
||||||
);
|
);
|
||||||
} catch (HttpExceptionInterface $e) {
|
} catch (HttpException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,7 +11,9 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
||||||
|
|
||||||
|
use AsyncAws\Core\Exception\Http\HttpException;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
|
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
|
||||||
@ -71,12 +73,20 @@ class AmazonSqsTransport implements TransportInterface, SetupableTransportInterf
|
|||||||
*/
|
*/
|
||||||
public function setup(): void
|
public function setup(): void
|
||||||
{
|
{
|
||||||
|
try {
|
||||||
$this->connection->setup();
|
$this->connection->setup();
|
||||||
|
} catch (HttpException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function reset()
|
public function reset()
|
||||||
{
|
{
|
||||||
|
try {
|
||||||
$this->connection->reset();
|
$this->connection->reset();
|
||||||
|
} catch (HttpException $e) {
|
||||||
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getReceiver(): AmazonSqsReceiver
|
private function getReceiver(): AmazonSqsReceiver
|
||||||
|
@ -11,11 +11,13 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
|
||||||
|
|
||||||
use Symfony\Component\HttpClient\HttpClient;
|
use AsyncAws\Sqs\Enum\QueueAttributeName;
|
||||||
|
use AsyncAws\Sqs\Result\ReceiveMessageResult;
|
||||||
|
use AsyncAws\Sqs\SqsClient;
|
||||||
|
use AsyncAws\Sqs\ValueObject\MessageAttributeValue;
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
use Symfony\Component\Messenger\Exception\TransportException;
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||||
use Symfony\Contracts\HttpClient\ResponseInterface;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A SQS connection.
|
* A SQS connection.
|
||||||
@ -46,17 +48,17 @@ class Connection
|
|||||||
private $configuration;
|
private $configuration;
|
||||||
private $client;
|
private $client;
|
||||||
|
|
||||||
/** @var ResponseInterface */
|
/** @var ReceiveMessageResult */
|
||||||
private $currentResponse;
|
private $currentResponse;
|
||||||
/** @var array[] */
|
/** @var array[] */
|
||||||
private $buffer = [];
|
private $buffer = [];
|
||||||
/** @var string|null */
|
/** @var string|null */
|
||||||
private $queueUrl;
|
private $queueUrl;
|
||||||
|
|
||||||
public function __construct(array $configuration, HttpClientInterface $client = null)
|
public function __construct(array $configuration, SqsClient $client = null)
|
||||||
{
|
{
|
||||||
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
|
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
|
||||||
$this->client = $client ?? HttpClient::create();
|
$this->client = $client ?? new SqsClient([]);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function __destruct()
|
public function __destruct()
|
||||||
@ -93,22 +95,24 @@ class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
$configuration = [
|
$configuration = [
|
||||||
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
|
|
||||||
'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
|
'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
|
||||||
'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
|
'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
|
||||||
'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
|
'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
|
||||||
'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
|
'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
|
||||||
'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
|
'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
|
||||||
'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
|
|
||||||
'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
|
|
||||||
];
|
];
|
||||||
|
|
||||||
if ('default' === ($parsedUrl['host'] ?? 'default')) {
|
$clientConfiguration = [
|
||||||
$configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']);
|
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
|
||||||
} else {
|
'accessKeyId' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
|
||||||
$configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
|
'accessKeySecret' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
|
||||||
if (preg_match(';sqs.(.+).amazonaws.com;', $parsedUrl['host'], $matches)) {
|
];
|
||||||
$configuration['region'] = $matches[1];
|
unset($query['region']);
|
||||||
|
|
||||||
|
if ('default' !== ($parsedUrl['host'] ?? 'default')) {
|
||||||
|
$clientConfiguration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
|
||||||
|
if (preg_match(';^sqs\.([^\.]++)\.amazonaws\.com$;', $parsedUrl['host'], $matches)) {
|
||||||
|
$clientConfiguration['region'] = $matches[1];
|
||||||
}
|
}
|
||||||
unset($query['sslmode']);
|
unset($query['sslmode']);
|
||||||
}
|
}
|
||||||
@ -131,7 +135,7 @@ class Connection
|
|||||||
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
|
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new self($configuration, $client);
|
return new self($configuration, new SqsClient($clientConfiguration, null, $client));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function get(): ?array
|
public function get(): ?array
|
||||||
@ -172,82 +176,99 @@ class Connection
|
|||||||
private function getNewMessages(): \Generator
|
private function getNewMessages(): \Generator
|
||||||
{
|
{
|
||||||
if (null === $this->currentResponse) {
|
if (null === $this->currentResponse) {
|
||||||
$this->currentResponse = $this->request($this->getQueueUrl(), [
|
$this->currentResponse = $this->client->receiveMessage([
|
||||||
'Action' => 'ReceiveMessage',
|
'QueueUrl' => $this->getQueueUrl(),
|
||||||
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
|
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
|
||||||
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
|
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
|
||||||
'MessageAttributeName.1' => 'All',
|
'MessageAttributeNames' => ['All'],
|
||||||
'WaitTimeSeconds' => $this->configuration['wait_time'],
|
'WaitTimeSeconds' => $this->configuration['wait_time'],
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) {
|
if (!$this->fetchMessage()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
|
yield from $this->getPendingMessages();
|
||||||
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
|
}
|
||||||
|
|
||||||
|
private function fetchMessage(): bool
|
||||||
|
{
|
||||||
|
if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($this->currentResponse->getMessages() as $message) {
|
||||||
$headers = [];
|
$headers = [];
|
||||||
foreach ($xmlMessage->MessageAttribute as $item) {
|
foreach ($message->getMessageAttributes() as $name => $attribute) {
|
||||||
if ('String' !== (string) $item->Value->DataType) {
|
if ('String' !== $attribute->getDataType()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
|
|
||||||
|
$headers[$name] = $attribute->getStringValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->buffer[] = [
|
$this->buffer[] = [
|
||||||
'id' => (string) $xmlMessage->ReceiptHandle,
|
'id' => $message->getReceiptHandle(),
|
||||||
'body' => (string) $xmlMessage->Body,
|
'body' => $message->getBody(),
|
||||||
'headers' => $headers,
|
'headers' => $headers,
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->currentResponse = null;
|
$this->currentResponse = null;
|
||||||
|
|
||||||
yield from $this->getPendingMessages();
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function setup(): void
|
public function setup(): void
|
||||||
{
|
{
|
||||||
$parameters = [
|
// Set to false to disable setup more than once
|
||||||
'Action' => 'CreateQueue',
|
$this->configuration['auto_setup'] = false;
|
||||||
|
if ($this->client->queueExists([
|
||||||
'QueueName' => $this->configuration['queue_name'],
|
'QueueName' => $this->configuration['queue_name'],
|
||||||
];
|
'QueueOwnerAWSAccountId' => $this->configuration['account'],
|
||||||
|
])->isSuccess()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if ($this->isFifoQueue($this->configuration['queue_name'])) {
|
if (null !== $this->configuration['account']) {
|
||||||
|
throw new InvalidArgumentException(sprintf('The Amazon SQS queue "%s" does not exists (or you don\'t have permissions on it), and can\'t be created when an account is provided.', $this->configuration['queue_name']));
|
||||||
|
}
|
||||||
|
|
||||||
|
$parameters = ['QueueName' => $this->configuration['queue_name']];
|
||||||
|
|
||||||
|
if (self::isFifoQueue($this->configuration['queue_name'])) {
|
||||||
$parameters['FifoQueue'] = true;
|
$parameters['FifoQueue'] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->call($this->configuration['endpoint'], $parameters);
|
$this->client->createQueue($parameters);
|
||||||
|
$exists = $this->client->queueExists(['QueueName' => $this->configuration['queue_name']]);
|
||||||
|
// Blocking call to wait for the queue to be created
|
||||||
|
$exists->wait();
|
||||||
|
if (!$exists->isSuccess()) {
|
||||||
|
throw new TransportException(sprintf('Failed to crate the Amazon SQS queue "%s".', $this->configuration['queue_name']));
|
||||||
|
}
|
||||||
$this->queueUrl = null;
|
$this->queueUrl = null;
|
||||||
|
|
||||||
$this->configuration['auto_setup'] = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function delete(string $id): void
|
public function delete(string $id): void
|
||||||
{
|
{
|
||||||
$this->call($this->getQueueUrl(), [
|
$this->client->deleteMessage([
|
||||||
'Action' => 'DeleteMessage',
|
'QueueUrl' => $this->getQueueUrl(),
|
||||||
'ReceiptHandle' => $id,
|
'ReceiptHandle' => $id,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getMessageCount(): int
|
public function getMessageCount(): int
|
||||||
{
|
{
|
||||||
$response = $this->request($this->getQueueUrl(), [
|
$response = $this->client->getQueueAttributes([
|
||||||
'Action' => 'GetQueueAttributes',
|
'QueueUrl' => $this->getQueueUrl(),
|
||||||
'AttributeNames' => ['ApproximateNumberOfMessages'],
|
'AttributeNames' => [QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES],
|
||||||
]);
|
]);
|
||||||
$this->checkResponse($response);
|
|
||||||
$xml = new \SimpleXMLElement($response->getContent());
|
|
||||||
foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) {
|
|
||||||
if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (int) $attribute->Value;
|
$attributes = $response->getAttributes();
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
|
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
|
||||||
@ -257,36 +278,40 @@ class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
$parameters = [
|
$parameters = [
|
||||||
'Action' => 'SendMessage',
|
'QueueUrl' => $this->getQueueUrl(),
|
||||||
'MessageBody' => $body,
|
'MessageBody' => $body,
|
||||||
'DelaySeconds' => $delay,
|
'DelaySeconds' => $delay,
|
||||||
|
'MessageAttributes' => [],
|
||||||
];
|
];
|
||||||
|
|
||||||
$index = 0;
|
|
||||||
foreach ($headers as $name => $value) {
|
foreach ($headers as $name => $value) {
|
||||||
++$index;
|
$parameters['MessageAttributes'][$name] = new MessageAttributeValue([
|
||||||
$parameters["MessageAttribute.$index.Name"] = $name;
|
'DataType' => 'String',
|
||||||
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
|
'StringValue' => $value,
|
||||||
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->isFifoQueue($this->configuration['queue_name'])) {
|
if (self::isFifoQueue($this->configuration['queue_name'])) {
|
||||||
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
|
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
|
||||||
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
|
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->call($this->getQueueUrl(), $parameters);
|
$this->client->sendMessage($parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function reset(): void
|
public function reset(): void
|
||||||
{
|
{
|
||||||
if (null !== $this->currentResponse) {
|
if (null !== $this->currentResponse) {
|
||||||
|
// fetch current response in order to requeue in transit messages
|
||||||
|
if (!$this->fetchMessage()) {
|
||||||
$this->currentResponse->cancel();
|
$this->currentResponse->cancel();
|
||||||
|
$this->currentResponse = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach ($this->getPendingMessages() as $message) {
|
foreach ($this->getPendingMessages() as $message) {
|
||||||
$this->call($this->getQueueUrl(), [
|
$this->client->changeMessageVisibility([
|
||||||
'Action' => 'ChangeMessageVisibility',
|
'QueueUrl' => $this->getQueueUrl(),
|
||||||
'ReceiptHandle' => $message['id'],
|
'ReceiptHandle' => $message['id'],
|
||||||
'VisibilityTimeout' => 0,
|
'VisibilityTimeout' => 0,
|
||||||
]);
|
]);
|
||||||
@ -295,108 +320,17 @@ class Connection
|
|||||||
|
|
||||||
private function getQueueUrl(): string
|
private function getQueueUrl(): string
|
||||||
{
|
{
|
||||||
if (null === $this->queueUrl) {
|
if (null !== $this->queueUrl) {
|
||||||
$parameters = [
|
|
||||||
'Action' => 'GetQueueUrl',
|
|
||||||
'QueueName' => $this->configuration['queue_name'],
|
|
||||||
];
|
|
||||||
if (isset($this->configuration['account'])) {
|
|
||||||
$parameters['QueueOwnerAWSAccountId'] = $this->configuration['account'];
|
|
||||||
}
|
|
||||||
|
|
||||||
$response = $this->request($this->configuration['endpoint'], $parameters);
|
|
||||||
$this->checkResponse($response);
|
|
||||||
$xml = new \SimpleXMLElement($response->getContent());
|
|
||||||
|
|
||||||
$this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->queueUrl;
|
return $this->queueUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function call(string $endpoint, array $body): void
|
return $this->queueUrl = $this->client->getQueueUrl([
|
||||||
{
|
'QueueName' => $this->configuration['queue_name'],
|
||||||
$this->checkResponse($this->request($endpoint, $body));
|
'QueueOwnerAWSAccountId' => $this->configuration['account'],
|
||||||
|
])->getQueueUrl();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function request(string $endpoint, array $body): ResponseInterface
|
private static function isFifoQueue(string $queueName): bool
|
||||||
{
|
|
||||||
if (!$this->configuration['access_key']) {
|
|
||||||
return $this->client->request('POST', $endpoint, ['body' => $body]);
|
|
||||||
}
|
|
||||||
|
|
||||||
$region = $this->configuration['region'];
|
|
||||||
$service = 'sqs';
|
|
||||||
|
|
||||||
$method = 'POST';
|
|
||||||
$requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738);
|
|
||||||
$amzDate = gmdate('Ymd\THis\Z');
|
|
||||||
$parsedUrl = parse_url($endpoint);
|
|
||||||
|
|
||||||
$headers = [
|
|
||||||
'host' => $parsedUrl['host'].($parsedUrl['port'] ? ':'.$parsedUrl['port'] : ''),
|
|
||||||
'x-amz-date' => $amzDate,
|
|
||||||
'content-type' => 'application/x-www-form-urlencoded',
|
|
||||||
];
|
|
||||||
|
|
||||||
$signedHeaders = ['host', 'x-amz-date'];
|
|
||||||
$canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string {
|
|
||||||
return sprintf('%s:%s', $headerName, $headers[$headerName]);
|
|
||||||
}, $signedHeaders))."\n";
|
|
||||||
|
|
||||||
$canonicalRequest = implode("\n", [
|
|
||||||
$method,
|
|
||||||
$parsedUrl['path'] ?? '/',
|
|
||||||
'',
|
|
||||||
$canonicalHeaders,
|
|
||||||
implode(';', $signedHeaders),
|
|
||||||
hash('sha256', $requestParameters),
|
|
||||||
]);
|
|
||||||
|
|
||||||
$algorithm = 'AWS4-HMAC-SHA256';
|
|
||||||
$credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request'];
|
|
||||||
|
|
||||||
$signingKey = 'AWS4'.$this->configuration['secret_key'];
|
|
||||||
foreach ($credentialScope as $credential) {
|
|
||||||
$signingKey = hash_hmac('sha256', $credential, $signingKey, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
$stringToSign = implode("\n", [
|
|
||||||
$algorithm,
|
|
||||||
$amzDate,
|
|
||||||
implode('/', $credentialScope),
|
|
||||||
hash('sha256', $canonicalRequest),
|
|
||||||
]);
|
|
||||||
|
|
||||||
$authorizationHeader = sprintf(
|
|
||||||
'%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
|
|
||||||
$algorithm,
|
|
||||||
$this->configuration['access_key'],
|
|
||||||
implode('/', $credentialScope),
|
|
||||||
implode(';', $signedHeaders),
|
|
||||||
hash_hmac('sha256', $stringToSign, $signingKey)
|
|
||||||
);
|
|
||||||
|
|
||||||
$options = [
|
|
||||||
'headers' => $headers + [
|
|
||||||
'authorization' => $authorizationHeader,
|
|
||||||
],
|
|
||||||
'body' => $requestParameters,
|
|
||||||
];
|
|
||||||
|
|
||||||
return $this->client->request($method, $endpoint, $options);
|
|
||||||
}
|
|
||||||
|
|
||||||
private function checkResponse(ResponseInterface $response): void
|
|
||||||
{
|
|
||||||
if (200 !== $response->getStatusCode()) {
|
|
||||||
$error = new \SimpleXMLElement($response->getContent(false));
|
|
||||||
|
|
||||||
throw new TransportException($error->Error->Message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private function isFifoQueue(string $queueName): bool
|
|
||||||
{
|
{
|
||||||
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
|
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
],
|
],
|
||||||
"require": {
|
"require": {
|
||||||
"php": "^7.2.5",
|
"php": "^7.2.5",
|
||||||
"symfony/http-client": "^4.3|5.0",
|
"async-aws/sqs": "^1.0",
|
||||||
"symfony/messenger": "^4.3|^5.0",
|
"symfony/messenger": "^4.3|^5.0",
|
||||||
"symfony/service-contracts": "^1.1|^2"
|
"symfony/service-contracts": "^1.1|^2"
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user