From dee077016df2b0bbc9701ab128f380f6dcbb2059 Mon Sep 17 00:00:00 2001 From: Tobias Schultze Date: Sun, 2 Jun 2019 04:44:45 +0200 Subject: [PATCH] [Messenger] set amqp content_type based on serialization format --- .../Transport/AmqpExt/AmqpSenderTest.php | 33 +++++++++++++++++++ .../Serialization/SerializerTest.php | 3 +- .../Transport/AmqpExt/AmqpSender.php | 16 ++++++++- .../Transport/Serialization/Serializer.php | 26 ++++++++++++++- 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 95380a9e55..c13345f033 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -69,6 +69,39 @@ class AmqpSenderTest extends TestCase $sender->send($envelope); } + public function testContentTypeHeaderIsMovedToAttribute() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']]; + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + unset($encoded['headers']['Content-Type']); + $stamp = new AmqpStamp(null, AMQP_NOPARAM, ['content_type' => 'application/json']); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + + public function testContentTypeHeaderDoesNotOverwriteAttribute() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk', AMQP_NOPARAM, ['content_type' => 'custom'])); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']]; + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + unset($encoded['headers']['Content-Type']); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + /** * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php index f43ad2efcb..897e4b10e0 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php @@ -55,7 +55,8 @@ class SerializerTest extends TestCase $this->assertArrayHasKey('body', $encoded); $this->assertArrayHasKey('headers', $encoded); $this->assertArrayHasKey('type', $encoded['headers']); - $this->assertEquals(DummyMessage::class, $encoded['headers']['type']); + $this->assertSame(DummyMessage::class, $encoded['headers']['type']); + $this->assertSame('application/json', $encoded['headers']['Content-Type']); } public function testUsesTheCustomFormatAndContext() diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index c570c4cb33..cc97af135e 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -50,12 +50,26 @@ class AmqpSender implements SenderInterface $delay = $delayStamp->getDelay(); } + $amqpStamp = $envelope->last(AmqpStamp::class); + if (isset($encodedMessage['headers']['Content-Type'])) { + $contentType = $encodedMessage['headers']['Content-Type']; + unset($encodedMessage['headers']['Content-Type']); + + $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; + + if (!isset($attributes['content_type'])) { + $attributes['content_type'] = $contentType; + + $amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes); + } + } + try { $this->connection->publish( $encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, - $envelope->last(AmqpStamp::class) + $amqpStamp ); } catch (\AMQPException $e) { throw new TransportException($e->getMessage(), 0, $e); diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php index c591acd050..3c13cb6e90 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php @@ -101,7 +101,7 @@ class Serializer implements SerializerInterface $envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class); - $headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope); + $headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope) + $this->getContentTypeHeader(); return [ 'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context), @@ -157,4 +157,28 @@ class Serializer implements SerializerInterface return null; } + + private function getContentTypeHeader(): array + { + $mimeType = $this->getMimeTypeForFormat(); + + return null === $mimeType ? [] : ['Content-Type' => $mimeType]; + } + + private function getMimeTypeForFormat(): ?string + { + switch ($this->format) { + case 'json': + return 'application/json'; + case 'xml': + return 'application/xml'; + case 'yml': + case 'yaml': + return 'application/x-yaml'; + case 'csv': + return 'text/csv'; + } + + return null; + } }