feature #36525 Improve SQS interoperability (jderusse)
This PR was squashed before being merged into the 5.1-dev branch (closes #36525).
Discussion
----------
Improve SQS interoperability
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| Deprecations? | no
| Tickets | NA
| License | MIT
| Doc PR | NA
The Symfony Messenger component provides a SerializerInterface to encode/decode the `Envelope`, this can be used to improve the Interoperability (see [article from jolicode](https://jolicode.com/blog/symfony-messenger-et-linteroperabilite) (french))
Sadly, the current implementation of SQS adapter json_encode the elements of the `Envelope` (`string body` + `string[] headers`) and store everything in the SQS message `Body`. That partially defect the interoperability: 3rd party have to also wrap (unwrap) message form json_encoded Body.
This PR leverage the AWS SQS `Body` and `MessageAttribute` properties to store message information:
```yaml
# before
SQS Message:
Body: {"body": "hello world", "headers": {"foo": "bar"}}
MessageAttributes: {}
# after
SQS Message:
Body: hello world
MessageAttributes:
foor: bar
```
Commits
-------
00d84c125e
Improve SQS interoperability
This commit is contained in:
commit
83b37e8010
@ -21,8 +21,8 @@ env:
|
|||||||
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
|
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
|
||||||
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
|
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
|
||||||
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
|
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
|
||||||
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
|
- MESSENGER_SQS_DSN="sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01"
|
||||||
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
|
- MESSENGER_SQS_FIFO_QUEUE_DSN="sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01"
|
||||||
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
|
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
@ -73,8 +73,8 @@ before_install:
|
|||||||
|
|
||||||
- |
|
- |
|
||||||
# Start Sqs server
|
# Start Sqs server
|
||||||
docker pull feathj/fake-sqs
|
docker pull asyncaws/testing-sqs
|
||||||
docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
|
docker run -d -p 9494:9494 --name sqs asyncaws/testing-sqs
|
||||||
|
|
||||||
- |
|
- |
|
||||||
# Start Kafka and install an up-to-date librdkafka
|
# Start Kafka and install an up-to-date librdkafka
|
||||||
|
@ -109,7 +109,7 @@ class ConnectionTest extends TestCase
|
|||||||
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
|
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
|
||||||
|
|
||||||
$httpClient->expects($this->at(1))->method('request')
|
$httpClient->expects($this->at(1))->method('request')
|
||||||
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
|
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
|
||||||
->willReturn($response);
|
->willReturn($response);
|
||||||
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
|
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
|
||||||
<ReceiveMessageResult>
|
<ReceiveMessageResult>
|
||||||
|
@ -176,6 +176,7 @@ class Connection
|
|||||||
'Action' => 'ReceiveMessage',
|
'Action' => 'ReceiveMessage',
|
||||||
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
|
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
|
||||||
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
|
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
|
||||||
|
'MessageAttributeName.1' => 'All',
|
||||||
'WaitTimeSeconds' => $this->configuration['wait_time'],
|
'WaitTimeSeconds' => $this->configuration['wait_time'],
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
@ -186,9 +187,18 @@ class Connection
|
|||||||
|
|
||||||
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
|
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
|
||||||
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
|
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
|
||||||
|
$headers = [];
|
||||||
|
foreach ($xmlMessage->MessageAttribute as $item) {
|
||||||
|
if ('String' !== (string) $item->Value->DataType) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
|
||||||
|
}
|
||||||
$this->buffer[] = [
|
$this->buffer[] = [
|
||||||
'id' => (string) $xmlMessage->ReceiptHandle,
|
'id' => (string) $xmlMessage->ReceiptHandle,
|
||||||
] + json_decode($xmlMessage->Body, true);
|
'body' => (string) $xmlMessage->Body,
|
||||||
|
'headers' => $headers,
|
||||||
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->currentResponse = null;
|
$this->currentResponse = null;
|
||||||
@ -246,17 +256,23 @@ class Connection
|
|||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
|
|
||||||
|
|
||||||
$parameters = [
|
$parameters = [
|
||||||
'Action' => 'SendMessage',
|
'Action' => 'SendMessage',
|
||||||
'MessageBody' => $messageBody,
|
'MessageBody' => $body,
|
||||||
'DelaySeconds' => $delay,
|
'DelaySeconds' => $delay,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
$index = 0;
|
||||||
|
foreach ($headers as $name => $value) {
|
||||||
|
++$index;
|
||||||
|
$parameters["MessageAttribute.$index.Name"] = $name;
|
||||||
|
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
|
||||||
|
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
|
||||||
|
}
|
||||||
|
|
||||||
if ($this->isFifoQueue($this->configuration['queue_name'])) {
|
if ($this->isFifoQueue($this->configuration['queue_name'])) {
|
||||||
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
|
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
|
||||||
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
|
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->call($this->getQueueUrl(), $parameters);
|
$this->call($this->getQueueUrl(), $parameters);
|
||||||
|
Reference in New Issue
Block a user