[HttpClient] add AsyncDecoratorTrait to ease processing responses without breaking async

This commit is contained in:
Nicolas Grekas 2020-05-10 09:15:36 +02:00
parent bf53b26662
commit 766a1c6287
16 changed files with 979 additions and 187 deletions

View File

@ -0,0 +1,54 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient;
use Symfony\Component\HttpClient\Response\AsyncResponse;
use Symfony\Component\HttpClient\Response\ResponseStream;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Symfony\Contracts\HttpClient\ResponseStreamInterface;
/**
* Eases with processing responses while streaming them.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
trait AsyncDecoratorTrait
{
private $client;
public function __construct(HttpClientInterface $client = null)
{
$this->client = $client ?? HttpClient::create();
}
/**
* {@inheritdoc}
*
* @return AsyncResponse
*/
abstract public function request(string $method, string $url, array $options = []): ResponseInterface;
/**
* {@inheritdoc}
*/
public function stream($responses, float $timeout = null): ResponseStreamInterface
{
if ($responses instanceof AsyncResponse) {
$responses = [$responses];
} elseif (!is_iterable($responses)) {
throw new \TypeError(sprintf('"%s()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', __METHOD__, get_debug_type($responses)));
}
return new ResponseStream(AsyncResponse::stream($responses, $timeout, static::class));
}
}

View File

@ -1,6 +1,11 @@
CHANGELOG
=========
5.2.0
-----
* added `AsyncDecoratorTrait` to ease processing responses without breaking async
5.1.0
-----

View File

@ -111,8 +111,12 @@ class ErrorChunk implements ChunkInterface
/**
* @return bool Whether the wrapped error has been thrown or not
*/
public function didThrow(): bool
public function didThrow(bool $didThrow = null): bool
{
if (null !== $didThrow && $this->didThrow !== $didThrow) {
return !$this->didThrow = $didThrow;
}
return $this->didThrow;
}

View File

@ -15,7 +15,7 @@ use Http\Client\Exception\NetworkException;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\CommonResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
@ -119,7 +119,7 @@ final class HttplugWaitLoop
}
}
if (isset(class_uses($response)[ResponseTrait::class])) {
if (isset(class_uses($response)[CommonResponseTrait::class])) {
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
} elseif (!$buffer) {
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));

View File

@ -27,7 +27,7 @@ use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\StreamInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\CommonResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
@ -104,7 +104,7 @@ final class Psr18Client implements ClientInterface, RequestFactoryInterface, Str
}
}
$body = isset(class_uses($response)[ResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = isset(class_uses($response)[CommonResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = $this->streamFactory->createStreamFromResource($body);
if ($body->isSeekable()) {

View File

@ -35,7 +35,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface;
*/
final class AmpResponse implements ResponseInterface
{
use ResponseTrait;
use CommonResponseTrait;
use TransportResponseTrait;
private $multi;
private $options;

View File

@ -0,0 +1,175 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Response;
use Symfony\Component\HttpClient\Chunk\DataChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/**
* A DTO to work with AsyncResponse.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class AsyncContext
{
private $passthru;
private $client;
private $response;
private $info = [];
private $content;
private $offset;
public function __construct(&$passthru, HttpClientInterface $client, ResponseInterface &$response, array &$info, $content, int $offset)
{
$this->passthru = &$passthru;
$this->client = $client;
$this->response = &$response;
$this->info = &$info;
$this->content = $content;
$this->offset = $offset;
}
/**
* Returns the HTTP status without consuming the response.
*/
public function getStatusCode(): int
{
return $this->response->getInfo('http_code');
}
/**
* Returns the headers without consuming the response.
*/
public function getHeaders(): array
{
$headers = [];
foreach ($this->response->getInfo('response_headers') as $h) {
if (11 <= \strlen($h) && '/' === $h[4] && preg_match('#^HTTP/\d+(?:\.\d+)? ([123456789]\d\d)(?: |$)#', $h, $m)) {
$headers = [];
} elseif (2 === \count($m = explode(':', $h, 2))) {
$headers[strtolower($m[0])][] = ltrim($m[1]);
}
}
return $headers;
}
/**
* @return resource|null The PHP stream resource where the content is buffered, if it is
*/
public function getContent()
{
return $this->content;
}
/**
* Creates a new chunk of content.
*/
public function createChunk(string $data): ChunkInterface
{
return new DataChunk($this->offset, $data);
}
/**
* Pauses the request for the given number of seconds.
*/
public function pause(float $duration): void
{
if (\is_callable($pause = $this->response->getInfo('pause_handler'))) {
$pause($duration);
} elseif (0 < $duration) {
usleep(1E6 * $duration);
}
}
/**
* Cancels the request and returns the last chunk to yield.
*/
public function cancel(): ChunkInterface
{
$this->info['canceled'] = true;
$this->info['error'] = 'Response has been canceled.';
$this->response->cancel();
return new LastChunk();
}
/**
* Returns the current info of the response.
*/
public function getInfo(string $type = null)
{
if (null !== $type) {
return $this->info[$type] ?? $this->response->getInfo($type);
}
return $this->info + $this->response->getInfo();
}
/**
* Attaches an info to the response.
*/
public function setInfo(string $type, $value): self
{
if ('canceled' === $type && $value !== $this->info['canceled']) {
throw new \LogicException('You cannot set the "canceled" info directly.');
}
if (null === $value) {
unset($this->info[$type]);
} else {
$this->info[$type] = $value;
}
return $this;
}
/**
* Returns the currently processed response.
*/
public function getResponse(): ResponseInterface
{
return $this->response;
}
/**
* Replaces the currently processed response by doing a new request.
*/
public function replaceRequest(string $method, string $url, array $options = []): ResponseInterface
{
$this->info['previous_info'][] = $this->response->getInfo();
return $this->response = $this->client->request($method, $url, ['buffer' => false] + $options);
}
/**
* Replaces the currently processed response by another one.
*/
public function replaceResponse(ResponseInterface $response): ResponseInterface
{
$this->info['previous_info'][] = $this->response->getInfo();
return $this->response = $response;
}
/**
* Replaces or removes the chunk filter iterator.
*/
public function passthru(callable $passthru = null): void
{
$this->passthru = $passthru;
}
}

View File

@ -0,0 +1,359 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Response;
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/**
* Provides a single extension point to process a response's content stream.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class AsyncResponse implements ResponseInterface
{
use CommonResponseTrait;
private $client;
private $response;
private $info = ['canceled' => false];
private $passthru;
/**
* @param callable(ChunkInterface, AsyncContext): ?\Iterator $passthru
*/
public function __construct(HttpClientInterface $client, string $method, string $url, array $options, callable $passthru)
{
$this->client = $client;
$this->shouldBuffer = $options['buffer'] ?? true;
$this->response = $client->request($method, $url, ['buffer' => false] + $options);
$this->passthru = $passthru;
$this->initializer = static function (self $response) {
return null !== $response->shouldBuffer;
};
if (\array_key_exists('user_data', $options)) {
$this->info['user_data'] = $options['user_data'];
}
}
public function getStatusCode(): int
{
if ($this->initializer) {
self::initialize($this);
}
return $this->response->getStatusCode();
}
public function getHeaders(bool $throw = true): array
{
if ($this->initializer) {
self::initialize($this);
}
$headers = $this->response->getHeaders(false);
if ($throw) {
$this->checkStatusCode($this->getInfo('http_code'));
}
return $headers;
}
public function getInfo(string $type = null)
{
if (null !== $type) {
return $this->info[$type] ?? $this->response->getInfo($type);
}
return $this->info + $this->response->getInfo();
}
/**
* {@inheritdoc}
*/
public function toStream(bool $throw = true)
{
if ($throw) {
// Ensure headers arrived
$this->getHeaders(true);
}
$handle = function () {
$stream = StreamWrapper::createResource($this->response);
return stream_get_meta_data($stream)['wrapper_data']->stream_cast(STREAM_CAST_FOR_SELECT);
};
$stream = StreamWrapper::createResource($this);
stream_get_meta_data($stream)['wrapper_data']
->bindHandles($handle, $this->content);
return $stream;
}
/**
* {@inheritdoc}
*/
public function cancel(): void
{
if ($this->info['canceled']) {
return;
}
$this->info['canceled'] = true;
$this->info['error'] = 'Response has been canceled.';
$this->close();
$client = $this->client;
$this->client = null;
if (!$this->passthru) {
return;
}
$context = new AsyncContext($this->passthru, $client, $this->response, $this->info, $this->content, $this->offset);
if (null === $stream = ($this->passthru)(new LastChunk(), $context)) {
return;
}
if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
}
try {
foreach ($stream as $chunk) {
if ($chunk->isLast()) {
break;
}
}
$stream->next();
if ($stream->valid()) {
throw new \LogicException('A chunk passthru cannot yield after the last chunk.');
}
$stream = $this->passthru = null;
} catch (ExceptionInterface $e) {
// ignore any errors when canceling
}
}
/**
* @internal
*/
public static function stream(iterable $responses, float $timeout = null, string $class = null): \Generator
{
while ($responses) {
$wrappedResponses = [];
$asyncMap = new \SplObjectStorage();
$client = null;
foreach ($responses as $r) {
if (!$r instanceof self) {
throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r)));
}
if (null !== $e = $r->info['error'] ?? null) {
yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e));
$chunk->didThrow() ?: $chunk->getContent();
continue;
}
if (null === $client) {
$client = $r->client;
} elseif ($r->client !== $client) {
throw new TransportException('Cannot stream AsyncResponse objects with many clients.');
}
$asyncMap[$r->response] = $r;
$wrappedResponses[] = $r->response;
}
if (!$client) {
return;
}
foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) {
$r = $asyncMap[$response];
if (!$r->passthru) {
if (null !== $chunk->getError() || $chunk->isLast()) {
unset($asyncMap[$response]);
}
yield $r => $chunk;
continue;
}
$context = new AsyncContext($r->passthru, $r->client, $r->response, $r->info, $r->content, $r->offset);
if (null === $stream = ($r->passthru)($chunk, $context)) {
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
}
continue;
}
$chunk = null;
if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
}
while (true) {
try {
if (null !== $chunk) {
$stream->next();
}
if (!$stream->valid()) {
break;
}
} catch (\Throwable $e) {
$r->info['error'] = $e->getMessage();
$r->response->cancel();
yield $r => $chunk = new ErrorChunk($r->offset, $e);
$chunk->didThrow() ?: $chunk->getContent();
unset($asyncMap[$response]);
break;
}
$chunk = $stream->current();
if (!$chunk instanceof ChunkInterface) {
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
}
if (null !== $chunk->getError()) {
// no-op
} elseif ($chunk->isFirst()) {
$e = $r->openBuffer();
yield $r => $chunk;
if (null === $e) {
continue;
}
$r->response->cancel();
$chunk = new ErrorChunk($r->offset, $e);
} elseif ('' !== $content = $chunk->getContent()) {
if (null !== $r->shouldBuffer) {
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
}
if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
$r->info['error'] = $chunk->getError();
$r->response->cancel();
}
}
if (null === $chunk->getError()) {
$r->offset += \strlen($content);
yield $r => $chunk;
if (!$chunk->isLast()) {
continue;
}
$stream->next();
if ($stream->valid()) {
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
}
$r->passthru = null;
} else {
if ($chunk instanceof ErrorChunk) {
$chunk->didThrow(false);
} else {
try {
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
} catch (TransportExceptionInterface $e) {
$chunk = new ErrorChunk($chunk->getOffset(), $e);
}
}
yield $r => $chunk;
$chunk->didThrow() ?: $chunk->getContent();
}
unset($asyncMap[$response]);
break;
}
$stream = $context = null;
if ($r->response !== $response && isset($asyncMap[$response])) {
break;
}
}
if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) {
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
}
$responses = [];
foreach ($asyncMap as $response) {
$r = $asyncMap[$response];
if (null !== $r->client) {
$responses[] = $asyncMap[$response];
}
}
}
}
private function openBuffer(): ?\Throwable
{
if (null === $shouldBuffer = $this->shouldBuffer) {
throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.');
}
$e = $this->shouldBuffer = null;
if ($shouldBuffer instanceof \Closure) {
try {
$shouldBuffer = $shouldBuffer($this->getHeaders(false));
if (null !== $e = $this->response->getInfo('error')) {
throw new TransportException($e);
}
} catch (\Throwable $e) {
$this->info['error'] = $e->getMessage();
$this->response->cancel();
}
}
if (true === $shouldBuffer) {
$this->content = fopen('php://temp', 'w+');
} elseif (\is_resource($shouldBuffer)) {
$this->content = $shouldBuffer;
}
return $e;
}
private function close(): void
{
$this->response->cancel();
}
}

View File

@ -0,0 +1,194 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Response;
use Symfony\Component\HttpClient\Exception\ClientException;
use Symfony\Component\HttpClient\Exception\JsonException;
use Symfony\Component\HttpClient\Exception\RedirectionException;
use Symfony\Component\HttpClient\Exception\ServerException;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
/**
* Implements common logic for response classes.
*
* @author Nicolas Grekas <p@tchwork.com>
*
* @internal
*/
trait CommonResponseTrait
{
/**
* @var callable|null A callback that tells whether we're waiting for response headers
*/
private $initializer;
private $shouldBuffer;
private $content;
private $offset = 0;
private $jsonData;
/**
* {@inheritdoc}
*/
public function getContent(bool $throw = true): string
{
if ($this->initializer) {
self::initialize($this);
}
if ($throw) {
$this->checkStatusCode();
}
if (null === $this->content) {
$content = null;
foreach (self::stream([$this]) as $chunk) {
if (!$chunk->isLast()) {
$content .= $chunk->getContent();
}
}
if (null !== $content) {
return $content;
}
if ('HEAD' === $this->getInfo('http_method') || \in_array($this->getInfo('http_code'), [204, 304], true)) {
return '';
}
throw new TransportException('Cannot get the content of the response twice: buffering is disabled.');
}
foreach (self::stream([$this]) as $chunk) {
// Chunks are buffered in $this->content already
}
rewind($this->content);
return stream_get_contents($this->content);
}
/**
* {@inheritdoc}
*/
public function toArray(bool $throw = true): array
{
if ('' === $content = $this->getContent($throw)) {
throw new JsonException('Response body is empty.');
}
if (null !== $this->jsonData) {
return $this->jsonData;
}
$contentType = $this->headers['content-type'][0] ?? 'application/json';
if (!preg_match('/\bjson\b/i', $contentType)) {
throw new JsonException(sprintf('Response content-type is "%s" while a JSON-compatible one was expected for "%s".', $contentType, $this->getInfo('url')));
}
try {
$content = json_decode($content, true, 512, JSON_BIGINT_AS_STRING | (\PHP_VERSION_ID >= 70300 ? JSON_THROW_ON_ERROR : 0));
} catch (\JsonException $e) {
throw new JsonException($e->getMessage().sprintf(' for "%s".', $this->getInfo('url')), $e->getCode());
}
if (\PHP_VERSION_ID < 70300 && JSON_ERROR_NONE !== json_last_error()) {
throw new JsonException(json_last_error_msg().sprintf(' for "%s".', $this->getInfo('url')), json_last_error());
}
if (!\is_array($content)) {
throw new JsonException(sprintf('JSON content was expected to decode to an array, "%s" returned for "%s".', get_debug_type($content), $this->getInfo('url')));
}
if (null !== $this->content) {
// Option "buffer" is true
return $this->jsonData = $content;
}
return $content;
}
/**
* Casts the response to a PHP stream resource.
*
* @return resource
*
* @throws TransportExceptionInterface When a network error occurs
* @throws RedirectionExceptionInterface On a 3xx when $throw is true and the "max_redirects" option has been reached
* @throws ClientExceptionInterface On a 4xx when $throw is true
* @throws ServerExceptionInterface On a 5xx when $throw is true
*/
public function toStream(bool $throw = true)
{
if ($throw) {
// Ensure headers arrived
$this->getHeaders($throw);
}
$stream = StreamWrapper::createResource($this);
stream_get_meta_data($stream)['wrapper_data']
->bindHandles($this->handle, $this->content);
return $stream;
}
/**
* Closes the response and all its network handles.
*/
abstract protected function close(): void;
private static function initialize(self $response): void
{
if (null !== $response->getInfo('error')) {
throw new TransportException($response->getInfo('error'));
}
try {
if (($response->initializer)($response)) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
$response->initializer = null;
}
private function checkStatusCode()
{
$code = $this->getInfo('http_code');
if (500 <= $code) {
throw new ServerException($this);
}
if (400 <= $code) {
throw new ClientException($this);
}
if (300 <= $code) {
throw new RedirectionException($this);
}
}
}

View File

@ -27,9 +27,10 @@ use Symfony\Contracts\HttpClient\ResponseInterface;
*/
final class CurlResponse implements ResponseInterface
{
use ResponseTrait {
use CommonResponseTrait {
getContent as private doGetContent;
}
use TransportResponseTrait;
private static $performing = false;
private $multi;

View File

@ -25,7 +25,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface;
*/
class MockResponse implements ResponseInterface
{
use ResponseTrait {
use CommonResponseTrait;
use TransportResponseTrait {
doDestruct as public __destruct;
}

View File

@ -26,7 +26,8 @@ use Symfony\Contracts\HttpClient\ResponseInterface;
*/
final class NativeResponse implements ResponseInterface
{
use ResponseTrait;
use CommonResponseTrait;
use TransportResponseTrait;
private $context;
private $url;

View File

@ -49,7 +49,7 @@ class StreamWrapper
*/
public static function createResource(ResponseInterface $response, HttpClientInterface $client = null)
{
if (\is_callable([$response, 'toStream']) && isset(class_uses($response)[ResponseTrait::class])) {
if (\is_callable([$response, 'toStream']) && isset(class_uses($response)[CommonResponseTrait::class])) {
$stack = debug_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2);
if ($response !== ($stack[1]['object'] ?? null)) {
@ -83,9 +83,9 @@ class StreamWrapper
}
/**
* @param resource|null $handle The resource handle that should be monitored when
* stream_select() is used on the created stream
* @param resource|null $content The seekable resource where the response body is buffered
* @param resource|callable|null $handle The resource handle that should be monitored when
* stream_select() is used on the created stream
* @param resource|null $content The seekable resource where the response body is buffered
*/
public function bindHandles(&$handle, &$content): void
{
@ -266,7 +266,7 @@ class StreamWrapper
if (STREAM_CAST_FOR_SELECT === $castAs) {
$this->response->getHeaders(false);
return $this->handle ?? false;
return (\is_callable($this->handle) ? ($this->handle)() : $this->handle) ?? false;
}
return false;

View File

@ -15,34 +15,19 @@ use Symfony\Component\HttpClient\Chunk\DataChunk;
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Component\HttpClient\Exception\ClientException;
use Symfony\Component\HttpClient\Exception\JsonException;
use Symfony\Component\HttpClient\Exception\RedirectionException;
use Symfony\Component\HttpClient\Exception\ServerException;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Component\HttpClient\Internal\ClientState;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
/**
* Implements the common logic for response classes.
* Implements common logic for transport-level response classes.
*
* @author Nicolas Grekas <p@tchwork.com>
*
* @internal
*/
trait ResponseTrait
trait TransportResponseTrait
{
private $logger;
private $headers = [];
/**
* @var callable|null A callback that initializes the two previous properties
*/
private $initializer;
private $info = [
'response_headers' => [],
'http_code' => 0,
@ -55,11 +40,8 @@ trait ResponseTrait
private $id;
private $timeout = 0;
private $inflate;
private $shouldBuffer;
private $content;
private $finalInfo;
private $offset = 0;
private $jsonData;
private $logger;
/**
* {@inheritdoc}
@ -89,89 +71,6 @@ trait ResponseTrait
return $this->headers;
}
/**
* {@inheritdoc}
*/
public function getContent(bool $throw = true): string
{
if ($this->initializer) {
self::initialize($this);
}
if ($throw) {
$this->checkStatusCode();
}
if (null === $this->content) {
$content = null;
foreach (self::stream([$this]) as $chunk) {
if (!$chunk->isLast()) {
$content .= $chunk->getContent();
}
}
if (null !== $content) {
return $content;
}
if ('HEAD' === $this->info['http_method'] || \in_array($this->info['http_code'], [204, 304], true)) {
return '';
}
throw new TransportException('Cannot get the content of the response twice: buffering is disabled.');
}
foreach (self::stream([$this]) as $chunk) {
// Chunks are buffered in $this->content already
}
rewind($this->content);
return stream_get_contents($this->content);
}
/**
* {@inheritdoc}
*/
public function toArray(bool $throw = true): array
{
if ('' === $content = $this->getContent($throw)) {
throw new JsonException('Response body is empty.');
}
if (null !== $this->jsonData) {
return $this->jsonData;
}
$contentType = $this->headers['content-type'][0] ?? 'application/json';
if (!preg_match('/\bjson\b/i', $contentType)) {
throw new JsonException(sprintf('Response content-type is "%s" while a JSON-compatible one was expected for "%s".', $contentType, $this->getInfo('url')));
}
try {
$content = json_decode($content, true, 512, JSON_BIGINT_AS_STRING | (\PHP_VERSION_ID >= 70300 ? JSON_THROW_ON_ERROR : 0));
} catch (\JsonException $e) {
throw new JsonException($e->getMessage().sprintf(' for "%s".', $this->getInfo('url')), $e->getCode());
}
if (\PHP_VERSION_ID < 70300 && JSON_ERROR_NONE !== json_last_error()) {
throw new JsonException(json_last_error_msg().sprintf(' for "%s".', $this->getInfo('url')), json_last_error());
}
if (!\is_array($content)) {
throw new JsonException(sprintf('JSON content was expected to decode to an array, "%s" returned for "%s".', get_debug_type($content), $this->getInfo('url')));
}
if (null !== $this->content) {
// Option "buffer" is true
return $this->jsonData = $content;
}
return $content;
}
/**
* {@inheritdoc}
*/
@ -182,35 +81,6 @@ trait ResponseTrait
$this->close();
}
/**
* Casts the response to a PHP stream resource.
*
* @return resource
*
* @throws TransportExceptionInterface When a network error occurs
* @throws RedirectionExceptionInterface On a 3xx when $throw is true and the "max_redirects" option has been reached
* @throws ClientExceptionInterface On a 4xx when $throw is true
* @throws ServerExceptionInterface On a 5xx when $throw is true
*/
public function toStream(bool $throw = true)
{
if ($throw) {
// Ensure headers arrived
$this->getHeaders($throw);
}
$stream = StreamWrapper::createResource($this);
stream_get_meta_data($stream)['wrapper_data']
->bindHandles($this->handle, $this->content);
return $stream;
}
/**
* Closes the response and all its network handles.
*/
abstract protected function close(): void;
/**
* Adds pending responses to the activity list.
*/
@ -226,30 +96,6 @@ trait ResponseTrait
*/
abstract protected static function select(ClientState $multi, float $timeout): int;
private static function initialize(self $response): void
{
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
try {
if (($response->initializer)($response)) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
$response->initializer = null;
}
private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
{
foreach ($responseHeaders as $h) {
@ -274,21 +120,6 @@ trait ResponseTrait
}
}
private function checkStatusCode()
{
if (500 <= $this->info['http_code']) {
throw new ServerException($this);
}
if (400 <= $this->info['http_code']) {
throw new ClientException($this);
}
if (300 <= $this->info['http_code']) {
throw new RedirectionException($this);
}
}
/**
* Ensures the request is always sent and that the response code was checked.
*/

View File

@ -0,0 +1,166 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Tests;
use Symfony\Component\HttpClient\AsyncDecoratorTrait;
use Symfony\Component\HttpClient\Response\AsyncContext;
use Symfony\Component\HttpClient\Response\AsyncResponse;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
class AsyncDecoratorTraitTest extends NativeHttpClientTest
{
protected function getHttpClient(string $testCase, \Closure $chunkFilter = null): HttpClientInterface
{
$chunkFilter = $chunkFilter ?? static function (ChunkInterface $chunk, AsyncContext $context) { yield $chunk; };
return new class(parent::getHttpClient($testCase), $chunkFilter) implements HttpClientInterface {
use AsyncDecoratorTrait;
private $chunkFilter;
public function __construct(HttpClientInterface $client, \Closure $chunkFilter = null)
{
$this->chunkFilter = $chunkFilter;
$this->client = $client;
}
public function request(string $method, string $url, array $options = []): ResponseInterface
{
return new AsyncResponse($this->client, $method, $url, $options, $this->chunkFilter);
}
};
}
public function testRetry404()
{
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) {
$this->assertTrue($chunk->isFirst());
$this->assertSame(404, $context->getStatusCode());
$context->getResponse()->cancel();
$context->replaceRequest('GET', 'http://localhost:8057/');
$context->passthru();
});
$response = $client->request('GET', 'http://localhost:8057/404');
foreach ($client->stream($response) as $chunk) {
}
$this->assertTrue($chunk->isLast());
$this->assertSame(200, $response->getStatusCode());
}
public function testRetryTransportError()
{
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) {
try {
if ($chunk->isFirst()) {
$this->assertSame(200, $context->getStatusCode());
}
yield $chunk;
} catch (TransportExceptionInterface $e) {
$context->getResponse()->cancel();
$context->replaceRequest('GET', 'http://localhost:8057/');
}
});
$response = $client->request('GET', 'http://localhost:8057/chunked-broken');
$this->assertSame(200, $response->getStatusCode());
}
public function testJsonTransclusion()
{
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) {
if ('' === $content = $chunk->getContent()) {
yield $chunk;
return;
}
$this->assertSame('{"documents":[{"id":"\/json\/1"},{"id":"\/json\/2"},{"id":"\/json\/3"}]}', $content);
$steps = preg_split('{\{"id":"\\\/json\\\/(\d)"\}}', $content, -1, PREG_SPLIT_DELIM_CAPTURE);
$steps[7] = $context->getResponse();
$steps[1] = $context->replaceRequest('GET', 'http://localhost:8057/json/1');
$steps[3] = $context->replaceRequest('GET', 'http://localhost:8057/json/2');
$steps[5] = $context->replaceRequest('GET', 'http://localhost:8057/json/3');
yield $context->createChunk(array_shift($steps));
$context->replaceResponse(array_shift($steps));
$context->passthru(static function (ChunkInterface $chunk, AsyncContext $context) use (&$steps) {
if ($chunk->isFirst()) {
return;
}
if ($steps && $chunk->isLast()) {
$chunk = $context->createChunk(array_shift($steps));
$context->replaceResponse(array_shift($steps));
}
yield $chunk;
});
});
$response = $client->request('GET', 'http://localhost:8057/json');
$this->assertSame('{"documents":[{"title":"\/json\/1"},{"title":"\/json\/2"},{"title":"\/json\/3"}]}', $response->getContent());
}
public function testPreflightRequest()
{
$client = new class(parent::getHttpClient(__FUNCTION__)) implements HttpClientInterface {
use AsyncDecoratorTrait;
public function request(string $method, string $url, array $options = []): ResponseInterface
{
$chunkFilter = static function (ChunkInterface $chunk, AsyncContext $context) use ($method, $url, $options) {
$context->replaceRequest($method, $url, $options);
$context->passthru();
};
return new AsyncResponse($this->client, 'GET', 'http://localhost:8057', $options, $chunkFilter);
}
};
$response = $client->request('GET', 'http://localhost:8057/json');
$this->assertSame('{"documents":[{"id":"\/json\/1"},{"id":"\/json\/2"},{"id":"\/json\/3"}]}', $response->getContent());
$this->assertSame('http://localhost:8057/', $response->getInfo('previous_info')[0]['url']);
}
public function testProcessingHappensOnce()
{
$lastChunks = 0;
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$lastChunks) {
$lastChunks += $chunk->isLast();
yield $chunk;
});
$response = $client->request('GET', 'http://localhost:8057/');
foreach ($client->stream($response) as $chunk) {
}
$this->assertTrue($chunk->isLast());
$this->assertSame(1, $lastChunks);
$chunk = null;
foreach ($client->stream($response) as $chunk) {
}
$this->assertTrue($chunk->isLast());
$this->assertSame(1, $lastChunks);
}
}

View File

@ -7,7 +7,7 @@ use Symfony\Component\HttpClient\Exception\JsonException;
use Symfony\Component\HttpClient\Response\MockResponse;
/**
* Test methods from Symfony\Component\HttpClient\Response\ResponseTrait.
* Test methods from Symfony\Component\HttpClient\Response\*ResponseTrait.
*/
class MockResponseTest extends TestCase
{