[HttpClient] added support for pausing responses with a new `pause_handler` callable exposed as an info item

This commit is contained in:
Nicolas Grekas 2020-06-07 17:52:03 +02:00 committed by Fabien Potencier
parent d75ef185d1
commit f3cc7c1bad
8 changed files with 163 additions and 22 deletions

View File

@ -5,6 +5,7 @@ CHANGELOG
-----
* added `AsyncDecoratorTrait` to ease processing responses without breaking async
* added support for pausing responses with a new `pause_handler` callable exposed as an info item
5.1.0
-----

View File

@ -26,6 +26,9 @@ final class CurlClientState extends ClientState
public $pushedResponses = [];
/** @var DnsCache */
public $dnsCache;
/** @var float[] */
public $pauseExpiries = [];
public $execCounter = PHP_INT_MIN;
public function __construct()
{

View File

@ -30,6 +30,8 @@ final class NativeClientState extends ClientState
public $dnsCache = [];
/** @var bool */
public $sleep = false;
/** @var int[] */
public $hosts = [];
public function __construct()
{

View File

@ -92,6 +92,28 @@ final class AmpResponse implements ResponseInterface
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
});
$info['pause_handler'] = static function (float $duration) use ($id, &$delay) {
if (null !== $delay) {
Loop::cancel($delay);
$delay = null;
}
if (0 < $duration) {
$duration += microtime(true);
Loop::disable($id);
$delay = Loop::defer(static function () use ($duration, $id, &$delay) {
if (0 < $duration -= microtime(true)) {
$delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); });
} else {
$delay = null;
Loop::enable($id);
}
});
} else {
Loop::enable($id);
}
};
$multi->openHandles[$id] = $id;
++$multi->responseCount;
}

View File

@ -89,6 +89,26 @@ final class CurlResponse implements ResponseInterface
return;
}
$execCounter = $multi->execCounter;
$this->info['pause_handler'] = static function (float $duration) use ($ch, $multi, $execCounter) {
if (0 < $duration) {
if ($execCounter === $multi->execCounter) {
$multi->execCounter = !\is_float($execCounter) ? 1 + $execCounter : PHP_INT_MIN;
curl_multi_exec($multi->handle, $execCounter);
}
$lastExpiry = end($multi->pauseExpiries);
$multi->pauseExpiries[(int) $ch] = $duration += microtime(true);
if (false !== $lastExpiry && $lastExpiry > $duration) {
asort($multi->pauseExpiries);
}
curl_pause($ch, CURLPAUSE_ALL);
} else {
unset($multi->pauseExpiries[(int) $ch]);
curl_pause($ch, CURLPAUSE_CONT);
}
};
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
curl_pause($ch, CURLPAUSE_CONT);
@ -206,7 +226,7 @@ final class CurlResponse implements ResponseInterface
private function close(): void
{
$this->inflate = null;
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
unset($this->multi->pauseExpiries[$this->id], $this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
curl_setopt($this->handle, CURLOPT_PRIVATE, '_0');
if (self::$performing) {
@ -261,6 +281,7 @@ final class CurlResponse implements ResponseInterface
try {
self::$performing = true;
++$multi->execCounter;
$active = 0;
while (CURLM_CALL_MULTI_PERFORM === curl_multi_exec($multi->handle, $active));
@ -303,7 +324,29 @@ final class CurlResponse implements ResponseInterface
$timeout = min($timeout, 0.01);
}
return curl_multi_select($multi->handle, $timeout);
if ($multi->pauseExpiries) {
$now = microtime(true);
foreach ($multi->pauseExpiries as $id => $pauseExpiry) {
if ($now < $pauseExpiry) {
$timeout = min($timeout, $pauseExpiry - $now);
break;
}
unset($multi->pauseExpiries[$id]);
curl_pause($multi->openHandles[$id][0], CURLPAUSE_CONT);
}
}
if (0 !== $selected = curl_multi_select($multi->handle, $timeout)) {
return $selected;
}
if ($multi->pauseExpiries && 0 < $timeout -= microtime(true) - $now) {
usleep(1E6 * $timeout);
}
return 0;
}
/**

View File

@ -38,6 +38,7 @@ final class NativeResponse implements ResponseInterface
private $multi;
private $debugBuffer;
private $shouldBuffer;
private $pauseExpiry = 0;
/**
* @internal
@ -65,6 +66,11 @@ final class NativeResponse implements ResponseInterface
$this->initializer = static function (self $response) {
return null === $response->remaining;
};
$pauseExpiry = &$this->pauseExpiry;
$info['pause_handler'] = static function (float $duration) use (&$pauseExpiry) {
$pauseExpiry = 0 < $duration ? microtime(true) + $duration : 0;
};
}
/**
@ -184,7 +190,9 @@ final class NativeResponse implements ResponseInterface
return;
}
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
$host = parse_url($this->info['redirect_url'] ?? $this->url, PHP_URL_HOST);
$this->multi->openHandles[$this->id] = [&$this->pauseExpiry, $h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info, $host];
$this->multi->hosts[$host] = 1 + ($this->multi->hosts[$host] ?? 0);
}
/**
@ -192,6 +200,9 @@ final class NativeResponse implements ResponseInterface
*/
private function close(): void
{
if (null !== ($host = $this->multi->openHandles[$this->id][6] ?? null) && 0 >= --$this->multi->hosts[$host]) {
unset($this->multi->hosts[$host]);
}
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
$this->handle = $this->buffer = $this->inflate = $this->onProgress = null;
}
@ -221,10 +232,18 @@ final class NativeResponse implements ResponseInterface
*/
private static function perform(ClientState $multi, array &$responses = null): void
{
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) {
if ($pauseExpiry) {
if (microtime(true) < $pauseExpiry) {
continue;
}
$multi->openHandles[$i][0] = 0;
}
$hasActivity = false;
$remaining = &$multi->openHandles[$i][3];
$info = &$multi->openHandles[$i][4];
$remaining = &$multi->openHandles[$i][4];
$info = &$multi->openHandles[$i][5];
$e = null;
// Read incoming buffer and write it to the dechunk one
@ -285,6 +304,9 @@ final class NativeResponse implements ResponseInterface
$multi->handlesActivity[$i][] = null;
$multi->handlesActivity[$i][] = $e;
if (null !== ($host = $multi->openHandles[$i][6] ?? null) && 0 >= --$multi->hosts[$host]) {
unset($multi->hosts[$host]);
}
unset($multi->openHandles[$i]);
$multi->sleep = false;
}
@ -294,25 +316,22 @@ final class NativeResponse implements ResponseInterface
return;
}
// Create empty activity lists to tell ResponseTrait::stream() we still have pending requests
$maxHosts = $multi->maxHostConnections;
foreach ($responses as $i => $response) {
if (null === $response->remaining && null !== $response->buffer) {
$multi->handlesActivity[$i] = [];
if (null !== $response->remaining || null === $response->buffer) {
continue;
}
}
if (\count($multi->openHandles) >= $multi->maxHostConnections) {
return;
}
// Open the next pending request - this is a blocking operation so we do only one of them
foreach ($responses as $i => $response) {
if (null === $response->remaining && null !== $response->buffer) {
if ($response->pauseExpiry && microtime(true) < $response->pauseExpiry) {
// Create empty open handles to tell we still have pending requests
$multi->openHandles[$i] = [INF, null, null, null];
} elseif ($maxHosts && $maxHosts > ($multi->hosts[parse_url($response->url, PHP_URL_HOST)] ?? 0)) {
// Open the next pending request - this is a blocking operation so we do only one of them
$response->open();
$multi->sleep = false;
self::perform($multi);
break;
$maxHosts = 0;
}
}
}
@ -324,9 +343,32 @@ final class NativeResponse implements ResponseInterface
*/
private static function select(ClientState $multi, float $timeout): int
{
$_ = [];
$handles = array_column($multi->openHandles, 0);
if (!$multi->sleep = !$multi->sleep) {
return -1;
}
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
$_ = $handles = [];
$now = null;
foreach ($multi->openHandles as [$pauseExpiry, $h]) {
if (null === $h) {
continue;
}
if ($pauseExpiry && ($now ?? $now = microtime(true)) < $pauseExpiry) {
$timeout = min($timeout, $pauseExpiry - $now);
continue;
}
$handles[] = $h;
}
if (!$handles) {
usleep(1E6 * $timeout);
return 0;
}
return stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
}
}

View File

@ -176,6 +176,30 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$this->assertSame($expected, $logger->logs);
}
public function testPause()
{
$client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/');
$time = microtime(true);
$response->getInfo('pause_handler')(0.5);
$this->assertSame(200, $response->getStatusCode());
$this->assertTrue(0.5 <= microtime(true) - $time);
$response = $client->request('GET', 'http://localhost:8057/');
$time = microtime(true);
$response->getInfo('pause_handler')(1);
foreach ($client->stream($response, 0.5) as $chunk) {
$this->assertTrue($chunk->isTimeout());
$response->cancel();
}
$response = null;
$this->assertTrue(1.0 > microtime(true) - $time);
$this->assertTrue(0.5 <= microtime(true) - $time);
}
public function testHttp2PushVulcainWithUnusedResponse()
{
$client = $this->getHttpClient(__FUNCTION__);

View File

@ -198,6 +198,10 @@ class MockHttpClientTest extends HttpClientTestCase
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
break;
case 'testPause':
$this->markTestSkipped("MockHttpClient doesn't support pauses by default");
break;
case 'testGetRequest':
array_unshift($headers, 'HTTP/1.1 200 OK');
$responses[] = new MockResponse($body, ['response_headers' => $headers]);