[Messenger] set amqp content_type based on serialization format
This commit is contained in:
parent
d8224b87c8
commit
dee077016d
@ -69,6 +69,39 @@ class AmqpSenderTest extends TestCase
|
|||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testContentTypeHeaderIsMovedToAttribute()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Oy'));
|
||||||
|
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
|
||||||
|
|
||||||
|
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||||
|
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||||
|
|
||||||
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
unset($encoded['headers']['Content-Type']);
|
||||||
|
$stamp = new AmqpStamp(null, AMQP_NOPARAM, ['content_type' => 'application/json']);
|
||||||
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
|
||||||
|
|
||||||
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
|
$sender->send($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testContentTypeHeaderDoesNotOverwriteAttribute()
|
||||||
|
{
|
||||||
|
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk', AMQP_NOPARAM, ['content_type' => 'custom']));
|
||||||
|
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class, 'Content-Type' => 'application/json']];
|
||||||
|
|
||||||
|
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||||
|
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||||
|
|
||||||
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
unset($encoded['headers']['Content-Type']);
|
||||||
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
|
||||||
|
|
||||||
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
|
$sender->send($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||||
*/
|
*/
|
||||||
|
@ -55,7 +55,8 @@ class SerializerTest extends TestCase
|
|||||||
$this->assertArrayHasKey('body', $encoded);
|
$this->assertArrayHasKey('body', $encoded);
|
||||||
$this->assertArrayHasKey('headers', $encoded);
|
$this->assertArrayHasKey('headers', $encoded);
|
||||||
$this->assertArrayHasKey('type', $encoded['headers']);
|
$this->assertArrayHasKey('type', $encoded['headers']);
|
||||||
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
$this->assertSame(DummyMessage::class, $encoded['headers']['type']);
|
||||||
|
$this->assertSame('application/json', $encoded['headers']['Content-Type']);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testUsesTheCustomFormatAndContext()
|
public function testUsesTheCustomFormatAndContext()
|
||||||
|
@ -50,12 +50,26 @@ class AmqpSender implements SenderInterface
|
|||||||
$delay = $delayStamp->getDelay();
|
$delay = $delayStamp->getDelay();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$amqpStamp = $envelope->last(AmqpStamp::class);
|
||||||
|
if (isset($encodedMessage['headers']['Content-Type'])) {
|
||||||
|
$contentType = $encodedMessage['headers']['Content-Type'];
|
||||||
|
unset($encodedMessage['headers']['Content-Type']);
|
||||||
|
|
||||||
|
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||||
|
|
||||||
|
if (!isset($attributes['content_type'])) {
|
||||||
|
$attributes['content_type'] = $contentType;
|
||||||
|
|
||||||
|
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->connection->publish(
|
$this->connection->publish(
|
||||||
$encodedMessage['body'],
|
$encodedMessage['body'],
|
||||||
$encodedMessage['headers'] ?? [],
|
$encodedMessage['headers'] ?? [],
|
||||||
$delay,
|
$delay,
|
||||||
$envelope->last(AmqpStamp::class)
|
$amqpStamp
|
||||||
);
|
);
|
||||||
} catch (\AMQPException $e) {
|
} catch (\AMQPException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
|
@ -101,7 +101,7 @@ class Serializer implements SerializerInterface
|
|||||||
|
|
||||||
$envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class);
|
$envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class);
|
||||||
|
|
||||||
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope);
|
$headers = ['type' => \get_class($envelope->getMessage())] + $this->encodeStamps($envelope) + $this->getContentTypeHeader();
|
||||||
|
|
||||||
return [
|
return [
|
||||||
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
|
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
|
||||||
@ -157,4 +157,28 @@ class Serializer implements SerializerInterface
|
|||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function getContentTypeHeader(): array
|
||||||
|
{
|
||||||
|
$mimeType = $this->getMimeTypeForFormat();
|
||||||
|
|
||||||
|
return null === $mimeType ? [] : ['Content-Type' => $mimeType];
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getMimeTypeForFormat(): ?string
|
||||||
|
{
|
||||||
|
switch ($this->format) {
|
||||||
|
case 'json':
|
||||||
|
return 'application/json';
|
||||||
|
case 'xml':
|
||||||
|
return 'application/xml';
|
||||||
|
case 'yml':
|
||||||
|
case 'yaml':
|
||||||
|
return 'application/x-yaml';
|
||||||
|
case 'csv':
|
||||||
|
return 'text/csv';
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user