From 1a0c9e720f959d4567e3a87e67936bb7b21e328f Mon Sep 17 00:00:00 2001 From: Diogo Peralta Cordeiro Date: Sat, 5 Mar 2022 14:23:08 +0000 Subject: [PATCH] [COMPONENT][FreeNetwork] Start using queues [COMPONENT][Notification] Start using queues [PLUGIN][ActivityPub] Start using queues --- components/FreeNetwork/FreeNetwork.php | 5 +- components/Notification/Notification.php | 34 +++++- plugins/ActivityPub/ActivityPub.php | 125 +++++++++++++++-------- plugins/ActivityPub/Controller/Inbox.php | 30 ++---- 4 files changed, 121 insertions(+), 73 deletions(-) diff --git a/components/FreeNetwork/FreeNetwork.php b/components/FreeNetwork/FreeNetwork.php index 2733123f26..11119f33bf 100644 --- a/components/FreeNetwork/FreeNetwork.php +++ b/components/FreeNetwork/FreeNetwork.php @@ -498,12 +498,9 @@ class FreeNetwork extends Component public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool { - $delivered = []; foreach (self::$protocols as $protocol) { - $protocol::freeNetworkDistribute($sender, $activity, $targets, $reason, $delivered); + $protocol::freeNetworkDistribute($sender, $activity, $targets, $reason); } - $failed_targets = array_udiff($targets, $delivered, fn (Actor $a, Actor $b): int => $a->getId() <=> $b->getId()); - // TODO: Implement failed queues return false; } diff --git a/components/Notification/Notification.php b/components/Notification/Notification.php index 533d087945..a9ac55a782 100644 --- a/components/Notification/Notification.php +++ b/components/Notification/Notification.php @@ -26,6 +26,7 @@ use App\Core\Event; use function App\Core\I18n\_m; use App\Core\Log; use App\Core\Modules\Component; +use App\Core\Queue\Queue; use App\Core\Router\RouteLoader; use App\Core\Router\Router; use App\Entity\Activity; @@ -74,8 +75,25 @@ class Notification extends Component return Event::next; } + public function onQueueNotificationLocal(Actor $sender, Activity $activity, Actor $target, ?string $reason, array &$retry_args): bool + { + // TODO: use https://symfony.com/doc/current/notifier.html + return Event::stop; + } + + public function onQueueNotificationRemote(Actor $sender, Activity $activity, array $targets, ?string $reason, array &$retry_args): bool + { + if (FreeNetwork::notify($sender, $activity, $targets, $reason)) { + return Event::stop; + } else { + return Event::next; + } + } + /** * Bring given Activity to Targets's attention + * + * @return true if successful, false otherwise */ public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool { @@ -92,8 +110,12 @@ class Notification extends Component // The target already knows about this, no need to bother with a notification continue; } - // TODO: use https://symfony.com/doc/current/notifier.html } + Queue::enqueue( + payload: [$sender, $activity, $target, $reason], + queue: 'notification_local', + priority: true, + ); } else { // We have no authority nor responsibility of notifying remote actors of a remote actor's doing if ($sender->getIsLocal()) { @@ -115,8 +137,14 @@ class Notification extends Component } } - FreeNetwork::notify($sender, $activity, $remote_targets, $reason); + if ($remote_targets !== []) { + Queue::enqueue( + payload: [$sender, $activity, $remote_targets, $reason], + queue: 'notification_remote', + priority: false, + ); + } - return Event::next; + return true; } } diff --git a/plugins/ActivityPub/ActivityPub.php b/plugins/ActivityPub/ActivityPub.php index acd415bc3d..0430126903 100644 --- a/plugins/ActivityPub/ActivityPub.php +++ b/plugins/ActivityPub/ActivityPub.php @@ -33,11 +33,14 @@ declare(strict_types = 1); namespace Plugin\ActivityPub; use ActivityPhp\Type; +use ActivityPhp\Type\AbstractObject; use App\Core\DB\DB; use App\Core\Event; use App\Core\HTTPClient; +use function App\Core\I18n\_m; use App\Core\Log; use App\Core\Modules\Plugin; +use App\Core\Queue\Queue; use App\Core\Router\RouteLoader; use App\Core\Router\Router; use App\Entity\Activity; @@ -127,6 +130,30 @@ class ActivityPub extends Plugin return Event::next; } + public function onQueueActivitypubInbox(ActivitypubActor $ap_actor, Actor $actor, string|AbstractObject $type): bool + { + // TODO: Check if Actor has authority over payload + + // Store Activity + $ap_act = Model\Activity::fromJson($type, ['source' => 'ActivityPub']); + FreeNetworkActorProtocol::protocolSucceeded( + 'activitypub', + $ap_actor->getActorId(), + Discovery::normalize($actor->getNickname() . '@' . parse_url($ap_actor->getInboxUri(), \PHP_URL_HOST)), + ); + $already_known_ids = []; + if (!empty($ap_act->_object_mention_ids)) { + $already_known_ids = $ap_act->_object_mention_ids; + } + + DB::flush(); + if (Event::handle('ActivityPubNewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]) === Event::next) { + Event::handle('NewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]); + } + + return Event::stop; + } + /** * This code executes when GNU social creates the page routing, and we hook * on this event to add our Inbox and Outbox handler for ActivityPub. @@ -291,6 +318,51 @@ class ActivityPub extends Plugin return false; } + public function onQueueActivitypubPostman( + Actor $sender, + Activity $activity, + string $inbox, + array $to_actors, + array &$retry_args, + ): bool + { + try { + $data = Model::toJson($activity); + if ($sender->isGroup()) { + // When the sender is a group, we have to wrap it in an Announce activity + $data = Type::create('Announce', ['object' => $data])->toJson(); + } + $res = self::postman($sender, $data, $inbox); + + // accumulate errors for later use, if needed + $status_code = $res->getStatusCode(); + if (!($status_code === 200 || $status_code === 202 || $status_code === 409)) { + $res_body = json_decode($res->getContent(), true); + $retry_args['reason'] ??= []; + $retry_args['reason'][] = $res_body['error'] ?? 'An unknown error occurred.'; + return Event::next; + } else { + foreach ($to_actors as $actor) { + if ($actor->isPerson()) { + FreeNetworkActorProtocol::protocolSucceeded( + 'activitypub', + $actor, + Discovery::normalize($actor->getNickname() . '@' . parse_url($inbox, \PHP_URL_HOST)), + ); + } + } + } + return Event::stop; + } catch (Exception $e) { + Log::error('ActivityPub @ freeNetworkDistribute: ' . $e->getMessage(), [$e]); + $retry_args['reason'] ??= []; + $retry_args['reason'][] = "Got an exception: {$e->getMessage()}"; + $retry_args['exception'] ??= []; + $retry_args['exception'][] = $e; + return Event::next; + } + } + /** * The FreeNetwork component will call this function to distribute this instance's activities * @@ -299,61 +371,30 @@ class ActivityPub extends Plugin * @throws ServerExceptionInterface * @throws TransportExceptionInterface */ - public static function freeNetworkDistribute(Actor $sender, Activity $activity, array $targets, ?string $reason = null, array &$delivered = []): bool + public static function freeNetworkDistribute(Actor $sender, Activity $activity, array $targets, ?string $reason = null): void { $to_addr = []; foreach ($targets as $actor) { if (FreeNetworkActorProtocol::canIActor('activitypub', $actor->getId())) { + // Sometimes FreeNetwork can allow us to actor even though we don't have an internal representation of + // the actor, that could for example mean that OStatus handled this actor while we were deactivated + // On next interaction this should be resolved, for now continue if (\is_null($ap_target = DB::findOneBy(ActivitypubActor::class, ['actor_id' => $actor->getId()], return_null: true))) { continue; } $to_addr[$ap_target->getInboxSharedUri() ?? $ap_target->getInboxUri()][] = $actor; } else { - return Event::next; + continue; } } - $errors = []; - //$to_failed = []; - foreach ($to_addr as $inbox => $dummy) { - try { - $data = Model::toJson($activity); - if ($sender->isGroup()) { - // When the sender is a group, we have to wrap it in an Announce activity - $data = Type::create('Announce', ['object' => $data])->toJson(); - } - $res = self::postman($sender, $data, $inbox); - - // accumulate errors for later use, if needed - $status_code = $res->getStatusCode(); - if (!($status_code === 200 || $status_code === 202 || $status_code === 409)) { - $res_body = json_decode($res->getContent(), true); - $errors[] = $res_body['error'] ?? 'An unknown error occurred.'; - //$to_failed[$inbox] = $activity; - } else { - array_push($delivered, ...$dummy); - foreach ($dummy as $actor) { - if ($actor->isPerson()) { - FreeNetworkActorProtocol::protocolSucceeded( - 'activitypub', - $actor, - Discovery::normalize($actor->getNickname() . '@' . parse_url($inbox, \PHP_URL_HOST)), - ); - } - } - } - } catch (Exception $e) { - Log::error('ActivityPub @ freeNetworkDistribute: ' . $e->getMessage(), [$e]); - //$to_failed[$inbox] = $activity; - } + foreach ($to_addr as $inbox => $to_actors) { + Queue::enqueue( + payload: [$sender, $activity, $inbox, $to_actors], + queue: 'activitypub_postman', + priority: false, + ); } - - if (!empty($errors)) { - Log::error(sizeof($errors) . ' instance/s failed to handle our activity!'); - return false; - } - - return true; } /** diff --git a/plugins/ActivityPub/Controller/Inbox.php b/plugins/ActivityPub/Controller/Inbox.php index 79b6a1846c..6026620bf5 100644 --- a/plugins/ActivityPub/Controller/Inbox.php +++ b/plugins/ActivityPub/Controller/Inbox.php @@ -34,17 +34,14 @@ namespace Plugin\ActivityPub\Controller; use App\Core\Controller; use App\Core\DB\DB; -use App\Core\Event; use function App\Core\I18n\_m; use App\Core\Log; +use App\Core\Queue\Queue; use App\Core\Router\Router; use App\Entity\Actor; use App\Util\Common; use App\Util\Exception\ClientException; -use Component\FreeNetwork\Entity\FreeNetworkActorProtocol; -use Component\FreeNetwork\Util\Discovery; use Exception; -use const PHP_URL_HOST; use Plugin\ActivityPub\Entity\ActivitypubActor; use Plugin\ActivityPub\Entity\ActivitypubRsa; use Plugin\ActivityPub\Util\Explorer; @@ -150,7 +147,7 @@ class Inbox extends Controller try { ActivitypubActor::update_profile($ap_actor, $actor, $activitypub_rsa, $res); } catch (Exception $e) { - return $error('Failed to updated remote actor information.', $e); + return $error('Failed to update remote actor information.', $e); } [$verified, /*$headers*/] = HTTPSignature::verify($actor_public_key, $signatureData, $headers, $path, $body); @@ -165,26 +162,11 @@ class Inbox extends Controller // HTTP signature checked out, make sure the "actor" of the activity matches that of the signature Log::debug('ActivityPub Inbox: HTTP Signature: Authorised request. Will now start the inbox handler.'); - // TODO: Check if Actor has authority over payload - - // Store Activity - $ap_act = Model\Activity::fromJson($type, ['source' => 'ActivityPub']); - FreeNetworkActorProtocol::protocolSucceeded( - 'activitypub', - $ap_actor->getActorId(), - Discovery::normalize($actor->getNickname() . '@' . parse_url($ap_actor->getInboxUri(), PHP_URL_HOST)), + Queue::enqueue( + payload: [$ap_actor, $actor, $type], + queue: 'activitypub_inbox', + priority: false, ); - $already_known_ids = []; - if (!empty($ap_act->_object_mention_ids)) { - $already_known_ids = $ap_act->_object_mention_ids; - } - - DB::flush(); - if (Event::handle('ActivityPubNewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]) === Event::next) { - Event::handle('NewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]); - } - - dd($ap_act, $act = $ap_act->getActivity(), $act->getActor(), $act->getObject()); return new TypeResponse($type, status: 202); }