diff --git a/.env b/.env index 47e1c7d4d8..cb0a1af3ac 100644 --- a/.env +++ b/.env @@ -33,4 +33,11 @@ DATABASE_URL=postgresql://postgres:foobar@postgres:5432/social #?serverVersion=11&charset=utf8 ###< doctrine/doctrine-bundle ### -SHELL_VERBOSITY=3 \ No newline at end of file +SHELL_VERBOSITY=3 +###> symfony/messenger ### +# Choose one of the transports below +# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages +MESSENGER_TRANSPORT_DSN_HIGH=doctrine://default?queue_name=high +MESSENGER_TRANSPORT_DSN_LOW=doctrine://default?queue_name=low +# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages +###< symfony/messenger ### diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml new file mode 100644 index 0000000000..14608ec2c4 --- /dev/null +++ b/config/packages/messenger.yaml @@ -0,0 +1,11 @@ +framework: + messenger: + failure_transport: failed + transports: + failed: 'doctrine://default?queue_name=failed' + high: '%env(MESSENGER_TRANSPORT_DSN_HIGH)%' + low: '%env(MESSENGER_TRANSPORT_DSN_LOW)%' + + routing: + 'App\Core\Queue\MessageHigh': high + 'App\Core\Queue\MessageLow': low diff --git a/config/services.yaml b/config/services.yaml index 2dad7fc5d0..018ee51514 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -36,3 +36,6 @@ services: App\Core\I18n\TransExtractor: tags: - { name: translation.extractor, alias: social } + + App\Core\Queue\MessageHandler: + tags: [messenger.message_handler] diff --git a/src/Controller/NetworkPublic.php b/src/Controller/NetworkPublic.php index df70df066d..152e52051d 100644 --- a/src/Controller/NetworkPublic.php +++ b/src/Controller/NetworkPublic.php @@ -31,6 +31,7 @@ namespace App\Controller; use App\Core\Controller; +use App\Core\Queue\Queue; class NetworkPublic extends Controller { @@ -41,6 +42,8 @@ class NetworkPublic extends Controller public function handle() { + Queue::enqueue('Yo, test', 'network_public'); + return [ '_template' => 'network/public.html.twig', 'notices' => ['some notice', 'some other notice', 'some other more diferent notice'], diff --git a/src/Core/GNUsocial.php b/src/Core/GNUsocial.php index cf1d0eedcf..3335d5db28 100644 --- a/src/Core/GNUsocial.php +++ b/src/Core/GNUsocial.php @@ -43,6 +43,7 @@ namespace App\Core; use App\Core\DB\DB; use App\Core\DB\DefaultSettings; use App\Core\I18n\I18nHelper; +use App\Core\Queue\Queue; use App\Core\Router\Router; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; @@ -52,6 +53,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Form\FormFactoryInterface; use Symfony\Component\HttpKernel\Event\RequestEvent; use Symfony\Component\HttpKernel\KernelEvents; +use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Routing\RouterInterface; use Symfony\Contracts\Translation\TranslatorInterface; @@ -62,6 +64,7 @@ class GNUsocial implements EventSubscriberInterface protected EntityManagerInterface $entity_manager; protected RouterInterface $router; protected FormFactoryInterface $form_factory; + protected MessageBusInterface $message_bus; /** * Symfony dependency injection gives us access to these services @@ -70,13 +73,15 @@ class GNUsocial implements EventSubscriberInterface TranslatorInterface $translator, EntityManagerInterface $em, RouterInterface $router, - FormFactoryInterface $ff) + FormFactoryInterface $ff, + MessageBusInterface $message_bus) { $this->logger = $logger; $this->translator = $translator; $this->entity_manager = $em; $this->router = $router; $this->form_factory = $ff; + $this->message_bus = $message_bus; } /** @@ -92,6 +97,7 @@ class GNUsocial implements EventSubscriberInterface DB::setManager($this->entity_manager); Router::setRouter($this->router); Form::setFactory($this->form_factory); + Queue::setMessageBus($this->message_bus); DefaultSettings::setDefaults(); ModulesManager::loadModules(); diff --git a/src/Core/Queue/Message.php b/src/Core/Queue/Message.php new file mode 100644 index 0000000000..72aa90536c --- /dev/null +++ b/src/Core/Queue/Message.php @@ -0,0 +1,36 @@ +. +// }}} + +/** + * Generic message wrapper + */ + +namespace App\Core\Queue; + +class Message +{ + public $content; + public string $queue; + + public function __construct($content, string $queue) + { + $this->queue = $queue; + $this->content = $content; + } +} diff --git a/src/Core/Queue/MessageHandler.php b/src/Core/Queue/MessageHandler.php new file mode 100644 index 0000000000..1e1d8813e1 --- /dev/null +++ b/src/Core/Queue/MessageHandler.php @@ -0,0 +1,35 @@ +. +// }}} + +/** + * Generic handler, distributes work based o + */ + +namespace App\Core\Queue; + +use App\Core\Event; +use Symfony\Component\Messenger\Handler\MessageHandlerInterface; + +class MessageHandler implements MessageHandlerInterface +{ + public function __invoke(Message $message) + { + Event::handle($message->queue, [$message->content]); + } +} diff --git a/src/Core/Queue/MessageHigh.php b/src/Core/Queue/MessageHigh.php new file mode 100644 index 0000000000..dbc896dd31 --- /dev/null +++ b/src/Core/Queue/MessageHigh.php @@ -0,0 +1,28 @@ +. +// }}} + +/** + * Generic high priority message wrapper + */ + +namespace App\Core\Queue; + +class MessageHigh extends Message +{ +} diff --git a/src/Core/Queue/MessageLow.php b/src/Core/Queue/MessageLow.php new file mode 100644 index 0000000000..c2c043d38e --- /dev/null +++ b/src/Core/Queue/MessageLow.php @@ -0,0 +1,28 @@ +. +// }}} + +/** + * Generic low priority message wrapper + */ + +namespace App\Core\Queue; + +class MessageLow extends Message +{ +} diff --git a/src/Core/Queue/Queue.php b/src/Core/Queue/Queue.php new file mode 100644 index 0000000000..8b057baebe --- /dev/null +++ b/src/Core/Queue/Queue.php @@ -0,0 +1,58 @@ +. +// }}} + +/** + * Queue wrapper + * + * @package GNUsocial + * @category Wrapper + * + * @author Hugo Sales + * @copyright 2020 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +namespace App\Core\Queue; + +use Symfony\Component\Messenger\MessageBusInterface; + +abstract class Queue +{ + private static ?MessageBusInterface $message_bus; + + public static function setMessageBus($mb): void + { + self::$message_bus = $mb; + } + + /** + * Enqueue a $message in a configured trasnport, to be handled by the $queue handler + * + * @param object|string + * @param mixed $message + */ + public static function enqueue($message, string $queue, bool $high = false, array $stamps = []) + { + if ($high) { + self::$message_bus->dispatch(new MessageHigh($message, $queue), $stamps); + } else { + self::$message_bus->dispatch(new MessageLow($message, $queue), $stamps); + } + } +}