[HttpClient] Don't read from the network faster than the CPU can deal with

This commit is contained in:
Nicolas Grekas 2020-01-05 14:56:11 +01:00
parent ccfc4b67b3
commit ac3d77a76a
6 changed files with 77 additions and 100 deletions

View File

@ -113,7 +113,7 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$url = implode('', $url);
if (!isset($options['normalized_headers']['user-agent'])) {
$options['normalized_headers']['user-agent'][] = $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
$options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
}
$curlopts = [
@ -194,8 +194,8 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$curlopts[CURLOPT_NOSIGNAL] = true;
}
if (!isset($options['normalized_headers']['accept-encoding']) && CURL_VERSION_LIBZ & self::$curlVersion['features']) {
$curlopts[CURLOPT_ENCODING] = 'gzip'; // Expose only one encoding, some servers mess up when more are provided
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
$options['headers'][] = 'Accept-Encoding: gzip'; // Expose only one encoding, some servers mess up when more are provided
}
foreach ($options['headers'] as $header) {

View File

@ -77,7 +77,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
}
if ($gzipEnabled = \extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
// gzip is the most widely available algo, no need to deal with deflate
$options['headers'][] = 'Accept-Encoding: gzip';
}
@ -210,7 +210,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$context = stream_context_create($context, ['notification' => $notification]);
self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $gzipEnabled, $info, $resolveRedirect, $onProgress, $this->logger);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $info, $resolveRedirect, $onProgress, $this->logger);
}
/**

View File

@ -52,6 +52,7 @@ final class CurlResponse implements ResponseInterface
$this->id = $id = (int) $ch;
$this->logger = $logger;
$this->shouldBuffer = $options['buffer'] ?? true;
$this->timeout = $options['timeout'] ?? null;
$this->info['http_method'] = $method;
$this->info['user_data'] = $options['user_data'] ?? null;
@ -65,30 +66,25 @@ final class CurlResponse implements ResponseInterface
curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter
}
if (null === $content = &$this->content) {
$content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
} else {
// Move the pushed response to the activity list
if (ftell($content)) {
rewind($content);
$multi->handlesActivity[$id][] = stream_get_contents($content);
}
$content = ($options['buffer'] ?? true) ? $content : null;
}
curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int {
return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger);
});
if (null === $options) {
// Pushed response: buffer until requested
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content): int {
return fwrite($content, $data);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;
curl_pause($ch, CURLPAUSE_RECV);
return \strlen($data);
});
return;
}
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
curl_pause($ch, CURLPAUSE_CONT);
if ($onProgress = $options['on_progress']) {
$url = isset($info['url']) ? ['url' => $info['url']] : [];
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
@ -108,33 +104,16 @@ final class CurlResponse implements ResponseInterface
});
}
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content, $multi, $id): int {
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;
return null !== $content ? fwrite($content, $data) : \strlen($data);
return \strlen($data);
});
$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
$waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE);
if ('H' === $waitFor[0] || 'D' === $waitFor[0]) {
try {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
}
return 'H' === $waitFor[0] || 'D' === $waitFor[0];
};
// Schedule the request in a non-blocking way
@ -221,6 +200,7 @@ final class CurlResponse implements ResponseInterface
*/
private function close(): void
{
$this->inflate = null;
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
curl_multi_remove_handle($this->multi->handle, $this->handle);
curl_setopt_array($this->handle, [

View File

@ -93,6 +93,7 @@ class MockResponse implements ResponseInterface
*/
protected function close(): void
{
$this->inflate = null;
$this->body = [];
}
@ -104,16 +105,9 @@ class MockResponse implements ResponseInterface
$response = new self([]);
$response->requestOptions = $options;
$response->id = ++self::$idSequence;
$response->content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
$response->shouldBuffer = $options['buffer'] ?? true;
$response->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
if (\is_array($response->body[0] ?? null)) {
// Consume the first chunk if it's not yielded yet
self::stream([$response])->current();
}
return \is_array($response->body[0] ?? null);
};
$response->info['redirect_count'] = 0;
@ -186,11 +180,6 @@ class MockResponse implements ResponseInterface
} else {
// Data or timeout chunk
$multi->handlesActivity[$id][] = $chunk;
if (\is_string($chunk) && null !== $response->content) {
// Buffer response body
fwrite($response->content, $chunk);
}
}
}
}

View File

@ -32,14 +32,13 @@ final class NativeResponse implements ResponseInterface
private $onProgress;
private $remaining;
private $buffer;
private $inflate;
private $multi;
private $debugBuffer;
/**
* @internal
*/
public function __construct(NativeClientState $multi, $context, string $url, $options, bool $gzipEnabled, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
{
$this->multi = $multi;
$this->id = (int) $context;
@ -50,27 +49,17 @@ final class NativeResponse implements ResponseInterface
$this->info = &$info;
$this->resolveRedirect = $resolveRedirect;
$this->onProgress = $onProgress;
$this->content = $options['buffer'] ? fopen('php://temp', 'w+') : null;
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
$this->shouldBuffer = $options['buffer'] ?? true;
// Temporary resources to dechunk/inflate the response stream
// Temporary resource to dechunk the response stream
$this->buffer = fopen('php://temp', 'w+');
$this->inflate = $gzipEnabled ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$info['user_data'] = $options['user_data'];
++$multi->responseCount;
$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
if (null === $response->remaining) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
return null === $response->remaining;
};
}
@ -165,7 +154,7 @@ final class NativeResponse implements ResponseInterface
stream_set_blocking($h, false);
$this->context = $this->resolveRedirect = null;
// Create dechunk and inflate buffers
// Create dechunk buffers
if (isset($this->headers['content-length'])) {
$this->remaining = (int) $this->headers['content-length'][0];
} elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) {
@ -175,10 +164,6 @@ final class NativeResponse implements ResponseInterface
$this->remaining = -2;
}
if ($this->inflate && 'gzip' !== ($this->headers['content-encoding'][0] ?? null)) {
$this->inflate = null;
}
$this->multi->handlesActivity[$this->id] = [new FirstChunk()];
if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) {
@ -188,7 +173,7 @@ final class NativeResponse implements ResponseInterface
return;
}
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info];
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
}
/**
@ -228,15 +213,15 @@ final class NativeResponse implements ResponseInterface
$multi->handles = [];
}
foreach ($multi->openHandles as $i => [$h, $buffer, $inflate, $content, $onProgress]) {
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false;
$remaining = &$multi->openHandles[$i][5];
$info = &$multi->openHandles[$i][6];
$remaining = &$multi->openHandles[$i][3];
$info = &$multi->openHandles[$i][4];
$e = null;
// Read incoming buffer and write it to the dechunk one
try {
while ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
fwrite($buffer, $data);
$hasActivity = true;
$multi->sleep = false;
@ -264,16 +249,8 @@ final class NativeResponse implements ResponseInterface
rewind($buffer);
ftruncate($buffer, 0);
if (null !== $inflate && false === $data = @inflate_add($inflate, $data)) {
$e = new TransportException('Error while processing content unencoding.');
}
if ('' !== $data && null === $e) {
if (null === $e) {
$multi->handlesActivity[$i][] = $data;
if (null !== $content && \strlen($data) !== fwrite($content, $data)) {
$e = new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($data)));
}
}
}

View File

@ -39,11 +39,6 @@ trait ResponseTrait
*/
private $initializer;
/**
* @var resource A php://temp stream typically
*/
private $content;
private $info = [
'response_headers' => [],
'http_code' => 0,
@ -54,6 +49,9 @@ trait ResponseTrait
private $handle;
private $id;
private $timeout;
private $inflate;
private $shouldBuffer;
private $content;
private $finalInfo;
private $offset = 0;
private $jsonData;
@ -64,8 +62,7 @@ trait ResponseTrait
public function getStatusCode(): int
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
return $this->info['http_code'];
@ -77,8 +74,7 @@ trait ResponseTrait
public function getHeaders(bool $throw = true): array
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
if ($throw) {
@ -94,8 +90,7 @@ trait ResponseTrait
public function getContent(bool $throw = true): string
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
if ($throw) {
@ -201,6 +196,30 @@ trait ResponseTrait
*/
abstract protected static function select(ClientState $multi, float $timeout): int;
private static function initialize(self $response): void
{
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
try {
if (($response->initializer)($response)) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
$response->initializer = null;
}
private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
{
foreach ($responseHeaders as $h) {
@ -246,8 +265,7 @@ trait ResponseTrait
private function doDestruct()
{
if ($this->initializer && null === $this->info['error']) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
$this->checkStatusCode();
}
}
@ -299,6 +317,16 @@ trait ResponseTrait
$isTimeout = false;
if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Error while processing content unencoding.')];
continue;
}
if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Failed writing %d bytes to the response buffer.', \strlen($chunk))];
continue;
}
$response->offset += \strlen($chunk);
$chunk = new DataChunk($response->offset, $chunk);
} elseif (null === $chunk) {
@ -326,6 +354,9 @@ trait ResponseTrait
$response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url']));
}
$response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$response->content = $response->shouldBuffer ? fopen('php://temp', 'w+') : null;
yield $response => $chunk;
if ($response->initializer && null === $response->info['error']) {