From 766a1c62874cd02b8ae19c671bde43639d40f013 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Sun, 10 May 2020 09:15:36 +0200 Subject: [PATCH] [HttpClient] add AsyncDecoratorTrait to ease processing responses without breaking async --- .../HttpClient/AsyncDecoratorTrait.php | 54 +++ src/Symfony/Component/HttpClient/CHANGELOG.md | 5 + .../Component/HttpClient/Chunk/ErrorChunk.php | 6 +- .../HttpClient/Internal/HttplugWaitLoop.php | 4 +- .../Component/HttpClient/Psr18Client.php | 4 +- .../HttpClient/Response/AmpResponse.php | 3 +- .../HttpClient/Response/AsyncContext.php | 175 +++++++++ .../HttpClient/Response/AsyncResponse.php | 359 ++++++++++++++++++ .../Response/CommonResponseTrait.php | 194 ++++++++++ .../HttpClient/Response/CurlResponse.php | 3 +- .../HttpClient/Response/MockResponse.php | 3 +- .../HttpClient/Response/NativeResponse.php | 3 +- .../HttpClient/Response/StreamWrapper.php | 10 +- ...seTrait.php => TransportResponseTrait.php} | 175 +-------- .../Tests/AsyncDecoratorTraitTest.php | 166 ++++++++ .../Tests/Response/MockResponseTest.php | 2 +- 16 files changed, 979 insertions(+), 187 deletions(-) create mode 100644 src/Symfony/Component/HttpClient/AsyncDecoratorTrait.php create mode 100644 src/Symfony/Component/HttpClient/Response/AsyncContext.php create mode 100644 src/Symfony/Component/HttpClient/Response/AsyncResponse.php create mode 100644 src/Symfony/Component/HttpClient/Response/CommonResponseTrait.php rename src/Symfony/Component/HttpClient/Response/{ResponseTrait.php => TransportResponseTrait.php} (64%) create mode 100644 src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php diff --git a/src/Symfony/Component/HttpClient/AsyncDecoratorTrait.php b/src/Symfony/Component/HttpClient/AsyncDecoratorTrait.php new file mode 100644 index 0000000000..c5d40a251d --- /dev/null +++ b/src/Symfony/Component/HttpClient/AsyncDecoratorTrait.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient; + +use Symfony\Component\HttpClient\Response\AsyncResponse; +use Symfony\Component\HttpClient\Response\ResponseStream; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; +use Symfony\Contracts\HttpClient\ResponseStreamInterface; + +/** + * Eases with processing responses while streaming them. + * + * @author Nicolas Grekas + */ +trait AsyncDecoratorTrait +{ + private $client; + + public function __construct(HttpClientInterface $client = null) + { + $this->client = $client ?? HttpClient::create(); + } + + /** + * {@inheritdoc} + * + * @return AsyncResponse + */ + abstract public function request(string $method, string $url, array $options = []): ResponseInterface; + + /** + * {@inheritdoc} + */ + public function stream($responses, float $timeout = null): ResponseStreamInterface + { + if ($responses instanceof AsyncResponse) { + $responses = [$responses]; + } elseif (!is_iterable($responses)) { + throw new \TypeError(sprintf('"%s()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', __METHOD__, get_debug_type($responses))); + } + + return new ResponseStream(AsyncResponse::stream($responses, $timeout, static::class)); + } +} diff --git a/src/Symfony/Component/HttpClient/CHANGELOG.md b/src/Symfony/Component/HttpClient/CHANGELOG.md index 8b8bbdf8cd..e4812500c7 100644 --- a/src/Symfony/Component/HttpClient/CHANGELOG.md +++ b/src/Symfony/Component/HttpClient/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +5.2.0 +----- + + * added `AsyncDecoratorTrait` to ease processing responses without breaking async + 5.1.0 ----- diff --git a/src/Symfony/Component/HttpClient/Chunk/ErrorChunk.php b/src/Symfony/Component/HttpClient/Chunk/ErrorChunk.php index f91f2bdf52..7c36afa42a 100644 --- a/src/Symfony/Component/HttpClient/Chunk/ErrorChunk.php +++ b/src/Symfony/Component/HttpClient/Chunk/ErrorChunk.php @@ -111,8 +111,12 @@ class ErrorChunk implements ChunkInterface /** * @return bool Whether the wrapped error has been thrown or not */ - public function didThrow(): bool + public function didThrow(bool $didThrow = null): bool { + if (null !== $didThrow && $this->didThrow !== $didThrow) { + return !$this->didThrow = $didThrow; + } + return $this->didThrow; } diff --git a/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php b/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php index f17a4a7850..2feafdb0bd 100644 --- a/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php +++ b/src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php @@ -15,7 +15,7 @@ use Http\Client\Exception\NetworkException; use Psr\Http\Message\ResponseFactoryInterface; use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; use Psr\Http\Message\StreamFactoryInterface; -use Symfony\Component\HttpClient\Response\ResponseTrait; +use Symfony\Component\HttpClient\Response\CommonResponseTrait; use Symfony\Component\HttpClient\Response\StreamWrapper; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; @@ -119,7 +119,7 @@ final class HttplugWaitLoop } } - if (isset(class_uses($response)[ResponseTrait::class])) { + if (isset(class_uses($response)[CommonResponseTrait::class])) { $body = $this->streamFactory->createStreamFromResource($response->toStream(false)); } elseif (!$buffer) { $body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client)); diff --git a/src/Symfony/Component/HttpClient/Psr18Client.php b/src/Symfony/Component/HttpClient/Psr18Client.php index 67c2fdb8f0..2e952b06ed 100644 --- a/src/Symfony/Component/HttpClient/Psr18Client.php +++ b/src/Symfony/Component/HttpClient/Psr18Client.php @@ -27,7 +27,7 @@ use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\StreamInterface; use Psr\Http\Message\UriFactoryInterface; use Psr\Http\Message\UriInterface; -use Symfony\Component\HttpClient\Response\ResponseTrait; +use Symfony\Component\HttpClient\Response\CommonResponseTrait; use Symfony\Component\HttpClient\Response\StreamWrapper; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; @@ -104,7 +104,7 @@ final class Psr18Client implements ClientInterface, RequestFactoryInterface, Str } } - $body = isset(class_uses($response)[ResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client); + $body = isset(class_uses($response)[CommonResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client); $body = $this->streamFactory->createStreamFromResource($body); if ($body->isSeekable()) { diff --git a/src/Symfony/Component/HttpClient/Response/AmpResponse.php b/src/Symfony/Component/HttpClient/Response/AmpResponse.php index a406585b9d..aea245d77d 100644 --- a/src/Symfony/Component/HttpClient/Response/AmpResponse.php +++ b/src/Symfony/Component/HttpClient/Response/AmpResponse.php @@ -35,7 +35,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface; */ final class AmpResponse implements ResponseInterface { - use ResponseTrait; + use CommonResponseTrait; + use TransportResponseTrait; private $multi; private $options; diff --git a/src/Symfony/Component/HttpClient/Response/AsyncContext.php b/src/Symfony/Component/HttpClient/Response/AsyncContext.php new file mode 100644 index 0000000000..b0138968ff --- /dev/null +++ b/src/Symfony/Component/HttpClient/Response/AsyncContext.php @@ -0,0 +1,175 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Response; + +use Symfony\Component\HttpClient\Chunk\DataChunk; +use Symfony\Component\HttpClient\Chunk\LastChunk; +use Symfony\Contracts\HttpClient\ChunkInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * A DTO to work with AsyncResponse. + * + * @author Nicolas Grekas + */ +final class AsyncContext +{ + private $passthru; + private $client; + private $response; + private $info = []; + private $content; + private $offset; + + public function __construct(&$passthru, HttpClientInterface $client, ResponseInterface &$response, array &$info, $content, int $offset) + { + $this->passthru = &$passthru; + $this->client = $client; + $this->response = &$response; + $this->info = &$info; + $this->content = $content; + $this->offset = $offset; + } + + /** + * Returns the HTTP status without consuming the response. + */ + public function getStatusCode(): int + { + return $this->response->getInfo('http_code'); + } + + /** + * Returns the headers without consuming the response. + */ + public function getHeaders(): array + { + $headers = []; + + foreach ($this->response->getInfo('response_headers') as $h) { + if (11 <= \strlen($h) && '/' === $h[4] && preg_match('#^HTTP/\d+(?:\.\d+)? ([123456789]\d\d)(?: |$)#', $h, $m)) { + $headers = []; + } elseif (2 === \count($m = explode(':', $h, 2))) { + $headers[strtolower($m[0])][] = ltrim($m[1]); + } + } + + return $headers; + } + + /** + * @return resource|null The PHP stream resource where the content is buffered, if it is + */ + public function getContent() + { + return $this->content; + } + + /** + * Creates a new chunk of content. + */ + public function createChunk(string $data): ChunkInterface + { + return new DataChunk($this->offset, $data); + } + + /** + * Pauses the request for the given number of seconds. + */ + public function pause(float $duration): void + { + if (\is_callable($pause = $this->response->getInfo('pause_handler'))) { + $pause($duration); + } elseif (0 < $duration) { + usleep(1E6 * $duration); + } + } + + /** + * Cancels the request and returns the last chunk to yield. + */ + public function cancel(): ChunkInterface + { + $this->info['canceled'] = true; + $this->info['error'] = 'Response has been canceled.'; + $this->response->cancel(); + + return new LastChunk(); + } + + /** + * Returns the current info of the response. + */ + public function getInfo(string $type = null) + { + if (null !== $type) { + return $this->info[$type] ?? $this->response->getInfo($type); + } + + return $this->info + $this->response->getInfo(); + } + + /** + * Attaches an info to the response. + */ + public function setInfo(string $type, $value): self + { + if ('canceled' === $type && $value !== $this->info['canceled']) { + throw new \LogicException('You cannot set the "canceled" info directly.'); + } + + if (null === $value) { + unset($this->info[$type]); + } else { + $this->info[$type] = $value; + } + + return $this; + } + + /** + * Returns the currently processed response. + */ + public function getResponse(): ResponseInterface + { + return $this->response; + } + + /** + * Replaces the currently processed response by doing a new request. + */ + public function replaceRequest(string $method, string $url, array $options = []): ResponseInterface + { + $this->info['previous_info'][] = $this->response->getInfo(); + + return $this->response = $this->client->request($method, $url, ['buffer' => false] + $options); + } + + /** + * Replaces the currently processed response by another one. + */ + public function replaceResponse(ResponseInterface $response): ResponseInterface + { + $this->info['previous_info'][] = $this->response->getInfo(); + + return $this->response = $response; + } + + /** + * Replaces or removes the chunk filter iterator. + */ + public function passthru(callable $passthru = null): void + { + $this->passthru = $passthru; + } +} diff --git a/src/Symfony/Component/HttpClient/Response/AsyncResponse.php b/src/Symfony/Component/HttpClient/Response/AsyncResponse.php new file mode 100644 index 0000000000..a0001b5e45 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Response/AsyncResponse.php @@ -0,0 +1,359 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Response; + +use Symfony\Component\HttpClient\Chunk\ErrorChunk; +use Symfony\Component\HttpClient\Chunk\LastChunk; +use Symfony\Component\HttpClient\Exception\TransportException; +use Symfony\Contracts\HttpClient\ChunkInterface; +use Symfony\Contracts\HttpClient\Exception\ExceptionInterface; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * Provides a single extension point to process a response's content stream. + * + * @author Nicolas Grekas + */ +final class AsyncResponse implements ResponseInterface +{ + use CommonResponseTrait; + + private $client; + private $response; + private $info = ['canceled' => false]; + private $passthru; + + /** + * @param callable(ChunkInterface, AsyncContext): ?\Iterator $passthru + */ + public function __construct(HttpClientInterface $client, string $method, string $url, array $options, callable $passthru) + { + $this->client = $client; + $this->shouldBuffer = $options['buffer'] ?? true; + $this->response = $client->request($method, $url, ['buffer' => false] + $options); + $this->passthru = $passthru; + $this->initializer = static function (self $response) { + return null !== $response->shouldBuffer; + }; + if (\array_key_exists('user_data', $options)) { + $this->info['user_data'] = $options['user_data']; + } + } + + public function getStatusCode(): int + { + if ($this->initializer) { + self::initialize($this); + } + + return $this->response->getStatusCode(); + } + + public function getHeaders(bool $throw = true): array + { + if ($this->initializer) { + self::initialize($this); + } + + $headers = $this->response->getHeaders(false); + + if ($throw) { + $this->checkStatusCode($this->getInfo('http_code')); + } + + return $headers; + } + + public function getInfo(string $type = null) + { + if (null !== $type) { + return $this->info[$type] ?? $this->response->getInfo($type); + } + + return $this->info + $this->response->getInfo(); + } + + /** + * {@inheritdoc} + */ + public function toStream(bool $throw = true) + { + if ($throw) { + // Ensure headers arrived + $this->getHeaders(true); + } + + $handle = function () { + $stream = StreamWrapper::createResource($this->response); + + return stream_get_meta_data($stream)['wrapper_data']->stream_cast(STREAM_CAST_FOR_SELECT); + }; + + $stream = StreamWrapper::createResource($this); + stream_get_meta_data($stream)['wrapper_data'] + ->bindHandles($handle, $this->content); + + return $stream; + } + + /** + * {@inheritdoc} + */ + public function cancel(): void + { + if ($this->info['canceled']) { + return; + } + + $this->info['canceled'] = true; + $this->info['error'] = 'Response has been canceled.'; + $this->close(); + $client = $this->client; + $this->client = null; + + if (!$this->passthru) { + return; + } + + $context = new AsyncContext($this->passthru, $client, $this->response, $this->info, $this->content, $this->offset); + if (null === $stream = ($this->passthru)(new LastChunk(), $context)) { + return; + } + + if (!$stream instanceof \Iterator) { + throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream))); + } + + try { + foreach ($stream as $chunk) { + if ($chunk->isLast()) { + break; + } + } + + $stream->next(); + + if ($stream->valid()) { + throw new \LogicException('A chunk passthru cannot yield after the last chunk.'); + } + + $stream = $this->passthru = null; + } catch (ExceptionInterface $e) { + // ignore any errors when canceling + } + } + + /** + * @internal + */ + public static function stream(iterable $responses, float $timeout = null, string $class = null): \Generator + { + while ($responses) { + $wrappedResponses = []; + $asyncMap = new \SplObjectStorage(); + $client = null; + + foreach ($responses as $r) { + if (!$r instanceof self) { + throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r))); + } + + if (null !== $e = $r->info['error'] ?? null) { + yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e)); + $chunk->didThrow() ?: $chunk->getContent(); + continue; + } + + if (null === $client) { + $client = $r->client; + } elseif ($r->client !== $client) { + throw new TransportException('Cannot stream AsyncResponse objects with many clients.'); + } + + $asyncMap[$r->response] = $r; + $wrappedResponses[] = $r->response; + } + + if (!$client) { + return; + } + + foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) { + $r = $asyncMap[$response]; + + if (!$r->passthru) { + if (null !== $chunk->getError() || $chunk->isLast()) { + unset($asyncMap[$response]); + } + + yield $r => $chunk; + continue; + } + + $context = new AsyncContext($r->passthru, $r->client, $r->response, $r->info, $r->content, $r->offset); + if (null === $stream = ($r->passthru)($chunk, $context)) { + if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) { + throw new \LogicException('A chunk passthru cannot swallow the last chunk.'); + } + + continue; + } + $chunk = null; + + if (!$stream instanceof \Iterator) { + throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream))); + } + + while (true) { + try { + if (null !== $chunk) { + $stream->next(); + } + + if (!$stream->valid()) { + break; + } + } catch (\Throwable $e) { + $r->info['error'] = $e->getMessage(); + $r->response->cancel(); + + yield $r => $chunk = new ErrorChunk($r->offset, $e); + $chunk->didThrow() ?: $chunk->getContent(); + unset($asyncMap[$response]); + break; + } + + $chunk = $stream->current(); + + if (!$chunk instanceof ChunkInterface) { + throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk))); + } + + if (null !== $chunk->getError()) { + // no-op + } elseif ($chunk->isFirst()) { + $e = $r->openBuffer(); + + yield $r => $chunk; + + if (null === $e) { + continue; + } + + $r->response->cancel(); + $chunk = new ErrorChunk($r->offset, $e); + } elseif ('' !== $content = $chunk->getContent()) { + if (null !== $r->shouldBuffer) { + throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.'); + } + + if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) { + $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content)))); + $r->info['error'] = $chunk->getError(); + $r->response->cancel(); + } + } + + if (null === $chunk->getError()) { + $r->offset += \strlen($content); + + yield $r => $chunk; + + if (!$chunk->isLast()) { + continue; + } + + $stream->next(); + + if ($stream->valid()) { + throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.'); + } + + $r->passthru = null; + } else { + if ($chunk instanceof ErrorChunk) { + $chunk->didThrow(false); + } else { + try { + $chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError()); + } catch (TransportExceptionInterface $e) { + $chunk = new ErrorChunk($chunk->getOffset(), $e); + } + } + + yield $r => $chunk; + $chunk->didThrow() ?: $chunk->getContent(); + } + + unset($asyncMap[$response]); + break; + } + + $stream = $context = null; + + if ($r->response !== $response && isset($asyncMap[$response])) { + break; + } + } + + if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) { + throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.'); + } + + $responses = []; + foreach ($asyncMap as $response) { + $r = $asyncMap[$response]; + + if (null !== $r->client) { + $responses[] = $asyncMap[$response]; + } + } + } + } + + private function openBuffer(): ?\Throwable + { + if (null === $shouldBuffer = $this->shouldBuffer) { + throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.'); + } + + $e = $this->shouldBuffer = null; + + if ($shouldBuffer instanceof \Closure) { + try { + $shouldBuffer = $shouldBuffer($this->getHeaders(false)); + + if (null !== $e = $this->response->getInfo('error')) { + throw new TransportException($e); + } + } catch (\Throwable $e) { + $this->info['error'] = $e->getMessage(); + $this->response->cancel(); + } + } + + if (true === $shouldBuffer) { + $this->content = fopen('php://temp', 'w+'); + } elseif (\is_resource($shouldBuffer)) { + $this->content = $shouldBuffer; + } + + return $e; + } + + private function close(): void + { + $this->response->cancel(); + } +} diff --git a/src/Symfony/Component/HttpClient/Response/CommonResponseTrait.php b/src/Symfony/Component/HttpClient/Response/CommonResponseTrait.php new file mode 100644 index 0000000000..4b610b015b --- /dev/null +++ b/src/Symfony/Component/HttpClient/Response/CommonResponseTrait.php @@ -0,0 +1,194 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Response; + +use Symfony\Component\HttpClient\Exception\ClientException; +use Symfony\Component\HttpClient\Exception\JsonException; +use Symfony\Component\HttpClient\Exception\RedirectionException; +use Symfony\Component\HttpClient\Exception\ServerException; +use Symfony\Component\HttpClient\Exception\TransportException; +use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface; +use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface; +use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; + +/** + * Implements common logic for response classes. + * + * @author Nicolas Grekas + * + * @internal + */ +trait CommonResponseTrait +{ + /** + * @var callable|null A callback that tells whether we're waiting for response headers + */ + private $initializer; + private $shouldBuffer; + private $content; + private $offset = 0; + private $jsonData; + + /** + * {@inheritdoc} + */ + public function getContent(bool $throw = true): string + { + if ($this->initializer) { + self::initialize($this); + } + + if ($throw) { + $this->checkStatusCode(); + } + + if (null === $this->content) { + $content = null; + + foreach (self::stream([$this]) as $chunk) { + if (!$chunk->isLast()) { + $content .= $chunk->getContent(); + } + } + + if (null !== $content) { + return $content; + } + + if ('HEAD' === $this->getInfo('http_method') || \in_array($this->getInfo('http_code'), [204, 304], true)) { + return ''; + } + + throw new TransportException('Cannot get the content of the response twice: buffering is disabled.'); + } + + foreach (self::stream([$this]) as $chunk) { + // Chunks are buffered in $this->content already + } + + rewind($this->content); + + return stream_get_contents($this->content); + } + + /** + * {@inheritdoc} + */ + public function toArray(bool $throw = true): array + { + if ('' === $content = $this->getContent($throw)) { + throw new JsonException('Response body is empty.'); + } + + if (null !== $this->jsonData) { + return $this->jsonData; + } + + $contentType = $this->headers['content-type'][0] ?? 'application/json'; + + if (!preg_match('/\bjson\b/i', $contentType)) { + throw new JsonException(sprintf('Response content-type is "%s" while a JSON-compatible one was expected for "%s".', $contentType, $this->getInfo('url'))); + } + + try { + $content = json_decode($content, true, 512, JSON_BIGINT_AS_STRING | (\PHP_VERSION_ID >= 70300 ? JSON_THROW_ON_ERROR : 0)); + } catch (\JsonException $e) { + throw new JsonException($e->getMessage().sprintf(' for "%s".', $this->getInfo('url')), $e->getCode()); + } + + if (\PHP_VERSION_ID < 70300 && JSON_ERROR_NONE !== json_last_error()) { + throw new JsonException(json_last_error_msg().sprintf(' for "%s".', $this->getInfo('url')), json_last_error()); + } + + if (!\is_array($content)) { + throw new JsonException(sprintf('JSON content was expected to decode to an array, "%s" returned for "%s".', get_debug_type($content), $this->getInfo('url'))); + } + + if (null !== $this->content) { + // Option "buffer" is true + return $this->jsonData = $content; + } + + return $content; + } + + /** + * Casts the response to a PHP stream resource. + * + * @return resource + * + * @throws TransportExceptionInterface When a network error occurs + * @throws RedirectionExceptionInterface On a 3xx when $throw is true and the "max_redirects" option has been reached + * @throws ClientExceptionInterface On a 4xx when $throw is true + * @throws ServerExceptionInterface On a 5xx when $throw is true + */ + public function toStream(bool $throw = true) + { + if ($throw) { + // Ensure headers arrived + $this->getHeaders($throw); + } + + $stream = StreamWrapper::createResource($this); + stream_get_meta_data($stream)['wrapper_data'] + ->bindHandles($this->handle, $this->content); + + return $stream; + } + + /** + * Closes the response and all its network handles. + */ + abstract protected function close(): void; + + private static function initialize(self $response): void + { + if (null !== $response->getInfo('error')) { + throw new TransportException($response->getInfo('error')); + } + + try { + if (($response->initializer)($response)) { + foreach (self::stream([$response]) as $chunk) { + if ($chunk->isFirst()) { + break; + } + } + } + } catch (\Throwable $e) { + // Persist timeouts thrown during initialization + $response->info['error'] = $e->getMessage(); + $response->close(); + throw $e; + } + + $response->initializer = null; + } + + private function checkStatusCode() + { + $code = $this->getInfo('http_code'); + + if (500 <= $code) { + throw new ServerException($this); + } + + if (400 <= $code) { + throw new ClientException($this); + } + + if (300 <= $code) { + throw new RedirectionException($this); + } + } +} diff --git a/src/Symfony/Component/HttpClient/Response/CurlResponse.php b/src/Symfony/Component/HttpClient/Response/CurlResponse.php index e7de360a8e..ff273de995 100644 --- a/src/Symfony/Component/HttpClient/Response/CurlResponse.php +++ b/src/Symfony/Component/HttpClient/Response/CurlResponse.php @@ -27,9 +27,10 @@ use Symfony\Contracts\HttpClient\ResponseInterface; */ final class CurlResponse implements ResponseInterface { - use ResponseTrait { + use CommonResponseTrait { getContent as private doGetContent; } + use TransportResponseTrait; private static $performing = false; private $multi; diff --git a/src/Symfony/Component/HttpClient/Response/MockResponse.php b/src/Symfony/Component/HttpClient/Response/MockResponse.php index e2ae37ece1..969185dd59 100644 --- a/src/Symfony/Component/HttpClient/Response/MockResponse.php +++ b/src/Symfony/Component/HttpClient/Response/MockResponse.php @@ -25,7 +25,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface; */ class MockResponse implements ResponseInterface { - use ResponseTrait { + use CommonResponseTrait; + use TransportResponseTrait { doDestruct as public __destruct; } diff --git a/src/Symfony/Component/HttpClient/Response/NativeResponse.php b/src/Symfony/Component/HttpClient/Response/NativeResponse.php index 2ba1b010e4..8c98a2ffbb 100644 --- a/src/Symfony/Component/HttpClient/Response/NativeResponse.php +++ b/src/Symfony/Component/HttpClient/Response/NativeResponse.php @@ -26,7 +26,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface; */ final class NativeResponse implements ResponseInterface { - use ResponseTrait; + use CommonResponseTrait; + use TransportResponseTrait; private $context; private $url; diff --git a/src/Symfony/Component/HttpClient/Response/StreamWrapper.php b/src/Symfony/Component/HttpClient/Response/StreamWrapper.php index 0755c22876..210fcf6c4d 100644 --- a/src/Symfony/Component/HttpClient/Response/StreamWrapper.php +++ b/src/Symfony/Component/HttpClient/Response/StreamWrapper.php @@ -49,7 +49,7 @@ class StreamWrapper */ public static function createResource(ResponseInterface $response, HttpClientInterface $client = null) { - if (\is_callable([$response, 'toStream']) && isset(class_uses($response)[ResponseTrait::class])) { + if (\is_callable([$response, 'toStream']) && isset(class_uses($response)[CommonResponseTrait::class])) { $stack = debug_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2); if ($response !== ($stack[1]['object'] ?? null)) { @@ -83,9 +83,9 @@ class StreamWrapper } /** - * @param resource|null $handle The resource handle that should be monitored when - * stream_select() is used on the created stream - * @param resource|null $content The seekable resource where the response body is buffered + * @param resource|callable|null $handle The resource handle that should be monitored when + * stream_select() is used on the created stream + * @param resource|null $content The seekable resource where the response body is buffered */ public function bindHandles(&$handle, &$content): void { @@ -266,7 +266,7 @@ class StreamWrapper if (STREAM_CAST_FOR_SELECT === $castAs) { $this->response->getHeaders(false); - return $this->handle ?? false; + return (\is_callable($this->handle) ? ($this->handle)() : $this->handle) ?? false; } return false; diff --git a/src/Symfony/Component/HttpClient/Response/ResponseTrait.php b/src/Symfony/Component/HttpClient/Response/TransportResponseTrait.php similarity index 64% rename from src/Symfony/Component/HttpClient/Response/ResponseTrait.php rename to src/Symfony/Component/HttpClient/Response/TransportResponseTrait.php index b0fa7eceb0..9e3faf1b55 100644 --- a/src/Symfony/Component/HttpClient/Response/ResponseTrait.php +++ b/src/Symfony/Component/HttpClient/Response/TransportResponseTrait.php @@ -15,34 +15,19 @@ use Symfony\Component\HttpClient\Chunk\DataChunk; use Symfony\Component\HttpClient\Chunk\ErrorChunk; use Symfony\Component\HttpClient\Chunk\FirstChunk; use Symfony\Component\HttpClient\Chunk\LastChunk; -use Symfony\Component\HttpClient\Exception\ClientException; -use Symfony\Component\HttpClient\Exception\JsonException; -use Symfony\Component\HttpClient\Exception\RedirectionException; -use Symfony\Component\HttpClient\Exception\ServerException; use Symfony\Component\HttpClient\Exception\TransportException; use Symfony\Component\HttpClient\Internal\ClientState; -use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface; -use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface; -use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface; -use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; /** - * Implements the common logic for response classes. + * Implements common logic for transport-level response classes. * * @author Nicolas Grekas * * @internal */ -trait ResponseTrait +trait TransportResponseTrait { - private $logger; private $headers = []; - - /** - * @var callable|null A callback that initializes the two previous properties - */ - private $initializer; - private $info = [ 'response_headers' => [], 'http_code' => 0, @@ -55,11 +40,8 @@ trait ResponseTrait private $id; private $timeout = 0; private $inflate; - private $shouldBuffer; - private $content; private $finalInfo; - private $offset = 0; - private $jsonData; + private $logger; /** * {@inheritdoc} @@ -89,89 +71,6 @@ trait ResponseTrait return $this->headers; } - /** - * {@inheritdoc} - */ - public function getContent(bool $throw = true): string - { - if ($this->initializer) { - self::initialize($this); - } - - if ($throw) { - $this->checkStatusCode(); - } - - if (null === $this->content) { - $content = null; - - foreach (self::stream([$this]) as $chunk) { - if (!$chunk->isLast()) { - $content .= $chunk->getContent(); - } - } - - if (null !== $content) { - return $content; - } - - if ('HEAD' === $this->info['http_method'] || \in_array($this->info['http_code'], [204, 304], true)) { - return ''; - } - - throw new TransportException('Cannot get the content of the response twice: buffering is disabled.'); - } - - foreach (self::stream([$this]) as $chunk) { - // Chunks are buffered in $this->content already - } - - rewind($this->content); - - return stream_get_contents($this->content); - } - - /** - * {@inheritdoc} - */ - public function toArray(bool $throw = true): array - { - if ('' === $content = $this->getContent($throw)) { - throw new JsonException('Response body is empty.'); - } - - if (null !== $this->jsonData) { - return $this->jsonData; - } - - $contentType = $this->headers['content-type'][0] ?? 'application/json'; - - if (!preg_match('/\bjson\b/i', $contentType)) { - throw new JsonException(sprintf('Response content-type is "%s" while a JSON-compatible one was expected for "%s".', $contentType, $this->getInfo('url'))); - } - - try { - $content = json_decode($content, true, 512, JSON_BIGINT_AS_STRING | (\PHP_VERSION_ID >= 70300 ? JSON_THROW_ON_ERROR : 0)); - } catch (\JsonException $e) { - throw new JsonException($e->getMessage().sprintf(' for "%s".', $this->getInfo('url')), $e->getCode()); - } - - if (\PHP_VERSION_ID < 70300 && JSON_ERROR_NONE !== json_last_error()) { - throw new JsonException(json_last_error_msg().sprintf(' for "%s".', $this->getInfo('url')), json_last_error()); - } - - if (!\is_array($content)) { - throw new JsonException(sprintf('JSON content was expected to decode to an array, "%s" returned for "%s".', get_debug_type($content), $this->getInfo('url'))); - } - - if (null !== $this->content) { - // Option "buffer" is true - return $this->jsonData = $content; - } - - return $content; - } - /** * {@inheritdoc} */ @@ -182,35 +81,6 @@ trait ResponseTrait $this->close(); } - /** - * Casts the response to a PHP stream resource. - * - * @return resource - * - * @throws TransportExceptionInterface When a network error occurs - * @throws RedirectionExceptionInterface On a 3xx when $throw is true and the "max_redirects" option has been reached - * @throws ClientExceptionInterface On a 4xx when $throw is true - * @throws ServerExceptionInterface On a 5xx when $throw is true - */ - public function toStream(bool $throw = true) - { - if ($throw) { - // Ensure headers arrived - $this->getHeaders($throw); - } - - $stream = StreamWrapper::createResource($this); - stream_get_meta_data($stream)['wrapper_data'] - ->bindHandles($this->handle, $this->content); - - return $stream; - } - - /** - * Closes the response and all its network handles. - */ - abstract protected function close(): void; - /** * Adds pending responses to the activity list. */ @@ -226,30 +96,6 @@ trait ResponseTrait */ abstract protected static function select(ClientState $multi, float $timeout): int; - private static function initialize(self $response): void - { - if (null !== $response->info['error']) { - throw new TransportException($response->info['error']); - } - - try { - if (($response->initializer)($response)) { - foreach (self::stream([$response]) as $chunk) { - if ($chunk->isFirst()) { - break; - } - } - } - } catch (\Throwable $e) { - // Persist timeouts thrown during initialization - $response->info['error'] = $e->getMessage(); - $response->close(); - throw $e; - } - - $response->initializer = null; - } - private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void { foreach ($responseHeaders as $h) { @@ -274,21 +120,6 @@ trait ResponseTrait } } - private function checkStatusCode() - { - if (500 <= $this->info['http_code']) { - throw new ServerException($this); - } - - if (400 <= $this->info['http_code']) { - throw new ClientException($this); - } - - if (300 <= $this->info['http_code']) { - throw new RedirectionException($this); - } - } - /** * Ensures the request is always sent and that the response code was checked. */ diff --git a/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php b/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php new file mode 100644 index 0000000000..874a5505b3 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php @@ -0,0 +1,166 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Tests; + +use Symfony\Component\HttpClient\AsyncDecoratorTrait; +use Symfony\Component\HttpClient\Response\AsyncContext; +use Symfony\Component\HttpClient\Response\AsyncResponse; +use Symfony\Contracts\HttpClient\ChunkInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +class AsyncDecoratorTraitTest extends NativeHttpClientTest +{ + protected function getHttpClient(string $testCase, \Closure $chunkFilter = null): HttpClientInterface + { + $chunkFilter = $chunkFilter ?? static function (ChunkInterface $chunk, AsyncContext $context) { yield $chunk; }; + + return new class(parent::getHttpClient($testCase), $chunkFilter) implements HttpClientInterface { + use AsyncDecoratorTrait; + + private $chunkFilter; + + public function __construct(HttpClientInterface $client, \Closure $chunkFilter = null) + { + $this->chunkFilter = $chunkFilter; + $this->client = $client; + } + + public function request(string $method, string $url, array $options = []): ResponseInterface + { + return new AsyncResponse($this->client, $method, $url, $options, $this->chunkFilter); + } + }; + } + + public function testRetry404() + { + $client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) { + $this->assertTrue($chunk->isFirst()); + $this->assertSame(404, $context->getStatusCode()); + $context->getResponse()->cancel(); + $context->replaceRequest('GET', 'http://localhost:8057/'); + $context->passthru(); + }); + + $response = $client->request('GET', 'http://localhost:8057/404'); + + foreach ($client->stream($response) as $chunk) { + } + $this->assertTrue($chunk->isLast()); + $this->assertSame(200, $response->getStatusCode()); + } + + public function testRetryTransportError() + { + $client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) { + try { + if ($chunk->isFirst()) { + $this->assertSame(200, $context->getStatusCode()); + } + + yield $chunk; + } catch (TransportExceptionInterface $e) { + $context->getResponse()->cancel(); + $context->replaceRequest('GET', 'http://localhost:8057/'); + } + }); + + $response = $client->request('GET', 'http://localhost:8057/chunked-broken'); + + $this->assertSame(200, $response->getStatusCode()); + } + + public function testJsonTransclusion() + { + $client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) { + if ('' === $content = $chunk->getContent()) { + yield $chunk; + + return; + } + + $this->assertSame('{"documents":[{"id":"\/json\/1"},{"id":"\/json\/2"},{"id":"\/json\/3"}]}', $content); + + $steps = preg_split('{\{"id":"\\\/json\\\/(\d)"\}}', $content, -1, PREG_SPLIT_DELIM_CAPTURE); + $steps[7] = $context->getResponse(); + $steps[1] = $context->replaceRequest('GET', 'http://localhost:8057/json/1'); + $steps[3] = $context->replaceRequest('GET', 'http://localhost:8057/json/2'); + $steps[5] = $context->replaceRequest('GET', 'http://localhost:8057/json/3'); + + yield $context->createChunk(array_shift($steps)); + + $context->replaceResponse(array_shift($steps)); + $context->passthru(static function (ChunkInterface $chunk, AsyncContext $context) use (&$steps) { + if ($chunk->isFirst()) { + return; + } + + if ($steps && $chunk->isLast()) { + $chunk = $context->createChunk(array_shift($steps)); + $context->replaceResponse(array_shift($steps)); + } + + yield $chunk; + }); + }); + + $response = $client->request('GET', 'http://localhost:8057/json'); + + $this->assertSame('{"documents":[{"title":"\/json\/1"},{"title":"\/json\/2"},{"title":"\/json\/3"}]}', $response->getContent()); + } + + public function testPreflightRequest() + { + $client = new class(parent::getHttpClient(__FUNCTION__)) implements HttpClientInterface { + use AsyncDecoratorTrait; + + public function request(string $method, string $url, array $options = []): ResponseInterface + { + $chunkFilter = static function (ChunkInterface $chunk, AsyncContext $context) use ($method, $url, $options) { + $context->replaceRequest($method, $url, $options); + $context->passthru(); + }; + + return new AsyncResponse($this->client, 'GET', 'http://localhost:8057', $options, $chunkFilter); + } + }; + + $response = $client->request('GET', 'http://localhost:8057/json'); + + $this->assertSame('{"documents":[{"id":"\/json\/1"},{"id":"\/json\/2"},{"id":"\/json\/3"}]}', $response->getContent()); + $this->assertSame('http://localhost:8057/', $response->getInfo('previous_info')[0]['url']); + } + + public function testProcessingHappensOnce() + { + $lastChunks = 0; + $client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$lastChunks) { + $lastChunks += $chunk->isLast(); + + yield $chunk; + }); + + $response = $client->request('GET', 'http://localhost:8057/'); + + foreach ($client->stream($response) as $chunk) { + } + $this->assertTrue($chunk->isLast()); + $this->assertSame(1, $lastChunks); + + $chunk = null; + foreach ($client->stream($response) as $chunk) { + } + $this->assertTrue($chunk->isLast()); + $this->assertSame(1, $lastChunks); + } +} diff --git a/src/Symfony/Component/HttpClient/Tests/Response/MockResponseTest.php b/src/Symfony/Component/HttpClient/Tests/Response/MockResponseTest.php index b1cb16c1d9..dcc256e960 100644 --- a/src/Symfony/Component/HttpClient/Tests/Response/MockResponseTest.php +++ b/src/Symfony/Component/HttpClient/Tests/Response/MockResponseTest.php @@ -7,7 +7,7 @@ use Symfony\Component\HttpClient\Exception\JsonException; use Symfony\Component\HttpClient\Response\MockResponse; /** - * Test methods from Symfony\Component\HttpClient\Response\ResponseTrait. + * Test methods from Symfony\Component\HttpClient\Response\*ResponseTrait. */ class MockResponseTest extends TestCase {