[HttpClient] added support for pausing responses with a new pause_handler
callable exposed as an info item
This commit is contained in:
parent
d75ef185d1
commit
f3cc7c1bad
@ -5,6 +5,7 @@ CHANGELOG
|
|||||||
-----
|
-----
|
||||||
|
|
||||||
* added `AsyncDecoratorTrait` to ease processing responses without breaking async
|
* added `AsyncDecoratorTrait` to ease processing responses without breaking async
|
||||||
|
* added support for pausing responses with a new `pause_handler` callable exposed as an info item
|
||||||
|
|
||||||
5.1.0
|
5.1.0
|
||||||
-----
|
-----
|
||||||
|
@ -26,6 +26,9 @@ final class CurlClientState extends ClientState
|
|||||||
public $pushedResponses = [];
|
public $pushedResponses = [];
|
||||||
/** @var DnsCache */
|
/** @var DnsCache */
|
||||||
public $dnsCache;
|
public $dnsCache;
|
||||||
|
/** @var float[] */
|
||||||
|
public $pauseExpiries = [];
|
||||||
|
public $execCounter = PHP_INT_MIN;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
|
@ -30,6 +30,8 @@ final class NativeClientState extends ClientState
|
|||||||
public $dnsCache = [];
|
public $dnsCache = [];
|
||||||
/** @var bool */
|
/** @var bool */
|
||||||
public $sleep = false;
|
public $sleep = false;
|
||||||
|
/** @var int[] */
|
||||||
|
public $hosts = [];
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
|
@ -92,6 +92,28 @@ final class AmpResponse implements ResponseInterface
|
|||||||
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
|
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$info['pause_handler'] = static function (float $duration) use ($id, &$delay) {
|
||||||
|
if (null !== $delay) {
|
||||||
|
Loop::cancel($delay);
|
||||||
|
$delay = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 < $duration) {
|
||||||
|
$duration += microtime(true);
|
||||||
|
Loop::disable($id);
|
||||||
|
$delay = Loop::defer(static function () use ($duration, $id, &$delay) {
|
||||||
|
if (0 < $duration -= microtime(true)) {
|
||||||
|
$delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); });
|
||||||
|
} else {
|
||||||
|
$delay = null;
|
||||||
|
Loop::enable($id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
Loop::enable($id);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
$multi->openHandles[$id] = $id;
|
$multi->openHandles[$id] = $id;
|
||||||
++$multi->responseCount;
|
++$multi->responseCount;
|
||||||
}
|
}
|
||||||
|
@ -89,6 +89,26 @@ final class CurlResponse implements ResponseInterface
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$execCounter = $multi->execCounter;
|
||||||
|
$this->info['pause_handler'] = static function (float $duration) use ($ch, $multi, $execCounter) {
|
||||||
|
if (0 < $duration) {
|
||||||
|
if ($execCounter === $multi->execCounter) {
|
||||||
|
$multi->execCounter = !\is_float($execCounter) ? 1 + $execCounter : PHP_INT_MIN;
|
||||||
|
curl_multi_exec($multi->handle, $execCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
$lastExpiry = end($multi->pauseExpiries);
|
||||||
|
$multi->pauseExpiries[(int) $ch] = $duration += microtime(true);
|
||||||
|
if (false !== $lastExpiry && $lastExpiry > $duration) {
|
||||||
|
asort($multi->pauseExpiries);
|
||||||
|
}
|
||||||
|
curl_pause($ch, CURLPAUSE_ALL);
|
||||||
|
} else {
|
||||||
|
unset($multi->pauseExpiries[(int) $ch]);
|
||||||
|
curl_pause($ch, CURLPAUSE_CONT);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
|
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
|
||||||
curl_pause($ch, CURLPAUSE_CONT);
|
curl_pause($ch, CURLPAUSE_CONT);
|
||||||
|
|
||||||
@ -206,7 +226,7 @@ final class CurlResponse implements ResponseInterface
|
|||||||
private function close(): void
|
private function close(): void
|
||||||
{
|
{
|
||||||
$this->inflate = null;
|
$this->inflate = null;
|
||||||
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
|
unset($this->multi->pauseExpiries[$this->id], $this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
|
||||||
curl_setopt($this->handle, CURLOPT_PRIVATE, '_0');
|
curl_setopt($this->handle, CURLOPT_PRIVATE, '_0');
|
||||||
|
|
||||||
if (self::$performing) {
|
if (self::$performing) {
|
||||||
@ -261,6 +281,7 @@ final class CurlResponse implements ResponseInterface
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
self::$performing = true;
|
self::$performing = true;
|
||||||
|
++$multi->execCounter;
|
||||||
$active = 0;
|
$active = 0;
|
||||||
while (CURLM_CALL_MULTI_PERFORM === curl_multi_exec($multi->handle, $active));
|
while (CURLM_CALL_MULTI_PERFORM === curl_multi_exec($multi->handle, $active));
|
||||||
|
|
||||||
@ -303,7 +324,29 @@ final class CurlResponse implements ResponseInterface
|
|||||||
$timeout = min($timeout, 0.01);
|
$timeout = min($timeout, 0.01);
|
||||||
}
|
}
|
||||||
|
|
||||||
return curl_multi_select($multi->handle, $timeout);
|
if ($multi->pauseExpiries) {
|
||||||
|
$now = microtime(true);
|
||||||
|
|
||||||
|
foreach ($multi->pauseExpiries as $id => $pauseExpiry) {
|
||||||
|
if ($now < $pauseExpiry) {
|
||||||
|
$timeout = min($timeout, $pauseExpiry - $now);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
unset($multi->pauseExpiries[$id]);
|
||||||
|
curl_pause($multi->openHandles[$id][0], CURLPAUSE_CONT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 !== $selected = curl_multi_select($multi->handle, $timeout)) {
|
||||||
|
return $selected;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($multi->pauseExpiries && 0 < $timeout -= microtime(true) - $now) {
|
||||||
|
usleep(1E6 * $timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,6 +38,7 @@ final class NativeResponse implements ResponseInterface
|
|||||||
private $multi;
|
private $multi;
|
||||||
private $debugBuffer;
|
private $debugBuffer;
|
||||||
private $shouldBuffer;
|
private $shouldBuffer;
|
||||||
|
private $pauseExpiry = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @internal
|
* @internal
|
||||||
@ -65,6 +66,11 @@ final class NativeResponse implements ResponseInterface
|
|||||||
$this->initializer = static function (self $response) {
|
$this->initializer = static function (self $response) {
|
||||||
return null === $response->remaining;
|
return null === $response->remaining;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
$pauseExpiry = &$this->pauseExpiry;
|
||||||
|
$info['pause_handler'] = static function (float $duration) use (&$pauseExpiry) {
|
||||||
|
$pauseExpiry = 0 < $duration ? microtime(true) + $duration : 0;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -184,7 +190,9 @@ final class NativeResponse implements ResponseInterface
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
|
$host = parse_url($this->info['redirect_url'] ?? $this->url, PHP_URL_HOST);
|
||||||
|
$this->multi->openHandles[$this->id] = [&$this->pauseExpiry, $h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info, $host];
|
||||||
|
$this->multi->hosts[$host] = 1 + ($this->multi->hosts[$host] ?? 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -192,6 +200,9 @@ final class NativeResponse implements ResponseInterface
|
|||||||
*/
|
*/
|
||||||
private function close(): void
|
private function close(): void
|
||||||
{
|
{
|
||||||
|
if (null !== ($host = $this->multi->openHandles[$this->id][6] ?? null) && 0 >= --$this->multi->hosts[$host]) {
|
||||||
|
unset($this->multi->hosts[$host]);
|
||||||
|
}
|
||||||
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
|
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
|
||||||
$this->handle = $this->buffer = $this->inflate = $this->onProgress = null;
|
$this->handle = $this->buffer = $this->inflate = $this->onProgress = null;
|
||||||
}
|
}
|
||||||
@ -221,10 +232,18 @@ final class NativeResponse implements ResponseInterface
|
|||||||
*/
|
*/
|
||||||
private static function perform(ClientState $multi, array &$responses = null): void
|
private static function perform(ClientState $multi, array &$responses = null): void
|
||||||
{
|
{
|
||||||
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
|
foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) {
|
||||||
|
if ($pauseExpiry) {
|
||||||
|
if (microtime(true) < $pauseExpiry) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$multi->openHandles[$i][0] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
$hasActivity = false;
|
$hasActivity = false;
|
||||||
$remaining = &$multi->openHandles[$i][3];
|
$remaining = &$multi->openHandles[$i][4];
|
||||||
$info = &$multi->openHandles[$i][4];
|
$info = &$multi->openHandles[$i][5];
|
||||||
$e = null;
|
$e = null;
|
||||||
|
|
||||||
// Read incoming buffer and write it to the dechunk one
|
// Read incoming buffer and write it to the dechunk one
|
||||||
@ -285,6 +304,9 @@ final class NativeResponse implements ResponseInterface
|
|||||||
|
|
||||||
$multi->handlesActivity[$i][] = null;
|
$multi->handlesActivity[$i][] = null;
|
||||||
$multi->handlesActivity[$i][] = $e;
|
$multi->handlesActivity[$i][] = $e;
|
||||||
|
if (null !== ($host = $multi->openHandles[$i][6] ?? null) && 0 >= --$multi->hosts[$host]) {
|
||||||
|
unset($multi->hosts[$host]);
|
||||||
|
}
|
||||||
unset($multi->openHandles[$i]);
|
unset($multi->openHandles[$i]);
|
||||||
$multi->sleep = false;
|
$multi->sleep = false;
|
||||||
}
|
}
|
||||||
@ -294,25 +316,22 @@ final class NativeResponse implements ResponseInterface
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create empty activity lists to tell ResponseTrait::stream() we still have pending requests
|
$maxHosts = $multi->maxHostConnections;
|
||||||
|
|
||||||
foreach ($responses as $i => $response) {
|
foreach ($responses as $i => $response) {
|
||||||
if (null === $response->remaining && null !== $response->buffer) {
|
if (null !== $response->remaining || null === $response->buffer) {
|
||||||
$multi->handlesActivity[$i] = [];
|
continue;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (\count($multi->openHandles) >= $multi->maxHostConnections) {
|
if ($response->pauseExpiry && microtime(true) < $response->pauseExpiry) {
|
||||||
return;
|
// Create empty open handles to tell we still have pending requests
|
||||||
}
|
$multi->openHandles[$i] = [INF, null, null, null];
|
||||||
|
} elseif ($maxHosts && $maxHosts > ($multi->hosts[parse_url($response->url, PHP_URL_HOST)] ?? 0)) {
|
||||||
// Open the next pending request - this is a blocking operation so we do only one of them
|
// Open the next pending request - this is a blocking operation so we do only one of them
|
||||||
foreach ($responses as $i => $response) {
|
|
||||||
if (null === $response->remaining && null !== $response->buffer) {
|
|
||||||
$response->open();
|
$response->open();
|
||||||
$multi->sleep = false;
|
$multi->sleep = false;
|
||||||
self::perform($multi);
|
self::perform($multi);
|
||||||
|
$maxHosts = 0;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -324,9 +343,32 @@ final class NativeResponse implements ResponseInterface
|
|||||||
*/
|
*/
|
||||||
private static function select(ClientState $multi, float $timeout): int
|
private static function select(ClientState $multi, float $timeout): int
|
||||||
{
|
{
|
||||||
$_ = [];
|
if (!$multi->sleep = !$multi->sleep) {
|
||||||
$handles = array_column($multi->openHandles, 0);
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
|
$_ = $handles = [];
|
||||||
|
$now = null;
|
||||||
|
|
||||||
|
foreach ($multi->openHandles as [$pauseExpiry, $h]) {
|
||||||
|
if (null === $h) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($pauseExpiry && ($now ?? $now = microtime(true)) < $pauseExpiry) {
|
||||||
|
$timeout = min($timeout, $pauseExpiry - $now);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$handles[] = $h;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$handles) {
|
||||||
|
usleep(1E6 * $timeout);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,6 +176,30 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
|
|||||||
$this->assertSame($expected, $logger->logs);
|
$this->assertSame($expected, $logger->logs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testPause()
|
||||||
|
{
|
||||||
|
$client = $this->getHttpClient(__FUNCTION__);
|
||||||
|
$response = $client->request('GET', 'http://localhost:8057/');
|
||||||
|
|
||||||
|
$time = microtime(true);
|
||||||
|
$response->getInfo('pause_handler')(0.5);
|
||||||
|
$this->assertSame(200, $response->getStatusCode());
|
||||||
|
$this->assertTrue(0.5 <= microtime(true) - $time);
|
||||||
|
|
||||||
|
$response = $client->request('GET', 'http://localhost:8057/');
|
||||||
|
|
||||||
|
$time = microtime(true);
|
||||||
|
$response->getInfo('pause_handler')(1);
|
||||||
|
|
||||||
|
foreach ($client->stream($response, 0.5) as $chunk) {
|
||||||
|
$this->assertTrue($chunk->isTimeout());
|
||||||
|
$response->cancel();
|
||||||
|
}
|
||||||
|
$response = null;
|
||||||
|
$this->assertTrue(1.0 > microtime(true) - $time);
|
||||||
|
$this->assertTrue(0.5 <= microtime(true) - $time);
|
||||||
|
}
|
||||||
|
|
||||||
public function testHttp2PushVulcainWithUnusedResponse()
|
public function testHttp2PushVulcainWithUnusedResponse()
|
||||||
{
|
{
|
||||||
$client = $this->getHttpClient(__FUNCTION__);
|
$client = $this->getHttpClient(__FUNCTION__);
|
||||||
|
@ -198,6 +198,10 @@ class MockHttpClientTest extends HttpClientTestCase
|
|||||||
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
|
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case 'testPause':
|
||||||
|
$this->markTestSkipped("MockHttpClient doesn't support pauses by default");
|
||||||
|
break;
|
||||||
|
|
||||||
case 'testGetRequest':
|
case 'testGetRequest':
|
||||||
array_unshift($headers, 'HTTP/1.1 200 OK');
|
array_unshift($headers, 'HTTP/1.1 200 OK');
|
||||||
$responses[] = new MockResponse($body, ['response_headers' => $headers]);
|
$responses[] = new MockResponse($body, ['response_headers' => $headers]);
|
||||||
|
Reference in New Issue
Block a user