forked from GNUsocial/gnu-social
		
	[COMPONENT][FreeNetwork] Start using queues
[COMPONENT][Notification] Start using queues [PLUGIN][ActivityPub] Start using queues
This commit is contained in:
		| @@ -498,12 +498,9 @@ class FreeNetwork extends Component | |||||||
|  |  | ||||||
|     public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool |     public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool | ||||||
|     { |     { | ||||||
|         $delivered = []; |  | ||||||
|         foreach (self::$protocols as $protocol) { |         foreach (self::$protocols as $protocol) { | ||||||
|             $protocol::freeNetworkDistribute($sender, $activity, $targets, $reason, $delivered); |             $protocol::freeNetworkDistribute($sender, $activity, $targets, $reason); | ||||||
|         } |         } | ||||||
|         $failed_targets = array_udiff($targets, $delivered, fn (Actor $a, Actor $b): int => $a->getId() <=> $b->getId()); |  | ||||||
|         // TODO: Implement failed queues |  | ||||||
|         return false; |         return false; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -26,6 +26,7 @@ use App\Core\Event; | |||||||
| use function App\Core\I18n\_m; | use function App\Core\I18n\_m; | ||||||
| use App\Core\Log; | use App\Core\Log; | ||||||
| use App\Core\Modules\Component; | use App\Core\Modules\Component; | ||||||
|  | use App\Core\Queue\Queue; | ||||||
| use App\Core\Router\RouteLoader; | use App\Core\Router\RouteLoader; | ||||||
| use App\Core\Router\Router; | use App\Core\Router\Router; | ||||||
| use App\Entity\Activity; | use App\Entity\Activity; | ||||||
| @@ -74,8 +75,25 @@ class Notification extends Component | |||||||
|         return Event::next; |         return Event::next; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     public function onQueueNotificationLocal(Actor $sender, Activity $activity, Actor $target, ?string $reason, array &$retry_args): bool | ||||||
|  |     { | ||||||
|  |         // TODO: use https://symfony.com/doc/current/notifier.html | ||||||
|  |         return Event::stop; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public function onQueueNotificationRemote(Actor $sender, Activity $activity, array $targets, ?string $reason, array &$retry_args): bool | ||||||
|  |     { | ||||||
|  |         if (FreeNetwork::notify($sender, $activity, $targets, $reason)) { | ||||||
|  |             return Event::stop; | ||||||
|  |         } else { | ||||||
|  |             return Event::next; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Bring given Activity to Targets's attention |      * Bring given Activity to Targets's attention | ||||||
|  |      * | ||||||
|  |      * @return true if successful, false otherwise | ||||||
|      */ |      */ | ||||||
|     public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool |     public static function notify(Actor $sender, Activity $activity, array $targets, ?string $reason = null): bool | ||||||
|     { |     { | ||||||
| @@ -92,8 +110,12 @@ class Notification extends Component | |||||||
|                         // The target already knows about this, no need to bother with a notification |                         // The target already knows about this, no need to bother with a notification | ||||||
|                         continue; |                         continue; | ||||||
|                     } |                     } | ||||||
|                     // TODO: use https://symfony.com/doc/current/notifier.html |  | ||||||
|                 } |                 } | ||||||
|  |                 Queue::enqueue( | ||||||
|  |                     payload: [$sender, $activity, $target, $reason], | ||||||
|  |                     queue: 'notification_local', | ||||||
|  |                     priority: true, | ||||||
|  |                 ); | ||||||
|             } else { |             } else { | ||||||
|                 // We have no authority nor responsibility of notifying remote actors of a remote actor's doing |                 // We have no authority nor responsibility of notifying remote actors of a remote actor's doing | ||||||
|                 if ($sender->getIsLocal()) { |                 if ($sender->getIsLocal()) { | ||||||
| @@ -115,8 +137,14 @@ class Notification extends Component | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         FreeNetwork::notify($sender, $activity, $remote_targets, $reason); |         if ($remote_targets !== []) { | ||||||
|  |             Queue::enqueue( | ||||||
|  |                 payload: [$sender, $activity, $remote_targets, $reason], | ||||||
|  |                 queue: 'notification_remote', | ||||||
|  |                 priority: false, | ||||||
|  |             ); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         return Event::next; |         return true; | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -33,11 +33,14 @@ declare(strict_types = 1); | |||||||
| namespace Plugin\ActivityPub; | namespace Plugin\ActivityPub; | ||||||
|  |  | ||||||
| use ActivityPhp\Type; | use ActivityPhp\Type; | ||||||
|  | use ActivityPhp\Type\AbstractObject; | ||||||
| use App\Core\DB\DB; | use App\Core\DB\DB; | ||||||
| use App\Core\Event; | use App\Core\Event; | ||||||
| use App\Core\HTTPClient; | use App\Core\HTTPClient; | ||||||
|  | use function App\Core\I18n\_m; | ||||||
| use App\Core\Log; | use App\Core\Log; | ||||||
| use App\Core\Modules\Plugin; | use App\Core\Modules\Plugin; | ||||||
|  | use App\Core\Queue\Queue; | ||||||
| use App\Core\Router\RouteLoader; | use App\Core\Router\RouteLoader; | ||||||
| use App\Core\Router\Router; | use App\Core\Router\Router; | ||||||
| use App\Entity\Activity; | use App\Entity\Activity; | ||||||
| @@ -127,6 +130,30 @@ class ActivityPub extends Plugin | |||||||
|         return Event::next; |         return Event::next; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     public function onQueueActivitypubInbox(ActivitypubActor $ap_actor, Actor $actor, string|AbstractObject $type): bool | ||||||
|  |     { | ||||||
|  |         // TODO: Check if Actor has authority over payload | ||||||
|  |  | ||||||
|  |         // Store Activity | ||||||
|  |         $ap_act = Model\Activity::fromJson($type, ['source' => 'ActivityPub']); | ||||||
|  |         FreeNetworkActorProtocol::protocolSucceeded( | ||||||
|  |             'activitypub', | ||||||
|  |             $ap_actor->getActorId(), | ||||||
|  |             Discovery::normalize($actor->getNickname() . '@' . parse_url($ap_actor->getInboxUri(), \PHP_URL_HOST)), | ||||||
|  |         ); | ||||||
|  |         $already_known_ids = []; | ||||||
|  |         if (!empty($ap_act->_object_mention_ids)) { | ||||||
|  |             $already_known_ids = $ap_act->_object_mention_ids; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         DB::flush(); | ||||||
|  |         if (Event::handle('ActivityPubNewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]) === Event::next) { | ||||||
|  |             Event::handle('NewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return Event::stop; | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * This code executes when GNU social creates the page routing, and we hook |      * This code executes when GNU social creates the page routing, and we hook | ||||||
|      * on this event to add our Inbox and Outbox handler for ActivityPub. |      * on this event to add our Inbox and Outbox handler for ActivityPub. | ||||||
| @@ -291,31 +318,14 @@ class ActivityPub extends Plugin | |||||||
|         return false; |         return false; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /** |     public function onQueueActivitypubPostman( | ||||||
|      * The FreeNetwork component will call this function to distribute this instance's activities |         Actor $sender, | ||||||
|      * |         Activity $activity, | ||||||
|      * @throws ClientExceptionInterface |         string $inbox, | ||||||
|      * @throws RedirectionExceptionInterface |         array $to_actors, | ||||||
|      * @throws ServerExceptionInterface |         array &$retry_args, | ||||||
|      * @throws TransportExceptionInterface |     ): bool | ||||||
|      */ |  | ||||||
|     public static function freeNetworkDistribute(Actor $sender, Activity $activity, array $targets, ?string $reason = null, array &$delivered = []): bool |  | ||||||
|     { |     { | ||||||
|         $to_addr = []; |  | ||||||
|         foreach ($targets as $actor) { |  | ||||||
|             if (FreeNetworkActorProtocol::canIActor('activitypub', $actor->getId())) { |  | ||||||
|                 if (\is_null($ap_target = DB::findOneBy(ActivitypubActor::class, ['actor_id' => $actor->getId()], return_null: true))) { |  | ||||||
|                     continue; |  | ||||||
|                 } |  | ||||||
|                 $to_addr[$ap_target->getInboxSharedUri() ?? $ap_target->getInboxUri()][] = $actor; |  | ||||||
|             } else { |  | ||||||
|                 return Event::next; |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         $errors = []; |  | ||||||
|         //$to_failed = []; |  | ||||||
|         foreach ($to_addr as $inbox => $dummy) { |  | ||||||
|         try { |         try { | ||||||
|             $data = Model::toJson($activity); |             $data = Model::toJson($activity); | ||||||
|             if ($sender->isGroup()) { |             if ($sender->isGroup()) { | ||||||
| @@ -328,11 +338,11 @@ class ActivityPub extends Plugin | |||||||
|             $status_code = $res->getStatusCode(); |             $status_code = $res->getStatusCode(); | ||||||
|             if (!($status_code === 200 || $status_code === 202 || $status_code === 409)) { |             if (!($status_code === 200 || $status_code === 202 || $status_code === 409)) { | ||||||
|                 $res_body = json_decode($res->getContent(), true); |                 $res_body = json_decode($res->getContent(), true); | ||||||
|                     $errors[] = $res_body['error'] ?? 'An unknown error occurred.'; |                 $retry_args['reason'] ??= []; | ||||||
|                 //$to_failed[$inbox] = $activity; |                 $retry_args['reason'][] = $res_body['error'] ?? 'An unknown error occurred.'; | ||||||
|  |                 return Event::next; | ||||||
|             } else { |             } else { | ||||||
|                     array_push($delivered, ...$dummy); |                 foreach ($to_actors as $actor) { | ||||||
|                     foreach ($dummy as $actor) { |  | ||||||
|                     if ($actor->isPerson()) { |                     if ($actor->isPerson()) { | ||||||
|                         FreeNetworkActorProtocol::protocolSucceeded( |                         FreeNetworkActorProtocol::protocolSucceeded( | ||||||
|                             'activitypub', |                             'activitypub', | ||||||
| @@ -342,18 +352,49 @@ class ActivityPub extends Plugin | |||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|  |             return Event::stop; | ||||||
|         } catch (Exception $e) { |         } catch (Exception $e) { | ||||||
|             Log::error('ActivityPub @ freeNetworkDistribute: ' . $e->getMessage(), [$e]); |             Log::error('ActivityPub @ freeNetworkDistribute: ' . $e->getMessage(), [$e]); | ||||||
|                 //$to_failed[$inbox] = $activity; |             $retry_args['reason'] ??= []; | ||||||
|  |             $retry_args['reason'][] = "Got an exception: {$e->getMessage()}"; | ||||||
|  |             $retry_args['exception'] ??= []; | ||||||
|  |             $retry_args['exception'][] = $e; | ||||||
|  |             return Event::next; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|         if (!empty($errors)) { |     /** | ||||||
|             Log::error(sizeof($errors) . ' instance/s failed to handle our activity!'); |      * The FreeNetwork component will call this function to distribute this instance's activities | ||||||
|             return false; |      * | ||||||
|  |      * @throws ClientExceptionInterface | ||||||
|  |      * @throws RedirectionExceptionInterface | ||||||
|  |      * @throws ServerExceptionInterface | ||||||
|  |      * @throws TransportExceptionInterface | ||||||
|  |      */ | ||||||
|  |     public static function freeNetworkDistribute(Actor $sender, Activity $activity, array $targets, ?string $reason = null): void | ||||||
|  |     { | ||||||
|  |         $to_addr = []; | ||||||
|  |         foreach ($targets as $actor) { | ||||||
|  |             if (FreeNetworkActorProtocol::canIActor('activitypub', $actor->getId())) { | ||||||
|  |                 // Sometimes FreeNetwork can allow us to actor even though we don't have an internal representation of | ||||||
|  |                 // the actor, that could for example mean that OStatus handled this actor while we were deactivated | ||||||
|  |                 // On next interaction this should be resolved, for now continue | ||||||
|  |                 if (\is_null($ap_target = DB::findOneBy(ActivitypubActor::class, ['actor_id' => $actor->getId()], return_null: true))) { | ||||||
|  |                     continue; | ||||||
|  |                 } | ||||||
|  |                 $to_addr[$ap_target->getInboxSharedUri() ?? $ap_target->getInboxUri()][] = $actor; | ||||||
|  |             } else { | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         return true; |         foreach ($to_addr as $inbox => $to_actors) { | ||||||
|  |             Queue::enqueue( | ||||||
|  |                 payload: [$sender, $activity, $inbox, $to_actors], | ||||||
|  |                 queue: 'activitypub_postman', | ||||||
|  |                 priority: false, | ||||||
|  |             ); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|   | |||||||
| @@ -34,17 +34,14 @@ namespace Plugin\ActivityPub\Controller; | |||||||
|  |  | ||||||
| use App\Core\Controller; | use App\Core\Controller; | ||||||
| use App\Core\DB\DB; | use App\Core\DB\DB; | ||||||
| use App\Core\Event; |  | ||||||
| use function App\Core\I18n\_m; | use function App\Core\I18n\_m; | ||||||
| use App\Core\Log; | use App\Core\Log; | ||||||
|  | use App\Core\Queue\Queue; | ||||||
| use App\Core\Router\Router; | use App\Core\Router\Router; | ||||||
| use App\Entity\Actor; | use App\Entity\Actor; | ||||||
| use App\Util\Common; | use App\Util\Common; | ||||||
| use App\Util\Exception\ClientException; | use App\Util\Exception\ClientException; | ||||||
| use Component\FreeNetwork\Entity\FreeNetworkActorProtocol; |  | ||||||
| use Component\FreeNetwork\Util\Discovery; |  | ||||||
| use Exception; | use Exception; | ||||||
| use const PHP_URL_HOST; |  | ||||||
| use Plugin\ActivityPub\Entity\ActivitypubActor; | use Plugin\ActivityPub\Entity\ActivitypubActor; | ||||||
| use Plugin\ActivityPub\Entity\ActivitypubRsa; | use Plugin\ActivityPub\Entity\ActivitypubRsa; | ||||||
| use Plugin\ActivityPub\Util\Explorer; | use Plugin\ActivityPub\Util\Explorer; | ||||||
| @@ -150,7 +147,7 @@ class Inbox extends Controller | |||||||
|             try { |             try { | ||||||
|                 ActivitypubActor::update_profile($ap_actor, $actor, $activitypub_rsa, $res); |                 ActivitypubActor::update_profile($ap_actor, $actor, $activitypub_rsa, $res); | ||||||
|             } catch (Exception $e) { |             } catch (Exception $e) { | ||||||
|                 return $error('Failed to updated remote actor information.', $e); |                 return $error('Failed to update remote actor information.', $e); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             [$verified, /*$headers*/] = HTTPSignature::verify($actor_public_key, $signatureData, $headers, $path, $body); |             [$verified, /*$headers*/] = HTTPSignature::verify($actor_public_key, $signatureData, $headers, $path, $body); | ||||||
| @@ -165,26 +162,11 @@ class Inbox extends Controller | |||||||
|         // HTTP signature checked out, make sure the "actor" of the activity matches that of the signature |         // HTTP signature checked out, make sure the "actor" of the activity matches that of the signature | ||||||
|         Log::debug('ActivityPub Inbox: HTTP Signature: Authorised request. Will now start the inbox handler.'); |         Log::debug('ActivityPub Inbox: HTTP Signature: Authorised request. Will now start the inbox handler.'); | ||||||
|  |  | ||||||
|         // TODO: Check if Actor has authority over payload |         Queue::enqueue( | ||||||
|  |             payload: [$ap_actor, $actor, $type], | ||||||
|         // Store Activity |             queue: 'activitypub_inbox', | ||||||
|         $ap_act = Model\Activity::fromJson($type, ['source' => 'ActivityPub']); |             priority: false, | ||||||
|         FreeNetworkActorProtocol::protocolSucceeded( |  | ||||||
|             'activitypub', |  | ||||||
|             $ap_actor->getActorId(), |  | ||||||
|             Discovery::normalize($actor->getNickname() . '@' . parse_url($ap_actor->getInboxUri(), PHP_URL_HOST)), |  | ||||||
|         ); |         ); | ||||||
|         $already_known_ids = []; |  | ||||||
|         if (!empty($ap_act->_object_mention_ids)) { |  | ||||||
|             $already_known_ids = $ap_act->_object_mention_ids; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         DB::flush(); |  | ||||||
|         if (Event::handle('ActivityPubNewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]) === Event::next) { |  | ||||||
|             Event::handle('NewNotification', [$actor, $ap_act->getActivity(), $already_known_ids, _m('{nickname} attentioned you.', ['{nickname}' => $actor->getNickname()])]); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         dd($ap_act, $act = $ap_act->getActivity(), $act->getActor(), $act->getObject()); |  | ||||||
|  |  | ||||||
|         return new TypeResponse($type, status: 202); |         return new TypeResponse($type, status: 202); | ||||||
|     } |     } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user