bug #35223 [HttpClient] Don't read from the network faster than the CPU can deal with (nicolas-grekas)

This PR was merged into the 4.3 branch.

Discussion
----------

[HttpClient] Don't read from the network faster than the CPU can deal with

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        | -

Something I spotted while working on #35115: both the curl and native clients don't play well with heavily compressed HTTP streams: they decompress faster than userland can process chunks.

The attached patch moves the decompression logic to the chunk generator. This means internally we only deal with raw compressed chunks, and they are decompressed only when passing the value to userland.

Commits
-------

ac3d77a76a [HttpClient] Don't read from the network faster than the CPU can deal with
This commit is contained in:
Nicolas Grekas 2020-01-06 13:56:11 +01:00
commit f9b36c754d
6 changed files with 77 additions and 100 deletions

View File

@ -113,7 +113,7 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$url = implode('', $url);
if (!isset($options['normalized_headers']['user-agent'])) {
$options['normalized_headers']['user-agent'][] = $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
$options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
}
$curlopts = [
@ -194,8 +194,8 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$curlopts[CURLOPT_NOSIGNAL] = true;
}
if (!isset($options['normalized_headers']['accept-encoding']) && CURL_VERSION_LIBZ & self::$curlVersion['features']) {
$curlopts[CURLOPT_ENCODING] = 'gzip'; // Expose only one encoding, some servers mess up when more are provided
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
$options['headers'][] = 'Accept-Encoding: gzip'; // Expose only one encoding, some servers mess up when more are provided
}
foreach ($options['headers'] as $header) {

View File

@ -77,7 +77,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
}
if ($gzipEnabled = \extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
// gzip is the most widely available algo, no need to deal with deflate
$options['headers'][] = 'Accept-Encoding: gzip';
}
@ -210,7 +210,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$context = stream_context_create($context, ['notification' => $notification]);
self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $gzipEnabled, $info, $resolveRedirect, $onProgress, $this->logger);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $info, $resolveRedirect, $onProgress, $this->logger);
}
/**

View File

@ -52,6 +52,7 @@ final class CurlResponse implements ResponseInterface
$this->id = $id = (int) $ch;
$this->logger = $logger;
$this->shouldBuffer = $options['buffer'] ?? true;
$this->timeout = $options['timeout'] ?? null;
$this->info['http_method'] = $method;
$this->info['user_data'] = $options['user_data'] ?? null;
@ -65,30 +66,25 @@ final class CurlResponse implements ResponseInterface
curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter
}
if (null === $content = &$this->content) {
$content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
} else {
// Move the pushed response to the activity list
if (ftell($content)) {
rewind($content);
$multi->handlesActivity[$id][] = stream_get_contents($content);
}
$content = ($options['buffer'] ?? true) ? $content : null;
}
curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int {
return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger);
});
if (null === $options) {
// Pushed response: buffer until requested
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content): int {
return fwrite($content, $data);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;
curl_pause($ch, CURLPAUSE_RECV);
return \strlen($data);
});
return;
}
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
curl_pause($ch, CURLPAUSE_CONT);
if ($onProgress = $options['on_progress']) {
$url = isset($info['url']) ? ['url' => $info['url']] : [];
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
@ -108,33 +104,16 @@ final class CurlResponse implements ResponseInterface
});
}
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content, $multi, $id): int {
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;
return null !== $content ? fwrite($content, $data) : \strlen($data);
return \strlen($data);
});
$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
$waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE);
if ('H' === $waitFor[0] || 'D' === $waitFor[0]) {
try {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
}
return 'H' === $waitFor[0] || 'D' === $waitFor[0];
};
// Schedule the request in a non-blocking way
@ -221,6 +200,7 @@ final class CurlResponse implements ResponseInterface
*/
private function close(): void
{
$this->inflate = null;
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
curl_multi_remove_handle($this->multi->handle, $this->handle);
curl_setopt_array($this->handle, [

View File

@ -93,6 +93,7 @@ class MockResponse implements ResponseInterface
*/
protected function close(): void
{
$this->inflate = null;
$this->body = [];
}
@ -104,16 +105,9 @@ class MockResponse implements ResponseInterface
$response = new self([]);
$response->requestOptions = $options;
$response->id = ++self::$idSequence;
$response->content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
$response->shouldBuffer = $options['buffer'] ?? true;
$response->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
if (\is_array($response->body[0] ?? null)) {
// Consume the first chunk if it's not yielded yet
self::stream([$response])->current();
}
return \is_array($response->body[0] ?? null);
};
$response->info['redirect_count'] = 0;
@ -186,11 +180,6 @@ class MockResponse implements ResponseInterface
} else {
// Data or timeout chunk
$multi->handlesActivity[$id][] = $chunk;
if (\is_string($chunk) && null !== $response->content) {
// Buffer response body
fwrite($response->content, $chunk);
}
}
}
}

View File

@ -32,14 +32,13 @@ final class NativeResponse implements ResponseInterface
private $onProgress;
private $remaining;
private $buffer;
private $inflate;
private $multi;
private $debugBuffer;
/**
* @internal
*/
public function __construct(NativeClientState $multi, $context, string $url, $options, bool $gzipEnabled, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
{
$this->multi = $multi;
$this->id = (int) $context;
@ -50,27 +49,17 @@ final class NativeResponse implements ResponseInterface
$this->info = &$info;
$this->resolveRedirect = $resolveRedirect;
$this->onProgress = $onProgress;
$this->content = $options['buffer'] ? fopen('php://temp', 'w+') : null;
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
$this->shouldBuffer = $options['buffer'] ?? true;
// Temporary resources to dechunk/inflate the response stream
// Temporary resource to dechunk the response stream
$this->buffer = fopen('php://temp', 'w+');
$this->inflate = $gzipEnabled ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$info['user_data'] = $options['user_data'];
++$multi->responseCount;
$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
if (null === $response->remaining) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
return null === $response->remaining;
};
}
@ -165,7 +154,7 @@ final class NativeResponse implements ResponseInterface
stream_set_blocking($h, false);
$this->context = $this->resolveRedirect = null;
// Create dechunk and inflate buffers
// Create dechunk buffers
if (isset($this->headers['content-length'])) {
$this->remaining = (int) $this->headers['content-length'][0];
} elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) {
@ -175,10 +164,6 @@ final class NativeResponse implements ResponseInterface
$this->remaining = -2;
}
if ($this->inflate && 'gzip' !== ($this->headers['content-encoding'][0] ?? null)) {
$this->inflate = null;
}
$this->multi->handlesActivity[$this->id] = [new FirstChunk()];
if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) {
@ -188,7 +173,7 @@ final class NativeResponse implements ResponseInterface
return;
}
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info];
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
}
/**
@ -228,15 +213,15 @@ final class NativeResponse implements ResponseInterface
$multi->handles = [];
}
foreach ($multi->openHandles as $i => [$h, $buffer, $inflate, $content, $onProgress]) {
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false;
$remaining = &$multi->openHandles[$i][5];
$info = &$multi->openHandles[$i][6];
$remaining = &$multi->openHandles[$i][3];
$info = &$multi->openHandles[$i][4];
$e = null;
// Read incoming buffer and write it to the dechunk one
try {
while ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
fwrite($buffer, $data);
$hasActivity = true;
$multi->sleep = false;
@ -264,16 +249,8 @@ final class NativeResponse implements ResponseInterface
rewind($buffer);
ftruncate($buffer, 0);
if (null !== $inflate && false === $data = @inflate_add($inflate, $data)) {
$e = new TransportException('Error while processing content unencoding.');
}
if ('' !== $data && null === $e) {
if (null === $e) {
$multi->handlesActivity[$i][] = $data;
if (null !== $content && \strlen($data) !== fwrite($content, $data)) {
$e = new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($data)));
}
}
}

View File

@ -39,11 +39,6 @@ trait ResponseTrait
*/
private $initializer;
/**
* @var resource A php://temp stream typically
*/
private $content;
private $info = [
'response_headers' => [],
'http_code' => 0,
@ -54,6 +49,9 @@ trait ResponseTrait
private $handle;
private $id;
private $timeout;
private $inflate;
private $shouldBuffer;
private $content;
private $finalInfo;
private $offset = 0;
private $jsonData;
@ -64,8 +62,7 @@ trait ResponseTrait
public function getStatusCode(): int
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
return $this->info['http_code'];
@ -77,8 +74,7 @@ trait ResponseTrait
public function getHeaders(bool $throw = true): array
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
if ($throw) {
@ -94,8 +90,7 @@ trait ResponseTrait
public function getContent(bool $throw = true): string
{
if ($this->initializer) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
}
if ($throw) {
@ -201,6 +196,30 @@ trait ResponseTrait
*/
abstract protected static function select(ClientState $multi, float $timeout): int;
private static function initialize(self $response): void
{
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
try {
if (($response->initializer)($response)) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
$response->initializer = null;
}
private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
{
foreach ($responseHeaders as $h) {
@ -246,8 +265,7 @@ trait ResponseTrait
private function doDestruct()
{
if ($this->initializer && null === $this->info['error']) {
($this->initializer)($this);
$this->initializer = null;
self::initialize($this);
$this->checkStatusCode();
}
}
@ -299,6 +317,16 @@ trait ResponseTrait
$isTimeout = false;
if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Error while processing content unencoding.')];
continue;
}
if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Failed writing %d bytes to the response buffer.', \strlen($chunk))];
continue;
}
$response->offset += \strlen($chunk);
$chunk = new DataChunk($response->offset, $chunk);
} elseif (null === $chunk) {
@ -326,6 +354,9 @@ trait ResponseTrait
$response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url']));
}
$response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$response->content = $response->shouldBuffer ? fopen('php://temp', 'w+') : null;
yield $response => $chunk;
if ($response->initializer && null === $response->info['error']) {