Fix content swallowed by AsyncClient initializer

This commit is contained in:
Jérémy Derussé 2020-11-30 11:13:54 +01:00 committed by Nicolas Grekas
parent fd05da651b
commit d324271691
4 changed files with 82 additions and 8 deletions

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\HttpClient\Response;
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\ChunkInterface;
@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
private $response;
private $info = ['canceled' => false];
private $passthru;
private $stream;
private $lastYielded = false;
/**
@ -226,6 +228,19 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
$asyncMap[$r->response] = $r;
$wrappedResponses[] = $r->response;
if ($r->stream) {
yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
if (!isset($asyncMap[$response])) {
array_pop($wrappedResponses);
}
if ($r->response !== $response && !isset($asyncMap[$r->response])) {
$asyncMap[$r->response] = $r;
$wrappedResponses[] = $r->response;
}
}
}
if (!$client) {
@ -286,6 +301,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
{
$r->stream = null;
$response = $r->response;
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
if (null === $stream = ($r->passthru)($chunk, $context)) {
@ -295,32 +311,39 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
return;
}
$chunk = null;
if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
}
$r->stream = $stream;
yield from self::passthruStream($response, $r, null, $asyncMap);
}
private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
{
while (true) {
try {
if (null !== $chunk) {
$stream->next();
if (null !== $chunk && $r->stream) {
$r->stream->next();
}
if (!$stream->valid()) {
if (!$r->stream || !$r->stream->valid() || !$r->stream) {
$r->stream = null;
break;
}
} catch (\Throwable $e) {
unset($asyncMap[$response]);
$r->stream = null;
$r->info['error'] = $e->getMessage();
$r->response->cancel();
yield $r => $chunk = new ErrorChunk($r->offset, $e);
$chunk->didThrow() ?: $chunk->getContent();
unset($asyncMap[$response]);
break;
}
$chunk = $stream->current();
$chunk = $r->stream->current();
if (!$chunk instanceof ChunkInterface) {
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
@ -356,6 +379,12 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
}
}
if (null !== $chunk->getError() || $chunk->isLast()) {
$stream = $r->stream;
$r->stream = null;
unset($asyncMap[$response]);
}
if (null === $chunk->getError()) {
$r->offset += \strlen($content);
@ -387,7 +416,6 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
$chunk->didThrow() ?: $chunk->getContent();
}
unset($asyncMap[$response]);
break;
}
}

View File

@ -127,7 +127,7 @@ class RetryableHttpClient implements HttpClientInterface
$context->getResponse()->cancel();
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk instanceof LastChunk ? $content : null, $exception);
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk->isLast() ? $content : null, $exception);
++$retryCount;
$this->logger->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode()), [

View File

@ -282,4 +282,25 @@ class AsyncDecoratorTraitTest extends NativeHttpClientTest
$this->assertSame('test', $lastInfo['foo']);
$this->assertArrayHasKey('previous_info', $lastInfo);
}
public function testMultipleYieldInInitializer()
{
$first = null;
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$first) {
if ($chunk->isFirst()) {
$first = $chunk;
return;
}
$context->passthru();
yield $first;
yield $context->createChunk('injectedFoo');
yield $chunk;
});
$response = $client->request('GET', 'http://localhost:8057/404', ['timeout' => 0.1]);
$this->assertSame(404, $response->getStatusCode());
$this->assertStringContainsString('injectedFoo', $response->getContent(false));
}
}

View File

@ -68,6 +68,31 @@ class RetryableHttpClientTest extends TestCase
self::assertSame(200, $response->getStatusCode());
}
public function testRetryWithBodyKeepContent()
{
$client = new RetryableHttpClient(
new MockHttpClient([
new MockResponse('my bad', ['http_code' => 400]),
]),
new class([400], 0) extends GenericRetryStrategy {
public function shouldRetry(AsyncContext $context, ?string $responseContent, ?TransportExceptionInterface $exception): ?bool
{
if (null === $responseContent) {
return null;
}
return 'my bad' !== $responseContent;
}
},
1
);
$response = $client->request('GET', 'http://example.com/foo-bar');
self::assertSame(400, $response->getStatusCode());
self::assertSame('my bad', $response->getContent(false));
}
public function testRetryWithBodyInvalid()
{
$client = new RetryableHttpClient(