diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 4b3e4427a0..5b89dd4f3e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -95,6 +95,7 @@ use Symfony\Component\Mailer\Bridge\Postmark\Transport\PostmarkTransportFactory; use Symfony\Component\Mailer\Bridge\Sendgrid\Transport\SendgridTransportFactory; use Symfony\Component\Mailer\Bridge\Sendinblue\Transport\SendinblueTransportFactory; use Symfony\Component\Mailer\Mailer; +use Symfony\Component\Mercure\HubRegistry; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory; @@ -2368,11 +2369,17 @@ class FrameworkExtension extends Extension } } - if (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages) && ContainerBuilder::willBeAvailable('symfony/mercure-bundle', MercureBundle::class, $parentPackages)) { - $container->getDefinition($classToServices[MercureTransportFactory::class]) - ->replaceArgument('$publisherLocator', new ServiceLocatorArgument(new TaggedIteratorArgument('mercure.publisher', null, null, true))); - } elseif (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages)) { - $container->removeDefinition($classToServices[MercureTransportFactory::class]); + if (ContainerBuilder::willBeAvailable('symfony/mercure-notifier', MercureTransportFactory::class, $parentPackages)) { + if (ContainerBuilder::willBeAvailable('symfony/mercure-bundle', MercureBundle::class, $parentPackages)) { + $definition = $container->getDefinition($classToServices[MercureTransportFactory::class]); + if (ContainerBuilder::willBeAvailable('symfony/mercure', HubRegistry::class, $parentPackages)) { + $definition->replaceArgument('$registry', new Reference(HubRegistry::class)); + } else { + $definition->replaceArgument('$registry', new ServiceLocatorArgument(new TaggedIteratorArgument('mercure.publisher', null, null, true))); + } + } else { + $container->removeDefinition($classToServices[MercureTransportFactory::class]); + } } if (isset($config['admin_recipients'])) { diff --git a/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransport.php b/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransport.php index bbdb663f4a..2c9bd5c42d 100644 --- a/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransport.php +++ b/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransport.php @@ -11,6 +11,10 @@ namespace Symfony\Component\Notifier\Bridge\Mercure; +use Symfony\Component\Mercure\Exception\InvalidArgumentException as MercureInvalidArgumentException; +use Symfony\Component\Mercure\Exception\RuntimeException as MercureRuntimeException; +use Symfony\Component\Mercure\HubInterface; +use Symfony\Component\Mercure\HubRegistry; use Symfony\Component\Mercure\PublisherInterface; use Symfony\Component\Mercure\Update; use Symfony\Component\Notifier\Exception\InvalidArgumentException; @@ -32,21 +36,22 @@ use Symfony\Contracts\HttpClient\HttpClientInterface; */ final class MercureTransport extends AbstractTransport { - private $publisher; - private $publisherId; + private $hub; + private $hubId; private $topics; /** + * @param HubInterface $hub * @param string|string[]|null $topics */ - public function __construct(PublisherInterface $publisher, string $publisherId, $topics = null, ?HttpClientInterface $client = null, ?EventDispatcherInterface $dispatcher = null) + public function __construct($hub, string $hubId, $topics = null, ?HttpClientInterface $client = null, ?EventDispatcherInterface $dispatcher = null) { if (null !== $topics && !\is_array($topics) && !\is_string($topics)) { throw new \TypeError(sprintf('"%s()" expects parameter 3 to be an array of strings, a string or null, "%s" given.', __METHOD__, get_debug_type($topics))); } - $this->publisher = $publisher; - $this->publisherId = $publisherId; + $this->hub = $hub; + $this->hubId = $hubId; $this->topics = $topics ?? 'https://symfony.com/notifier'; parent::__construct($client, $dispatcher); @@ -54,7 +59,7 @@ final class MercureTransport extends AbstractTransport public function __toString(): string { - return sprintf('mercure://%s?%s', $this->publisherId, http_build_query(['topic' => $this->topics])); + return sprintf('mercure://%s?%s', $this->hubId, http_build_query(['topic' => $this->topics])); } public function supports(MessageInterface $message): bool @@ -87,7 +92,11 @@ final class MercureTransport extends AbstractTransport ]), $options->isPrivate(), $options->getId(), $options->getType(), $options->getRetry()); try { - $messageId = ($this->publisher)($update); + if ($this->hub instanceof HubInterface) { + $messageId = $this->hub->publish($update); + } else { + $messageId = ($this->hub)($update); + } $sentMessage = new SentMessage($message, (string) $this); $sentMessage->setMessageId($messageId); @@ -95,9 +104,9 @@ final class MercureTransport extends AbstractTransport return $sentMessage; } catch (HttpExceptionInterface $e) { throw new TransportException('Unable to post the Mercure message: '.$e->getResponse()->getContent(false), $e->getResponse(), $e->getCode(), $e); - } catch (ExceptionInterface $e) { + } catch (ExceptionInterface | MercureRuntimeException $e) { throw new RuntimeException('Unable to post the Mercure message: '.$e->getMessage(), $e->getCode(), $e); - } catch (\InvalidArgumentException $e) { + } catch (\InvalidArgumentException | MercureInvalidArgumentException $e) { throw new InvalidArgumentException('Unable to post the Mercure message: '.$e->getMessage(), $e->getCode(), $e); } } diff --git a/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransportFactory.php b/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransportFactory.php index a7411479d1..13fd1aba82 100644 --- a/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransportFactory.php +++ b/src/Symfony/Component/Notifier/Bridge/Mercure/MercureTransportFactory.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Notifier\Bridge\Mercure; use Symfony\Bundle\MercureBundle\MercureBundle; +use Symfony\Component\Mercure\HubRegistry; use Symfony\Component\Mercure\PublisherInterface; use Symfony\Component\Notifier\Exception\LogicException; use Symfony\Component\Notifier\Exception\UnsupportedSchemeException; @@ -25,16 +26,16 @@ use Symfony\Contracts\Service\ServiceProviderInterface; */ final class MercureTransportFactory extends AbstractTransportFactory { - private $publisherLocator; + private $registry; /** - * @param ServiceProviderInterface $publisherLocator A container that holds {@see PublisherInterface} instances + * @param HubRegistry $registry */ - public function __construct(ServiceProviderInterface $publisherLocator) + public function __construct(HubRegistry $registry) { parent::__construct(); - $this->publisherLocator = $publisherLocator; + $this->registry = $registry; } /** @@ -46,18 +47,24 @@ final class MercureTransportFactory extends AbstractTransportFactory throw new UnsupportedSchemeException($dsn, 'mercure', $this->getSupportedSchemes()); } - $publisherId = $dsn->getHost(); - if (!$this->publisherLocator->has($publisherId)) { + $hubId = $dsn->getHost(); + $topic = $dsn->getOption('topic'); + + if ($this->registry instanceof HubRegistry) { + $hub = $this->registry->getHub($hubId); + + return new MercureTransport($hub, $hubId, $topic); + } + + if (!$this->publisherLocator->has($hubId)) { if (!class_exists(MercureBundle::class) && !$this->publisherLocator->getProvidedServices()) { throw new LogicException('No publishers found. Did you forget to install the MercureBundle? Try running "composer require symfony/mercure-bundle".'); } - throw new LogicException(sprintf('"%s" not found. Did you mean one of: %s?', $publisherId, implode(', ', array_keys($this->publisherLocator->getProvidedServices())))); + throw new LogicException(sprintf('"%s" not found. Did you mean one of: %s?', $hubId, implode(', ', array_keys($this->publisherLocator->getProvidedServices())))); } - $topic = $dsn->getOption('topic'); - - return new MercureTransport($this->publisherLocator->get($publisherId), $publisherId, $topic); + return new MercureTransport($this->publisherLocator->get($hubId), $hubId, $topic); } protected function getSupportedSchemes(): array diff --git a/src/Symfony/Component/Notifier/Bridge/Mercure/Tests/MercureTransportTest.php b/src/Symfony/Component/Notifier/Bridge/Mercure/Tests/MercureTransportTest.php index 320b422fc4..79e4a80678 100644 --- a/src/Symfony/Component/Notifier/Bridge/Mercure/Tests/MercureTransportTest.php +++ b/src/Symfony/Component/Notifier/Bridge/Mercure/Tests/MercureTransportTest.php @@ -12,6 +12,10 @@ namespace Symfony\Component\Notifier\Bridge\Mercure\Tests; use Symfony\Component\HttpClient\Exception\TransportException as HttpClientTransportException; +use Symfony\Component\Mercure\Exception\RuntimeException as MercureRuntimeException; +use Symfony\Component\Mercure\HubInterface; +use Symfony\Component\Mercure\Jwt\StaticTokenProvider; +use Symfony\Component\Mercure\MockHub; use Symfony\Component\Mercure\PublisherInterface; use Symfony\Component\Mercure\Update; use Symfony\Component\Notifier\Bridge\Mercure\MercureOptions; @@ -36,11 +40,11 @@ use TypeError; */ final class MercureTransportTest extends TransportTestCase { - public function createTransport(?HttpClientInterface $client = null, ?PublisherInterface $publisher = null, string $publisherId = 'publisherId', $topics = null): TransportInterface + public function createTransport(?HttpClientInterface $client = null, ?HubInterface $hub = null, string $hubId = 'publisherId', $topics = null): TransportInterface { - $publisher = $publisher ?? $this->createMock(PublisherInterface::class); + $hub = $hub ?? $this->createMock(HubInterface::class); - return new MercureTransport($publisher, $publisherId, $topics); + return new MercureTransport($hub, $hubId, $topics); } public function toStringProvider(): iterable @@ -90,87 +94,102 @@ final class MercureTransportTest extends TransportTestCase public function testSendWithTransportFailureThrows() { - $publisher = $this->createMock(PublisherInterface::class); - $publisher->method('__invoke')->willThrowException(new HttpClientTransportException('Cannot connect to mercure')); + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void { + throw new MercureRuntimeException('Cannot connect to mercure'); + }); $this->expectException(RuntimeException::class); $this->expectExceptionMessage('Unable to post the Mercure message: Cannot connect to mercure'); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject')); + $this->createTransport(null, $hub)->send(new ChatMessage('subject')); } public function testSendWithWrongResponseThrows() { - $response = $this->createMock(ResponseInterface::class); - $response->method('getContent')->willReturn('Service Unavailable'); + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void { + $response = $this->createMock(ResponseInterface::class); + $response->method('getContent')->willReturn('Service Unavailable'); - $httpException = $this->createMock(ServerExceptionInterface::class); - $httpException->method('getResponse')->willReturn($response); + $httpException = $this->createMock(ServerExceptionInterface::class); + $httpException->method('getResponse')->willReturn($response); - $publisher = $this->createMock(PublisherInterface::class); - $publisher->method('__invoke')->willThrowException($httpException); + throw $httpException; + }); $this->expectException(TransportException::class); $this->expectExceptionMessage('Unable to post the Mercure message: Service Unavailable'); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject')); + $this->createTransport(null, $hub)->send(new ChatMessage('subject')); } public function testSendWithWrongTokenThrows() { - $publisher = $this->createMock(PublisherInterface::class); - $publisher->method('__invoke')->willThrowException(new \InvalidArgumentException('The provided JWT is not valid')); + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(): void { + throw new \InvalidArgumentException('The provided JWT is not valid'); + }); $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('Unable to post the Mercure message: The provided JWT is not valid'); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject')); + $this->createTransport(null, $hub)->send(new ChatMessage('subject')); } public function testSendWithMercureOptions() { - $publisher = $this->createMock(PublisherInterface::class); - $publisher - ->expects($this->once()) - ->method('__invoke') - ->with(new Update(['/topic/1', '/topic/2'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', true, 'id', 'type', 1)) - ; + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string { + $this->assertSame(['/topic/1', '/topic/2'], $update->getTopics()); + $this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData()); + $this->assertSame('id', $update->getId()); + $this->assertSame('type', $update->getType()); + $this->assertSame('1', $update->getRetry()); + $this->assertTrue($update->isPrivate()); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject', new MercureOptions(['/topic/1', '/topic/2'], true, 'id', 'type', 1))); + return 'id'; + }); + + $this->createTransport(null, $hub)->send(new ChatMessage('subject', new MercureOptions(['/topic/1', '/topic/2'], true, 'id', 'type', 1))); } public function testSendWithMercureOptionsButWithoutOptionTopic() { - $publisher = $this->createMock(PublisherInterface::class); - $publisher - ->expects($this->once()) - ->method('__invoke') - ->with(new Update(['https://symfony.com/notifier'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', true, 'id', 'type', 1)) - ; + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string { + $this->assertSame(['https://symfony.com/notifier'], $update->getTopics()); + $this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData()); + $this->assertSame('id', $update->getId()); + $this->assertSame('type', $update->getType()); + $this->assertSame('1', $update->getRetry()); + $this->assertTrue($update->isPrivate()); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject', new MercureOptions(null, true, 'id', 'type', 1))); + return 'id'; + }); + + $this->createTransport(null, $hub)->send(new ChatMessage('subject', new MercureOptions(null, true, 'id', 'type', 1))); } public function testSendWithoutMercureOptions() { - $publisher = $this->createMock(PublisherInterface::class); - $publisher - ->expects($this->once()) - ->method('__invoke') - ->with(new Update(['https://symfony.com/notifier'], '{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}')) - ; + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update): string { + $this->assertSame(['https://symfony.com/notifier'], $update->getTopics()); + $this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData()); - $this->createTransport(null, $publisher)->send(new ChatMessage('subject')); + return 'id'; + }); + + $this->createTransport(null, $hub)->send(new ChatMessage('subject')); } public function testSendSuccessfully() { $messageId = 'urn:uuid:a7045be0-a75d-4d40-8bd2-29fa4e5dd10b'; - $publisher = $this->createMock(PublisherInterface::class); - $publisher->method('__invoke')->willReturn($messageId); + $hub = new MockHub('default', 'https://foo.com/.well-known/mercure', new StaticTokenProvider('foo'), function(Update $update) use($messageId): string { + $this->assertSame(['https://symfony.com/notifier'], $update->getTopics()); + $this->assertSame('{"@context":"https:\/\/www.w3.org\/ns\/activitystreams","type":"Announce","summary":"subject"}', $update->getData()); - $sentMessage = $this->createTransport(null, $publisher)->send(new ChatMessage('subject')); + return $messageId; + }); + + $sentMessage = $this->createTransport(null, $hub)->send(new ChatMessage('subject')); $this->assertSame($messageId, $sentMessage->getMessageId()); } } diff --git a/src/Symfony/Component/Notifier/Bridge/Mercure/composer.json b/src/Symfony/Component/Notifier/Bridge/Mercure/composer.json index da85334a00..330e4d5d00 100644 --- a/src/Symfony/Component/Notifier/Bridge/Mercure/composer.json +++ b/src/Symfony/Component/Notifier/Bridge/Mercure/composer.json @@ -18,7 +18,7 @@ "require": { "php": ">=7.2.5", "ext-json": "*", - "symfony/mercure": "^0.4", + "symfony/mercure": "^0.4|^0.5", "symfony/notifier": "^5.3", "symfony/service-contracts": "^1.10|^2" },