Improve pause handler
This commit is contained in:
parent
ba4d57bb5f
commit
bf2d1cf6e7
|
@ -102,7 +102,8 @@
|
|||
"symfony/yaml": "self.version"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/http-client": "^4.2",
|
||||
"amphp/amp": "^2.5",
|
||||
"amphp/http-client": "^4.2.1",
|
||||
"amphp/http-tunnel": "^1.0",
|
||||
"async-aws/ses": "^1.0",
|
||||
"async-aws/sqs": "^1.0",
|
||||
|
|
|
@ -13,10 +13,14 @@ namespace Symfony\Component\HttpClient\Response;
|
|||
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\CancellationTokenSource;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Http\Client\HttpException;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Client\Response;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\HttpClient\Chunk\FirstChunk;
|
||||
use Symfony\Component\HttpClient\Chunk\InformationalChunk;
|
||||
|
@ -38,6 +42,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
use CommonResponseTrait;
|
||||
use TransportResponseTrait;
|
||||
|
||||
private static $nextId = 'a';
|
||||
|
||||
private $multi;
|
||||
private $options;
|
||||
private $canceller;
|
||||
|
@ -88,29 +94,33 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
$onProgress((int) $info['size_download'], ((int) (1 + $info['download_content_length']) ?: 1) - 1, (array) $info);
|
||||
};
|
||||
|
||||
$this->id = $id = Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger) {
|
||||
return self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
|
||||
$pauseDeferred = new Deferred();
|
||||
$pause = new Success();
|
||||
|
||||
$throttleWatcher = null;
|
||||
|
||||
$this->id = $id = self::$nextId++;
|
||||
Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger, &$pause) {
|
||||
return new Coroutine(self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause));
|
||||
});
|
||||
|
||||
$info['pause_handler'] = static function (float $duration) use ($id, &$delay) {
|
||||
if (null !== $delay) {
|
||||
Loop::cancel($delay);
|
||||
$delay = null;
|
||||
$info['pause_handler'] = static function (float $duration) use (&$throttleWatcher, &$pauseDeferred, &$pause) {
|
||||
if (null !== $throttleWatcher) {
|
||||
Loop::cancel($throttleWatcher);
|
||||
}
|
||||
|
||||
if (0 < $duration) {
|
||||
$duration += microtime(true);
|
||||
Loop::disable($id);
|
||||
$delay = Loop::defer(static function () use ($duration, $id, &$delay) {
|
||||
if (0 < $duration -= microtime(true)) {
|
||||
$delay = Loop::delay(ceil(1000 * $duration), static function () use ($id) { Loop::enable($id); });
|
||||
} else {
|
||||
$delay = null;
|
||||
Loop::enable($id);
|
||||
}
|
||||
});
|
||||
$pause = $pauseDeferred->promise();
|
||||
|
||||
if ($duration <= 0) {
|
||||
$deferred = $pauseDeferred;
|
||||
$pauseDeferred = new Deferred();
|
||||
$deferred->resolve();
|
||||
} else {
|
||||
Loop::enable($id);
|
||||
$throttleWatcher = Loop::delay(ceil(1000 * $duration), static function () use (&$pauseDeferred) {
|
||||
$deferred = $pauseDeferred;
|
||||
$pauseDeferred = new Deferred();
|
||||
$deferred->resolve();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -210,7 +220,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
return null === self::$delay ? 1 : 0;
|
||||
}
|
||||
|
||||
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
|
||||
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
|
||||
{
|
||||
$activity = &$multi->handlesActivity;
|
||||
|
||||
|
@ -225,7 +235,7 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
if (null === $response = yield from self::getPushedResponse($request, $multi, $info, $headers, $options, $logger)) {
|
||||
$logger && $logger->info(sprintf('Request: "%s %s"', $info['http_method'], $info['url']));
|
||||
|
||||
$response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger);
|
||||
$response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause);
|
||||
}
|
||||
|
||||
$options = null;
|
||||
|
@ -249,6 +259,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
while (true) {
|
||||
self::stopLoop();
|
||||
|
||||
yield $pause;
|
||||
|
||||
if (null === $data = yield $body->read()) {
|
||||
break;
|
||||
}
|
||||
|
@ -269,8 +281,10 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
self::stopLoop();
|
||||
}
|
||||
|
||||
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
|
||||
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
|
||||
{
|
||||
yield $pause;
|
||||
|
||||
$originRequest->setBody(new AmpBody($options['body'], $info, $onProgress));
|
||||
$response = yield $multi->request($options, $originRequest, $canceller->getToken(), $info, $onProgress, $handle);
|
||||
$previousUrl = null;
|
||||
|
@ -348,6 +362,8 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
|
|||
$request->removeHeader('host');
|
||||
}
|
||||
|
||||
yield $pause;
|
||||
|
||||
$response = yield $multi->request($options, $request, $canceller->getToken(), $info, $onProgress, $handle);
|
||||
$info['redirect_time'] = microtime(true) - $info['start_time'];
|
||||
}
|
||||
|
|
|
@ -201,6 +201,31 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
|
|||
$this->assertTrue(0.5 <= microtime(true) - $time);
|
||||
}
|
||||
|
||||
public function testPauseReplace()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
$response = $client->request('GET', 'http://localhost:8057/');
|
||||
|
||||
$time = microtime(true);
|
||||
$response->getInfo('pause_handler')(10);
|
||||
$response->getInfo('pause_handler')(0.5);
|
||||
$this->assertSame(200, $response->getStatusCode());
|
||||
$this->assertGreaterThanOrEqual(0.5, microtime(true) - $time);
|
||||
$this->assertLessThanOrEqual(5, microtime(true) - $time);
|
||||
}
|
||||
|
||||
public function testPauseDuringBody()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
$response = $client->request('GET', 'http://localhost:8057/timeout-body');
|
||||
|
||||
$time = microtime(true);
|
||||
$this->assertSame(200, $response->getStatusCode());
|
||||
$response->getInfo('pause_handler')(1);
|
||||
$response->getContent();
|
||||
$this->assertGreaterThanOrEqual(1, microtime(true) - $time);
|
||||
}
|
||||
|
||||
public function testHttp2PushVulcainWithUnusedResponse()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
|
|
|
@ -199,6 +199,8 @@ class MockHttpClientTest extends HttpClientTestCase
|
|||
break;
|
||||
|
||||
case 'testPause':
|
||||
case 'testPauseReplace':
|
||||
case 'testPauseDuringBody':
|
||||
$this->markTestSkipped("MockHttpClient doesn't support pauses by default");
|
||||
break;
|
||||
|
||||
|
|
|
@ -23,12 +23,13 @@
|
|||
"require": {
|
||||
"php": ">=7.2.5",
|
||||
"psr/log": "^1.0",
|
||||
"symfony/http-client-contracts": "^2.1.1",
|
||||
"symfony/http-client-contracts": "^2.1.3",
|
||||
"symfony/polyfill-php73": "^1.11",
|
||||
"symfony/polyfill-php80": "^1.15",
|
||||
"symfony/service-contracts": "^1.0|^2"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/amp": "^2.5",
|
||||
"amphp/http-client": "^4.2.1",
|
||||
"amphp/http-tunnel": "^1.0",
|
||||
"amphp/socket": "^1.1",
|
||||
|
|
Reference in New Issue