feature #30375 [Messenger] Added transport agnostic exception (nikossvnk, lolmx)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Added transport agnostic exception | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | yes | Deprecations? | no | Tests pass? | yes | Fixed tickets | #30346 | License | MIT | Doc PR | TODO As described in #30346, client code shouldn't care about which transport is currently used by the message bus. This pr adds a new generic exception that is thrown by the `AmqpSender` if the message couldn't be delivered. Commits -------7d6a3fa487
Updated changelog to document changes in AmqpReceiver62a08eeea0
Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportExceptionb2b0640d80
Chain new exception with previous one06c84040c4
forgot one backslash, my bad93c10013fa
[Messenger] Added new TransportException which is thrown if transport could not send a message
This commit is contained in:
commit
b727f59e01
|
@ -50,6 +50,11 @@ HttpFoundation
|
|||
* The `FileinfoMimeTypeGuesser` class has been deprecated,
|
||||
use `Symfony\Component\Mime\FileinfoMimeTypeGuesser` instead.
|
||||
|
||||
Messenger
|
||||
---------
|
||||
|
||||
* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
|
||||
|
||||
Routing
|
||||
-------
|
||||
|
||||
|
|
|
@ -11,6 +11,11 @@ CHANGELOG
|
|||
changed from `Serializer` to `PhpSerializer` inside `AmqpReceiver`,
|
||||
`AmqpSender`, `AmqpTransport` and `AmqpTransportFactory`.
|
||||
|
||||
* Added `TransportException` to mark an exception transport-related
|
||||
|
||||
* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
|
||||
no longer thrown in favor of `TransportException`.
|
||||
|
||||
4.2.0
|
||||
-----
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Exception;
|
||||
|
||||
/**
|
||||
* @author Eric Masoero <em@studeal.fr>
|
||||
*
|
||||
* @experimental in 4.2
|
||||
*/
|
||||
class TransportException extends RuntimeException
|
||||
{
|
||||
}
|
|
@ -101,6 +101,83 @@ class AmqpReceiverTest extends TestCase
|
|||
throw new WillNeverWorkException('Well...');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||
*/
|
||||
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||
);
|
||||
|
||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
||||
$envelope->method('getHeaders')->willReturn([
|
||||
'type' => DummyMessage::class,
|
||||
]);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($envelope);
|
||||
|
||||
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||
$receiver->stop();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||
*/
|
||||
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||
);
|
||||
|
||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
||||
$envelope->method('getHeaders')->willReturn([
|
||||
'type' => DummyMessage::class,
|
||||
]);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($envelope);
|
||||
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->receive(function () {
|
||||
throw new WillNeverWorkException('Well...');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||
*/
|
||||
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
|
||||
{
|
||||
$serializer = new Serializer(
|
||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||
);
|
||||
|
||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
||||
$envelope->method('getHeaders')->willReturn([
|
||||
'type' => DummyMessage::class,
|
||||
]);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($envelope);
|
||||
|
||||
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
|
||||
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
$receiver->receive(function () {
|
||||
throw new InterruptException('Well...');
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class InterruptException extends \Exception
|
||||
|
|
|
@ -37,4 +37,22 @@ class AmqpSenderTest extends TestCase
|
|||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||
*/
|
||||
public function testItThrowsATransportExceptionIfItCannotSendTheMessage()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Oy'));
|
||||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
||||
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('publish')->with($encoded['body'], $encoded['headers'])->willThrowException(new \AMQPException());
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
|
@ -61,11 +62,21 @@ class AmqpReceiver implements ReceiverInterface
|
|||
|
||||
$this->connection->ack($AMQPEnvelope);
|
||||
} catch (RejectMessageExceptionInterface $e) {
|
||||
try {
|
||||
$this->connection->reject($AMQPEnvelope);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
throw $e;
|
||||
} catch (\AMQPException $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
} catch (\Throwable $e) {
|
||||
try {
|
||||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
throw $e;
|
||||
} finally {
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
@ -41,7 +42,11 @@ class AmqpSender implements SenderInterface
|
|||
{
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
|
||||
try {
|
||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
|
||||
} catch (\AMQPException $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
|
Reference in New Issue