Merge branch '5.1'
* 5.1: Fix extra SQL support in Doctrine migrations [HttpClient] fix management of shorter-than-requested timeouts with AmpHttpClient
This commit is contained in:
commit
b40c1abf9a
@ -64,8 +64,7 @@ final class MessengerTransportDoctrineSchemaSubscriber implements EventSubscribe
|
||||
continue;
|
||||
}
|
||||
|
||||
$extraSql = $transport->getExtraSetupSqlForTable($table);
|
||||
if (null === $extraSql) {
|
||||
if (!$extraSql = $transport->getExtraSetupSqlForTable($table)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -79,7 +78,9 @@ final class MessengerTransportDoctrineSchemaSubscriber implements EventSubscribe
|
||||
* the only way to inject some extra SQL.
|
||||
*/
|
||||
$event->addSql($createTableSql);
|
||||
$event->addSql($extraSql);
|
||||
foreach ($extraSql as $sql) {
|
||||
$event->addSql($sql);
|
||||
}
|
||||
$event->preventDefault();
|
||||
|
||||
return;
|
||||
|
@ -61,7 +61,7 @@ class MessengerTransportDoctrineSchemaSubscriberTest extends TestCase
|
||||
$doctrineTransport->expects($this->once())
|
||||
->method('getExtraSetupSqlForTable')
|
||||
->with($table)
|
||||
->willReturn('ALTER TABLE pizza ADD COLUMN extra_cheese boolean');
|
||||
->willReturn(['ALTER TABLE pizza ADD COLUMN extra_cheese boolean']);
|
||||
|
||||
// we use the platform to generate the full create table sql
|
||||
$platform->expects($this->once())
|
||||
@ -87,7 +87,7 @@ class MessengerTransportDoctrineSchemaSubscriberTest extends TestCase
|
||||
$doctrineTransport = $this->createMock(DoctrineTransport::class);
|
||||
$doctrineTransport->expects($this->once())
|
||||
->method('getExtraSetupSqlForTable')
|
||||
->willReturn(null);
|
||||
->willReturn([]);
|
||||
|
||||
$platform->expects($this->never())
|
||||
->method('getCreateTableSQL');
|
||||
|
@ -42,6 +42,8 @@ final class AmpResponse implements ResponseInterface
|
||||
private $canceller;
|
||||
private $onProgress;
|
||||
|
||||
private static $delay;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
@ -171,18 +173,21 @@ final class AmpResponse implements ResponseInterface
|
||||
*/
|
||||
private static function select(ClientState $multi, float $timeout): int
|
||||
{
|
||||
$selected = 1;
|
||||
$delay = Loop::delay(1000 * $timeout, static function () use (&$selected) {
|
||||
$selected = 0;
|
||||
Loop::stop();
|
||||
});
|
||||
Loop::run();
|
||||
$start = microtime(true);
|
||||
$remaining = $timeout;
|
||||
|
||||
if ($selected) {
|
||||
Loop::cancel($delay);
|
||||
while (true) {
|
||||
self::$delay = Loop::delay(1000 * $remaining, [Loop::class, 'stop']);
|
||||
Loop::run();
|
||||
|
||||
if (null === self::$delay) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (0 >= $remaining = $timeout - microtime(true) + $start) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return $selected;
|
||||
}
|
||||
|
||||
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
|
||||
@ -192,7 +197,7 @@ final class AmpResponse implements ResponseInterface
|
||||
$request->setInformationalResponseHandler(static function (Response $response) use (&$activity, $id, &$info, &$headers) {
|
||||
self::addResponseHeaders($response, $info, $headers);
|
||||
$activity[$id][] = new InformationalChunk($response->getStatus(), $response->getHeaders());
|
||||
Loop::defer([Loop::class, 'stop']);
|
||||
self::stopLoop();
|
||||
});
|
||||
|
||||
try {
|
||||
@ -210,7 +215,7 @@ final class AmpResponse implements ResponseInterface
|
||||
if ('HEAD' === $response->getRequest()->getMethod() || \in_array($info['http_code'], [204, 304], true)) {
|
||||
$activity[$id][] = null;
|
||||
$activity[$id][] = null;
|
||||
Loop::defer([Loop::class, 'stop']);
|
||||
self::stopLoop();
|
||||
|
||||
return;
|
||||
}
|
||||
@ -222,7 +227,7 @@ final class AmpResponse implements ResponseInterface
|
||||
$body = $response->getBody();
|
||||
|
||||
while (true) {
|
||||
Loop::defer([Loop::class, 'stop']);
|
||||
self::stopLoop();
|
||||
|
||||
if (null === $data = yield $body->read()) {
|
||||
break;
|
||||
@ -241,7 +246,7 @@ final class AmpResponse implements ResponseInterface
|
||||
$info['download_content_length'] = $info['size_download'];
|
||||
}
|
||||
|
||||
Loop::defer([Loop::class, 'stop']);
|
||||
self::stopLoop();
|
||||
}
|
||||
|
||||
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger)
|
||||
@ -402,4 +407,14 @@ final class AmpResponse implements ResponseInterface
|
||||
return $response;
|
||||
}
|
||||
}
|
||||
|
||||
private static function stopLoop(): void
|
||||
{
|
||||
if (null !== self::$delay) {
|
||||
Loop::cancel(self::$delay);
|
||||
self::$delay = null;
|
||||
}
|
||||
|
||||
Loop::defer([Loop::class, 'stop']);
|
||||
}
|
||||
}
|
||||
|
@ -12,18 +12,15 @@
|
||||
namespace Symfony\Component\HttpClient\Tests;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\HttpClient\AmpHttpClient;
|
||||
use Symfony\Component\HttpClient\CurlHttpClient;
|
||||
use Symfony\Component\HttpClient\HttpClient;
|
||||
use Symfony\Component\HttpClient\NativeHttpClient;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
|
||||
class HttpClientTest extends TestCase
|
||||
{
|
||||
public function testCreateClient()
|
||||
{
|
||||
if (\extension_loaded('curl') && ('\\' !== \DIRECTORY_SEPARATOR || ini_get('curl.cainfo') || ini_get('openssl.cafile') || ini_get('openssl.capath')) && 0x073d00 <= curl_version()['version_number']) {
|
||||
$this->assertInstanceOf(CurlHttpClient::class, HttpClient::create());
|
||||
} else {
|
||||
$this->assertInstanceOf(AmpHttpClient::class, HttpClient::create());
|
||||
}
|
||||
$this->assertInstanceOf(HttpClientInterface::class, HttpClient::create());
|
||||
$this->assertNotInstanceOf(NativeHttpClient::class, HttpClient::create());
|
||||
}
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ class PostgreSqlConnectionTest extends TestCase
|
||||
|
||||
$table = new Table('queue_table');
|
||||
$table->addOption('_symfony_messenger_table_name', 'queue_table');
|
||||
$this->assertStringContainsString('CREATE TRIGGER', $connection->getExtraSetupSqlForTable($table));
|
||||
$this->assertStringContainsString('CREATE TRIGGER', implode("\n", $connection->getExtraSetupSqlForTable($table)));
|
||||
}
|
||||
|
||||
public function testGetExtraSetupSqlWrongTable()
|
||||
@ -62,6 +62,6 @@ class PostgreSqlConnectionTest extends TestCase
|
||||
|
||||
$table = new Table('queue_table');
|
||||
// don't set the _symfony_messenger_table_name option
|
||||
$this->assertNull($connection->getExtraSetupSqlForTable($table));
|
||||
$this->assertSame([], $connection->getExtraSetupSqlForTable($table));
|
||||
}
|
||||
}
|
||||
|
@ -285,9 +285,9 @@ class Connection implements ResetInterface
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): ?string
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
return null;
|
||||
return [];
|
||||
}
|
||||
|
||||
private function createAvailableMessagesQueryBuilder(): QueryBuilder
|
||||
|
@ -111,8 +111,10 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa
|
||||
|
||||
/**
|
||||
* Adds extra SQL if the given table was created by the Connection.
|
||||
*
|
||||
* @return string[]
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): ?string
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
return $this->connection->getExtraSetupSqlForTable($createdTable);
|
||||
}
|
||||
|
@ -85,43 +85,43 @@ final class PostgreSqlConnection extends Connection
|
||||
{
|
||||
parent::setup();
|
||||
|
||||
$this->driverConnection->exec($this->getTriggerSql());
|
||||
$this->driverConnection->exec(implode("\n", $this->getTriggerSql()));
|
||||
}
|
||||
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): ?string
|
||||
/**
|
||||
* @return string[]
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
|
||||
return null;
|
||||
return [];
|
||||
}
|
||||
|
||||
if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
|
||||
return null;
|
||||
return [];
|
||||
}
|
||||
|
||||
return $this->getTriggerSql();
|
||||
}
|
||||
|
||||
private function getTriggerSql(): string
|
||||
private function getTriggerSql(): array
|
||||
{
|
||||
return sprintf(<<<'SQL'
|
||||
LOCK TABLE %1$s;
|
||||
-- create trigger function
|
||||
return [
|
||||
sprintf('LOCK TABLE %s;', $this->configuration['table_name']),
|
||||
// create trigger function
|
||||
sprintf(<<<'SQL'
|
||||
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('%1$s', NEW.queue_name::text);
|
||||
RETURN NEW;
|
||||
BEGIN
|
||||
PERFORM pg_notify('%1$s', NEW.queue_name::text);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- register trigger
|
||||
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
|
||||
|
||||
CREATE TRIGGER notify_trigger
|
||||
AFTER INSERT
|
||||
ON %1$s
|
||||
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
|
||||
SQL
|
||||
, $this->configuration['table_name']);
|
||||
, $this->configuration['table_name']),
|
||||
// register trigger
|
||||
sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']),
|
||||
sprintf('CREATE TRIGGER notify_trigger AFTER INSERT ON %1$s FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();', $this->configuration['table_name']),
|
||||
];
|
||||
}
|
||||
|
||||
private function unlisten()
|
||||
|
@ -116,7 +116,7 @@ switch ($vars['REQUEST_URI']) {
|
||||
echo '<1>';
|
||||
@ob_flush();
|
||||
flush();
|
||||
usleep(600000);
|
||||
usleep(500000);
|
||||
echo '<2>';
|
||||
exit;
|
||||
|
||||
|
@ -745,7 +745,7 @@ abstract class HttpClientTestCase extends TestCase
|
||||
usleep(300000); // wait for the previous test to release the server
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
$response = $client->request('GET', 'http://localhost:8057/timeout-body', [
|
||||
'timeout' => 0.3,
|
||||
'timeout' => 0.25,
|
||||
]);
|
||||
|
||||
try {
|
||||
|
Reference in New Issue
Block a user