diff --git a/src/Symfony/Component/HttpClient/HttplugClient.php b/src/Symfony/Component/HttpClient/HttplugClient.php index 6ba3cca620..8f7fe51465 100644 --- a/src/Symfony/Component/HttpClient/HttplugClient.php +++ b/src/Symfony/Component/HttpClient/HttplugClient.php @@ -32,9 +32,8 @@ use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\StreamInterface; use Psr\Http\Message\UriFactoryInterface; use Psr\Http\Message\UriInterface; +use Symfony\Component\HttpClient\Internal\HttplugWaitLoop; use Symfony\Component\HttpClient\Response\HttplugPromise; -use Symfony\Component\HttpClient\Response\ResponseTrait; -use Symfony\Component\HttpClient\Response\StreamWrapper; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; use Symfony\Contracts\HttpClient\ResponseInterface; @@ -60,27 +59,27 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF private $client; private $responseFactory; private $streamFactory; - private $promisePool = []; - private $pendingResponse; + private $promisePool; + private $waitLoop; public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null) { $this->client = $client ?? HttpClient::create(); $this->responseFactory = $responseFactory; $this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null); - $this->promisePool = new \SplObjectStorage(); + $this->promisePool = \function_exists('GuzzleHttp\Promise\queue') ? new \SplObjectStorage() : null; - if (null !== $this->responseFactory && null !== $this->streamFactory) { - return; + if (null === $this->responseFactory || null === $this->streamFactory) { + if (!class_exists(Psr17Factory::class)) { + throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".'); + } + + $psr17Factory = new Psr17Factory(); + $this->responseFactory = $this->responseFactory ?? $psr17Factory; + $this->streamFactory = $this->streamFactory ?? $psr17Factory; } - if (!class_exists(Psr17Factory::class)) { - throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".'); - } - - $psr17Factory = new Psr17Factory(); - $this->responseFactory = $this->responseFactory ?? $psr17Factory; - $this->streamFactory = $this->streamFactory ?? $psr17Factory; + $this->waitLoop = new HttplugWaitLoop($this->client, $this->promisePool, $this->responseFactory, $this->streamFactory); } /** @@ -89,7 +88,7 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF public function sendRequest(RequestInterface $request): Psr7ResponseInterface { try { - return $this->createPsr7Response($this->sendPsr7Request($request)); + return $this->waitLoop->createPsr7Response($this->sendPsr7Request($request)); } catch (TransportExceptionInterface $e) { throw new NetworkException($e->getMessage(), $request, $e); } @@ -102,7 +101,7 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF */ public function sendAsyncRequest(RequestInterface $request): Promise { - if (!class_exists(GuzzlePromise::class)) { + if (!$promisePool = $this->promisePool) { throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__)); } @@ -112,23 +111,22 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF return new RejectedPromise($e); } - $cancel = function () use ($response) { + $waitLoop = $this->waitLoop; + + $promise = new GuzzlePromise(static function () use ($response, $waitLoop) { + $waitLoop->wait($response); + }, static function () use ($response, $promisePool) { $response->cancel(); - unset($this->promisePool[$response]); - }; + unset($promisePool[$response]); + }); - $promise = new GuzzlePromise(function () use ($response) { - $this->pendingResponse = $response; - $this->wait(); - }, $cancel); + $promisePool[$response] = [$request, $promise]; - $this->promisePool[$response] = [$request, $promise]; - - return new HttplugPromise($promise, $cancel); + return new HttplugPromise($promise); } /** - * Resolve pending promises that complete before the timeouts are reached. + * Resolves pending promises that complete before the timeouts are reached. * * When $maxDuration is null and $idleTimeout is reached, promises are rejected. * @@ -136,64 +134,7 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF */ public function wait(float $maxDuration = null, float $idleTimeout = null): int { - $pendingResponse = $this->pendingResponse; - $this->pendingResponse = null; - - if (null !== $maxDuration) { - $startTime = microtime(true); - $idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration)); - $remainingDuration = $maxDuration; - } - - do { - foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) { - try { - if (null !== $maxDuration && $chunk->isTimeout()) { - goto check_duration; - } - - if ($chunk->isFirst()) { - // Deactivate throwing on 3/4/5xx - $response->getStatusCode(); - } - - if (!$chunk->isLast()) { - goto check_duration; - } - - if ([$request, $promise] = $this->promisePool[$response] ?? null) { - unset($this->promisePool[$response]); - $promise->resolve($this->createPsr7Response($response, true)); - } - } catch (\Exception $e) { - if ([$request, $promise] = $this->promisePool[$response] ?? null) { - unset($this->promisePool[$response]); - - if ($e instanceof TransportExceptionInterface) { - $e = new NetworkException($e->getMessage(), $request, $e); - } - - $promise->reject($e); - } - } - - if ($pendingResponse === $response) { - return \count($this->promisePool); - } - - check_duration: - if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) { - $idleTimeout = $remainingDuration / 5; - break; - } - } - - if (!$count = \count($this->promisePool)) { - return 0; - } - } while (null !== $maxDuration && 0 < $remainingDuration); - - return $count; + return $this->waitLoop->wait(null, $maxDuration, $idleTimeout); } /** @@ -265,6 +206,11 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF return new Uri($uri); } + public function __destruct() + { + $this->wait(); + } + private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface { try { @@ -286,29 +232,4 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF throw new NetworkException($e->getMessage(), $request, $e); } } - - private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface - { - $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); - - foreach ($response->getHeaders(false) as $name => $values) { - foreach ($values as $value) { - $psrResponse = $psrResponse->withAddedHeader($name, $value); - } - } - - if (isset(class_uses($response)[ResponseTrait::class])) { - $body = $this->streamFactory->createStreamFromResource($response->toStream(false)); - } elseif (!$buffer) { - $body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client)); - } else { - $body = $this->streamFactory->createStream($response->getContent(false)); - } - - if ($body->isSeekable()) { - $body->seek(0); - } - - return $psrResponse->withBody($body); - } } diff --git a/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php b/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php new file mode 100644 index 0000000000..3f287feb6b --- /dev/null +++ b/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php @@ -0,0 +1,136 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Internal; + +use Http\Client\Exception\NetworkException; +use Psr\Http\Message\ResponseFactoryInterface; +use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; +use Psr\Http\Message\StreamFactoryInterface; +use Symfony\Component\HttpClient\Response\ResponseTrait; +use Symfony\Component\HttpClient\Response\StreamWrapper; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * @author Nicolas Grekas + * + * @internal + */ +final class HttplugWaitLoop +{ + private $client; + private $promisePool; + private $responseFactory; + private $streamFactory; + + public function __construct(HttpClientInterface $client, ?\SplObjectStorage $promisePool, ResponseFactoryInterface $responseFactory, StreamFactoryInterface $streamFactory) + { + $this->client = $client; + $this->promisePool = $promisePool; + $this->responseFactory = $responseFactory; + $this->streamFactory = $streamFactory; + } + + public function wait(?ResponseInterface $pendingResponse, float $maxDuration = null, float $idleTimeout = null): int + { + if (!$this->promisePool) { + return 0; + } + + $guzzleQueue = \GuzzleHttp\Promise\queue(); + + if (0.0 === $remainingDuration = $maxDuration) { + $idleTimeout = 0.0; + } elseif (null !== $maxDuration) { + $startTime = microtime(true); + $idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration)); + } + + do { + foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) { + try { + if (null !== $maxDuration && $chunk->isTimeout()) { + goto check_duration; + } + + if ($chunk->isFirst()) { + // Deactivate throwing on 3/4/5xx + $response->getStatusCode(); + } + + if (!$chunk->isLast()) { + goto check_duration; + } + + if ([$request, $promise] = $this->promisePool[$response] ?? null) { + unset($this->promisePool[$response]); + $promise->resolve($this->createPsr7Response($response, true)); + } + } catch (\Exception $e) { + if ([$request, $promise] = $this->promisePool[$response] ?? null) { + unset($this->promisePool[$response]); + + if ($e instanceof TransportExceptionInterface) { + $e = new NetworkException($e->getMessage(), $request, $e); + } + + $promise->reject($e); + } + } + + $guzzleQueue->run(); + + if ($pendingResponse === $response) { + return $this->promisePool->count(); + } + + check_duration: + if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) { + $idleTimeout = $remainingDuration / 5; + break; + } + } + + if (!$count = $this->promisePool->count()) { + return 0; + } + } while (null === $maxDuration || 0 < $remainingDuration); + + return $count; + } + + public function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface + { + $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); + + foreach ($response->getHeaders(false) as $name => $values) { + foreach ($values as $value) { + $psrResponse = $psrResponse->withAddedHeader($name, $value); + } + } + + if (isset(class_uses($response)[ResponseTrait::class])) { + $body = $this->streamFactory->createStreamFromResource($response->toStream(false)); + } elseif (!$buffer) { + $body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client)); + } else { + $body = $this->streamFactory->createStream($response->getContent(false)); + } + + if ($body->isSeekable()) { + $body->seek(0); + } + + return $psrResponse->withBody($body); + } +} diff --git a/src/Symfony/Component/HttpClient/Response/HttplugPromise.php b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php index 73637b38f7..2f98d6e0b9 100644 --- a/src/Symfony/Component/HttpClient/Response/HttplugPromise.php +++ b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php @@ -23,12 +23,10 @@ use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; final class HttplugPromise implements HttplugPromiseInterface { private $promise; - private $cancel; - public function __construct(GuzzlePromiseInterface $promise, callable $cancel = null) + public function __construct(GuzzlePromiseInterface $promise) { $this->promise = $promise; - $this->cancel = $cancel; } public function then(callable $onFulfilled = null, callable $onRejected = null): self @@ -58,16 +56,4 @@ final class HttplugPromise implements HttplugPromiseInterface { return $this->promise->wait($unwrap); } - - public function __destruct() - { - if ($this->cancel) { - ($this->cancel)(); - } - } - - public function __wakeup() - { - throw new \BadMethodCallException('Cannot unserialize '.__CLASS__); - } } diff --git a/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php b/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php index 8cd339f2f8..66a8cef6e7 100644 --- a/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php +++ b/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php @@ -75,6 +75,31 @@ class HttplugClientTest extends TestCase $this->assertSame('HTTP/1.1', $body['SERVER_PROTOCOL']); } + public function testWait() + { + $client = new HttplugClient(new NativeHttpClient()); + + $successCallableCalled = false; + $failureCallableCalled = false; + $client->sendAsyncRequest($client->createRequest('GET', 'http://localhost:8057/timeout-body')) + ->then(function (ResponseInterface $response) use (&$successCallableCalled) { + $successCallableCalled = true; + + return $response; + }, function (\Exception $exception) use (&$failureCallableCalled) { + $failureCallableCalled = true; + + throw $exception; + }); + + $client->wait(0); + $this->assertFalse($successCallableCalled, '$promise->then() should not be called yet.'); + + $client->wait(); + $this->assertTrue($successCallableCalled, '$promise->then() should have been called.'); + $this->assertFalse($failureCallableCalled, 'Failure callable should not be called when request is successful.'); + } + public function testPostRequest() { $client = new HttplugClient(new NativeHttpClient());