.github/workflows/ci.yml 0000644 00000006615 14135605015 0011267 0 ustar 00 name: Continuous Integration on: - push - pull_request jobs: tests: strategy: matrix: include: - operating-system: 'ubuntu-latest' php-version: '7.1' composer-require-checker-version: none - operating-system: 'ubuntu-latest' php-version: '7.2' composer-require-checker-version: 2.1.0 - operating-system: 'ubuntu-latest' php-version: '7.3' composer-require-checker-version: 2.1.0 - operating-system: 'ubuntu-latest' php-version: '7.4' composer-require-checker-version: 3.3.0 - operating-system: 'ubuntu-latest' php-version: '8.0' composer-flags: '--ignore-platform-req=php' composer-require-checker-version: 3.3.0 - operating-system: 'windows-latest' php-version: '7.4' composer-require-checker-version: 3.3.0 job-description: 'on Windows' - operating-system: 'macos-latest' php-version: '7.4' composer-require-checker-version: 3.3.0 job-description: 'on macOS' name: PHP ${{ matrix.php-version }} ${{ matrix.job-description }} runs-on: ${{ matrix.operating-system }} steps: - name: Set git to use LF run: | git config --global core.autocrlf false git config --global core.eol lf - name: Checkout code uses: actions/checkout@v2 - name: Setup PHP uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php-version }} - name: Get Composer cache directory id: composer-cache run: echo "::set-output name=dir::$(composer config cache-dir)" - name: Cache dependencies uses: actions/cache@v2 with: path: ${{ steps.composer-cache.outputs.dir }} key: composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-${{ matrix.composer-flags }} restore-keys: | composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}- composer-${{ runner.os }}-${{ matrix.php-version }}- composer-${{ runner.os }}- composer- - name: Install dependencies uses: nick-invision/retry@v2 with: timeout_minutes: 5 max_attempts: 5 retry_wait_seconds: 30 command: | composer update --optimize-autoloader --no-interaction --no-progress ${{ matrix.composer-flags }} composer info -D - name: Run tests run: vendor/bin/phpunit ${{ matrix.phpunit-flags }} - name: Run style fixer run: vendor/bin/php-cs-fixer --diff --dry-run -v fix env: PHP_CS_FIXER_IGNORE_ENV: 1 - name: Install composer-require-checker run: php -r 'file_put_contents("composer-require-checker.phar", file_get_contents("https://github.com/maglnet/ComposerRequireChecker/releases/download/${{ matrix.composer-require-checker-version }}/composer-require-checker.phar"));' if: runner.os != 'Windows' && matrix.composer-require-checker-version != 'none' - name: Run composer-require-checker run: php composer-require-checker.phar check composer.json --config-file $PWD/composer-require-check.json if: runner.os != 'Windows' && matrix.composer-require-checker-version != 'none' .gitmodules 0000644 00000000140 14135605015 0006714 0 ustar 00 [submodule "docs/.shared"] path = docs/.shared url = https://github.com/amphp/amphp.github.io .php_cs.dist 0000644 00000000414 14135605015 0006762 0 ustar 00 getFinder() ->in(__DIR__ . '/src') ->in(__DIR__ . '/test'); $cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__; $config->setCacheFile($cacheDir . '/.php_cs.cache'); return $config; LICENSE 0000644 00000002065 14135605015 0005554 0 ustar 00 The MIT License (MIT) Copyright (c) 2015-2021 amphp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. README.md 0000644 00000003074 14135605015 0006027 0 ustar 00
Synchronization primitives for PHP applications and libraries using Amp.
## Installation This package can be installed as a [Composer](https://getcomposer.org/) dependency. ```bash composer require amphp/sync ``` ## Documentation Documentation can be found on [amphp.org/sync](https://amphp.org/sync/) as well as in the [`./docs`](./docs) directory. ## Versioning `amphp/sync` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages. ## Security If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker. ## License The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. composer-require-check.json 0000644 00000001373 14135605015 0012017 0 ustar 00 { "symbol-whitelist": [ "null", "true", "false", "static", "self", "parent", "array", "string", "int", "float", "bool", "iterable", "callable", "void", "object", "MSG_EAGAIN", "MSG_ENOMSG", "msg_get_queue", "MSG_IPC_NOWAIT", "msg_queue_exists", "msg_receive", "msg_remove_queue", "msg_send", "msg_set_queue", "msg_stat_queue", "Threaded", "transform", "SysvMessageQueue" ], "php-core-extensions": [ "Core", "date", "pcre", "Phar", "Reflection", "SPL", "standard" ] } composer.json 0000755 00000002065 14135605015 0007274 0 ustar 00 { "name": "amphp/sync", "description": "Mutex, Semaphore, and other synchronization tools for Amp.", "keywords": [ "asynchronous", "async", "mutex", "semaphore", "synchronization" ], "homepage": "https://github.com/amphp/sync", "license": "MIT", "authors": [ { "name": "Aaron Piotrowski", "email": "aaron@trowski.com" }, { "name": "Stephen Coakley", "email": "me@stephencoakley.com" } ], "require": { "php": ">=7.1", "amphp/amp": "^2.2" }, "require-dev": { "phpunit/phpunit": "^9 || ^8 || ^7", "amphp/phpunit-util": "^1.1", "amphp/php-cs-fixer-config": "dev-master" }, "autoload": { "psr-4": { "Amp\\Sync\\": "src" }, "files": [ "src/functions.php", "src/ConcurrentIterator/functions.php" ] }, "autoload-dev": { "psr-4": { "Amp\\Sync\\Test\\": "test" } } } docs/Gemfile 0000644 00000000171 14135605015 0006766 0 ustar 00 source "https://rubygems.org" gem "github-pages" gem "kramdown" gem "jekyll-github-metadata" gem "jekyll-relative-links" docs/_config.yml 0000644 00000000710 14135605015 0007621 0 ustar 00 kramdown: input: GFM toc_levels: 2..3 baseurl: "/sync" layouts_dir: ".shared/layout" includes_dir: ".shared/includes" exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"] safe: true repository: amphp/sync gems: - "jekyll-github-metadata" - "jekyll-relative-links" defaults: - scope: path: "" type: "pages" values: layout: "docs" shared_asset_path: "/sync/asset" navigation: - mutex - semaphore - concurrent-iterator docs/concurrent-iterator.md 0000644 00000011515 14135605015 0012032 0 ustar 00 --- title: Concurrent Iterators permalink: /concurrent-iterator --- As already stated in the [preamble of our documentation](https://amphp.org/amp/), the weak link when managing concurrency is humans; so `amphp/sync` provides abstractions to hide the complexity of concurrent iteration. ## Concurrency Approaches Given you have a list of URLs you want to crawl, let's discuss a few possible approaches. For simplicity, we will assume a `fetch` function already exists, which takes a URL and returns the HTTP status code (which is everything we want to know for these examples). ### Approach 1: Sequential Simple loop using non-blocking I/O, but no concurrency while fetching the individual URLs; starts the second request as soon as the first completed. ```php $urls = [...]; Amp\call(function () use ($urls) { $results = []; foreach ($urls as $url) { // always wait for the promise to resolve before fetching the next one $statusCode = yield fetch($url); $results[$url] = $statusCode; } return $results; }); ``` ### Approach 2: Everything Concurrently Almost the same loop, but awaiting all promises at once; starts all requests immediately. Might not be feasible with too many URLs. ```php $urls = [...]; Amp\call(function () use ($urls) { $results = []; foreach ($urls as $url) { // note the missing yield, we're adding the promises to the array $statusCodePromise = fetch($url); $results[$url] = $statusCodePromise; } // yielding an array of promises awaits them all at once $results = yield $results; return $results; }); ``` ### Approach 3: Concurrent Chunks Splitting the jobs into chunks of ten; all requests within a chunk are made concurrently, but each chunk sequentially, so the timing for each chunk depends on the slowest response; starts the eleventh request as soon as the first ten requests completed. ```php $urls = [...]; Amp\call(function () use ($urls) { $results = []; foreach (\array_chunk($urls, 10) as $chunk) { $promises = []; foreach ($chunk as $url) { // note the missing yield, we're adding the promises to the array $statusCodePromise = fetch($url); $promises[$url] = $statusCodePromise; } // yielding an array of promises awaits them all at once $results = \array_merge($results, yield $promises); } return $results; }); ``` ### Approach 4: Concurrent Iterator Concurrent iteration, keeping the concurrency at a maximum of ten; starts the eleventh request as soon as any of the first ten requests completes. ```php $urls = [...]; Amp\call(function () use ($urls) { $results = []; yield Amp\Sync\ConcurrentIterator\each( Amp\Iterator\fromIterable($urls), new Amp\Sync\LocalSemaphore(10), function (string $url) use (&$results) { $statusCode = yield fetch($url); $results[$url] = $statusCode; } ); return $results; }); ``` ## Provided APIs ### `Amp\Sync\ConcurrentIterator\each` Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation. The returned `Promise` resolves as soon as the iterator is empty and all operations are completed. Use `LocalSemaphore` if you don't need to synchronize beyond a single process. ```php function each(Iterator $iterator, Semaphore $semaphore, callable $processor): Promise { // ... } ``` ### `Amp\Sync\ConcurrentIterator\map` Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation. Returns a new `Iterator` instance with the return values of `$processor`. Use `LocalSemaphore` if you don't need to synchronize beyond a single process. ```php function map(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator { // ... } ``` ### `Amp\Sync\ConcurrentIterator\filter` Calls `$filter` for each item in the iterator while acquiring a lock from `$semaphore` during each operation. Returns a new `Iterator` instance with the original values where `$filter` resolves to `true`. Use `LocalSemaphore` if you don't need to synchronize beyond a single process. ```php function filter(Iterator $iterator, Semaphore $semaphore, callable $filter): Iterator { // ... } ``` ### `Amp\Sync\ConcurrentIterator\transform` Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation. `$processor` receives the current element and an `$emit` callable as arguments. This function can be used to implement additional concurrent iteration functions and is the base for `map`, `filter`, and `each`. Use `LocalSemaphore` if you don't need to synchronize beyond a single process. ```php function transform(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator { // ... } ``` docs/index.md 0000644 00000000740 14135605015 0007126 0 ustar 00 --- title: Synchronization Tools permalink: / --- This package defines synchronization primitives for PHP applications and libraries using Amp, such as locks, semaphores and concurrency limited iterator operations. ## Installation This package can be installed as a [Composer](https://getcomposer.org/) dependency. ```bash composer require amphp/sync ``` ## Usage See [`Mutex`](./mutex.md), [`Semaphore`](./semaphore.md), and [concurrent iteration](./concurrent-iterator.md). docs/mutex.md 0000644 00000002534 14135605015 0007164 0 ustar 00 --- title: Mutex permalink: /mutex --- [Mutual exclusion](https://en.wikipedia.org/wiki/Mutual_exclusion) can be achieved using `Amp\Sync\synchronized()` and any `Mutex` implementation, or by manually using the `Mutex` instance to acquire a lock. Locks are acquired using `Mutex::acquire()`, which returns a `Promise` that resolves to an instance of `Lock` once the lock has been successfully acquired. As long as the resulting `Lock` object isn't released using `Lock::release()` or by being garbage collected, the holder of the lock can exclusively run some code as long as all other parties running the same code also acquire a lock before doing so. ### Examples ```php function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) { return Amp\call(function () use ($mutex, $filePath, $data) { /** @var Amp\Sync\Lock $lock */ $lock = yield $mutex->acquire(); $this->fileHandle = yield Amp\File\open($filePath, 'w'); yield $this->fileHandle->write($data); $lock->release(); }); } ``` ```php function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) { return Amp\Sync\synchronized($mutex, function () use ($filePath, $data) { $this->fileHandle = yield Amp\File\open($filePath, 'w'); yield $this->fileHandle->write($data); }); } ``` docs/semaphore.md 0000644 00000001573 14135605015 0010007 0 ustar 00 --- title: Semaphore permalink: /semaphore --- [Semaphores](https://en.wikipedia.org/wiki/Semaphore_%28programming%29) are another synchronization primitive in addition to [mutual exclusion](./mutex.md). Instead of providing exclusive access to a single party, they provide access to a limited set of N parties at the same time. This makes them great to control concurrency, e.g. limiting an HTTP client to X concurrent requests, so the HTTP server doesn't get overwhelmed. Similar to [`Mutex`](./mutex.md), `Lock` instances can be acquired using `Semaphore::acquire()`. Please refer to the `Mutex` documentation for additional usage documentation, as they're basically equivalent except for the fact that `Mutex` is always a `Semaphore` with a count of exactly one party. In many cases you can use [concurrent iterators](./concurrent-iterator.md) instead of directly using a `Semaphore`. src/Barrier.php 0000644 00000003624 14135605015 0007437 0 ustar 00 arrive(); * $barrier->arrive(); // promise returned from Barrier::await() is now resolved * * yield $barrier->await(); * ``` */ final class Barrier { /** @var int */ private $count; /** @var Deferred */ private $deferred; public function __construct(int $count) { if ($count < 1) { throw new \Error('Count must be positive, got ' . $count); } $this->count = $count; $this->deferred = new Deferred(); } public function getCount(): int { return $this->count; } public function arrive(int $count = 1): void { if ($count < 1) { throw new \Error('Count must be at least 1, got ' . $count); } if ($count > $this->count) { throw new \Error('Count cannot be greater than remaining count: ' . $count . ' > ' . $this->count); } $this->count -= $count; if ($this->count === 0) { $this->deferred->resolve(); } } public function register(int $count = 1): void { if ($count < 1) { throw new \Error('Count must be at least 1, got ' . $count); } if ($this->count === 0) { throw new \Error('Can\'t increase count, because the barrier already broke'); } $this->count += $count; } public function await(): Promise { return $this->deferred->promise(); } } src/ConcurrentIterator/functions.php 0000644 00000012636 14135605015 0013720 0 ustar 00 getId()]); $lock->release(); $barrier->arrive(); } }; while (yield $iterator->advance()) { if ($error) { break; } /** @var Lock $lock */ $lock = yield $semaphore->acquire(); if ($gc || isset($locks[$lock->getId()])) { // Throwing here causes a segfault on PHP 7.3 return; // throw new CancelledException; // producer and locks have been GCed } $locks[$lock->getId()] = true; $barrier->register(); asyncCall($processor, $lock, $iterator->getCurrent()); } $barrier->arrive(); // remove dummy item yield $barrier->await(); if ($error) { throw $error; } }); } /** * Concurrently map all iterator values using {@code $processor}. * * The order of the items in the resulting iterator is not guaranteed in any way. * * @param Iterator $iterator Values to map. * @param Semaphore $semaphore Semaphore limiting the concurrency, e.g. {@code LocalSemaphore} * @param callable $processor Processing callable, which is run as coroutine. It should not throw any errors, * otherwise the entire operation is aborted. * * @return Iterator Mapped values. */ function map(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator { $processor = coroutine($processor); return transform($iterator, $semaphore, static function ($value, callable $emit) use ($processor) { $value = yield $processor($value); yield $emit($value); }); } /** * Concurrently filter all iterator values using {@code $filter}. * * The order of the items in the resulting iterator is not guaranteed in any way. * * @param Iterator $iterator Values to map. * @param Semaphore $semaphore Semaphore limiting the concurrency, e.g. {@code LocalSemaphore} * @param callable $filter Processing callable, which is run as coroutine. It should not throw any errors, * otherwise the entire operation is aborted. Must resolve to a boolean, true to keep values in the resulting * iterator. * * @return Iterator Values, where {@code $filter} resolved to {@code true}. */ function filter(Iterator $iterator, Semaphore $semaphore, callable $filter): Iterator { $filter = coroutine($filter); return transform($iterator, $semaphore, static function ($value, callable $emit) use ($filter) { $keep = yield $filter($value); if (!\is_bool($keep)) { throw new \TypeError(__NAMESPACE__ . '\filter\'s callable must resolve to a boolean value, got ' . \gettype($keep)); } if ($keep) { yield $emit($value); } }); } /** * Concurrently invoke a callback on all iterator values using {@code $processor}. * * @param Iterator $iterator Values to act on. * @param Semaphore $semaphore Semaphore limiting the concurrency, e.g. {@code LocalSemaphore} * @param callable $processor Processing callable, which is run as coroutine. It should not throw any errors, * otherwise the entire operation is aborted. * * @return Promise */ function each(Iterator $iterator, Semaphore $semaphore, callable $processor): Promise { $processor = coroutine($processor); $iterator = transform( $iterator, $semaphore, static function ($value, callable $emit) use ($processor) { yield $processor($value); yield $emit(null); } ); // Use Amp\Iterator\discard in the future return call(static function () use ($iterator) { $count = 0; while (yield $iterator->advance()) { $count++; } return $count; }); } src/FileMutex.php 0000644 00000004263 14135605015 0007753 0 ustar 00 fileName = $fileName; } /** * {@inheritdoc} */ public function acquire(): Promise { return new Coroutine($this->doAcquire()); } /** * @coroutine * * @return \Generator */ private function doAcquire(): \Generator { // Try to create the lock file. If the file already exists, someone else // has the lock, so set an asynchronous timer and try again. while (($handle = @\fopen($this->fileName, 'x')) === false) { yield new Delayed(self::LATENCY_TIMEOUT); } // Return a lock object that can be used to release the lock on the mutex. $lock = new Lock(0, function (): void { $this->release(); }); \fclose($handle); return $lock; } /** * Releases the lock on the mutex. * * @throws SyncException If the unlock operation failed. */ protected function release() { $success = @\unlink($this->fileName); if (!$success) { throw new SyncException('Failed to unlock the mutex file.'); } } } src/Internal/MutexStorage.php 0000644 00000001503 14135605015 0012246 0 ustar 00 locked) { return true; } $this->locked = true; return false; }; while ($this->locked || $this->synchronized($tsl)) { yield new Delayed(self::LATENCY_TIMEOUT); } return new Lock(0, function (): void { $this->locked = false; }); }); } } src/Internal/SemaphoreStorage.php 0000644 00000003505 14135605015 0013073 0 ustar 00 count()) { return null; } return $this->shift(); }; while (!$this->count() || ($id = $this->synchronized($tsl)) === null) { yield new Delayed(self::LATENCY_TIMEOUT); } return new Lock($id, function (Lock $lock): void { $id = $lock->getId(); $this->synchronized(function () use ($id) { $this[] = $id; }); }); }); } } src/KeyedMutex.php 0000644 00000001373 14135605015 0010134 0 ustar 00 Resolves with a lock object with an ID of 0. May fail with a SyncException * if an error occurs when attempting to obtain the lock (e.g. a shared memory segment closed). */ public function acquire(string $key): Promise; } src/KeyedSemaphore.php 0000644 00000001460 14135605015 0010752 0 ustar 00 Resolves with an integer keyed lock object. Identifiers returned by the * locks should be 0-indexed. Releasing an identifier MUST make that same identifier available. May fail with * a SyncException if an error occurs when attempting to obtain the lock (e.g. a shared memory segment closed). */ public function acquire(string $key): Promise; } src/LocalKeyedMutex.php 0000644 00000001534 14135605015 0011106 0 ustar 00 mutex[$key])) { $this->mutex[$key] = new LocalMutex; $this->locks[$key] = 0; } return call(function () use ($key) { $this->locks[$key]++; /** @var Lock $lock */ $lock = yield $this->mutex[$key]->acquire(); return new Lock(0, function () use ($lock, $key) { if (--$this->locks[$key] === 0) { unset($this->mutex[$key], $this->locks[$key]); } $lock->release(); }); }); } } src/LocalKeyedSemaphore.php 0000644 00000002036 14135605015 0011725 0 ustar 00 maxLocks = $maxLocks; } public function acquire(string $key): Promise { if (!isset($this->semaphore[$key])) { $this->semaphore[$key] = new LocalSemaphore($this->maxLocks); $this->locks[$key] = 0; } return call(function () use ($key) { $this->locks[$key]++; /** @var Lock $lock */ $lock = yield $this->semaphore[$key]->acquire(); return new Lock(0, function () use ($lock, $key) { if (--$this->locks[$key] === 0) { unset($this->semaphore[$key], $this->locks[$key]); } $lock->release(); }); }); } } src/LocalMutex.php 0000644 00000001613 14135605015 0010122 0 ustar 00 locked) { $this->locked = true; return new Success(new Lock(0, \Closure::fromCallable([$this, 'release']))); } $this->queue[] = $deferred = new Deferred; return $deferred->promise(); } private function release(): void { if (!empty($this->queue)) { $deferred = \array_shift($this->queue); $deferred->resolve(new Lock(0, \Closure::fromCallable([$this, 'release']))); return; } $this->locked = false; } } src/LocalSemaphore.php 0000644 00000002223 14135605015 0010741 0 ustar 00 locks = \range(0, $maxLocks - 1); } /** {@inheritdoc} */ public function acquire(): Promise { if (!empty($this->locks)) { return new Success(new Lock(\array_shift($this->locks), \Closure::fromCallable([$this, 'release']))); } $this->queue[] = $deferred = new Deferred; return $deferred->promise(); } private function release(Lock $lock): void { $id = $lock->getId(); if (!empty($this->queue)) { $deferred = \array_shift($this->queue); $deferred->resolve(new Lock($id, \Closure::fromCallable([$this, 'release']))); return; } $this->locks[] = $id; } } src/Lock.php 0000644 00000003324 14135605015 0006736 0 ustar 00 id = $id; $this->releaser = $releaser; } /** * Checks if the lock has already been released. * * @return bool True if the lock has already been released, otherwise false. */ public function isReleased(): bool { return !$this->releaser; } /** * @return int Lock identifier. */ public function getId(): int { return $this->id; } /** * Releases the lock. No-op if the lock has already been released. */ public function release() { if (!$this->releaser) { return; } // Invoke the releaser function given to us by the synchronization source // to release the lock. $releaser = $this->releaser; $this->releaser = null; $releaser($this); } /** * Releases the lock when there are no more references to it. */ public function __destruct() { if (!$this->isReleased()) { $this->release(); } } } src/Mutex.php 0000644 00000001256 14135605015 0007152 0 ustar 00 Resolves with a lock object with an ID of 0. May fail with a SyncException * if an error occurs when attempting to obtain the lock (e.g. a shared memory segment closed). */ public function acquire(): Promise; } src/PosixSemaphore.php 0000644 00000015452 14135605015 0011021 0 ustar 00 init($maxLocks, $permissions); return $semaphore; } /** * @param string $id The unique name of the semaphore to use. * * @return \Amp\Sync\PosixSemaphore */ public static function use(string $id): self { $semaphore = new self($id); $semaphore->open(); return $semaphore; } /** * @param string $id * * @throws \Error If the sysvmsg extension is not loaded. */ private function __construct(string $id) { if (!\extension_loaded("sysvmsg")) { throw new \Error(__CLASS__ . " requires the sysvmsg extension."); } $this->id = $id; $this->key = self::makeKey($this->id); } /** * Private method to prevent cloning. */ private function __clone() { } /** * Prevent serialization. */ public function __sleep() { throw new \Error('A semaphore cannot be serialized!'); } public function getId(): string { return $this->id; } private function open(): void { if (!\msg_queue_exists($this->key)) { throw new SyncException('No semaphore with that ID found'); } $this->queue = \msg_get_queue($this->key); if (!$this->queue) { throw new SyncException('Failed to open the semaphore.'); } } /** * @param int $maxLocks The maximum number of locks that can be acquired from the semaphore. * @param int $permissions Permissions to access the semaphore. * * @throws SyncException If the semaphore could not be created due to an internal error. */ private function init(int $maxLocks, int $permissions): void { if (\msg_queue_exists($this->key)) { throw new SyncException('A semaphore with that ID already exists'); } $this->queue = \msg_get_queue($this->key, $permissions); if (!$this->queue) { throw new SyncException('Failed to create the semaphore.'); } $this->initializer = \getmypid(); // Fill the semaphore with locks. while (--$maxLocks >= 0) { $this->release($maxLocks); } } /** * Gets the access permissions of the semaphore. * * @return int A permissions mode. */ public function getPermissions(): int { $stat = \msg_stat_queue($this->queue); return $stat['msg_perm.mode']; } /** * Sets the access permissions of the semaphore. * * The current user must have access to the semaphore in order to change the permissions. * * @param int $mode A permissions mode to set. * * @throws SyncException If the operation failed. */ public function setPermissions(int $mode) { if (!\msg_set_queue($this->queue, ['msg_perm.mode' => $mode])) { throw new SyncException('Failed to change the semaphore permissions.'); } } public function acquire(): Promise { return new Coroutine($this->doAcquire()); } /** * {@inheritdoc} */ private function doAcquire(): \Generator { do { // Attempt to acquire a lock from the semaphore. if (@\msg_receive($this->queue, 0, $type, 1, $id, false, \MSG_IPC_NOWAIT, $errno)) { // A free lock was found, so resolve with a lock object that can // be used to release the lock. return new Lock(\unpack("C", $id)[1], function (Lock $lock) { $this->release($lock->getId()); }); } // Check for unusual errors. if ($errno !== \MSG_ENOMSG) { throw new SyncException(\sprintf('Failed to acquire a lock; errno: %d', $errno)); } } while (yield new Delayed(self::LATENCY_TIMEOUT, true)); } /** * Removes the semaphore if it still exists. * * @throws SyncException If the operation failed. */ public function __destruct() { if ($this->initializer === 0 || $this->initializer !== \getmypid()) { return; } if (\PHP_VERSION_ID < 80000 && (!\is_resource($this->queue) || !\msg_queue_exists($this->key))) { return; } if (\PHP_VERSION_ID >= 80000 && (!$this->queue instanceof \SysvMessageQueue || !\msg_queue_exists($this->key))) { return; } \msg_remove_queue($this->queue); } /** * Releases a lock from the semaphore. * * @param int $id Lock identifier. * * @throws SyncException If the operation failed. */ protected function release(int $id) { if (!$this->queue) { return; // Queue already destroyed. } // Call send in non-blocking mode. If the call fails because the queue // is full, then the number of locks configured is too large. if (!@\msg_send($this->queue, 1, \pack("C", $id), false, false, $errno)) { if ($errno === \MSG_EAGAIN) { throw new SyncException('The semaphore size is larger than the system allows.'); } throw new SyncException('Failed to release the lock.'); } } private static function makeKey(string $id): int { return \abs(\unpack("l", \md5($id, true))[1]); } } src/PrefixedKeyedMutex.php 0000644 00000000716 14135605015 0011623 0 ustar 00 mutex = $mutex; $this->prefix = $prefix; } public function acquire(string $key): Promise { return $this->mutex->acquire($this->prefix . $key); } } src/PrefixedKeyedSemaphore.php 0000644 00000000762 14135605015 0012445 0 ustar 00 semaphore = $semaphore; $this->prefix = $prefix; } public function acquire(string $key): Promise { return $this->semaphore->acquire($this->prefix . $key); } } src/Semaphore.php 0000644 00000001350 14135605015 0007766 0 ustar 00 Resolves with an integer keyed lock object. Identifiers returned by the * locks should be 0-indexed. Releasing an identifier MUST make that same identifier available. May fail with * a SyncException if an error occurs when attempting to obtain the lock (e.g. a shared memory segment closed). */ public function acquire(): Promise; } src/SemaphoreMutex.php 0000644 00000001424 14135605015 0011013 0 ustar 00 semaphore = $semaphore; } /** {@inheritdoc} */ public function acquire(): Promise { return call(function (): \Generator { /** @var \Amp\Sync\Lock $lock */ $lock = yield $this->semaphore->acquire(); if ($lock->getId() !== 0) { $lock->release(); throw new \Error("Cannot use a semaphore with more than a single lock"); } return $lock; }); } } src/StaticKeyMutex.php 0000644 00000000644 14135605015 0010773 0 ustar 00 mutex = $mutex; $this->key = $key; } public function acquire(): Promise { return $this->mutex->acquire($this->key); } } src/SyncException.php 0000644 00000000107 14135605015 0010635 0 ustar 00 mutex = new Internal\MutexStorage; } /** * {@inheritdoc} */ public function acquire(): Promise { return $this->mutex->acquire(); } } src/ThreadedSemaphore.php 0000644 00000002255 14135605015 0011434 0 ustar 00 semaphore = new Internal\SemaphoreStorage($locks); } /** * {@inheritdoc} */ public function acquire(): Promise { return $this->semaphore->acquire(); } } src/functions.php 0000644 00000001572 14135605015 0010061 0 ustar 00 acquire(); try { return yield call($callback, ...$args); } finally { $lock->release(); } }); } docs/.shared 0000777 00000000000 14135605015 0006745 5 ustar 00