[lock] Add store dedicated to postgresql

This commit is contained in:
Jérémy Derussé 2020-09-29 16:39:47 +02:00 committed by Fabien Potencier
parent b0ae228152
commit 3d114be680
12 changed files with 562 additions and 4 deletions

View File

@ -15,6 +15,12 @@ jobs:
php: ['7.2', '7.4']
services:
postgres:
image: postgres:9.6-alpine
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: 'password'
redis:
image: redis:6.0.0
ports:
@ -144,6 +150,7 @@ jobs:
MEMCACHED_HOST: localhost
MONGODB_HOST: localhost
KAFKA_BROKER: localhost:9092
POSTGRES_HOST: localhost
- name: Run HTTP push tests
if: matrix.php == '7.4'

View File

@ -9,6 +9,8 @@ CHANGELOG
* added `NoLock`
* deprecated `NotSupportedException`, it shouldn't be thrown anymore.
* deprecated `RetryTillSaveStore`, logic has been moved in `Lock` and is not needed anymore.
* added `InMemoryStore`
* added `PostgreSqlStore`
5.1.0
-----

View File

@ -0,0 +1,114 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Store;
use Symfony\Component\Lock\Exception\LockConflictedException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\SharedLockStoreInterface;
/**
* InMemoryStore is a PersistingStoreInterface implementation using
* php-array to manage locks.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class InMemoryStore implements SharedLockStoreInterface
{
private $locks = [];
private $readLocks = [];
public function save(Key $key)
{
$hashKey = (string) $key;
$token = $this->getUniqueToken($key);
if (isset($this->locks[$hashKey])) {
// already acquired
if ($this->locks[$hashKey] === $token) {
return;
}
throw new LockConflictedException();
}
// check for promotion
if (isset($this->readLocks[$hashKey][$token]) && 1 === \count($this->readLocks[$hashKey])) {
unset($this->readLocks[$hashKey]);
$this->locks[$hashKey] = $token;
return;
}
if (\count($this->readLocks[$hashKey] ?? []) > 0) {
throw new LockConflictedException();
}
$this->locks[$hashKey] = $token;
}
public function saveRead(Key $key)
{
$hashKey = (string) $key;
$token = $this->getUniqueToken($key);
// check if lock is already acquired in read mode
if (isset($this->readLocks[$hashKey])) {
$this->readLocks[$hashKey][$token] = true;
return;
}
// check for demotion
if (isset($this->locks[$hashKey])) {
if ($this->locks[$hashKey] !== $token) {
throw new LockConflictedException();
}
unset($this->locks[$hashKey]);
}
$this->readLocks[$hashKey][$token] = true;
}
public function putOffExpiration(Key $key, float $ttl)
{
// do nothing, memory locks forever.
}
public function delete(Key $key)
{
$hashKey = (string) $key;
$token = $this->getUniqueToken($key);
unset($this->readLocks[$hashKey][$token]);
if (($this->locks[$hashKey] ?? null) === $token) {
unset($this->locks[$hashKey]);
}
}
public function exists(Key $key)
{
$hashKey = (string) $key;
$token = $this->getUniqueToken($key);
return isset($this->readLocks[$hashKey][$token]) || ($this->locks[$hashKey] ?? null) === $token;
}
private function getUniqueToken(Key $key): string
{
if (!$key->hasState(__CLASS__)) {
$token = base64_encode(random_bytes(32));
$key->setState(__CLASS__, $token);
}
return $key->getState(__CLASS__);
}
}

View File

@ -74,7 +74,6 @@ class PdoStore implements PersistingStoreInterface
*
* @throws InvalidArgumentException When first argument is not PDO nor Connection nor string
* @throws InvalidArgumentException When PDO error mode is not PDO::ERRMODE_EXCEPTION
* @throws InvalidArgumentException When namespace contains invalid characters
* @throws InvalidArgumentException When the initial ttl is not valid
*/
public function __construct($connOrDsn, array $options = [], float $gcProbability = 0.01, int $initialTtl = 300)

View File

@ -0,0 +1,283 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Store;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Symfony\Component\Lock\BlockingSharedLockStoreInterface;
use Symfony\Component\Lock\BlockingStoreInterface;
use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Exception\LockConflictedException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\PersistingStoreInterface;
/**
* PostgreSqlStore is a PersistingStoreInterface implementation using
* PostgreSql advisory locks.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class PostgreSqlStore implements BlockingSharedLockStoreInterface, BlockingStoreInterface
{
private $conn;
private $dsn;
private $username = '';
private $password = '';
private $connectionOptions = [];
private static $storeRegistry = [];
/**
* You can either pass an existing database connection as PDO instance or
* a Doctrine DBAL Connection or a DSN string that will be used to
* lazy-connect to the database when the lock is actually used.
*
* List of available options:
* * db_username: The username when lazy-connect [default: '']
* * db_password: The password when lazy-connect [default: '']
* * db_connection_options: An array of driver-specific connection options [default: []]
*
* @param \PDO|Connection|string $connOrDsn A \PDO or Connection instance or DSN string or null
* @param array $options An associative array of options
*
* @throws InvalidArgumentException When first argument is not PDO nor Connection nor string
* @throws InvalidArgumentException When PDO error mode is not PDO::ERRMODE_EXCEPTION
* @throws InvalidArgumentException When namespace contains invalid characters
*/
public function __construct($connOrDsn, array $options = [])
{
if ($connOrDsn instanceof \PDO) {
if (\PDO::ERRMODE_EXCEPTION !== $connOrDsn->getAttribute(\PDO::ATTR_ERRMODE)) {
throw new InvalidArgumentException(sprintf('"%s" requires PDO error mode attribute be set to throw Exceptions (i.e. $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION)).', __METHOD__));
}
$this->conn = $connOrDsn;
$this->checkDriver();
} elseif ($connOrDsn instanceof Connection) {
$this->conn = $connOrDsn;
$this->checkDriver();
} elseif (\is_string($connOrDsn)) {
$this->dsn = $connOrDsn;
} else {
throw new InvalidArgumentException(sprintf('"%s" requires PDO or Doctrine\DBAL\Connection instance or DSN string as first argument, "%s" given.', __CLASS__, get_debug_type($connOrDsn)));
}
$this->username = $options['db_username'] ?? $this->username;
$this->password = $options['db_password'] ?? $this->password;
$this->connectionOptions = $options['db_connection_options'] ?? $this->connectionOptions;
}
public function save(Key $key)
{
// prevent concurrency within the same connection
$this->getInternalStore()->save($key);
$sql = 'SELECT pg_try_advisory_lock(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
// Check if lock is acquired
if (true === (\is_object($result) ? $result->fetchOne() : $stmt->fetchColumn())) {
// release sharedLock in case of promotion
$this->unlockShared($key);
return;
}
throw new LockConflictedException();
}
public function saveRead(Key $key)
{
// prevent concurrency within the same connection
$this->getInternalStore()->saveRead($key);
$sql = 'SELECT pg_try_advisory_lock_shared(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
// Check if lock is acquired
if (true === (\is_object($result) ? $result->fetchOne() : $stmt->fetchColumn())) {
// release lock in case of demotion
$this->unlock($key);
return;
}
throw new LockConflictedException();
}
public function putOffExpiration(Key $key, float $ttl)
{
// postgresql locks forever.
// check if lock still exists
if (!$this->exists($key)) {
throw new LockConflictedException();
}
}
public function delete(Key $key)
{
// Prevent deleting locks own by an other key in the same connection
if (!$this->exists($key)) {
return;
}
$this->unlock($key);
// Prevent deleting Readlocks own by current key AND an other key in the same connection
$store = $this->getInternalStore();
try {
// If lock acquired = there is no other ReadLock
$store->save($key);
$this->unlockShared($key);
} catch (LockConflictedException $e) {
// an other key exists in this ReadLock
}
$store->delete($key);
}
public function exists(Key $key)
{
$sql = "SELECT count(*) FROM pg_locks WHERE locktype='advisory' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
if ((\is_object($result) ? $result->fetchOne() : $stmt->fetchColumn()) > 0) {
// connection is locked, check for lock in internal store
return $this->getInternalStore()->exists($key);
}
return false;
}
public function waitAndSave(Key $key)
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->save($key);
$sql = 'SELECT pg_advisory_lock(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$stmt->execute();
// release lock in case of promotion
$this->unlockShared($key);
}
public function waitAndSaveRead(Key $key)
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->saveRead($key);
$sql = 'SELECT pg_advisory_lock_shared(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$stmt->execute();
// release lock in case of demotion
$this->unlock($key);
}
/**
* Returns a hashed version of the key.
*/
private function getHashedKey(Key $key): int
{
return crc32((string) $key);
}
private function unlock(Key $key): void
{
while (true) {
$sql = "SELECT pg_advisory_unlock(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ExclusiveLock' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
if (0 === (\is_object($result) ? $result : $stmt)->rowCount()) {
break;
}
}
}
private function unlockShared(Key $key): void
{
while (true) {
$sql = "SELECT pg_advisory_unlock_shared(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ShareLock' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
if (0 === (\is_object($result) ? $result : $stmt)->rowCount()) {
break;
}
}
}
/**
* @return \PDO|Connection
*/
private function getConnection(): object
{
if (null === $this->conn) {
if (strpos($this->dsn, '://')) {
if (!class_exists(DriverManager::class)) {
throw new InvalidArgumentException(sprintf('Failed to parse the DSN "%s". Try running "composer require doctrine/dbal".', $this->dsn));
}
$this->conn = DriverManager::getConnection(['url' => $this->dsn]);
} else {
$this->conn = new \PDO($this->dsn, $this->username, $this->password, $this->connectionOptions);
$this->conn->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
}
$this->checkDriver();
}
return $this->conn;
}
private function checkDriver(): void
{
if ($this->conn instanceof \PDO) {
if ('pgsql' !== $driver = $this->conn->getAttribute(\PDO::ATTR_DRIVER_NAME)) {
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, $driver));
}
} else {
$driver = $this->conn->getDriver();
switch (true) {
case $driver instanceof \Doctrine\DBAL\Driver\PDOPgSql\Driver:
case $driver instanceof \Doctrine\DBAL\Driver\PDO\PgSQL\Driver:
break;
default:
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, \get_class($driver)));
}
}
}
private function getInternalStore(): PersistingStoreInterface
{
$namespace = spl_object_hash($this->getConnection());
return self::$storeRegistry[$namespace] ?? self::$storeRegistry[$namespace] = new InMemoryStore();
}
}

View File

@ -102,6 +102,9 @@ class RedisStore implements SharedLockStoreInterface
$this->checkNotExpired($key);
}
/**
* {@inheritdoc}
*/
public function saveRead(Key $key)
{
$script = '

View File

@ -97,8 +97,16 @@ class StoreFactory
case 0 === strpos($connection, 'sqlite3://'):
return new PdoStore($connection);
case 0 === strpos($connection, 'pgsql+advisory:'):
case 0 === strpos($connection, 'postgres+advisory://'):
case 0 === strpos($connection, 'postgresql+advisory://'):
return new PostgreSqlStore(preg_replace('/^([^:+]+)\+advisory/', '$1', $connection));
case 0 === strpos($connection, 'zookeeper://'):
return new ZookeeperStore(ZookeeperStore::createConnection($connection));
case 'in-memory' === $connection:
return new InMemoryStore();
}
throw new InvalidArgumentException(sprintf('Unsupported Connection: "%s".', $connection));

View File

@ -61,11 +61,11 @@ trait BlockingStoreTestTrait
$store->save($key);
$this->fail('The store saves a locked key.');
} catch (LockConflictedException $e) {
} finally {
// send the ready signal to the child
posix_kill($childPID, \SIGHUP);
}
// send the ready signal to the child
posix_kill($childPID, \SIGHUP);
// This call should be blocked by the child #1
$store->waitAndSave($key);
$this->assertTrue($store->exists($key));

View File

@ -0,0 +1,31 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Tests\Store;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\InMemoryStore;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class InMemoryStoreTest extends AbstractStoreTest
{
use SharedLockStoreTestTrait;
/**
* {@inheritdoc}
*/
public function getStore(): PersistingStoreInterface
{
return new InMemoryStore();
}
}

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Tests\Store;
use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\PostgreSqlStore;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*
* @requires extension pdo_pgsql
* @group integration
*/
class PostgreSqlDbalStoreTest extends AbstractStoreTest
{
use SharedLockStoreTestTrait;
use BlockingStoreTestTrait;
/**
* {@inheritdoc}
*/
public function getStore(): PersistingStoreInterface
{
if (!getenv('POSTGRES_HOST')) {
$this->markTestSkipped('Missing POSTGRES_HOST env variable');
}
return new PostgreSqlStore('pgsql://postgres:password@'.getenv('POSTGRES_HOST'));
}
public function testInvalidDriver()
{
$store = new PostgreSqlStore('sqlite:///foo.db');
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('The adapter "Symfony\Component\Lock\Store\PostgreSqlStore" does not support');
$store->exists(new Key('foo'));
}
}

View File

@ -0,0 +1,53 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Tests\Store;
use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\PostgreSqlStore;
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*
* @requires extension pdo_pgsql
* @group integration
*/
class PostgreSqlStoreTest extends AbstractStoreTest
{
use SharedLockStoreTestTrait;
use BlockingStoreTestTrait;
/**
* {@inheritdoc}
*/
public function getStore(): PersistingStoreInterface
{
if (!getenv('POSTGRES_HOST')) {
$this->markTestSkipped('Missing POSTGRES_HOST env variable');
}
return new PostgreSqlStore('pgsql:host='.getenv('POSTGRES_HOST'), ['db_username' => 'postgres', 'db_password' => 'password']);
}
/**
* @requires extension pdo_sqlite
*/
public function testInvalidDriver()
{
$store = new PostgreSqlStore('sqlite:/tmp/foo.db');
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('The adapter "Symfony\Component\Lock\Store\PostgreSqlStore" does not support');
$store->exists(new Key('foo'));
}
}

View File

@ -15,9 +15,11 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Cache\Traits\RedisProxy;
use Symfony\Component\Lock\Store\FlockStore;
use Symfony\Component\Lock\Store\InMemoryStore;
use Symfony\Component\Lock\Store\MemcachedStore;
use Symfony\Component\Lock\Store\MongoDbStore;
use Symfony\Component\Lock\Store\PdoStore;
use Symfony\Component\Lock\Store\PostgreSqlStore;
use Symfony\Component\Lock\Store\RedisStore;
use Symfony\Component\Lock\Store\SemaphoreStore;
use Symfony\Component\Lock\Store\StoreFactory;
@ -77,19 +79,25 @@ class StoreFactoryTest extends TestCase
yield ['sqlite::memory:', PdoStore::class];
yield ['mysql:host=localhost;dbname=test;', PdoStore::class];
yield ['pgsql:host=localhost;dbname=test;', PdoStore::class];
yield ['pgsql+advisory:host=localhost;dbname=test;', PostgreSqlStore::class];
yield ['oci:host=localhost;dbname=test;', PdoStore::class];
yield ['sqlsrv:server=localhost;Database=test', PdoStore::class];
yield ['mysql://server.com/test', PdoStore::class];
yield ['mysql2://server.com/test', PdoStore::class];
yield ['pgsql://server.com/test', PdoStore::class];
yield ['pgsql+advisory://server.com/test', PostgreSqlStore::class];
yield ['postgres://server.com/test', PdoStore::class];
yield ['postgres+advisory://server.com/test', PostgreSqlStore::class];
yield ['postgresql://server.com/test', PdoStore::class];
yield ['postgresql+advisory://server.com/test', PostgreSqlStore::class];
yield ['sqlite:///tmp/test', PdoStore::class];
yield ['sqlite3:///tmp/test', PdoStore::class];
yield ['oci:///server.com/test', PdoStore::class];
yield ['mssql:///server.com/test', PdoStore::class];
}
yield ['in-memory', InMemoryStore::class];
yield ['flock', FlockStore::class];
yield ['flock://'.sys_get_temp_dir(), FlockStore::class];
}