[HttpClient] fix monitoring timeouts when other streams are active

This commit is contained in:
Nicolas Grekas 2020-05-26 08:30:07 +02:00
parent d87b6665dd
commit d2a53f0bda
8 changed files with 64 additions and 31 deletions

View File

@ -28,8 +28,6 @@ final class NativeClientState extends ClientState
public $responseCount = 0; public $responseCount = 0;
/** @var string[] */ /** @var string[] */
public $dnsCache = []; public $dnsCache = [];
/** @var resource[] */
public $handles = [];
/** @var bool */ /** @var bool */
public $sleep = false; public $sleep = false;

View File

@ -220,11 +220,6 @@ final class NativeResponse implements ResponseInterface
*/ */
private static function perform(ClientState $multi, array &$responses = null): void private static function perform(ClientState $multi, array &$responses = null): void
{ {
// List of native handles for stream_select()
if (null !== $responses) {
$multi->handles = [];
}
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) { foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false; $hasActivity = false;
$remaining = &$multi->openHandles[$i][3]; $remaining = &$multi->openHandles[$i][3];
@ -291,8 +286,6 @@ final class NativeResponse implements ResponseInterface
$multi->handlesActivity[$i][] = $e; $multi->handlesActivity[$i][] = $e;
unset($multi->openHandles[$i]); unset($multi->openHandles[$i]);
$multi->sleep = false; $multi->sleep = false;
} elseif (null !== $responses) {
$multi->handles[] = $h;
} }
} }
@ -307,7 +300,7 @@ final class NativeResponse implements ResponseInterface
} }
} }
if (\count($multi->handles) >= $multi->maxHostConnections) { if (\count($multi->openHandles) >= $multi->maxHostConnections) {
return; return;
} }
@ -318,10 +311,6 @@ final class NativeResponse implements ResponseInterface
$multi->sleep = false; $multi->sleep = false;
self::perform($multi); self::perform($multi);
if (null !== $response->handle) {
$multi->handles[] = $response->handle;
}
break; break;
} }
} }
@ -335,7 +324,8 @@ final class NativeResponse implements ResponseInterface
private static function select(ClientState $multi, float $timeout): int private static function select(ClientState $multi, float $timeout): int
{ {
$_ = []; $_ = [];
$handles = array_column($multi->openHandles, 0);
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($multi->handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout))); return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
} }
} }

View File

@ -316,7 +316,7 @@ trait ResponseTrait
} }
$lastActivity = microtime(true); $lastActivity = microtime(true);
$isTimeout = false; $enlapsedTimeout = 0;
while (true) { while (true) {
$hasActivity = false; $hasActivity = false;
@ -338,7 +338,7 @@ trait ResponseTrait
} elseif (!isset($multi->openHandles[$j])) { } elseif (!isset($multi->openHandles[$j])) {
unset($responses[$j]); unset($responses[$j]);
continue; continue;
} elseif ($isTimeout) { } elseif ($enlapsedTimeout >= $timeoutMax) {
$multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))]; $multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))];
} else { } else {
continue; continue;
@ -346,7 +346,7 @@ trait ResponseTrait
while ($multi->handlesActivity[$j] ?? false) { while ($multi->handlesActivity[$j] ?? false) {
$hasActivity = true; $hasActivity = true;
$isTimeout = false; $enlapsedTimeout = 0;
if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) { if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) { if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
@ -379,7 +379,7 @@ trait ResponseTrait
} }
} elseif ($chunk instanceof ErrorChunk) { } elseif ($chunk instanceof ErrorChunk) {
unset($responses[$j]); unset($responses[$j]);
$isTimeout = true; $enlapsedTimeout = $timeoutMax;
} elseif ($chunk instanceof FirstChunk) { } elseif ($chunk instanceof FirstChunk) {
if ($response->logger) { if ($response->logger) {
$info = $response->getInfo(); $info = $response->getInfo();
@ -447,10 +447,11 @@ trait ResponseTrait
continue; continue;
} }
switch (self::select($multi, $timeoutMin)) { if (-1 === self::select($multi, min($timeoutMin, $timeoutMax - $enlapsedTimeout))) {
case -1: usleep(min(500, 1E6 * $timeoutMin)); break; usleep(min(500, 1E6 * $timeoutMin));
case 0: $isTimeout = microtime(true) - $lastActivity > $timeoutMax; break;
} }
$enlapsedTimeout = microtime(true) - $lastActivity;
} }
} }
} }

View File

@ -112,6 +112,15 @@ class CurlHttpClientTest extends HttpClientTestCase
$this->assertSame($expected, $logger->logs); $this->assertSame($expected, $logger->logs);
} }
public function testTimeoutIsNotAFatalError()
{
if ('\\' === \DIRECTORY_SEPARATOR) {
$this->markTestSkipped('Too transient on Windows');
}
parent::testTimeoutIsNotAFatalError();
}
private function getVulcainClient(): CurlHttpClient private function getVulcainClient(): CurlHttpClient
{ {
if (\PHP_VERSION_ID >= 70300 && \PHP_VERSION_ID < 70304) { if (\PHP_VERSION_ID >= 70300 && \PHP_VERSION_ID < 70304) {

View File

@ -72,9 +72,9 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$this->assertSame($response, stream_get_meta_data($stream)['wrapper_data']->getResponse()); $this->assertSame($response, stream_get_meta_data($stream)['wrapper_data']->getResponse());
$this->assertSame(404, $response->getStatusCode()); $this->assertSame(404, $response->getStatusCode());
$this->expectException(ClientException::class);
$response = $client->request('GET', 'http://localhost:8057/404'); $response = $client->request('GET', 'http://localhost:8057/404');
$stream = $response->toStream(); $this->expectException(ClientException::class);
$response->toStream();
} }
public function testNonBlockingStream() public function testNonBlockingStream()
@ -82,6 +82,7 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$client = $this->getHttpClient(__FUNCTION__); $client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/timeout-body'); $response = $client->request('GET', 'http://localhost:8057/timeout-body');
$stream = $response->toStream(); $stream = $response->toStream();
usleep(10000);
$this->assertTrue(stream_set_blocking($stream, false)); $this->assertTrue(stream_set_blocking($stream, false));
$this->assertSame('<1>', fread($stream, 8192)); $this->assertSame('<1>', fread($stream, 8192));
@ -99,6 +100,7 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$response = $client->request('GET', 'http://localhost:8057/timeout-body', [ $response = $client->request('GET', 'http://localhost:8057/timeout-body', [
'timeout' => 0.25, 'timeout' => 0.25,
]); ]);
$this->assertSame(200, $response->getStatusCode());
try { try {
$response->getContent(); $response->getContent();

View File

@ -69,6 +69,10 @@ class MockHttpClientTest extends HttpClientTestCase
$this->markTestSkipped("MockHttpClient doesn't unzip"); $this->markTestSkipped("MockHttpClient doesn't unzip");
break; break;
case 'testTimeoutWithActiveConcurrentStream':
$this->markTestSkipped('Real transport required');
break;
case 'testDestruct': case 'testDestruct':
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct"); $this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
break; break;

View File

@ -786,6 +786,30 @@ abstract class HttpClientTestCase extends TestCase
} }
} }
public function testTimeoutWithActiveConcurrentStream()
{
$p1 = TestHttpServer::start(8067);
$p2 = TestHttpServer::start(8077);
$client = $this->getHttpClient(__FUNCTION__);
$streamingResponse = $client->request('GET', 'http://localhost:8067/max-duration');
$blockingResponse = $client->request('GET', 'http://localhost:8077/timeout-body', [
'timeout' => 0.25,
]);
$this->assertSame(200, $streamingResponse->getStatusCode());
$this->assertSame(200, $blockingResponse->getStatusCode());
$this->expectException(TransportExceptionInterface::class);
try {
$blockingResponse->getContent();
} finally {
$p1->stop();
$p2->stop();
}
}
public function testDestruct() public function testDestruct()
{ {
$client = $this->getHttpClient(__FUNCTION__); $client = $this->getHttpClient(__FUNCTION__);

View File

@ -19,23 +19,28 @@ use Symfony\Component\Process\Process;
*/ */
class TestHttpServer class TestHttpServer
{ {
private static $process; private static $process = [];
public static function start() public static function start(int $port = 8057)
{ {
if (self::$process) { if (isset(self::$process[$port])) {
self::$process->stop(); self::$process[$port]->stop();
} else {
register_shutdown_function(static function () use ($port) {
self::$process[$port]->stop();
});
} }
$finder = new PhpExecutableFinder(); $finder = new PhpExecutableFinder();
$process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:8057'])); $process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:'.$port]));
$process->setWorkingDirectory(__DIR__.'/Fixtures/web'); $process->setWorkingDirectory(__DIR__.'/Fixtures/web');
$process->start(); $process->start();
self::$process[$port] = $process;
do { do {
usleep(50000); usleep(50000);
} while (!@fopen('http://127.0.0.1:8057/', 'r')); } while (!@fopen('http://127.0.0.1:'.$port, 'r'));
self::$process = $process; return $process;
} }
} }