diff --git a/src/Symfony/Component/HttpClient/Response/AsyncResponse.php b/src/Symfony/Component/HttpClient/Response/AsyncResponse.php index 0eb1355efd..481d760a59 100644 --- a/src/Symfony/Component/HttpClient/Response/AsyncResponse.php +++ b/src/Symfony/Component/HttpClient/Response/AsyncResponse.php @@ -12,6 +12,7 @@ namespace Symfony\Component\HttpClient\Response; use Symfony\Component\HttpClient\Chunk\ErrorChunk; +use Symfony\Component\HttpClient\Chunk\FirstChunk; use Symfony\Component\HttpClient\Chunk\LastChunk; use Symfony\Component\HttpClient\Exception\TransportException; use Symfony\Contracts\HttpClient\ChunkInterface; @@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface private $response; private $info = ['canceled' => false]; private $passthru; + private $stream; private $lastYielded = false; /** @@ -226,6 +228,19 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface $asyncMap[$r->response] = $r; $wrappedResponses[] = $r->response; + + if ($r->stream) { + yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap); + + if (!isset($asyncMap[$response])) { + array_pop($wrappedResponses); + } + + if ($r->response !== $response && !isset($asyncMap[$r->response])) { + $asyncMap[$r->response] = $r; + $wrappedResponses[] = $r->response; + } + } } if (!$client) { @@ -286,6 +301,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator { + $r->stream = null; $response = $r->response; $context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset); if (null === $stream = ($r->passthru)($chunk, $context)) { @@ -295,32 +311,39 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface return; } - $chunk = null; if (!$stream instanceof \Iterator) { throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream))); } + $r->stream = $stream; + yield from self::passthruStream($response, $r, null, $asyncMap); + } + + private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator + { while (true) { try { - if (null !== $chunk) { - $stream->next(); + if (null !== $chunk && $r->stream) { + $r->stream->next(); } - if (!$stream->valid()) { + if (!$r->stream || !$r->stream->valid() || !$r->stream) { + $r->stream = null; break; } } catch (\Throwable $e) { + unset($asyncMap[$response]); + $r->stream = null; $r->info['error'] = $e->getMessage(); $r->response->cancel(); yield $r => $chunk = new ErrorChunk($r->offset, $e); $chunk->didThrow() ?: $chunk->getContent(); - unset($asyncMap[$response]); break; } - $chunk = $stream->current(); + $chunk = $r->stream->current(); if (!$chunk instanceof ChunkInterface) { throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk))); @@ -356,6 +379,12 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface } } + if (null !== $chunk->getError() || $chunk->isLast()) { + $stream = $r->stream; + $r->stream = null; + unset($asyncMap[$response]); + } + if (null === $chunk->getError()) { $r->offset += \strlen($content); @@ -387,7 +416,6 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface $chunk->didThrow() ?: $chunk->getContent(); } - unset($asyncMap[$response]); break; } } diff --git a/src/Symfony/Component/HttpClient/RetryableHttpClient.php b/src/Symfony/Component/HttpClient/RetryableHttpClient.php index 387a33fc9b..8eec943e2d 100644 --- a/src/Symfony/Component/HttpClient/RetryableHttpClient.php +++ b/src/Symfony/Component/HttpClient/RetryableHttpClient.php @@ -127,7 +127,7 @@ class RetryableHttpClient implements HttpClientInterface $context->getResponse()->cancel(); - $delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk instanceof LastChunk ? $content : null, $exception); + $delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk->isLast() ? $content : null, $exception); ++$retryCount; $this->logger->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode()), [ diff --git a/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php b/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php index 90872ece75..499cc1ef30 100644 --- a/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php +++ b/src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php @@ -282,4 +282,25 @@ class AsyncDecoratorTraitTest extends NativeHttpClientTest $this->assertSame('test', $lastInfo['foo']); $this->assertArrayHasKey('previous_info', $lastInfo); } + + public function testMultipleYieldInInitializer() + { + $first = null; + $client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$first) { + if ($chunk->isFirst()) { + $first = $chunk; + + return; + } + $context->passthru(); + yield $first; + yield $context->createChunk('injectedFoo'); + yield $chunk; + }); + + $response = $client->request('GET', 'http://localhost:8057/404', ['timeout' => 0.1]); + + $this->assertSame(404, $response->getStatusCode()); + $this->assertStringContainsString('injectedFoo', $response->getContent(false)); + } } diff --git a/src/Symfony/Component/HttpClient/Tests/RetryableHttpClientTest.php b/src/Symfony/Component/HttpClient/Tests/RetryableHttpClientTest.php index d4649e5665..2929418fd6 100644 --- a/src/Symfony/Component/HttpClient/Tests/RetryableHttpClientTest.php +++ b/src/Symfony/Component/HttpClient/Tests/RetryableHttpClientTest.php @@ -68,6 +68,31 @@ class RetryableHttpClientTest extends TestCase self::assertSame(200, $response->getStatusCode()); } + public function testRetryWithBodyKeepContent() + { + $client = new RetryableHttpClient( + new MockHttpClient([ + new MockResponse('my bad', ['http_code' => 400]), + ]), + new class([400], 0) extends GenericRetryStrategy { + public function shouldRetry(AsyncContext $context, ?string $responseContent, ?TransportExceptionInterface $exception): ?bool + { + if (null === $responseContent) { + return null; + } + + return 'my bad' !== $responseContent; + } + }, + 1 + ); + + $response = $client->request('GET', 'http://example.com/foo-bar'); + + self::assertSame(400, $response->getStatusCode()); + self::assertSame('my bad', $response->getContent(false)); + } + public function testRetryWithBodyInvalid() { $client = new RetryableHttpClient(