diff --git a/src/Symfony/Component/HttpClient/CHANGELOG.md b/src/Symfony/Component/HttpClient/CHANGELOG.md index e4812500c7..1251c607ac 100644 --- a/src/Symfony/Component/HttpClient/CHANGELOG.md +++ b/src/Symfony/Component/HttpClient/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG ----- * added `AsyncDecoratorTrait` to ease processing responses without breaking async + * added support for pausing responses with a new `pause_handler` callable exposed as an info item 5.1.0 ----- diff --git a/src/Symfony/Component/HttpClient/Internal/CurlClientState.php b/src/Symfony/Component/HttpClient/Internal/CurlClientState.php index 1c2e6c8eed..7e612da96c 100644 --- a/src/Symfony/Component/HttpClient/Internal/CurlClientState.php +++ b/src/Symfony/Component/HttpClient/Internal/CurlClientState.php @@ -26,6 +26,9 @@ final class CurlClientState extends ClientState public $pushedResponses = []; /** @var DnsCache */ public $dnsCache; + /** @var float[] */ + public $pauseExpiries = []; + public $execCounter = PHP_INT_MIN; public function __construct() { diff --git a/src/Symfony/Component/HttpClient/Internal/NativeClientState.php b/src/Symfony/Component/HttpClient/Internal/NativeClientState.php index 2a47dbcca0..658fdcd97e 100644 --- a/src/Symfony/Component/HttpClient/Internal/NativeClientState.php +++ b/src/Symfony/Component/HttpClient/Internal/NativeClientState.php @@ -30,6 +30,8 @@ final class NativeClientState extends ClientState public $dnsCache = []; /** @var bool */ public $sleep = false; + /** @var int[] */ + public $hosts = []; public function __construct() { diff --git a/src/Symfony/Component/HttpClient/Response/AmpResponse.php b/src/Symfony/Component/HttpClient/Response/AmpResponse.php index aea245d77d..6ee3d47d5b 100644 --- a/src/Symfony/Component/HttpClient/Response/AmpResponse.php +++ b/src/Symfony/Component/HttpClient/Response/AmpResponse.php @@ -92,6 +92,28 @@ final class AmpResponse implements ResponseInterface return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger); }); + $info['pause_handler'] = static function (float $duration) use ($id, &$delay) { + if (null !== $delay) { + Loop::cancel($delay); + $delay = null; + } + + if (0 < $duration) { + $duration += microtime(true); + Loop::disable($id); + $delay = Loop::defer(static function () use ($duration, $id, &$delay) { + if (0 < $duration -= microtime(true)) { + $delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); }); + } else { + $delay = null; + Loop::enable($id); + } + }); + } else { + Loop::enable($id); + } + }; + $multi->openHandles[$id] = $id; ++$multi->responseCount; } diff --git a/src/Symfony/Component/HttpClient/Response/CurlResponse.php b/src/Symfony/Component/HttpClient/Response/CurlResponse.php index ff273de995..a4cbc5d2d3 100644 --- a/src/Symfony/Component/HttpClient/Response/CurlResponse.php +++ b/src/Symfony/Component/HttpClient/Response/CurlResponse.php @@ -89,6 +89,26 @@ final class CurlResponse implements ResponseInterface return; } + $execCounter = $multi->execCounter; + $this->info['pause_handler'] = static function (float $duration) use ($ch, $multi, $execCounter) { + if (0 < $duration) { + if ($execCounter === $multi->execCounter) { + $multi->execCounter = !\is_float($execCounter) ? 1 + $execCounter : PHP_INT_MIN; + curl_multi_exec($multi->handle, $execCounter); + } + + $lastExpiry = end($multi->pauseExpiries); + $multi->pauseExpiries[(int) $ch] = $duration += microtime(true); + if (false !== $lastExpiry && $lastExpiry > $duration) { + asort($multi->pauseExpiries); + } + curl_pause($ch, CURLPAUSE_ALL); + } else { + unset($multi->pauseExpiries[(int) $ch]); + curl_pause($ch, CURLPAUSE_CONT); + } + }; + $this->inflate = !isset($options['normalized_headers']['accept-encoding']); curl_pause($ch, CURLPAUSE_CONT); @@ -206,7 +226,7 @@ final class CurlResponse implements ResponseInterface private function close(): void { $this->inflate = null; - unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]); + unset($this->multi->pauseExpiries[$this->id], $this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]); curl_setopt($this->handle, CURLOPT_PRIVATE, '_0'); if (self::$performing) { @@ -261,6 +281,7 @@ final class CurlResponse implements ResponseInterface try { self::$performing = true; + ++$multi->execCounter; $active = 0; while (CURLM_CALL_MULTI_PERFORM === curl_multi_exec($multi->handle, $active)); @@ -303,7 +324,29 @@ final class CurlResponse implements ResponseInterface $timeout = min($timeout, 0.01); } - return curl_multi_select($multi->handle, $timeout); + if ($multi->pauseExpiries) { + $now = microtime(true); + + foreach ($multi->pauseExpiries as $id => $pauseExpiry) { + if ($now < $pauseExpiry) { + $timeout = min($timeout, $pauseExpiry - $now); + break; + } + + unset($multi->pauseExpiries[$id]); + curl_pause($multi->openHandles[$id][0], CURLPAUSE_CONT); + } + } + + if (0 !== $selected = curl_multi_select($multi->handle, $timeout)) { + return $selected; + } + + if ($multi->pauseExpiries && 0 < $timeout -= microtime(true) - $now) { + usleep(1E6 * $timeout); + } + + return 0; } /** diff --git a/src/Symfony/Component/HttpClient/Response/NativeResponse.php b/src/Symfony/Component/HttpClient/Response/NativeResponse.php index 8c98a2ffbb..226274d135 100644 --- a/src/Symfony/Component/HttpClient/Response/NativeResponse.php +++ b/src/Symfony/Component/HttpClient/Response/NativeResponse.php @@ -38,6 +38,7 @@ final class NativeResponse implements ResponseInterface private $multi; private $debugBuffer; private $shouldBuffer; + private $pauseExpiry = 0; /** * @internal @@ -65,6 +66,11 @@ final class NativeResponse implements ResponseInterface $this->initializer = static function (self $response) { return null === $response->remaining; }; + + $pauseExpiry = &$this->pauseExpiry; + $info['pause_handler'] = static function (float $duration) use (&$pauseExpiry) { + $pauseExpiry = 0 < $duration ? microtime(true) + $duration : 0; + }; } /** @@ -184,7 +190,9 @@ final class NativeResponse implements ResponseInterface return; } - $this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info]; + $host = parse_url($this->info['redirect_url'] ?? $this->url, PHP_URL_HOST); + $this->multi->openHandles[$this->id] = [&$this->pauseExpiry, $h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info, $host]; + $this->multi->hosts[$host] = 1 + ($this->multi->hosts[$host] ?? 0); } /** @@ -192,6 +200,9 @@ final class NativeResponse implements ResponseInterface */ private function close(): void { + if (null !== ($host = $this->multi->openHandles[$this->id][6] ?? null) && 0 >= --$this->multi->hosts[$host]) { + unset($this->multi->hosts[$host]); + } unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]); $this->handle = $this->buffer = $this->inflate = $this->onProgress = null; } @@ -221,10 +232,18 @@ final class NativeResponse implements ResponseInterface */ private static function perform(ClientState $multi, array &$responses = null): void { - foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) { + foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) { + if ($pauseExpiry) { + if (microtime(true) < $pauseExpiry) { + continue; + } + + $multi->openHandles[$i][0] = 0; + } + $hasActivity = false; - $remaining = &$multi->openHandles[$i][3]; - $info = &$multi->openHandles[$i][4]; + $remaining = &$multi->openHandles[$i][4]; + $info = &$multi->openHandles[$i][5]; $e = null; // Read incoming buffer and write it to the dechunk one @@ -285,6 +304,9 @@ final class NativeResponse implements ResponseInterface $multi->handlesActivity[$i][] = null; $multi->handlesActivity[$i][] = $e; + if (null !== ($host = $multi->openHandles[$i][6] ?? null) && 0 >= --$multi->hosts[$host]) { + unset($multi->hosts[$host]); + } unset($multi->openHandles[$i]); $multi->sleep = false; } @@ -294,25 +316,22 @@ final class NativeResponse implements ResponseInterface return; } - // Create empty activity lists to tell ResponseTrait::stream() we still have pending requests + $maxHosts = $multi->maxHostConnections; + foreach ($responses as $i => $response) { - if (null === $response->remaining && null !== $response->buffer) { - $multi->handlesActivity[$i] = []; + if (null !== $response->remaining || null === $response->buffer) { + continue; } - } - if (\count($multi->openHandles) >= $multi->maxHostConnections) { - return; - } - - // Open the next pending request - this is a blocking operation so we do only one of them - foreach ($responses as $i => $response) { - if (null === $response->remaining && null !== $response->buffer) { + if ($response->pauseExpiry && microtime(true) < $response->pauseExpiry) { + // Create empty open handles to tell we still have pending requests + $multi->openHandles[$i] = [INF, null, null, null]; + } elseif ($maxHosts && $maxHosts > ($multi->hosts[parse_url($response->url, PHP_URL_HOST)] ?? 0)) { + // Open the next pending request - this is a blocking operation so we do only one of them $response->open(); $multi->sleep = false; self::perform($multi); - - break; + $maxHosts = 0; } } } @@ -324,9 +343,32 @@ final class NativeResponse implements ResponseInterface */ private static function select(ClientState $multi, float $timeout): int { - $_ = []; - $handles = array_column($multi->openHandles, 0); + if (!$multi->sleep = !$multi->sleep) { + return -1; + } - return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout))); + $_ = $handles = []; + $now = null; + + foreach ($multi->openHandles as [$pauseExpiry, $h]) { + if (null === $h) { + continue; + } + + if ($pauseExpiry && ($now ?? $now = microtime(true)) < $pauseExpiry) { + $timeout = min($timeout, $pauseExpiry - $now); + continue; + } + + $handles[] = $h; + } + + if (!$handles) { + usleep(1E6 * $timeout); + + return 0; + } + + return stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout))); } } diff --git a/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php b/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php index f8a84ea4bc..bc3fe67662 100644 --- a/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php +++ b/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php @@ -176,6 +176,30 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase $this->assertSame($expected, $logger->logs); } + public function testPause() + { + $client = $this->getHttpClient(__FUNCTION__); + $response = $client->request('GET', 'http://localhost:8057/'); + + $time = microtime(true); + $response->getInfo('pause_handler')(0.5); + $this->assertSame(200, $response->getStatusCode()); + $this->assertTrue(0.5 <= microtime(true) - $time); + + $response = $client->request('GET', 'http://localhost:8057/'); + + $time = microtime(true); + $response->getInfo('pause_handler')(1); + + foreach ($client->stream($response, 0.5) as $chunk) { + $this->assertTrue($chunk->isTimeout()); + $response->cancel(); + } + $response = null; + $this->assertTrue(1.0 > microtime(true) - $time); + $this->assertTrue(0.5 <= microtime(true) - $time); + } + public function testHttp2PushVulcainWithUnusedResponse() { $client = $this->getHttpClient(__FUNCTION__); diff --git a/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php b/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php index 56b7232bc4..8717d33f9a 100644 --- a/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php +++ b/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php @@ -198,6 +198,10 @@ class MockHttpClientTest extends HttpClientTestCase $this->markTestSkipped("MockHttpClient doesn't timeout on destruct"); break; + case 'testPause': + $this->markTestSkipped("MockHttpClient doesn't support pauses by default"); + break; + case 'testGetRequest': array_unshift($headers, 'HTTP/1.1 200 OK'); $responses[] = new MockResponse($body, ['response_headers' => $headers]);