[Messenger] Add SQS transport

This commit is contained in:
Jérémy Derussé 2019-07-08 23:33:42 +02:00
parent ef30ef55d0
commit c226479d5f
No known key found for this signature in database
GPG Key ID: 2083FA5758C473D2
22 changed files with 1317 additions and 0 deletions

View File

@ -21,6 +21,7 @@ env:
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
matrix:
@ -69,6 +70,11 @@ before_install:
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
# Start Sqs server
docker pull feathj/fake-sqs
docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
- |
# Start Kafka and install an up-to-date librdkafka
docker network create kafka_network

View File

@ -85,6 +85,10 @@
<tag name="kernel.reset" method="reset" />
</service>
<service id="messenger.transport.sqs.factory" class="Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory">
<tag name="messenger.transport_factory" />
</service>
<!-- retry -->
<service id="messenger.retry_strategy_locator">
<tag name="container.service_locator" />

View File

@ -0,0 +1,3 @@
/Tests export-ignore
/phpunit.xml.dist export-ignore
/.gitignore export-ignore

View File

@ -0,0 +1,3 @@
vendor/
composer.lock
phpunit.xml

View File

@ -0,0 +1,8 @@
CHANGELOG
=========
5.1.0
-----
* Introduced the Amazon SQS bridge.

View File

@ -0,0 +1,19 @@
Copyright (c) 2020 Fabien Potencier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,12 @@
Amazon SQS Messenger
====================
Provides Amazon SQS integration for Symfony Messenger.
Resources
---------
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)

View File

@ -0,0 +1,18 @@
<?php
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures;
class DummyMessage
{
private $message;
public function __construct(string $message)
{
$this->message = $message;
}
public function getMessage(): string
{
return $this->message;
}
}

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
class AmazonSqsIntegrationTest extends TestCase
{
private $connection;
protected function setUp(): void
{
if (!getenv('MESSENGER_SQS_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
}
$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
$this->connection->setup();
$this->clearSqs();
}
public function testConnectionSendAndGet()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $this->connection->getMessageCount());
$wait = 0;
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
usleep(5000);
}
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
private function clearSqs()
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $this->connection->get()) {
usleep(5000);
continue;
}
$this->connection->delete($message['id']);
}
}
}

View File

@ -0,0 +1,76 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
class AmazonSqsReceiverTest extends TestCase
{
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($sqsEnvelop);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
{
$this->expectException(MessageDecodingFailedException::class);
$serializer = $this->createMock(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($sqsEnvelop);
$connection->expects($this->once())->method('delete');
$receiver = new AmazonSqsReceiver($connection, $serializer);
iterator_to_array($receiver->get());
}
private function createSqsEnvelope()
{
return [
'id' => 1,
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
];
}
private function createSerializer(): Serializer
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
return $serializer;
}
}

View File

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class AmazonSqsSenderTest extends TestCase
{
public function testSend()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,27 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
class AmazonSqsTransportFactoryTest extends TestCase
{
public function testSupportsOnlySqsTransports()
{
$factory = new AmazonSqsTransportFactory();
$this->assertTrue($factory->supports('sqs://localhost', []));
$this->assertFalse($factory->supports('redis://localhost', []));
$this->assertFalse($factory->supports('invalid-dsn', []));
}
}

View File

@ -0,0 +1,60 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class AmazonSqsTransportTest extends TestCase
{
public function testItIsATransport()
{
$transport = $this->getTransport();
$this->assertInstanceOf(TransportInterface::class, $transport);
}
public function testReceivesMessages()
{
$transport = $this->getTransport(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
);
$decodedMessage = new DummyMessage('Decoded.');
$sqsEnvelope = [
'id' => '5',
'body' => 'body',
'headers' => ['my' => 'header'],
];
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($sqsEnvelope);
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
{
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
return new AmazonSqsTransport($connection, $serializer);
}
}

View File

@ -0,0 +1,228 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
class ConnectionTest extends TestCase
{
public function testFromInvalidDsn()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The given Amazon SQS DSN "sqs://" is invalid.');
Connection::fromDsn('sqs://');
}
public function testFromDsn()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
Connection::fromDsn('sqs://default/queue', [], $httpClient)
);
}
public function testFromDsnWithRegion()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
);
}
public function testFromDsnWithCustomEndpoint()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
);
}
public function testFromDsnWithCustomEndpointAndPort()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
);
}
public function testFromDsnWithOptions()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
);
}
public function testFromDsnWithQueryOptions()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
);
}
private function handleGetQueueUrl(int $index, $mock): string
{
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$mock->expects($this->at($index))->method('request')
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
<GetQueueUrlResult>
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
</GetQueueUrlResult>
<ResponseMetadata>
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
</ResponseMetadata>
</GetQueueUrlResponse>');
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
}
public function testKeepGettingPendingMessages()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
</ReceiveMessageResult>
<ResponseMetadata>
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
</ResponseMetadata>
</ReceiveMessageResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
}
public function testUnexpectedSqsError()
{
$this->expectException(TransportException::class);
$this->expectExceptionMessage('SQS error happens');
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
$httpClient->expects($this->once())->method('request')->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<Error>
<Type>Sender</Type>
<Code>boom</Code>
<Message>SQS error happens</Message>
<Detail/>
</Error>
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
</ErrorResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
$connection->get();
}
}

View File

@ -0,0 +1,32 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsReceivedStamp implements NonSendableStampInterface
{
private $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}

View File

@ -0,0 +1,113 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
try {
$sqsEnvelope = $this->connection->get();
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (null === $sqsEnvelope) {
return;
}
try {
$envelope = $this->serializer->decode([
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->delete($sqsEnvelope['id']);
throw $exception;
}
yield $envelope->with(new AmazonSqsReceivedStamp($sqsEnvelope['id']));
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
try {
$this->connection->getMessageCount();
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
private function findSqsReceivedStamp(Envelope $envelope): AmazonSqsReceivedStamp
{
/** @var AmazonSqsReceivedStamp|null $sqsReceivedStamp */
$sqsReceivedStamp = $envelope->last(AmazonSqsReceivedStamp::class);
if (null === $sqsReceivedStamp) {
throw new LogicException('No AmazonSqsReceivedStamp found on the Envelope.');
}
return $sqsReceivedStamp;
}
}

View File

@ -0,0 +1,54 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsSender implements SenderInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;
try {
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
return $envelope;
}
}

View File

@ -0,0 +1,91 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Contracts\Service\ResetInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, ResetInterface
{
private $serializer;
private $connection;
private $receiver;
private $sender;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return ($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
public function reset()
{
$this->connection->reset();
}
private function getReceiver(): AmazonSqsReceiver
{
return $this->receiver = new AmazonSqsReceiver($this->connection, $this->serializer);
}
private function getSender(): AmazonSqsSender
{
return $this->sender = new AmazonSqsSender($this->connection, $this->serializer);
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
unset($options['transport_name']);
return new AmazonSqsTransport(Connection::fromDsn($dsn, $options), $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'sqs://');
}
}

View File

@ -0,0 +1,362 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
use Symfony\Component\HttpClient\HttpClient;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/**
* A SQS connection.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*
* @internal
* @final
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
'wait_time' => 20,
'poll_timeout' => 0.1,
'visibility_timeout' => null,
'auto_setup' => true,
'access_key' => null,
'secret_key' => null,
'endpoint' => 'https://sqs.eu-west-1.amazonaws.com',
'region' => 'eu-west-1',
'queue_name' => 'messages',
'account' => null,
];
private $configuration;
private $client;
/** @var ResponseInterface */
private $currentResponse;
/** @var array[] */
private $buffer = [];
/** @var string|null */
private $queueUrl;
public function __construct(array $configuration, HttpClientInterface $client = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->client = $client ?? HttpClient::create();
}
public function __destruct()
{
$this->reset();
}
/**
* Creates a connection based on the DSN and options.
*
* Available options:
*
* * endpoint: absolute URL to the SQS service (Default: https://sqs.eu-west-1.amazonaws.com)
* * region: name of the AWS region (Default: eu-west-1)
* * queue_name: name of the queue (Default: messages)
* * account: identifier of the AWS account
* * access_key: AWS access key
* * secret_key: AWS secret key
* * buffer_size: number of messages to prefetch (Default: 9)
* * wait_time: long polling duration in seconds (Default: 20)
* * poll_timeout: amount of seconds the transport should wait for new message
* * visibility_timeout: amount of seconds the message won't be visible
* * auto_setup: Whether the queue should be created automatically during send / get (Default: true)
*/
public static function fromDsn(string $dsn, array $options = [], HttpClientInterface $client = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given Amazon SQS DSN "%s" is invalid.', $dsn));
}
$query = [];
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $query);
}
$configuration = [
'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
];
if ('default' === ($parsedUrl['host'] ?? 'default')) {
$configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']);
} else {
$configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
unset($query['sslmode']);
}
$parsedPath = explode('/', ltrim($parsedUrl['path'] ?? '/', '/'));
if (\count($parsedPath) > 0) {
$configuration['queue_name'] = end($parsedPath);
}
$configuration['account'] = 2 === \count($parsedPath) ? $parsedPath[0] : null;
// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
if (0 < \count($optionsExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
}
// check for extra keys in options
$queryExtraKeys = array_diff(array_keys($query), array_keys($configuration));
if (0 < \count($queryExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
}
return new self($configuration, $client);
}
public function get(): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
foreach ($this->getNextMessages() as $message) {
return $message;
}
return null;
}
/**
* @return array[]
*/
private function getNextMessages(): \Generator
{
yield from $this->getPendingMessages();
yield from $this->getNewMessages();
}
/**
* @return array[]
*/
private function getPendingMessages(): \Generator
{
while (!empty($this->buffer)) {
yield array_shift($this->buffer);
}
}
/**
* @return array[]
*/
private function getNewMessages(): \Generator
{
if (null === $this->currentResponse) {
$this->currentResponse = $this->request($this->getQueueUrl(), [
'Action' => 'ReceiveMessage',
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) {
return;
}
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
$this->buffer[] = [
'id' => (string) $xmlMessage->ReceiptHandle,
] + json_decode($xmlMessage->Body, true);
}
$this->currentResponse = null;
yield from $this->getPendingMessages();
}
public function setup(): void
{
$this->call($this->configuration['endpoint'], [
'Action' => 'CreateQueue',
'QueueName' => $this->configuration['queue_name'],
]);
$this->queueUrl = null;
$this->configuration['auto_setup'] = false;
}
public function delete(string $id): void
{
$this->call($this->getQueueUrl(), [
'Action' => 'DeleteMessage',
'ReceiptHandle' => $id,
]);
}
public function getMessageCount(): int
{
$response = $this->request($this->getQueueUrl(), [
'Action' => 'GetQueueAttributes',
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) {
if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) {
continue;
}
return (int) $attribute->Value;
}
return 0;
}
public function send(string $body, array $headers, int $delay = 0): void
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$this->call($this->getQueueUrl(), [
'Action' => 'SendMessage',
'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
'DelaySeconds' => $delay,
]);
}
public function reset(): void
{
if (null !== $this->currentResponse) {
$this->currentResponse->cancel();
}
foreach ($this->getPendingMessages() as $message) {
$this->call($this->getQueueUrl(), [
'Action' => 'ChangeMessageVisibility',
'ReceiptHandle' => $message['id'],
'VisibilityTimeout' => 0,
]);
}
}
private function getQueueUrl(): string
{
if (null === $this->queueUrl) {
$parameters = [
'Action' => 'GetQueueUrl',
'QueueName' => $this->configuration['queue_name'],
];
if (isset($this->configuration['account'])) {
$parameters['QueueOwnerAWSAccountId'] = $this->configuration['account'];
}
$response = $this->request($this->configuration['endpoint'], $parameters);
$this->checkResponse($response);
$xml = new \SimpleXMLElement($response->getContent());
$this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl;
}
return $this->queueUrl;
}
private function call(string $endpoint, array $body): void
{
$this->checkResponse($this->request($endpoint, $body));
}
private function request(string $endpoint, array $body): ResponseInterface
{
if (!$this->configuration['access_key']) {
return $this->client->request('POST', $endpoint, ['body' => $body]);
}
$region = $this->configuration['region'];
$service = 'sqs';
$method = 'POST';
$requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738);
$amzDate = gmdate('Ymd\THis\Z');
$parsedUrl = parse_url($endpoint);
$headers = [
'host' => $parsedUrl['host'],
'x-amz-date' => $amzDate,
'content-type' => 'application/x-www-form-urlencoded',
];
$signedHeaders = ['host', 'x-amz-date'];
$canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string {
return sprintf('%s:%s', $headerName, $headers[$headerName]);
}, $signedHeaders))."\n";
$canonicalRequest = implode("\n", [
$method,
$parsedUrl['path'] ?? '/',
'',
$canonicalHeaders,
implode(';', $signedHeaders),
hash('sha256', $requestParameters),
]);
$algorithm = 'AWS4-HMAC-SHA256';
$credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request'];
$signingKey = 'AWS4'.$this->configuration['secret_key'];
foreach ($credentialScope as $credential) {
$signingKey = hash_hmac('sha256', $credential, $signingKey, true);
}
$stringToSign = implode("\n", [
$algorithm,
$amzDate,
implode('/', $credentialScope),
hash('sha256', $canonicalRequest),
]);
$authorizationHeader = sprintf(
'%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
$algorithm,
$this->configuration['access_key'],
implode('/', $credentialScope),
implode(';', $signedHeaders),
hash_hmac('sha256', $stringToSign, $signingKey)
);
$options = [
'headers' => $headers + [
'authorization' => $authorizationHeader,
],
'body' => $requestParameters,
];
return $this->client->request($method, $endpoint, $options);
}
private function checkResponse(ResponseInterface $response): void
{
if (200 !== $response->getStatusCode()) {
$error = new \SimpleXMLElement($response->getContent(false));
throw new TransportException($error->Error->Message);
}
}
}

View File

@ -0,0 +1,40 @@
{
"name": "symfony/amazon-sqs-messenger",
"type": "symfony-bridge",
"description": "Symfony Amazon SQS extension Messenger Bridge",
"keywords": [],
"homepage": "https://symfony.com",
"license": "MIT",
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"require": {
"php": "^7.2.5",
"symfony/http-client": "^4.3|5.0",
"symfony/messenger": "^4.3|^5.0"
},
"require-dev": {
"symfony/http-client-contracts": "^1.0|^2.0",
"symfony/property-access": "^4.4|^5.0",
"symfony/serializer": "^4.4|^5.0"
},
"autoload": {
"psr-4": { "Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "dev",
"extra": {
"branch-alias": {
"dev-master": "5.1-dev"
}
}
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/5.2/phpunit.xsd"
backupGlobals="false"
colors="true"
bootstrap="vendor/autoload.php"
failOnRisky="true"
failOnWarning="true"
>
<php>
<ini name="error_reporting" value="-1" />
</php>
<testsuites>
<testsuite name="Symfony Amazon SQS Messenger Component Test Suite">
<directory>./Tests/</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory>./</directory>
<exclude>
<directory>./Tests</directory>
<directory>./vendor</directory>
</exclude>
</whitelist>
</filter>
</phpunit>