add support for symfony/mercure:^0.5

This commit is contained in:
azjezz 2021-03-25 15:03:15 +01:00
parent 2dcf313a87
commit d3306fdc92
5 changed files with 107 additions and 65 deletions

View File

@ -95,6 +95,7 @@ use Symfony\Component\Mailer\Bridge\Postmark\Transport\PostmarkTransportFactory;
use Symfony\Component\Mailer\Bridge\Sendgrid\Transport\SendgridTransportFactory;
use Symfony\Component\Mailer\Bridge\Sendinblue\Transport\SendinblueTransportFactory;
use Symfony\Component\Mailer\Mailer;
use Symfony\Component\Mercure\HubRegistry;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
@ -2368,11 +2369,17 @@ class FrameworkExtension extends Extension
}
}
if (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages) && ContainerBuilder::willBeAvailable('symfony/mercure-bundle', MercureBundle::class, $parentPackages)) {
$container->getDefinition($classToServices[MercureTransportFactory::class])
->replaceArgument('$publisherLocator', new ServiceLocatorArgument(new TaggedIteratorArgument('mercure.publisher', null, null, true)));
} elseif (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages)) {
$container->removeDefinition($classToServices[MercureTransportFactory::class]);
if (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages)) {
if (ContainerBuilder::willBeAvailable('symfony/mercure-bundle', MercureBundle::class, $parentPackages)) {
$definition = $container->getDefinition($classToServices[MercureTransportFactory::class]);
if (ContainerBuilder::willBeAvailable('symfony/mercure', HubRegistry::class, $parentPackages)) {
$definition->replaceArgument('$registry', new Reference(HubRegistry::class));
} else {
$definition->replaceArgument('$registry', new ServiceLocatorArgument(new TaggedIteratorArgument('mercure.publisher', null, null, true)));
}
} else {
$container->removeDefinition($classToServices[MercureTransportFactory::class]);
}
}
if (isset($config['admin_recipients'])) {

View File

@ -11,6 +11,10 @@
namespace Symfony\Component\Notifier\Bridge\Mercure;
use Symfony\Component\Mercure\Exception\InvalidArgumentException as MercureInvalidArgumentException;
use Symfony\Component\Mercure\Exception\RuntimeException as MercureRuntimeException;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\HubRegistry;
use Symfony\Component\Mercure\PublisherInterface;
use Symfony\Component\Mercure\Update;
use Symfony\Component\Notifier\Exception\InvalidArgumentException;
@ -32,21 +36,22 @@ use Symfony\Contracts\HttpClient\HttpClientInterface;
*/
final class MercureTransport extends AbstractTransport
{
private $publisher;
private $publisherId;
private $hub;
private $hubId;
private $topics;
/**
* @param HubInterface $hub
* @param string|string[]|null $topics
*/
public function __construct(PublisherInterface $publisher, string $publisherId, $topics = null, ?HttpClientInterface $client = null, ?EventDispatcherInterface $dispatcher = null)
public function __construct($hub, string $hubId, $topics = null, ?HttpClientInterface $client = null, ?EventDispatcherInterface $dispatcher = null)
{
if (null !== $topics && !\is_array($topics) && !\is_string($topics)) {
throw new \TypeError(sprintf('"%s()" expects parameter 3 to be an array of strings, a string or null, "%s" given.', __METHOD__, get_debug_type($topics)));
}
$this->publisher = $publisher;
$this->publisherId = $publisherId;
$this->hub = $hub;
$this->hubId = $hubId;
$this->topics = $topics ?? 'https://symfony.com/notifier';
parent::__construct($client, $dispatcher);
@ -54,7 +59,7 @@ final class MercureTransport extends AbstractTransport
public function __toString(): string
{
return sprintf('mercure://%s?%s', $this->publisherId, http_build_query(['topic' => $this->topics]));
return sprintf('mercure://%s?%s', $this->hubId, http_build_query(['topic' => $this->topics]));
}
public function supports(MessageInterface $message): bool
@ -87,7 +92,11 @@ final class MercureTransport extends AbstractTransport
]), $options->isPrivate(), $options->getId(), $options->getType(), $options->getRetry());
try {
$messageId = ($this->publisher)($update);
if ($this->hub instanceof HubInterface) {
$messageId = $this->hub->publish($update);
} else {
$messageId = ($this->hub)($update);
}
$sentMessage = new SentMessage($message, (string) $this);
$sentMessage->setMessageId($messageId);
@ -95,9 +104,9 @@ final class MercureTransport extends AbstractTransport
return $sentMessage;
} catch (HttpExceptionInterface $e) {
throw new TransportException('Unable to post the Mercure message: '.$e->getResponse()->getContent(false), $e->getResponse(), $e->getCode(), $e);
} catch (ExceptionInterface $e) {
} catch (ExceptionInterface | MercureRuntimeException $e) {
throw new RuntimeException('Unable to post the Mercure message: '.$e->getMessage(), $e->getCode(), $e);
} catch (\InvalidArgumentException $e) {
} catch (\InvalidArgumentException | MercureInvalidArgumentException $e) {
throw new InvalidArgumentException('Unable to post the Mercure message: '.$e->getMessage(), $e->getCode(), $e);
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Notifier\Bridge\Mercure;
use Symfony\Bundle\MercureBundle\MercureBundle;
use Symfony\Component\Mercure\HubRegistry;
use Symfony\Component\Mercure\PublisherInterface;
use Symfony\Component\Notifier\Exception\LogicException;
use Symfony\Component\Notifier\Exception\UnsupportedSchemeException;
@ -25,16 +26,16 @@ use Symfony\Contracts\Service\ServiceProviderInterface;
*/
final class MercureTransportFactory extends AbstractTransportFactory
{
private $publisherLocator;
private $registry;
/**
* @param ServiceProviderInterface $publisherLocator A container that holds {@see PublisherInterface} instances
* @param HubRegistry $registry
*/
public function __construct(ServiceProviderInterface $publisherLocator)
public function __construct(HubRegistry $registry)
{
parent::__construct();
$this->publisherLocator = $publisherLocator;
$this->registry = $registry;
}
/**
@ -46,18 +47,24 @@ final class MercureTransportFactory extends AbstractTransportFactory
throw new UnsupportedSchemeException($dsn, 'mercure', $this->getSupportedSchemes());
}
$publisherId = $dsn->getHost();
if (!$this->publisherLocator->has($publisherId)) {
$hubId = $dsn->getHost();
$topic = $dsn->getOption('topic');
if ($this->registry instanceof HubRegistry) {
$hub = $this->registry->getHub($hubId);
return new MercureTransport($hub, $hubId, $topic);
}
if (!$this->publisherLocator->has($hubId)) {
if (!class_exists(MercureBundle::class) && !$this->publisherLocator->getProvidedServices()) {
throw new LogicException('No publishers found. Did you forget to install the MercureBundle? Try running "composer require symfony/mercure-bundle".');
}
throw new LogicException(sprintf('"%s" not found. Did you mean one of: %s?', $publisherId, implode(', ', array_keys($this->publisherLocator->getProvidedServices()))));
throw new LogicException(sprintf('"%s" not found. Did you mean one of: %s?', $hubId, implode(', ', array_keys($this->publisherLocator->getProvidedServices()))));
}
$topic = $dsn->getOption('topic');
return new MercureTransport($this->publisherLocator->get($publisherId), $publisherId, $topic);
return new MercureTransport($this->publisherLocator->get($hubId), $hubId, $topic);
}
protected function getSupportedSchemes(): array

View File

@ -12,6 +12,10 @@
namespace Symfony\Component\Notifier\Bridge\Mercure\Tests;
use Symfony\Component\HttpClient\Exception\TransportException as HttpClientTransportException;
use Symfony\Component\Mercure\Exception\RuntimeException as MercureRuntimeException;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Jwt\StaticTokenProvider;
use Symfony\Component\Mercure\MockHub;
use Symfony\Component\Mercure\PublisherInterface;
use Symfony\Component\Mercure\Update;
use Symfony\Component\Notifier\Bridge\Mercure\MercureOptions;
@ -36,11 +40,11 @@ use TypeError;
*/
final class MercureTransportTest extends TransportTestCase
{
public function createTransport(?HttpClientInterface $client = null, ?PublisherInterface $publisher = null, string $publisherId = 'publisherId', $topics = null): TransportInterface
public function createTransport(?HttpClientInterface $client = null, ?HubInterface $hub = null, string $hubId = 'publisherId', $topics = null): TransportInterface
{
$publisher = $publisher ?? $this->createMock(PublisherInterface::class);
$hub = $hub ?? $this->createMock(HubInterface::class);
return new MercureTransport($publisher, $publisherId, $topics);
return new MercureTransport($hub, $hubId, $topics);
}
public function toStringProvider(): iterable
@ -90,87 +94,102 @@ final class MercureTransportTest extends TransportTestCase
public function testSendWithTransportFailureThrows()
{
$publisher = $this->createMock(PublisherInterface::class);
$publisher->method('__invoke')->willThrowException(new HttpClientTransportException('Cannot connect to mercure'));
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void {
throw new MercureRuntimeException('Cannot connect to mercure');
});
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Unable to post the Mercure message: Cannot connect to mercure');
$this->createTransport(null, $publisher)->send(new ChatMessage('subject'));
$this->createTransport(null, $hub)->send(new ChatMessage('subject'));
}
public function testSendWithWrongResponseThrows()
{
$response = $this->createMock(ResponseInterface::class);
$response->method('getContent')->willReturn('Service Unavailable');
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void {
$response = $this->createMock(ResponseInterface::class);
$response->method('getContent')->willReturn('Service Unavailable');
$httpException = $this->createMock(ServerExceptionInterface::class);
$httpException->method('getResponse')->willReturn($response);
$httpException = $this->createMock(ServerExceptionInterface::class);
$httpException->method('getResponse')->willReturn($response);
$publisher = $this->createMock(PublisherInterface::class);
$publisher->method('__invoke')->willThrowException($httpException);
throw $httpException;
});
$this->expectException(TransportException::class);
$this->expectExceptionMessage('Unable to post the Mercure message: Service Unavailable');
$this->createTransport(null, $publisher)->send(new ChatMessage('subject'));
$this->createTransport(null, $hub)->send(new ChatMessage('subject'));
}
public function testSendWithWrongTokenThrows()
{
$publisher = $this->createMock(PublisherInterface::class);
$publisher->method('__invoke')->willThrowException(new \InvalidArgumentException('The provided JWT is not valid'));
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void {
throw new \InvalidArgumentException('The provided JWT is not valid');
});
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Unable to post the Mercure message: The provided JWT is not valid');
$this->createTransport(null, $publisher)->send(new ChatMessage('subject'));
$this->createTransport(null, $hub)->send(new ChatMessage('subject'));
}
public function testSendWithMercureOptions()
{
$publisher = $this->createMock(PublisherInterface::class);
$publisher
->expects($this->once())
->method('__invoke')
->with(new Update(['/topic/1', '/topic/2'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', true, 'id', 'type', 1))
;
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string {
$this->assertSame(['/topic/1', '/topic/2'], $update->getTopics());
$this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData());
$this->assertSame('id', $update->getId());
$this->assertSame('type', $update->getType());
$this->assertSame('1', $update->getRetry());
$this->assertTrue($update->isPrivate());
$this->createTransport(null, $publisher)->send(new ChatMessage('subject', new MercureOptions(['/topic/1', '/topic/2'], true, 'id', 'type', 1)));
return 'id';
});
$this->createTransport(null, $hub)->send(new ChatMessage('subject', new MercureOptions(['/topic/1', '/topic/2'], true, 'id', 'type', 1)));
}
public function testSendWithMercureOptionsButWithoutOptionTopic()
{
$publisher = $this->createMock(PublisherInterface::class);
$publisher
->expects($this->once())
->method('__invoke')
->with(new Update(['https://symfony.com/notifier'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', true, 'id', 'type', 1))
;
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string {
$this->assertSame(['https://symfony.com/notifier'], $update->getTopics());
$this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData());
$this->assertSame('id', $update->getId());
$this->assertSame('type', $update->getType());
$this->assertSame('1', $update->getRetry());
$this->assertTrue($update->isPrivate());
$this->createTransport(null, $publisher)->send(new ChatMessage('subject', new MercureOptions(null, true, 'id', 'type', 1)));
return 'id';
});
$this->createTransport(null, $hub)->send(new ChatMessage('subject', new MercureOptions(null, true, 'id', 'type', 1)));
}
public function testSendWithoutMercureOptions()
{
$publisher = $this->createMock(PublisherInterface::class);
$publisher
->expects($this->once())
->method('__invoke')
->with(new Update(['https://symfony.com/notifier'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}'))
;
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string {
$this->assertSame(['https://symfony.com/notifier'], $update->getTopics());
$this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData());
$this->createTransport(null, $publisher)->send(new ChatMessage('subject'));
return 'id';
});
$this->createTransport(null, $hub)->send(new ChatMessage('subject'));
}
public function testSendSuccessfully()
{
$messageId = 'urn:uuid:a7045be0-a75d-4d40-8bd2-29fa4e5dd10b';
$publisher = $this->createMock(PublisherInterface::class);
$publisher->method('__invoke')->willReturn($messageId);
$hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update) use($messageId): string {
$this->assertSame(['https://symfony.com/notifier'], $update->getTopics());
$this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData());
$sentMessage = $this->createTransport(null, $publisher)->send(new ChatMessage('subject'));
return $messageId;
});
$sentMessage = $this->createTransport(null, $hub)->send(new ChatMessage('subject'));
$this->assertSame($messageId, $sentMessage->getMessageId());
}
}

View File

@ -18,7 +18,7 @@
"require": {
"php": ">=7.2.5",
"ext-json": "*",
"symfony/mercure": "^0.4",
"symfony/mercure": "^0.4|^0.5",
"symfony/notifier": "^5.3",
"symfony/service-contracts": "^1.10|^2"
},