[QUEUE] Add queueing wrapper, default configuration and example usage

This commit is contained in:
Hugo Sales
2020-07-09 15:00:58 +00:00
committed by Hugo Sales
parent 69341880d3
commit 47ab835549
10 changed files with 217 additions and 2 deletions

7
.env
View File

@@ -34,3 +34,10 @@ DATABASE_URL=postgresql://postgres:foobar@postgres:5432/social
###< doctrine/doctrine-bundle ###
SHELL_VERBOSITY=3
###> symfony/messenger ###
# Choose one of the transports below
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
MESSENGER_TRANSPORT_DSN_HIGH=doctrine://default?queue_name=high
MESSENGER_TRANSPORT_DSN_LOW=doctrine://default?queue_name=low
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
###< symfony/messenger ###

View File

@@ -0,0 +1,11 @@
framework:
messenger:
failure_transport: failed
transports:
failed: 'doctrine://default?queue_name=failed'
high: '%env(MESSENGER_TRANSPORT_DSN_HIGH)%'
low: '%env(MESSENGER_TRANSPORT_DSN_LOW)%'
routing:
'App\Core\Queue\MessageHigh': high
'App\Core\Queue\MessageLow': low

View File

@@ -36,3 +36,6 @@ services:
App\Core\I18n\TransExtractor:
tags:
- { name: translation.extractor, alias: social }
App\Core\Queue\MessageHandler:
tags: [messenger.message_handler]

View File

@@ -31,6 +31,7 @@
namespace App\Controller;
use App\Core\Controller;
use App\Core\Queue\Queue;
class NetworkPublic extends Controller
{
@@ -41,6 +42,8 @@ class NetworkPublic extends Controller
public function handle()
{
Queue::enqueue('Yo, test', 'network_public');
return [
'_template' => 'network/public.html.twig',
'notices' => ['some notice', 'some other notice', 'some other more diferent notice'],

View File

@@ -43,6 +43,7 @@ namespace App\Core;
use App\Core\DB\DB;
use App\Core\DB\DefaultSettings;
use App\Core\I18n\I18nHelper;
use App\Core\Queue\Queue;
use App\Core\Router\Router;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
@@ -52,6 +53,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Form\FormFactoryInterface;
use Symfony\Component\HttpKernel\Event\RequestEvent;
use Symfony\Component\HttpKernel\KernelEvents;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\RouterInterface;
use Symfony\Contracts\Translation\TranslatorInterface;
@@ -62,6 +64,7 @@ class GNUsocial implements EventSubscriberInterface
protected EntityManagerInterface $entity_manager;
protected RouterInterface $router;
protected FormFactoryInterface $form_factory;
protected MessageBusInterface $message_bus;
/**
* Symfony dependency injection gives us access to these services
@@ -70,13 +73,15 @@ class GNUsocial implements EventSubscriberInterface
TranslatorInterface $translator,
EntityManagerInterface $em,
RouterInterface $router,
FormFactoryInterface $ff)
FormFactoryInterface $ff,
MessageBusInterface $message_bus)
{
$this->logger = $logger;
$this->translator = $translator;
$this->entity_manager = $em;
$this->router = $router;
$this->form_factory = $ff;
$this->message_bus = $message_bus;
}
/**
@@ -92,6 +97,7 @@ class GNUsocial implements EventSubscriberInterface
DB::setManager($this->entity_manager);
Router::setRouter($this->router);
Form::setFactory($this->form_factory);
Queue::setMessageBus($this->message_bus);
DefaultSettings::setDefaults();
ModulesManager::loadModules();

View File

@@ -0,0 +1,36 @@
<?php
// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}
/**
* Generic message wrapper
*/
namespace App\Core\Queue;
class Message
{
public $content;
public string $queue;
public function __construct($content, string $queue)
{
$this->queue = $queue;
$this->content = $content;
}
}

View File

@@ -0,0 +1,35 @@
<?php
// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}
/**
* Generic handler, distributes work based o
*/
namespace App\Core\Queue;
use App\Core\Event;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class MessageHandler implements MessageHandlerInterface
{
public function __invoke(Message $message)
{
Event::handle($message->queue, [$message->content]);
}
}

View File

@@ -0,0 +1,28 @@
<?php
// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}
/**
* Generic high priority message wrapper
*/
namespace App\Core\Queue;
class MessageHigh extends Message
{
}

View File

@@ -0,0 +1,28 @@
<?php
// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}
/**
* Generic low priority message wrapper
*/
namespace App\Core\Queue;
class MessageLow extends Message
{
}

58
src/Core/Queue/Queue.php Normal file
View File

@@ -0,0 +1,58 @@
<?php
// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}
/**
* Queue wrapper
*
* @package GNUsocial
* @category Wrapper
*
* @author Hugo Sales <hugo@fc.up.pt>
* @copyright 2020 Free Software Foundation, Inc http://www.fsf.org
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
*/
namespace App\Core\Queue;
use Symfony\Component\Messenger\MessageBusInterface;
abstract class Queue
{
private static ?MessageBusInterface $message_bus;
public static function setMessageBus($mb): void
{
self::$message_bus = $mb;
}
/**
* Enqueue a $message in a configured trasnport, to be handled by the $queue handler
*
* @param object|string
* @param mixed $message
*/
public static function enqueue($message, string $queue, bool $high = false, array $stamps = [])
{
if ($high) {
self::$message_bus->dispatch(new MessageHigh($message, $queue), $stamps);
} else {
self::$message_bus->dispatch(new MessageLow($message, $queue), $stamps);
}
}
}