Skip to content

Commit

Permalink
Merge pull request #18 from php-lock/mysql-lock
Browse files Browse the repository at this point in the history
Add locking backend for MySQL GET_LOCK() function
  • Loading branch information
Willem Stuursma-Ruwen authored May 21, 2018
2 parents 741eb27 + 941e058 commit ba72c91
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 21 deletions.
75 changes: 75 additions & 0 deletions classes/mutex/MySQLMutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

namespace malkusch\lock\mutex;

use malkusch\lock\exception\LockAcquireException;
use malkusch\lock\exception\TimeoutException;

class MySQLMutex extends LockMutex
{
/**
* @var \PDO
*/
private $pdo;

/**
* @var string
*/
private $name;
/**
* @var int
*/
private $timeout;

public function __construct(\PDO $PDO, $name, $timeout = 0)
{
$this->pdo = $PDO;

if (\strlen($name) > 64) {
throw new \InvalidArgumentException("The maximum length of the lock name is 64 characters.");
}

$this->name = $name;
$this->timeout = $timeout;
}

/**
* @throws LockAcquireException
*/
public function lock()
{
$statement = $this->pdo->prepare("SELECT GET_LOCK(?,?)");

$statement->execute([
$this->name,
$this->timeout,
]);

$statement->setFetchMode(\PDO::FETCH_NUM);
$row = $statement->fetch();

if ($row[0] == 1) {
/*
* Returns 1 if the lock was obtained successfully.
*/
return;
}

if ($row[0] === null) {
/*
* NULL if an error occurred (such as running out of memory or the thread was killed with mysqladmin kill).
*/
throw new LockAcquireException("An error occurred while acquiring the lock");
}

throw new TimeoutException("Timeout when acquiring lock.");
}

public function unlock()
{
$statement = $this->pdo->prepare("DO RELEASE_LOCK(?)");
$statement->execute([
$this->name
]);
}
}
6 changes: 3 additions & 3 deletions classes/util/Loop.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function end()
}

/**
* Repeats executing a code until it was succesful.
* Repeats executing a code until it was successful.
*
* The code has to be designed in a way that it can be repeated without any
* side effects. When execution was successful it should notify that event
Expand Down Expand Up @@ -84,7 +84,7 @@ public function execute(callable $code)
/*
* Calculate max time remaining, don't sleep any longer than that.
*/
$usecRemaining = intval(($timeout - microtime(true)) * 1e6);
$usecRemaining = \intval(($timeout - microtime(true)) * 1e6);

if ($usecRemaining <= 0) {
/*
Expand All @@ -93,7 +93,7 @@ public function execute(callable $code)
throw new TimeoutException("Timeout of $this->timeout seconds exceeded.");
}

$usleep = min($usecRemaining, \random_int($min, $max));
$usleep = \min($usecRemaining, \random_int($min, $max));

usleep($usleep);
}
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
},
"require-dev": {
"ext-memcached": "*",
"ext-redis": "^2.2.4|^3.0|^4.0",
"ext-redis": "*",
"ext-pcntl": "*",
"ext-pdo_mysql": "*",
"ext-pdo_sqlite": "*",
"kriswallsmith/spork": "^0.3",
Expand Down
2 changes: 1 addition & 1 deletion tests/mutex/MemcachedMutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function testFailAcquireLock()
{
$mutex = new MemcachedMutex("testFailAcquireLock", $this->memcached, 1);

$this->memcached->add(MemcachedMutex::PREFIX."testFailAcquireLock", true, 2);
$this->memcached->add(MemcachedMutex::PREFIX."testFailAcquireLock", "xxx", 999);

$mutex->synchronized(function () {
$this->fail("execution is not expected");
Expand Down
36 changes: 25 additions & 11 deletions tests/mutex/MutexConcurrencyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
*/
class MutexConcurrencyTest extends \PHPUnit_Framework_TestCase
{

/**
* @var \PDO The pdo instance.
*/
private $pdo;

/**
* Gets a PDO instance.
*
Expand All @@ -57,9 +56,13 @@ private function getPDO($dsn, $user)
private function fork($concurrency, callable $code)
{
$manager = new ProcessManager();
$manager->setDebug(true);

for ($i = 0; $i < $concurrency; $i++) {
$manager->fork($code);
}

$manager->check();
}

/**
Expand Down Expand Up @@ -99,21 +102,22 @@ public function provideTestHighContention()
{
$cases = array_map(function (array $mutexFactory) {
$file = tmpfile();
fputs($file, pack("i", 0));
fflush($file);
fwrite($file, pack("i", 0));

return [
function ($increment) use ($file) {
fseek($file, 0);
rewind($file);
flock($file, LOCK_EX);
$data = fread($file, 4);
$counter = unpack("i", $data)[1];

$counter += $increment;

fseek($file, 0);
rewind($file);
fwrite($file, pack("i", $counter));
fflush($file);


flock($file, LOCK_UN);

return $counter;
},
$mutexFactory[0]
Expand All @@ -122,15 +126,16 @@ function ($increment) use ($file) {

$addPDO = function ($dsn, $user, $vendor) use (&$cases) {
$pdo = $this->getPDO($dsn, $user);
$pdo->beginTransaction();


$options = ["mysql" => "engine=InnoDB"];
$option = isset($options[$vendor]) ? $options[$vendor] : "";
$pdo->exec("CREATE TABLE IF NOT EXISTS counter(id INT PRIMARY KEY, counter INT) $option");


$pdo->beginTransaction();
$pdo->exec("DELETE FROM counter");
$pdo->exec("INSERT INTO counter VALUES (1, 0)");
$pdo->commit();

$this->pdo = null;

$cases[$vendor] = [
Expand Down Expand Up @@ -259,6 +264,15 @@ function ($uri) {
return new PHPRedisMutex($apis, "test", $timeout);
}];
}

if (getenv("MYSQL_DSN")) {
$cases["MySQLMutex"] = [function ($timeout = 3) {
$pdo = new \PDO(getenv("MYSQL_DSN"), getenv("MYSQL_USER"));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new MySQLMutex($pdo, "test", $timeout);
}];
}

return $cases;
}
Expand Down
18 changes: 14 additions & 4 deletions tests/mutex/MutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
*/
class MutexTest extends \PHPUnit_Framework_TestCase
{

const TIMEOUT = 3;
const TIMEOUT = 4;

/**
* Provides Mutex factories.
Expand Down Expand Up @@ -109,6 +108,15 @@ function ($uri) {
}];
}

if (getenv("MYSQL_DSN")) {
$cases["MySQLMutex"] = [function () {
$pdo = new \PDO(getenv("MYSQL_DSN"), getenv("MYSQL_USER"));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new MySQLMutex($pdo, "test", self::TIMEOUT);
}];
}

return $cases;
}

Expand Down Expand Up @@ -150,11 +158,12 @@ public function testRelease(callable $mutexFactory)
* @param callable $mutexFactory The Mutex factory.
* @test
* @dataProvider provideMutexFactories
* @requires PHP 7.0
*/
public function testLiveness(callable $mutexFactory)
{
$manager = new ProcessManager();
$manager->setDebug(true);

$manager->fork(function () use ($mutexFactory) {
$mutex = call_user_func($mutexFactory);
$mutex->synchronized(function () {
Expand All @@ -168,6 +177,8 @@ public function testLiveness(callable $mutexFactory)
$mutex = call_user_func($mutexFactory);
$mutex->synchronized(function () {
});

$manager->check();
}

/**
Expand All @@ -177,7 +188,6 @@ public function testLiveness(callable $mutexFactory)
* @test
* @dataProvider provideMutexFactories
* @expectedException \DomainException
* @requires PHP 5.6
*/
public function testSynchronizedPassesExceptionThrough(callable $mutexFactory)
{
Expand Down
5 changes: 4 additions & 1 deletion tests/mutex/PHPRedisMutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace malkusch\lock\mutex;

use Redis;
use RedisException;

/**
* Tests for PHPRedisMutex.
Expand Down Expand Up @@ -93,6 +92,10 @@ public function testSyncronizedWorks($serialization)

public function dpSerializationModes()
{
if (!class_exists(Redis::class)) {
return [];
}

$serializers = [
[Redis::SERIALIZER_NONE],
[Redis::SERIALIZER_PHP],
Expand Down

0 comments on commit ba72c91

Please sign in to comment.