[HttpClient] Don't read from the network faster than the CPU can deal with

This commit is contained in:
Nicolas Grekas 2020-01-05 14:56:11 +01:00
parent ccfc4b67b3
commit ac3d77a76a
6 changed files with 77 additions and 100 deletions

View File

@ -113,7 +113,7 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$url = implode('', $url); $url = implode('', $url);
if (!isset($options['normalized_headers']['user-agent'])) { if (!isset($options['normalized_headers']['user-agent'])) {
$options['normalized_headers']['user-agent'][] = $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl'; $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
} }
$curlopts = [ $curlopts = [
@ -194,8 +194,8 @@ final class CurlHttpClient implements HttpClientInterface, LoggerAwareInterface,
$curlopts[CURLOPT_NOSIGNAL] = true; $curlopts[CURLOPT_NOSIGNAL] = true;
} }
if (!isset($options['normalized_headers']['accept-encoding']) && CURL_VERSION_LIBZ & self::$curlVersion['features']) { if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
$curlopts[CURLOPT_ENCODING] = 'gzip'; // Expose only one encoding, some servers mess up when more are provided $options['headers'][] = 'Accept-Encoding: gzip'; // Expose only one encoding, some servers mess up when more are provided
} }
foreach ($options['headers'] as $header) { foreach ($options['headers'] as $header) {

View File

@ -77,7 +77,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded'; $options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
} }
if ($gzipEnabled = \extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) { if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
// gzip is the most widely available algo, no need to deal with deflate // gzip is the most widely available algo, no need to deal with deflate
$options['headers'][] = 'Accept-Encoding: gzip'; $options['headers'][] = 'Accept-Encoding: gzip';
} }
@ -210,7 +210,7 @@ final class NativeHttpClient implements HttpClientInterface, LoggerAwareInterfac
$context = stream_context_create($context, ['notification' => $notification]); $context = stream_context_create($context, ['notification' => $notification]);
self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy); self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $gzipEnabled, $info, $resolveRedirect, $onProgress, $this->logger); return new NativeResponse($this->multi, $context, implode('', $url), $options, $info, $resolveRedirect, $onProgress, $this->logger);
} }
/** /**

View File

@ -52,6 +52,7 @@ final class CurlResponse implements ResponseInterface
$this->id = $id = (int) $ch; $this->id = $id = (int) $ch;
$this->logger = $logger; $this->logger = $logger;
$this->shouldBuffer = $options['buffer'] ?? true;
$this->timeout = $options['timeout'] ?? null; $this->timeout = $options['timeout'] ?? null;
$this->info['http_method'] = $method; $this->info['http_method'] = $method;
$this->info['user_data'] = $options['user_data'] ?? null; $this->info['user_data'] = $options['user_data'] ?? null;
@ -65,30 +66,25 @@ final class CurlResponse implements ResponseInterface
curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter
} }
if (null === $content = &$this->content) {
$content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
} else {
// Move the pushed response to the activity list
if (ftell($content)) {
rewind($content);
$multi->handlesActivity[$id][] = stream_get_contents($content);
}
$content = ($options['buffer'] ?? true) ? $content : null;
}
curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int { curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int {
return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger); return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger);
}); });
if (null === $options) { if (null === $options) {
// Pushed response: buffer until requested // Pushed response: buffer until requested
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content): int { curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
return fwrite($content, $data); $multi->handlesActivity[$id][] = $data;
curl_pause($ch, CURLPAUSE_RECV);
return \strlen($data);
}); });
return; return;
} }
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
curl_pause($ch, CURLPAUSE_CONT);
if ($onProgress = $options['on_progress']) { if ($onProgress = $options['on_progress']) {
$url = isset($info['url']) ? ['url' => $info['url']] : []; $url = isset($info['url']) ? ['url' => $info['url']] : [];
curl_setopt($ch, CURLOPT_NOPROGRESS, false); curl_setopt($ch, CURLOPT_NOPROGRESS, false);
@ -108,33 +104,16 @@ final class CurlResponse implements ResponseInterface
}); });
} }
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content, $multi, $id): int { curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data; $multi->handlesActivity[$id][] = $data;
return null !== $content ? fwrite($content, $data) : \strlen($data); return \strlen($data);
}); });
$this->initializer = static function (self $response) { $this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
$waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE); $waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE);
if ('H' === $waitFor[0] || 'D' === $waitFor[0]) { return 'H' === $waitFor[0] || 'D' === $waitFor[0];
try {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
}
}; };
// Schedule the request in a non-blocking way // Schedule the request in a non-blocking way
@ -221,6 +200,7 @@ final class CurlResponse implements ResponseInterface
*/ */
private function close(): void private function close(): void
{ {
$this->inflate = null;
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]); unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
curl_multi_remove_handle($this->multi->handle, $this->handle); curl_multi_remove_handle($this->multi->handle, $this->handle);
curl_setopt_array($this->handle, [ curl_setopt_array($this->handle, [

View File

@ -93,6 +93,7 @@ class MockResponse implements ResponseInterface
*/ */
protected function close(): void protected function close(): void
{ {
$this->inflate = null;
$this->body = []; $this->body = [];
} }
@ -104,16 +105,9 @@ class MockResponse implements ResponseInterface
$response = new self([]); $response = new self([]);
$response->requestOptions = $options; $response->requestOptions = $options;
$response->id = ++self::$idSequence; $response->id = ++self::$idSequence;
$response->content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null; $response->shouldBuffer = $options['buffer'] ?? true;
$response->initializer = static function (self $response) { $response->initializer = static function (self $response) {
if (null !== $response->info['error']) { return \is_array($response->body[0] ?? null);
throw new TransportException($response->info['error']);
}
if (\is_array($response->body[0] ?? null)) {
// Consume the first chunk if it's not yielded yet
self::stream([$response])->current();
}
}; };
$response->info['redirect_count'] = 0; $response->info['redirect_count'] = 0;
@ -186,11 +180,6 @@ class MockResponse implements ResponseInterface
} else { } else {
// Data or timeout chunk // Data or timeout chunk
$multi->handlesActivity[$id][] = $chunk; $multi->handlesActivity[$id][] = $chunk;
if (\is_string($chunk) && null !== $response->content) {
// Buffer response body
fwrite($response->content, $chunk);
}
} }
} }
} }

View File

@ -32,14 +32,13 @@ final class NativeResponse implements ResponseInterface
private $onProgress; private $onProgress;
private $remaining; private $remaining;
private $buffer; private $buffer;
private $inflate;
private $multi; private $multi;
private $debugBuffer; private $debugBuffer;
/** /**
* @internal * @internal
*/ */
public function __construct(NativeClientState $multi, $context, string $url, $options, bool $gzipEnabled, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger) public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
{ {
$this->multi = $multi; $this->multi = $multi;
$this->id = (int) $context; $this->id = (int) $context;
@ -50,27 +49,17 @@ final class NativeResponse implements ResponseInterface
$this->info = &$info; $this->info = &$info;
$this->resolveRedirect = $resolveRedirect; $this->resolveRedirect = $resolveRedirect;
$this->onProgress = $onProgress; $this->onProgress = $onProgress;
$this->content = $options['buffer'] ? fopen('php://temp', 'w+') : null; $this->inflate = !isset($options['normalized_headers']['accept-encoding']);
$this->shouldBuffer = $options['buffer'] ?? true;
// Temporary resources to dechunk/inflate the response stream // Temporary resource to dechunk the response stream
$this->buffer = fopen('php://temp', 'w+'); $this->buffer = fopen('php://temp', 'w+');
$this->inflate = $gzipEnabled ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$info['user_data'] = $options['user_data']; $info['user_data'] = $options['user_data'];
++$multi->responseCount; ++$multi->responseCount;
$this->initializer = static function (self $response) { $this->initializer = static function (self $response) {
if (null !== $response->info['error']) { return null === $response->remaining;
throw new TransportException($response->info['error']);
}
if (null === $response->remaining) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
}; };
} }
@ -165,7 +154,7 @@ final class NativeResponse implements ResponseInterface
stream_set_blocking($h, false); stream_set_blocking($h, false);
$this->context = $this->resolveRedirect = null; $this->context = $this->resolveRedirect = null;
// Create dechunk and inflate buffers // Create dechunk buffers
if (isset($this->headers['content-length'])) { if (isset($this->headers['content-length'])) {
$this->remaining = (int) $this->headers['content-length'][0]; $this->remaining = (int) $this->headers['content-length'][0];
} elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) { } elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) {
@ -175,10 +164,6 @@ final class NativeResponse implements ResponseInterface
$this->remaining = -2; $this->remaining = -2;
} }
if ($this->inflate && 'gzip' !== ($this->headers['content-encoding'][0] ?? null)) {
$this->inflate = null;
}
$this->multi->handlesActivity[$this->id] = [new FirstChunk()]; $this->multi->handlesActivity[$this->id] = [new FirstChunk()];
if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) { if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) {
@ -188,7 +173,7 @@ final class NativeResponse implements ResponseInterface
return; return;
} }
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info]; $this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
} }
/** /**
@ -228,15 +213,15 @@ final class NativeResponse implements ResponseInterface
$multi->handles = []; $multi->handles = [];
} }
foreach ($multi->openHandles as $i => [$h, $buffer, $inflate, $content, $onProgress]) { foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false; $hasActivity = false;
$remaining = &$multi->openHandles[$i][5]; $remaining = &$multi->openHandles[$i][3];
$info = &$multi->openHandles[$i][6]; $info = &$multi->openHandles[$i][4];
$e = null; $e = null;
// Read incoming buffer and write it to the dechunk one // Read incoming buffer and write it to the dechunk one
try { try {
while ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) { if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
fwrite($buffer, $data); fwrite($buffer, $data);
$hasActivity = true; $hasActivity = true;
$multi->sleep = false; $multi->sleep = false;
@ -264,16 +249,8 @@ final class NativeResponse implements ResponseInterface
rewind($buffer); rewind($buffer);
ftruncate($buffer, 0); ftruncate($buffer, 0);
if (null !== $inflate && false === $data = @inflate_add($inflate, $data)) { if (null === $e) {
$e = new TransportException('Error while processing content unencoding.');
}
if ('' !== $data && null === $e) {
$multi->handlesActivity[$i][] = $data; $multi->handlesActivity[$i][] = $data;
if (null !== $content && \strlen($data) !== fwrite($content, $data)) {
$e = new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($data)));
}
} }
} }

View File

@ -39,11 +39,6 @@ trait ResponseTrait
*/ */
private $initializer; private $initializer;
/**
* @var resource A php://temp stream typically
*/
private $content;
private $info = [ private $info = [
'response_headers' => [], 'response_headers' => [],
'http_code' => 0, 'http_code' => 0,
@ -54,6 +49,9 @@ trait ResponseTrait
private $handle; private $handle;
private $id; private $id;
private $timeout; private $timeout;
private $inflate;
private $shouldBuffer;
private $content;
private $finalInfo; private $finalInfo;
private $offset = 0; private $offset = 0;
private $jsonData; private $jsonData;
@ -64,8 +62,7 @@ trait ResponseTrait
public function getStatusCode(): int public function getStatusCode(): int
{ {
if ($this->initializer) { if ($this->initializer) {
($this->initializer)($this); self::initialize($this);
$this->initializer = null;
} }
return $this->info['http_code']; return $this->info['http_code'];
@ -77,8 +74,7 @@ trait ResponseTrait
public function getHeaders(bool $throw = true): array public function getHeaders(bool $throw = true): array
{ {
if ($this->initializer) { if ($this->initializer) {
($this->initializer)($this); self::initialize($this);
$this->initializer = null;
} }
if ($throw) { if ($throw) {
@ -94,8 +90,7 @@ trait ResponseTrait
public function getContent(bool $throw = true): string public function getContent(bool $throw = true): string
{ {
if ($this->initializer) { if ($this->initializer) {
($this->initializer)($this); self::initialize($this);
$this->initializer = null;
} }
if ($throw) { if ($throw) {
@ -201,6 +196,30 @@ trait ResponseTrait
*/ */
abstract protected static function select(ClientState $multi, float $timeout): int; abstract protected static function select(ClientState $multi, float $timeout): int;
private static function initialize(self $response): void
{
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}
try {
if (($response->initializer)($response)) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
$response->initializer = null;
}
private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
{ {
foreach ($responseHeaders as $h) { foreach ($responseHeaders as $h) {
@ -246,8 +265,7 @@ trait ResponseTrait
private function doDestruct() private function doDestruct()
{ {
if ($this->initializer && null === $this->info['error']) { if ($this->initializer && null === $this->info['error']) {
($this->initializer)($this); self::initialize($this);
$this->initializer = null;
$this->checkStatusCode(); $this->checkStatusCode();
} }
} }
@ -299,6 +317,16 @@ trait ResponseTrait
$isTimeout = false; $isTimeout = false;
if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) { if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Error while processing content unencoding.')];
continue;
}
if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) {
$multi->handlesActivity[$j] = [null, new TransportException('Failed writing %d bytes to the response buffer.', \strlen($chunk))];
continue;
}
$response->offset += \strlen($chunk); $response->offset += \strlen($chunk);
$chunk = new DataChunk($response->offset, $chunk); $chunk = new DataChunk($response->offset, $chunk);
} elseif (null === $chunk) { } elseif (null === $chunk) {
@ -326,6 +354,9 @@ trait ResponseTrait
$response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url'])); $response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url']));
} }
$response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(ZLIB_ENCODING_GZIP) : null;
$response->content = $response->shouldBuffer ? fopen('php://temp', 'w+') : null;
yield $response => $chunk; yield $response => $chunk;
if ($response->initializer && null === $response->info['error']) { if ($response->initializer && null === $response->info['error']) {