[Messenger] set amqp content_type based on serialization format

This commit is contained in:
Tobias Schultze 2019-06-02 04:44:45 +02:00
parent d8224b87c8
commit dee077016d
4 changed files with 75 additions and 3 deletions

View File

@ -69,6 +69,39 @@ class AmqpSenderTest extends TestCase
$sender->send($envelope);
}
public function testContentTypeHeaderIsMovedToAttribute()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
unset($encoded['headers']['Content-Type']);
$stamp = new AmqpStamp(null, AMQP_NOPARAM, ['content_type' => 'application/json']);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
public function testContentTypeHeaderDoesNotOverwriteAttribute()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk', AMQP_NOPARAM, ['content_type' => 'custom']));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
unset($encoded['headers']['Content-Type']);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/

View File

@ -55,7 +55,8 @@ class SerializerTest extends TestCase
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
$this->assertSame(DummyMessage::class, $encoded['headers']['type']);
$this->assertSame('application/json', $encoded['headers']['Content-Type']);
}
public function testUsesTheCustomFormatAndContext()

View File

@ -50,12 +50,26 @@ class AmqpSender implements SenderInterface
$delay = $delayStamp->getDelay();
}
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
unset($encodedMessage['headers']['Content-Type']);
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
if (!isset($attributes['content_type'])) {
$attributes['content_type'] = $contentType;
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
}
}
try {
$this->connection->publish(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$envelope->last(AmqpStamp::class)
$amqpStamp
);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);

View File

@ -101,7 +101,7 @@ class Serializer implements SerializerInterface
$envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class);
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope);
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope) + $this->getContentTypeHeader();
return [
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
@ -157,4 +157,28 @@ class Serializer implements SerializerInterface
return null;
}
private function getContentTypeHeader(): array
{
$mimeType = $this->getMimeTypeForFormat();
return null === $mimeType ? [] : ['Content-Type' => $mimeType];
}
private function getMimeTypeForFormat(): ?string
{
switch ($this->format) {
case 'json':
return 'application/json';
case 'xml':
return 'application/xml';
case 'yml':
case 'yaml':
return 'application/x-yaml';
case 'csv':
return 'text/csv';
}
return null;
}
}