[HttpClient] Async HTTPlug client

This commit is contained in:
Nyholm 2019-09-26 11:41:29 +02:00 committed by Nicolas Grekas
parent 732c0344ea
commit 4fd593f869
6 changed files with 357 additions and 34 deletions

View File

@ -108,6 +108,7 @@
"doctrine/orm": "~2.4,>=2.4.5",
"doctrine/reflection": "~1.0",
"doctrine/doctrine-bundle": "~1.4",
"guzzlehttp/promises": "^1.3.1",
"masterminds/html5": "^2.6",
"monolog/monolog": "^1.25.1",
"nyholm/psr7": "^1.0",

View File

@ -5,7 +5,7 @@ CHANGELOG
-----
* added `StreamWrapper`
* added `HttplugClient`
* added `HttplugClient` with support for sync and async requests
* added `max_duration` option
* added support for NTLM authentication
* added `$response->toStream()` to cast responses to regular PHP streams

View File

@ -11,31 +11,38 @@
namespace Symfony\Component\HttpClient;
use GuzzleHttp\Promise\Promise as GuzzlePromise;
use Http\Client\Exception\NetworkException;
use Http\Client\Exception\RequestException;
use Http\Client\HttpClient;
use Http\Client\HttpAsyncClient;
use Http\Client\HttpClient as HttplugInterface;
use Http\Message\RequestFactory;
use Http\Message\StreamFactory;
use Http\Message\UriFactory;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Client\NetworkExceptionInterface;
use Psr\Http\Client\RequestExceptionInterface;
use Http\Promise\Promise;
use Http\Promise\RejectedPromise;
use Nyholm\Psr7\Factory\Psr17Factory;
use Nyholm\Psr7\Request;
use Nyholm\Psr7\Uri;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\StreamInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use Symfony\Component\HttpClient\Response\HttplugPromise;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
if (!interface_exists(HttpClient::class)) {
if (!interface_exists(HttplugInterface::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".');
}
if (!interface_exists(ClientInterface::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".');
}
if (!interface_exists(RequestFactory::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".');
}
@ -43,42 +50,166 @@ if (!interface_exists(RequestFactory::class)) {
/**
* An adapter to turn a Symfony HttpClientInterface into an Httplug client.
*
* Run "composer require psr/http-client" to install the base ClientInterface. Run
* "composer require nyholm/psr7" to install an efficient implementation of response
* Run "composer require nyholm/psr7" to install an efficient implementation of response
* and stream factories with flex-provided autowiring aliases.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory
final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory
{
private $client;
private $responseFactory;
private $streamFactory;
private $promisePool = [];
private $pendingResponse;
public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
{
$this->client = new Psr18Client($client, $responseFactory, $streamFactory);
$this->client = $client ?? HttpClient::create();
$this->responseFactory = $responseFactory;
$this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
$this->promisePool = new \SplObjectStorage();
if (null !== $this->responseFactory && null !== $this->streamFactory) {
return;
}
if (!class_exists(Psr17Factory::class)) {
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
}
$psr17Factory = new Psr17Factory();
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
}
/**
* {@inheritdoc}
*/
public function sendRequest(RequestInterface $request): ResponseInterface
public function sendRequest(RequestInterface $request): Psr7ResponseInterface
{
try {
return $this->client->sendRequest($request);
} catch (RequestExceptionInterface $e) {
throw new RequestException($e->getMessage(), $request, $e);
} catch (NetworkExceptionInterface $e) {
return $this->createPsr7Response($this->sendPsr7Request($request));
} catch (TransportExceptionInterface $e) {
throw new NetworkException($e->getMessage(), $request, $e);
}
}
/**
* {@inheritdoc}
*
* @return HttplugPromise
*/
public function sendAsyncRequest(RequestInterface $request): Promise
{
if (!class_exists(GuzzlePromise::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__));
}
try {
$response = $this->sendPsr7Request($request, true);
} catch (NetworkException $e) {
return new RejectedPromise($e);
}
$cancel = function () use ($response) {
$response->cancel();
unset($this->promisePool[$response]);
};
$promise = new GuzzlePromise(function () use ($response) {
$this->pendingResponse = $response;
$this->wait();
}, $cancel);
$this->promisePool[$response] = [$request, $promise];
return new HttplugPromise($promise, $cancel);
}
/**
* Resolve pending promises that complete before the timeouts are reached.
*
* When $maxDuration is null and $idleTimeout is reached, promises are rejected.
*
* @return int The number of remaining pending promises
*/
public function wait(float $maxDuration = null, float $idleTimeout = null): int
{
$pendingResponse = $this->pendingResponse;
$this->pendingResponse = null;
if (null !== $maxDuration) {
$startTime = microtime(true);
$idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
$remainingDuration = $maxDuration;
}
do {
foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
try {
if (null !== $maxDuration && $chunk->isTimeout()) {
goto check_duration;
}
if ($chunk->isFirst()) {
// Deactivate throwing on 3/4/5xx
$response->getStatusCode();
}
if (!$chunk->isLast()) {
goto check_duration;
}
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
unset($this->promisePool[$response]);
$promise->resolve($this->createPsr7Response($response, true));
}
} catch (\Exception $e) {
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
unset($this->promisePool[$response]);
if ($e instanceof TransportExceptionInterface) {
$e = new NetworkException($e->getMessage(), $request, $e);
}
$promise->reject($e);
}
}
if ($pendingResponse === $response) {
return \count($this->promisePool);
}
check_duration:
if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
$idleTimeout = $remainingDuration / 5;
break;
}
}
if (!$count = \count($this->promisePool)) {
return 0;
}
} while (null !== $maxDuration && 0 < $remainingDuration);
return $count;
}
/**
* {@inheritdoc}
*/
public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface
{
$request = $this->client
->createRequest($method, $uri)
if ($this->responseFactory instanceof RequestFactoryInterface) {
$request = $this->responseFactory->createRequest($method, $uri);
} elseif (!class_exists(Request::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
} else {
$request = new Request($method, $uri);
}
$request = $request
->withProtocolVersion($protocolVersion)
->withBody($this->createStream($body))
;
@ -100,27 +231,84 @@ final class HttplugClient implements HttpClient, RequestFactory, StreamFactory,
}
if (\is_string($body ?? '')) {
$body = $this->client->createStream($body ?? '');
if ($body->isSeekable()) {
$body->seek(0);
}
return $body;
$stream = $this->streamFactory->createStream($body ?? '');
} elseif (\is_resource($body)) {
$stream = $this->streamFactory->createStreamFromResource($body);
} else {
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
}
if (\is_resource($body)) {
return $this->client->createStreamFromResource($body);
if ($stream->isSeekable()) {
$stream->seek(0);
}
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
return $stream;
}
/**
* {@inheritdoc}
*/
public function createUri($uri = ''): UriInterface
public function createUri($uri): UriInterface
{
return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri);
if ($uri instanceof UriInterface) {
return $uri;
}
if ($this->responseFactory instanceof UriFactoryInterface) {
return $this->responseFactory->createUri($uri);
}
if (!class_exists(Uri::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
}
return new Uri($uri);
}
private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
{
try {
$body = $request->getBody();
if ($body->isSeekable()) {
$body->seek(0);
}
return $this->client->request($request->getMethod(), (string) $request->getUri(), [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null,
'buffer' => $buffer,
]);
} catch (\InvalidArgumentException $e) {
throw new RequestException($e->getMessage(), $request, $e);
} catch (TransportExceptionInterface $e) {
throw new NetworkException($e->getMessage(), $request, $e);
}
}
private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
{
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
}
}
if (isset(class_uses($response)[ResponseTrait::class])) {
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
} elseif (!$buffer) {
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
} else {
$body = $this->streamFactory->createStream($response->getContent(false));
}
if ($body->isSeekable()) {
$body->seek(0);
}
return $psrResponse->withBody($body);
}
}

View File

@ -0,0 +1,73 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Response;
use GuzzleHttp\Promise\PromiseInterface as GuzzlePromiseInterface;
use Http\Promise\Promise as HttplugPromiseInterface;
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
/**
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*
* @internal
*/
final class HttplugPromise implements HttplugPromiseInterface
{
private $promise;
private $cancel;
public function __construct(GuzzlePromiseInterface $promise, callable $cancel = null)
{
$this->promise = $promise;
$this->cancel = $cancel;
}
public function then(callable $onFulfilled = null, callable $onRejected = null): self
{
return new self($this->promise->then($onFulfilled, $onRejected));
}
public function cancel(): void
{
$this->promise->cancel();
}
/**
* {@inheritdoc}
*/
public function getState(): string
{
return $this->promise->getState();
}
/**
* {@inheritdoc}
*
* @return Psr7ResponseInterface|mixed
*/
public function wait($unwrap = true)
{
return $this->promise->wait($unwrap);
}
public function __destruct()
{
if ($this->cancel) {
($this->cancel)();
}
}
public function __wakeup()
{
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
}
}

View File

@ -13,7 +13,9 @@ namespace Symfony\Component\HttpClient\Tests;
use Http\Client\Exception\NetworkException;
use Http\Client\Exception\RequestException;
use Http\Promise\Promise;
use PHPUnit\Framework\TestCase;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\HttpClient\HttplugClient;
use Symfony\Component\HttpClient\NativeHttpClient;
use Symfony\Contracts\HttpClient\Test\TestHttpServer;
@ -41,6 +43,38 @@ class HttplugClientTest extends TestCase
$this->assertSame('HTTP/1.1', $body['SERVER_PROTOCOL']);
}
public function testSendAsyncRequest()
{
$client = new HttplugClient(new NativeHttpClient());
$promise = $client->sendAsyncRequest($client->createRequest('GET', 'http://localhost:8057'));
$successCallableCalled = false;
$failureCallableCalled = false;
$promise->then(function (ResponseInterface $response) use (&$successCallableCalled) {
$successCallableCalled = true;
return $response;
}, function (\Exception $exception) use (&$failureCallableCalled) {
$failureCallableCalled = true;
throw $exception;
});
$this->assertEquals(Promise::PENDING, $promise->getState());
$response = $promise->wait(true);
$this->assertTrue($successCallableCalled, '$promise->then() was never called.');
$this->assertFalse($failureCallableCalled, 'Failure callable should not be called when request is successful.');
$this->assertEquals(Promise::FULFILLED, $promise->getState());
$this->assertSame(200, $response->getStatusCode());
$this->assertSame('application/json', $response->getHeaderLine('content-type'));
$body = json_decode((string) $response->getBody(), true);
$this->assertSame('HTTP/1.1', $body['SERVER_PROTOCOL']);
}
public function testPostRequest()
{
$client = new HttplugClient(new NativeHttpClient());
@ -62,6 +96,32 @@ class HttplugClientTest extends TestCase
$client->sendRequest($client->createRequest('GET', 'http://localhost:8058'));
}
public function testAsyncNetworkException()
{
$client = new HttplugClient(new NativeHttpClient());
$promise = $client->sendAsyncRequest($client->createRequest('GET', 'http://localhost:8058'));
$successCallableCalled = false;
$failureCallableCalled = false;
$promise->then(function (ResponseInterface $response) use (&$successCallableCalled) {
$successCallableCalled = true;
return $response;
}, function (\Exception $exception) use (&$failureCallableCalled) {
$failureCallableCalled = true;
throw $exception;
});
$promise->wait(false);
$this->assertFalse($successCallableCalled, 'Success callable should not be called when request fails.');
$this->assertTrue($failureCallableCalled, 'Failure callable was never called.');
$this->assertEquals(Promise::REJECTED, $promise->getState());
$this->expectException(NetworkException::class);
$promise->wait(true);
}
public function testRequestException()
{
$client = new HttplugClient(new NativeHttpClient());

View File

@ -26,6 +26,7 @@
"symfony/polyfill-php73": "^1.11"
},
"require-dev": {
"guzzlehttp/promises": "^1.3.1",
"nyholm/psr7": "^1.0",
"php-http/httplug": "^1.0|^2.0",
"psr/http-client": "^1.0",