Improve SQS interoperability

This commit is contained in:
Jérémy Derussé 2020-04-22 13:43:49 +02:00 committed by Robin Chalas
parent 829566cdea
commit 00d84c125e
3 changed files with 26 additions and 10 deletions

View File

@ -21,8 +21,8 @@ env:
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
- MESSENGER_SQS_DSN="sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01"
- MESSENGER_SQS_FIFO_QUEUE_DSN="sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01"
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
matrix:
@ -73,8 +73,8 @@ before_install:
- |
# Start Sqs server
docker pull feathj/fake-sqs
docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
docker pull asyncaws/testing-sqs
docker run -d -p 9494:9494 --name sqs asyncaws/testing-sqs
- |
# Start Kafka and install an up-to-date librdkafka

View File

@ -109,7 +109,7 @@ class ConnectionTest extends TestCase
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>

View File

@ -176,6 +176,7 @@ class Connection
'Action' => 'ReceiveMessage',
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MessageAttributeName.1' => 'All',
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
@ -186,9 +187,18 @@ class Connection
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
$headers = [];
foreach ($xmlMessage->MessageAttribute as $item) {
if ('String' !== (string) $item->Value->DataType) {
continue;
}
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
}
$this->buffer[] = [
'id' => (string) $xmlMessage->ReceiptHandle,
] + json_decode($xmlMessage->Body, true);
'body' => (string) $xmlMessage->Body,
'headers' => $headers,
];
}
$this->currentResponse = null;
@ -246,17 +256,23 @@ class Connection
$this->setup();
}
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
$parameters = [
'Action' => 'SendMessage',
'MessageBody' => $messageBody,
'MessageBody' => $body,
'DelaySeconds' => $delay,
];
$index = 0;
foreach ($headers as $name => $value) {
++$index;
$parameters["MessageAttribute.$index.Name"] = $name;
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
}
if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
}
$this->call($this->getQueueUrl(), $parameters);