Improve pause handler

This commit is contained in:
Niklas Keller 2020-07-14 23:03:35 +02:00 committed by Nicolas Grekas
parent ba4d57bb5f
commit bf2d1cf6e7
5 changed files with 68 additions and 23 deletions

View File

@ -102,7 +102,8 @@
"symfony/yaml": "self.version"
},
"require-dev": {
"amphp/http-client": "^4.2",
"amphp/amp": "^2.5",
"amphp/http-client": "^4.2.1",
"amphp/http-tunnel": "^1.0",
"async-aws/ses": "^1.0",
"async-aws/sqs": "^1.0",

View File

@ -13,10 +13,14 @@ namespace Symfony\Component\HttpClient\Response;
use Amp\ByteStream\StreamException;
use Amp\CancellationTokenSource;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Psr\Log\LoggerInterface;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\InformationalChunk;
@ -38,6 +42,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
use CommonResponseTrait;
use TransportResponseTrait;
private static $nextId = 'a';
private $multi;
private $options;
private $canceller;
@ -88,29 +94,33 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
$onProgress((int) $info['size_download'], ((int) (1 + $info['download_content_length']) ?: 1) - 1, (array) $info);
};
$this->id = $id = Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger) {
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
$pauseDeferred = new Deferred();
$pause = new Success();
$throttleWatcher = null;
$this->id = $id = self::$nextId++;
Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger, &$pause) {
return new Coroutine(self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause));
});
$info['pause_handler'] = static function (float $duration) use ($id, &$delay) {
if (null !== $delay) {
Loop::cancel($delay);
$delay = null;
$info['pause_handler'] = static function (float $duration) use (&$throttleWatcher, &$pauseDeferred, &$pause) {
if (null !== $throttleWatcher) {
Loop::cancel($throttleWatcher);
}
if (0 < $duration) {
$duration += microtime(true);
Loop::disable($id);
$delay = Loop::defer(static function () use ($duration, $id, &$delay) {
if (0 < $duration -= microtime(true)) {
$delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); });
} else {
$delay = null;
Loop::enable($id);
}
});
$pause = $pauseDeferred->promise();
if ($duration <= 0) {
$deferred = $pauseDeferred;
$pauseDeferred = new Deferred();
$deferred->resolve();
} else {
Loop::enable($id);
$throttleWatcher = Loop::delay(ceil(1000 * $duration), static function () use (&$pauseDeferred) {
$deferred = $pauseDeferred;
$pauseDeferred = new Deferred();
$deferred->resolve();
});
}
};
@ -210,7 +220,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
return null === self::$delay ? 1 : 0;
}
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
{
$activity = &$multi->handlesActivity;
@ -225,7 +235,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
if (null === $response = yield from self::getPushedResponse($request, $multi, $info, $headers, $options, $logger)) {
$logger && $logger->info(sprintf('Request: "%s %s"', $info['http_method'], $info['url']));
$response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
$response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause);
}
$options = null;
@ -249,6 +259,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
while (true) {
self::stopLoop();
yield $pause;
if (null === $data = yield $body->read()) {
break;
}
@ -269,8 +281,10 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
self::stopLoop();
}
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
{
yield $pause;
$originRequest->setBody(new AmpBody($options['body'], $info, $onProgress));
$response = yield $multi->request($options, $originRequest, $canceller->getToken(), $info, $onProgress, $handle);
$previousUrl = null;
@ -348,6 +362,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
$request->removeHeader('host');
}
yield $pause;
$response = yield $multi->request($options, $request, $canceller->getToken(), $info, $onProgress, $handle);
$info['redirect_time'] = microtime(true) - $info['start_time'];
}

View File

@ -201,6 +201,31 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$this->assertTrue(0.5 <= microtime(true) - $time);
}
public function testPauseReplace()
{
$client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/');
$time = microtime(true);
$response->getInfo('pause_handler')(10);
$response->getInfo('pause_handler')(0.5);
$this->assertSame(200, $response->getStatusCode());
$this->assertGreaterThanOrEqual(0.5, microtime(true) - $time);
$this->assertLessThanOrEqual(5, microtime(true) - $time);
}
public function testPauseDuringBody()
{
$client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/timeout-body');
$time = microtime(true);
$this->assertSame(200, $response->getStatusCode());
$response->getInfo('pause_handler')(1);
$response->getContent();
$this->assertGreaterThanOrEqual(1, microtime(true) - $time);
}
public function testHttp2PushVulcainWithUnusedResponse()
{
$client = $this->getHttpClient(__FUNCTION__);

View File

@ -199,6 +199,8 @@ class MockHttpClientTest extends HttpClientTestCase
break;
case 'testPause':
case 'testPauseReplace':
case 'testPauseDuringBody':
$this->markTestSkipped("MockHttpClient doesn't support pauses by default");
break;

View File

@ -23,12 +23,13 @@
"require": {
"php": ">=7.2.5",
"psr/log": "^1.0",
"symfony/http-client-contracts": "^2.1.1",
"symfony/http-client-contracts": "^2.1.3",
"symfony/polyfill-php73": "^1.11",
"symfony/polyfill-php80": "^1.15",
"symfony/service-contracts": "^1.0|^2"
},
"require-dev": {
"amphp/amp": "^2.5",
"amphp/http-client": "^4.2.1",
"amphp/http-tunnel": "^1.0",
"amphp/socket": "^1.1",