Browse Source

[QUEUE] Add queueing wrapper, default configuration and example usage

remotes/upstream/experimental
Hugo Sales 2 years ago
parent
commit
d48cb3f0b8
Signed by: someonewithpc <hugo@hsal.es> GPG Key ID: 7D0C7EAFC9D835A0
10 changed files with 217 additions and 2 deletions
  1. +8
    -1
      .env
  2. +11
    -0
      config/packages/messenger.yaml
  3. +3
    -0
      config/services.yaml
  4. +3
    -0
      src/Controller/NetworkPublic.php
  5. +7
    -1
      src/Core/GNUsocial.php
  6. +36
    -0
      src/Core/Queue/Message.php
  7. +35
    -0
      src/Core/Queue/MessageHandler.php
  8. +28
    -0
      src/Core/Queue/MessageHigh.php
  9. +28
    -0
      src/Core/Queue/MessageLow.php
  10. +58
    -0
      src/Core/Queue/Queue.php

+ 8
- 1
.env View File

@@ -33,4 +33,11 @@ DATABASE_URL=postgresql://postgres:foobar@postgres:5432/social
#?serverVersion=11&charset=utf8
###< doctrine/doctrine-bundle ###

SHELL_VERBOSITY=3
SHELL_VERBOSITY=3
###> symfony/messenger ###
# Choose one of the transports below
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
MESSENGER_TRANSPORT_DSN_HIGH=doctrine://default?queue_name=high
MESSENGER_TRANSPORT_DSN_LOW=doctrine://default?queue_name=low
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
###< symfony/messenger ###

+ 11
- 0
config/packages/messenger.yaml View File

@@ -0,0 +1,11 @@
framework:
messenger:
failure_transport: failed
transports:
failed: 'doctrine://default?queue_name=failed'
high: '%env(MESSENGER_TRANSPORT_DSN_HIGH)%'
low: '%env(MESSENGER_TRANSPORT_DSN_LOW)%'

routing:
'App\Core\Queue\MessageHigh': high
'App\Core\Queue\MessageLow': low

+ 3
- 0
config/services.yaml View File

@@ -36,3 +36,6 @@ services:
App\Core\I18n\TransExtractor:
tags:
- { name: translation.extractor, alias: social }

App\Core\Queue\MessageHandler:
tags: [messenger.message_handler]

+ 3
- 0
src/Controller/NetworkPublic.php View File

@@ -31,6 +31,7 @@
namespace App\Controller;

use App\Core\Controller;
use App\Core\Queue\Queue;

class NetworkPublic extends Controller
{
@@ -41,6 +42,8 @@ class NetworkPublic extends Controller

public function handle()
{
Queue::enqueue('Yo, test', 'network_public');

return [
'_template' => 'network/public.html.twig',
'notices' => ['some notice', 'some other notice', 'some other more diferent notice'],


+ 7
- 1
src/Core/GNUsocial.php View File

@@ -43,6 +43,7 @@ namespace App\Core;
use App\Core\DB\DB;
use App\Core\DB\DefaultSettings;
use App\Core\I18n\I18nHelper;
use App\Core\Queue\Queue;
use App\Core\Router\Router;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
@@ -52,6 +53,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Form\FormFactoryInterface;
use Symfony\Component\HttpKernel\Event\RequestEvent;
use Symfony\Component\HttpKernel\KernelEvents;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\RouterInterface;
use Symfony\Contracts\Translation\TranslatorInterface;

@@ -62,6 +64,7 @@ class GNUsocial implements EventSubscriberInterface
protected EntityManagerInterface $entity_manager;
protected RouterInterface $router;
protected FormFactoryInterface $form_factory;
protected MessageBusInterface $message_bus;

/**
* Symfony dependency injection gives us access to these services
@@ -70,13 +73,15 @@ class GNUsocial implements EventSubscriberInterface
TranslatorInterface $translator,
EntityManagerInterface $em,
RouterInterface $router,
FormFactoryInterface $ff)
FormFactoryInterface $ff,
MessageBusInterface $message_bus)
{
$this->logger = $logger;
$this->translator = $translator;
$this->entity_manager = $em;
$this->router = $router;
$this->form_factory = $ff;
$this->message_bus = $message_bus;
}

/**
@@ -92,6 +97,7 @@ class GNUsocial implements EventSubscriberInterface
DB::setManager($this->entity_manager);
Router::setRouter($this->router);
Form::setFactory($this->form_factory);
Queue::setMessageBus($this->message_bus);

DefaultSettings::setDefaults();
ModulesManager::loadModules();


+ 36
- 0
src/Core/Queue/Message.php View File

@@ -0,0 +1,36 @@
<?php

// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}

/**
* Generic message wrapper
*/

namespace App\Core\Queue;

class Message
{
public $content;
public string $queue;

public function __construct($content, string $queue)
{
$this->queue = $queue;
$this->content = $content;
}
}

+ 35
- 0
src/Core/Queue/MessageHandler.php View File

@@ -0,0 +1,35 @@
<?php

// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}

/**
* Generic handler, distributes work based o
*/

namespace App\Core\Queue;

use App\Core\Event;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class MessageHandler implements MessageHandlerInterface
{
public function __invoke(Message $message)
{
Event::handle($message->queue, [$message->content]);
}
}

+ 28
- 0
src/Core/Queue/MessageHigh.php View File

@@ -0,0 +1,28 @@
<?php

// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}

/**
* Generic high priority message wrapper
*/

namespace App\Core\Queue;

class MessageHigh extends Message
{
}

+ 28
- 0
src/Core/Queue/MessageLow.php View File

@@ -0,0 +1,28 @@
<?php

// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}

/**
* Generic low priority message wrapper
*/

namespace App\Core\Queue;

class MessageLow extends Message
{
}

+ 58
- 0
src/Core/Queue/Queue.php View File

@@ -0,0 +1,58 @@
<?php

// {{{ License
// This file is part of GNU social - https://www.gnu.org/software/social
//
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
// }}}

/**
* Queue wrapper
*
* @package GNUsocial
* @category Wrapper
*
* @author Hugo Sales <hugo@fc.up.pt>
* @copyright 2020 Free Software Foundation, Inc http://www.fsf.org
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
*/

namespace App\Core\Queue;

use Symfony\Component\Messenger\MessageBusInterface;

abstract class Queue
{
private static ?MessageBusInterface $message_bus;

public static function setMessageBus($mb): void
{
self::$message_bus = $mb;
}

/**
* Enqueue a $message in a configured trasnport, to be handled by the $queue handler
*
* @param object|string
* @param mixed $message
*/
public static function enqueue($message, string $queue, bool $high = false, array $stamps = [])
{
if ($high) {
self::$message_bus->dispatch(new MessageHigh($message, $queue), $stamps);
} else {
self::$message_bus->dispatch(new MessageLow($message, $queue), $stamps);
}
}
}

Loading…
Cancel
Save