diff --git a/composer.json b/composer.json index 61b64b53d1..ef3906cb32 100644 --- a/composer.json +++ b/composer.json @@ -108,6 +108,7 @@ "doctrine/orm": "~2.4,>=2.4.5", "doctrine/reflection": "~1.0", "doctrine/doctrine-bundle": "~1.4", + "guzzlehttp/promises": "^1.3.1", "masterminds/html5": "^2.6", "monolog/monolog": "^1.25.1", "nyholm/psr7": "^1.0", diff --git a/src/Symfony/Component/HttpClient/CHANGELOG.md b/src/Symfony/Component/HttpClient/CHANGELOG.md index 973bb6107b..c36aeb65ce 100644 --- a/src/Symfony/Component/HttpClient/CHANGELOG.md +++ b/src/Symfony/Component/HttpClient/CHANGELOG.md @@ -5,7 +5,7 @@ CHANGELOG ----- * added `StreamWrapper` - * added `HttplugClient` + * added `HttplugClient` with support for sync and async requests * added `max_duration` option * added support for NTLM authentication * added `$response->toStream()` to cast responses to regular PHP streams diff --git a/src/Symfony/Component/HttpClient/HttplugClient.php b/src/Symfony/Component/HttpClient/HttplugClient.php index 71eb5200ce..6ba3cca620 100644 --- a/src/Symfony/Component/HttpClient/HttplugClient.php +++ b/src/Symfony/Component/HttpClient/HttplugClient.php @@ -11,31 +11,38 @@ namespace Symfony\Component\HttpClient; +use GuzzleHttp\Promise\Promise as GuzzlePromise; use Http\Client\Exception\NetworkException; use Http\Client\Exception\RequestException; -use Http\Client\HttpClient; +use Http\Client\HttpAsyncClient; +use Http\Client\HttpClient as HttplugInterface; use Http\Message\RequestFactory; use Http\Message\StreamFactory; use Http\Message\UriFactory; -use Psr\Http\Client\ClientInterface; -use Psr\Http\Client\NetworkExceptionInterface; -use Psr\Http\Client\RequestExceptionInterface; +use Http\Promise\Promise; +use Http\Promise\RejectedPromise; +use Nyholm\Psr7\Factory\Psr17Factory; +use Nyholm\Psr7\Request; +use Nyholm\Psr7\Uri; +use Psr\Http\Message\RequestFactoryInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseFactoryInterface; -use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\StreamInterface; +use Psr\Http\Message\UriFactoryInterface; use Psr\Http\Message\UriInterface; +use Symfony\Component\HttpClient\Response\HttplugPromise; +use Symfony\Component\HttpClient\Response\ResponseTrait; +use Symfony\Component\HttpClient\Response\StreamWrapper; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; -if (!interface_exists(HttpClient::class)) { +if (!interface_exists(HttplugInterface::class)) { throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".'); } -if (!interface_exists(ClientInterface::class)) { - throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".'); -} - if (!interface_exists(RequestFactory::class)) { throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".'); } @@ -43,42 +50,166 @@ if (!interface_exists(RequestFactory::class)) { /** * An adapter to turn a Symfony HttpClientInterface into an Httplug client. * - * Run "composer require psr/http-client" to install the base ClientInterface. Run - * "composer require nyholm/psr7" to install an efficient implementation of response + * Run "composer require nyholm/psr7" to install an efficient implementation of response * and stream factories with flex-provided autowiring aliases. * * @author Nicolas Grekas
*/
-final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory
+final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory
{
private $client;
+ private $responseFactory;
+ private $streamFactory;
+ private $promisePool = [];
+ private $pendingResponse;
public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
{
- $this->client = new Psr18Client($client, $responseFactory, $streamFactory);
+ $this->client = $client ?? HttpClient::create();
+ $this->responseFactory = $responseFactory;
+ $this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
+ $this->promisePool = new \SplObjectStorage();
+
+ if (null !== $this->responseFactory && null !== $this->streamFactory) {
+ return;
+ }
+
+ if (!class_exists(Psr17Factory::class)) {
+ throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
+ }
+
+ $psr17Factory = new Psr17Factory();
+ $this->responseFactory = $this->responseFactory ?? $psr17Factory;
+ $this->streamFactory = $this->streamFactory ?? $psr17Factory;
}
/**
* {@inheritdoc}
*/
- public function sendRequest(RequestInterface $request): ResponseInterface
+ public function sendRequest(RequestInterface $request): Psr7ResponseInterface
{
try {
- return $this->client->sendRequest($request);
- } catch (RequestExceptionInterface $e) {
- throw new RequestException($e->getMessage(), $request, $e);
- } catch (NetworkExceptionInterface $e) {
+ return $this->createPsr7Response($this->sendPsr7Request($request));
+ } catch (TransportExceptionInterface $e) {
throw new NetworkException($e->getMessage(), $request, $e);
}
}
+ /**
+ * {@inheritdoc}
+ *
+ * @return HttplugPromise
+ */
+ public function sendAsyncRequest(RequestInterface $request): Promise
+ {
+ if (!class_exists(GuzzlePromise::class)) {
+ throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__));
+ }
+
+ try {
+ $response = $this->sendPsr7Request($request, true);
+ } catch (NetworkException $e) {
+ return new RejectedPromise($e);
+ }
+
+ $cancel = function () use ($response) {
+ $response->cancel();
+ unset($this->promisePool[$response]);
+ };
+
+ $promise = new GuzzlePromise(function () use ($response) {
+ $this->pendingResponse = $response;
+ $this->wait();
+ }, $cancel);
+
+ $this->promisePool[$response] = [$request, $promise];
+
+ return new HttplugPromise($promise, $cancel);
+ }
+
+ /**
+ * Resolve pending promises that complete before the timeouts are reached.
+ *
+ * When $maxDuration is null and $idleTimeout is reached, promises are rejected.
+ *
+ * @return int The number of remaining pending promises
+ */
+ public function wait(float $maxDuration = null, float $idleTimeout = null): int
+ {
+ $pendingResponse = $this->pendingResponse;
+ $this->pendingResponse = null;
+
+ if (null !== $maxDuration) {
+ $startTime = microtime(true);
+ $idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
+ $remainingDuration = $maxDuration;
+ }
+
+ do {
+ foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
+ try {
+ if (null !== $maxDuration && $chunk->isTimeout()) {
+ goto check_duration;
+ }
+
+ if ($chunk->isFirst()) {
+ // Deactivate throwing on 3/4/5xx
+ $response->getStatusCode();
+ }
+
+ if (!$chunk->isLast()) {
+ goto check_duration;
+ }
+
+ if ([$request, $promise] = $this->promisePool[$response] ?? null) {
+ unset($this->promisePool[$response]);
+ $promise->resolve($this->createPsr7Response($response, true));
+ }
+ } catch (\Exception $e) {
+ if ([$request, $promise] = $this->promisePool[$response] ?? null) {
+ unset($this->promisePool[$response]);
+
+ if ($e instanceof TransportExceptionInterface) {
+ $e = new NetworkException($e->getMessage(), $request, $e);
+ }
+
+ $promise->reject($e);
+ }
+ }
+
+ if ($pendingResponse === $response) {
+ return \count($this->promisePool);
+ }
+
+ check_duration:
+ if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
+ $idleTimeout = $remainingDuration / 5;
+ break;
+ }
+ }
+
+ if (!$count = \count($this->promisePool)) {
+ return 0;
+ }
+ } while (null !== $maxDuration && 0 < $remainingDuration);
+
+ return $count;
+ }
+
/**
* {@inheritdoc}
*/
public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface
{
- $request = $this->client
- ->createRequest($method, $uri)
+ if ($this->responseFactory instanceof RequestFactoryInterface) {
+ $request = $this->responseFactory->createRequest($method, $uri);
+ } elseif (!class_exists(Request::class)) {
+ throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
+ } else {
+ $request = new Request($method, $uri);
+ }
+
+ $request = $request
->withProtocolVersion($protocolVersion)
->withBody($this->createStream($body))
;
@@ -100,27 +231,84 @@ final class HttplugClient implements HttpClient, RequestFactory, StreamFactory,
}
if (\is_string($body ?? '')) {
- $body = $this->client->createStream($body ?? '');
-
- if ($body->isSeekable()) {
- $body->seek(0);
- }
-
- return $body;
+ $stream = $this->streamFactory->createStream($body ?? '');
+ } elseif (\is_resource($body)) {
+ $stream = $this->streamFactory->createStreamFromResource($body);
+ } else {
+ throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
}
- if (\is_resource($body)) {
- return $this->client->createStreamFromResource($body);
+ if ($stream->isSeekable()) {
+ $stream->seek(0);
}
- throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
+ return $stream;
}
/**
* {@inheritdoc}
*/
- public function createUri($uri = ''): UriInterface
+ public function createUri($uri): UriInterface
{
- return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri);
+ if ($uri instanceof UriInterface) {
+ return $uri;
+ }
+
+ if ($this->responseFactory instanceof UriFactoryInterface) {
+ return $this->responseFactory->createUri($uri);
+ }
+
+ if (!class_exists(Uri::class)) {
+ throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
+ }
+
+ return new Uri($uri);
+ }
+
+ private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
+ {
+ try {
+ $body = $request->getBody();
+
+ if ($body->isSeekable()) {
+ $body->seek(0);
+ }
+
+ return $this->client->request($request->getMethod(), (string) $request->getUri(), [
+ 'headers' => $request->getHeaders(),
+ 'body' => $body->getContents(),
+ 'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null,
+ 'buffer' => $buffer,
+ ]);
+ } catch (\InvalidArgumentException $e) {
+ throw new RequestException($e->getMessage(), $request, $e);
+ } catch (TransportExceptionInterface $e) {
+ throw new NetworkException($e->getMessage(), $request, $e);
+ }
+ }
+
+ private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
+ {
+ $psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
+
+ foreach ($response->getHeaders(false) as $name => $values) {
+ foreach ($values as $value) {
+ $psrResponse = $psrResponse->withAddedHeader($name, $value);
+ }
+ }
+
+ if (isset(class_uses($response)[ResponseTrait::class])) {
+ $body = $this->streamFactory->createStreamFromResource($response->toStream(false));
+ } elseif (!$buffer) {
+ $body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
+ } else {
+ $body = $this->streamFactory->createStream($response->getContent(false));
+ }
+
+ if ($body->isSeekable()) {
+ $body->seek(0);
+ }
+
+ return $psrResponse->withBody($body);
}
}
diff --git a/src/Symfony/Component/HttpClient/Response/HttplugPromise.php b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php
new file mode 100644
index 0000000000..73637b38f7
--- /dev/null
+++ b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php
@@ -0,0 +1,73 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\HttpClient\Response;
+
+use GuzzleHttp\Promise\PromiseInterface as GuzzlePromiseInterface;
+use Http\Promise\Promise as HttplugPromiseInterface;
+use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
+
+/**
+ * @author Tobias Nyholm