From bf2d1cf6e78ee2fe46241ca52afc125474d6576d Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Tue, 14 Jul 2020 23:03:35 +0200 Subject: [PATCH] Improve pause handler --- composer.json | 3 +- .../HttpClient/Response/AmpResponse.php | 58 ++++++++++++------- .../HttpClient/Tests/HttpClientTestCase.php | 25 ++++++++ .../HttpClient/Tests/MockHttpClientTest.php | 2 + .../Component/HttpClient/composer.json | 3 +- 5 files changed, 68 insertions(+), 23 deletions(-) diff --git a/composer.json b/composer.json index b1d530e233..8cd09d1878 100644 --- a/composer.json +++ b/composer.json @@ -102,7 +102,8 @@ "symfony/yaml": "self.version" }, "require-dev": { - "amphp/http-client": "^4.2", + "amphp/amp": "^2.5", + "amphp/http-client": "^4.2.1", "amphp/http-tunnel": "^1.0", "async-aws/ses": "^1.0", "async-aws/sqs": "^1.0", diff --git a/src/Symfony/Component/HttpClient/Response/AmpResponse.php b/src/Symfony/Component/HttpClient/Response/AmpResponse.php index 13b7ae35c1..ca13475000 100644 --- a/src/Symfony/Component/HttpClient/Response/AmpResponse.php +++ b/src/Symfony/Component/HttpClient/Response/AmpResponse.php @@ -13,10 +13,14 @@ namespace Symfony\Component\HttpClient\Response; use Amp\ByteStream\StreamException; use Amp\CancellationTokenSource; +use Amp\Coroutine; +use Amp\Deferred; use Amp\Http\Client\HttpException; use Amp\Http\Client\Request; use Amp\Http\Client\Response; use Amp\Loop; +use Amp\Promise; +use Amp\Success; use Psr\Log\LoggerInterface; use Symfony\Component\HttpClient\Chunk\FirstChunk; use Symfony\Component\HttpClient\Chunk\InformationalChunk; @@ -38,6 +42,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface use CommonResponseTrait; use TransportResponseTrait; + private static $nextId = 'a'; + private $multi; private $options; private $canceller; @@ -88,29 +94,33 @@ final class AmpResponse implements ResponseInterface, StreamableInterface $onProgress((int) $info['size_download'], ((int) (1 + $info['download_content_length']) ?: 1) - 1, (array) $info); }; - $this->id = $id = Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger) { - return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger); + $pauseDeferred = new Deferred(); + $pause = new Success(); + + $throttleWatcher = null; + + $this->id = $id = self::$nextId++; + Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger, &$pause) { + return new Coroutine(self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause)); }); - $info['pause_handler'] = static function (float $duration) use ($id, &$delay) { - if (null !== $delay) { - Loop::cancel($delay); - $delay = null; + $info['pause_handler'] = static function (float $duration) use (&$throttleWatcher, &$pauseDeferred, &$pause) { + if (null !== $throttleWatcher) { + Loop::cancel($throttleWatcher); } - if (0 < $duration) { - $duration += microtime(true); - Loop::disable($id); - $delay = Loop::defer(static function () use ($duration, $id, &$delay) { - if (0 < $duration -= microtime(true)) { - $delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); }); - } else { - $delay = null; - Loop::enable($id); - } - }); + $pause = $pauseDeferred->promise(); + + if ($duration <= 0) { + $deferred = $pauseDeferred; + $pauseDeferred = new Deferred(); + $deferred->resolve(); } else { - Loop::enable($id); + $throttleWatcher = Loop::delay(ceil(1000 * $duration), static function () use (&$pauseDeferred) { + $deferred = $pauseDeferred; + $pauseDeferred = new Deferred(); + $deferred->resolve(); + }); } }; @@ -210,7 +220,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface return null === self::$delay ? 1 : 0; } - private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger) + private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause) { $activity = &$multi->handlesActivity; @@ -225,7 +235,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface if (null === $response = yield from self::getPushedResponse($request, $multi, $info, $headers, $options, $logger)) { $logger && $logger->info(sprintf('Request: "%s %s"', $info['http_method'], $info['url'])); - $response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger); + $response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause); } $options = null; @@ -249,6 +259,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface while (true) { self::stopLoop(); + yield $pause; + if (null === $data = yield $body->read()) { break; } @@ -269,8 +281,10 @@ final class AmpResponse implements ResponseInterface, StreamableInterface self::stopLoop(); } - private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger) + private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause) { + yield $pause; + $originRequest->setBody(new AmpBody($options['body'], $info, $onProgress)); $response = yield $multi->request($options, $originRequest, $canceller->getToken(), $info, $onProgress, $handle); $previousUrl = null; @@ -348,6 +362,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface $request->removeHeader('host'); } + yield $pause; + $response = yield $multi->request($options, $request, $canceller->getToken(), $info, $onProgress, $handle); $info['redirect_time'] = microtime(true) - $info['start_time']; } diff --git a/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php b/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php index 3c67b4bdbd..fe67477c5e 100644 --- a/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php +++ b/src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php @@ -201,6 +201,31 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase $this->assertTrue(0.5 <= microtime(true) - $time); } + public function testPauseReplace() + { + $client = $this->getHttpClient(__FUNCTION__); + $response = $client->request('GET', 'http://localhost:8057/'); + + $time = microtime(true); + $response->getInfo('pause_handler')(10); + $response->getInfo('pause_handler')(0.5); + $this->assertSame(200, $response->getStatusCode()); + $this->assertGreaterThanOrEqual(0.5, microtime(true) - $time); + $this->assertLessThanOrEqual(5, microtime(true) - $time); + } + + public function testPauseDuringBody() + { + $client = $this->getHttpClient(__FUNCTION__); + $response = $client->request('GET', 'http://localhost:8057/timeout-body'); + + $time = microtime(true); + $this->assertSame(200, $response->getStatusCode()); + $response->getInfo('pause_handler')(1); + $response->getContent(); + $this->assertGreaterThanOrEqual(1, microtime(true) - $time); + } + public function testHttp2PushVulcainWithUnusedResponse() { $client = $this->getHttpClient(__FUNCTION__); diff --git a/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php b/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php index 663b2439ae..033113d92f 100644 --- a/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php +++ b/src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php @@ -199,6 +199,8 @@ class MockHttpClientTest extends HttpClientTestCase break; case 'testPause': + case 'testPauseReplace': + case 'testPauseDuringBody': $this->markTestSkipped("MockHttpClient doesn't support pauses by default"); break; diff --git a/src/Symfony/Component/HttpClient/composer.json b/src/Symfony/Component/HttpClient/composer.json index e37c53313e..c593567d10 100644 --- a/src/Symfony/Component/HttpClient/composer.json +++ b/src/Symfony/Component/HttpClient/composer.json @@ -23,12 +23,13 @@ "require": { "php": ">=7.2.5", "psr/log": "^1.0", - "symfony/http-client-contracts": "^2.1.1", + "symfony/http-client-contracts": "^2.1.3", "symfony/polyfill-php73": "^1.11", "symfony/polyfill-php80": "^1.15", "symfony/service-contracts": "^1.0|^2" }, "require-dev": { + "amphp/amp": "^2.5", "amphp/http-client": "^4.2.1", "amphp/http-tunnel": "^1.0", "amphp/socket": "^1.1",