diff --git a/UPGRADE-4.3.md b/UPGRADE-4.3.md index 6c348a3403..d81e393c44 100644 --- a/UPGRADE-4.3.md +++ b/UPGRADE-4.3.md @@ -50,6 +50,11 @@ HttpFoundation * The `FileinfoMimeTypeGuesser` class has been deprecated, use `Symfony\Component\Mime\FileinfoMimeTypeGuesser` instead. +Messenger +--------- + + * `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead. + Routing ------- diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index ff901be210..b7f604f41c 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -11,6 +11,11 @@ CHANGELOG changed from `Serializer` to `PhpSerializer` inside `AmqpReceiver`, `AmqpSender`, `AmqpTransport` and `AmqpTransportFactory`. + * Added `TransportException` to mark an exception transport-related + + * [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is + no longer thrown in favor of `TransportException`. + 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Exception/TransportException.php b/src/Symfony/Component/Messenger/Exception/TransportException.php new file mode 100644 index 0000000000..e94daba209 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/TransportException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Eric Masoero + * + * @experimental in 4.2 + */ +class TransportException extends RuntimeException +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 23f1c8b2de..8e224e0653 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -101,6 +101,83 @@ class AmqpReceiverTest extends TestCase throw new WillNeverWorkException('Well...'); }); } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->method('ack')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $receiver->stop(); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotRejectMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->method('reject')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function () { + throw new WillNeverWorkException('Well...'); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->method('nack')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function () { + throw new InterruptException('Well...'); + }); + } } class InterruptException extends \Exception diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index caef14c9e2..7fcb41c159 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -37,4 +37,22 @@ class AmqpSenderTest extends TestCase $sender = new AmqpSender($connection, $serializer); $sender->send($envelope); } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotSendTheMessage() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('publish')->with($encoded['body'], $encoded['headers'])->willThrowException(new \AMQPException()); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index fdd81cac54..cb7a4db013 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -61,11 +62,21 @@ class AmqpReceiver implements ReceiverInterface $this->connection->ack($AMQPEnvelope); } catch (RejectMessageExceptionInterface $e) { - $this->connection->reject($AMQPEnvelope); + try { + $this->connection->reject($AMQPEnvelope); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } throw $e; + } catch (\AMQPException $e) { + throw new TransportException($e->getMessage(), 0, $e); } catch (\Throwable $e) { - $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + try { + $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } throw $e; } finally { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 74ab0dfaa9..e9760ac2eb 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -41,7 +42,11 @@ class AmqpSender implements SenderInterface { $encodedMessage = $this->serializer->encode($envelope); - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + try { + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + } catch (\AMQPException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } return $envelope; }