diff --git a/classes/mutex/MySQLMutex.php b/classes/mutex/MySQLMutex.php new file mode 100644 index 00000000..0ed9e4d5 --- /dev/null +++ b/classes/mutex/MySQLMutex.php @@ -0,0 +1,75 @@ +pdo = $PDO; + + if (\strlen($name) > 64) { + throw new \InvalidArgumentException("The maximum length of the lock name is 64 characters."); + } + + $this->name = $name; + $this->timeout = $timeout; + } + + /** + * @throws LockAcquireException + */ + public function lock() + { + $statement = $this->pdo->prepare("SELECT GET_LOCK(?,?)"); + + $statement->execute([ + $this->name, + $this->timeout, + ]); + + $statement->setFetchMode(\PDO::FETCH_NUM); + $row = $statement->fetch(); + + if ($row[0] == 1) { + /* + * Returns 1 if the lock was obtained successfully. + */ + return; + } + + if ($row[0] === null) { + /* + * NULL if an error occurred (such as running out of memory or the thread was killed with mysqladmin kill). + */ + throw new LockAcquireException("An error occurred while acquiring the lock"); + } + + throw new TimeoutException("Timeout when acquiring lock."); + } + + public function unlock() + { + $statement = $this->pdo->prepare("DO RELEASE_LOCK(?)"); + $statement->execute([ + $this->name + ]); + } +} diff --git a/classes/util/Loop.php b/classes/util/Loop.php index 667f3452..db33adae 100644 --- a/classes/util/Loop.php +++ b/classes/util/Loop.php @@ -50,7 +50,7 @@ public function end() } /** - * Repeats executing a code until it was succesful. + * Repeats executing a code until it was successful. * * The code has to be designed in a way that it can be repeated without any * side effects. When execution was successful it should notify that event @@ -84,7 +84,7 @@ public function execute(callable $code) /* * Calculate max time remaining, don't sleep any longer than that. */ - $usecRemaining = intval(($timeout - microtime(true)) * 1e6); + $usecRemaining = \intval(($timeout - microtime(true)) * 1e6); if ($usecRemaining <= 0) { /* @@ -93,7 +93,7 @@ public function execute(callable $code) throw new TimeoutException("Timeout of $this->timeout seconds exceeded."); } - $usleep = min($usecRemaining, \random_int($min, $max)); + $usleep = \min($usecRemaining, \random_int($min, $max)); usleep($usleep); } diff --git a/composer.json b/composer.json index d793c603..fdc3fedb 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,8 @@ }, "require-dev": { "ext-memcached": "*", - "ext-redis": "^2.2.4|^3.0|^4.0", + "ext-redis": "*", + "ext-pcntl": "*", "ext-pdo_mysql": "*", "ext-pdo_sqlite": "*", "kriswallsmith/spork": "^0.3", diff --git a/tests/mutex/MemcachedMutexTest.php b/tests/mutex/MemcachedMutexTest.php index e6b1455e..985b0ca5 100644 --- a/tests/mutex/MemcachedMutexTest.php +++ b/tests/mutex/MemcachedMutexTest.php @@ -37,7 +37,7 @@ public function testFailAcquireLock() { $mutex = new MemcachedMutex("testFailAcquireLock", $this->memcached, 1); - $this->memcached->add(MemcachedMutex::PREFIX."testFailAcquireLock", true, 2); + $this->memcached->add(MemcachedMutex::PREFIX."testFailAcquireLock", "xxx", 999); $mutex->synchronized(function () { $this->fail("execution is not expected"); diff --git a/tests/mutex/MutexConcurrencyTest.php b/tests/mutex/MutexConcurrencyTest.php index ebbde3c8..3b9e32a2 100644 --- a/tests/mutex/MutexConcurrencyTest.php +++ b/tests/mutex/MutexConcurrencyTest.php @@ -25,12 +25,11 @@ */ class MutexConcurrencyTest extends \PHPUnit_Framework_TestCase { - /** * @var \PDO The pdo instance. */ private $pdo; - + /** * Gets a PDO instance. * @@ -57,9 +56,13 @@ private function getPDO($dsn, $user) private function fork($concurrency, callable $code) { $manager = new ProcessManager(); + $manager->setDebug(true); + for ($i = 0; $i < $concurrency; $i++) { $manager->fork($code); } + + $manager->check(); } /** @@ -99,21 +102,22 @@ public function provideTestHighContention() { $cases = array_map(function (array $mutexFactory) { $file = tmpfile(); - fputs($file, pack("i", 0)); - fflush($file); + fwrite($file, pack("i", 0)); return [ function ($increment) use ($file) { - fseek($file, 0); + rewind($file); + flock($file, LOCK_EX); $data = fread($file, 4); $counter = unpack("i", $data)[1]; $counter += $increment; - fseek($file, 0); + rewind($file); fwrite($file, pack("i", $counter)); - fflush($file); - + + flock($file, LOCK_UN); + return $counter; }, $mutexFactory[0] @@ -122,15 +126,16 @@ function ($increment) use ($file) { $addPDO = function ($dsn, $user, $vendor) use (&$cases) { $pdo = $this->getPDO($dsn, $user); - $pdo->beginTransaction(); - + $options = ["mysql" => "engine=InnoDB"]; $option = isset($options[$vendor]) ? $options[$vendor] : ""; $pdo->exec("CREATE TABLE IF NOT EXISTS counter(id INT PRIMARY KEY, counter INT) $option"); - + + $pdo->beginTransaction(); $pdo->exec("DELETE FROM counter"); $pdo->exec("INSERT INTO counter VALUES (1, 0)"); $pdo->commit(); + $this->pdo = null; $cases[$vendor] = [ @@ -259,6 +264,15 @@ function ($uri) { return new PHPRedisMutex($apis, "test", $timeout); }]; } + + if (getenv("MYSQL_DSN")) { + $cases["MySQLMutex"] = [function ($timeout = 3) { + $pdo = new \PDO(getenv("MYSQL_DSN"), getenv("MYSQL_USER")); + $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); + + return new MySQLMutex($pdo, "test", $timeout); + }]; + } return $cases; } diff --git a/tests/mutex/MutexTest.php b/tests/mutex/MutexTest.php index e10c4d83..1c378115 100644 --- a/tests/mutex/MutexTest.php +++ b/tests/mutex/MutexTest.php @@ -23,8 +23,7 @@ */ class MutexTest extends \PHPUnit_Framework_TestCase { - - const TIMEOUT = 3; + const TIMEOUT = 4; /** * Provides Mutex factories. @@ -109,6 +108,15 @@ function ($uri) { }]; } + if (getenv("MYSQL_DSN")) { + $cases["MySQLMutex"] = [function () { + $pdo = new \PDO(getenv("MYSQL_DSN"), getenv("MYSQL_USER")); + $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); + + return new MySQLMutex($pdo, "test", self::TIMEOUT); + }]; + } + return $cases; } @@ -150,11 +158,12 @@ public function testRelease(callable $mutexFactory) * @param callable $mutexFactory The Mutex factory. * @test * @dataProvider provideMutexFactories - * @requires PHP 7.0 */ public function testLiveness(callable $mutexFactory) { $manager = new ProcessManager(); + $manager->setDebug(true); + $manager->fork(function () use ($mutexFactory) { $mutex = call_user_func($mutexFactory); $mutex->synchronized(function () { @@ -168,6 +177,8 @@ public function testLiveness(callable $mutexFactory) $mutex = call_user_func($mutexFactory); $mutex->synchronized(function () { }); + + $manager->check(); } /** @@ -177,7 +188,6 @@ public function testLiveness(callable $mutexFactory) * @test * @dataProvider provideMutexFactories * @expectedException \DomainException - * @requires PHP 5.6 */ public function testSynchronizedPassesExceptionThrough(callable $mutexFactory) { diff --git a/tests/mutex/PHPRedisMutexTest.php b/tests/mutex/PHPRedisMutexTest.php index 7e2ed56e..3fd31a19 100644 --- a/tests/mutex/PHPRedisMutexTest.php +++ b/tests/mutex/PHPRedisMutexTest.php @@ -3,7 +3,6 @@ namespace malkusch\lock\mutex; use Redis; -use RedisException; /** * Tests for PHPRedisMutex. @@ -93,6 +92,10 @@ public function testSyncronizedWorks($serialization) public function dpSerializationModes() { + if (!class_exists(Redis::class)) { + return []; + } + $serializers = [ [Redis::SERIALIZER_NONE], [Redis::SERIALIZER_PHP],