.gitmodules 0000644 00000000140 13755316600 0006722 0 ustar 00 [submodule "docs/.shared"]
path = docs/.shared
url = https://github.com/amphp/amphp.github.io
.php_cs.dist 0000644 00000002563 13755316600 0006777 0 ustar 00 setRiskyAllowed(true)
->setRules([
"@PSR1" => true,
"@PSR2" => true,
"braces" => [
"allow_single_line_closure" => true,
"position_after_functions_and_oop_constructs" => "same",
],
"array_syntax" => ["syntax" => "short"],
"cast_spaces" => true,
"combine_consecutive_unsets" => true,
"function_to_constant" => true,
"no_multiline_whitespace_before_semicolons" => true,
"no_unused_imports" => true,
"no_useless_else" => true,
"no_useless_return" => true,
"no_whitespace_before_comma_in_array" => true,
"no_whitespace_in_blank_line" => true,
"non_printable_character" => true,
"normalize_index_brace" => true,
"ordered_imports" => true,
"php_unit_construct" => true,
"php_unit_dedicate_assert" => true,
"php_unit_fqcn_annotation" => true,
"phpdoc_summary" => true,
"phpdoc_types" => true,
"psr4" => true,
"return_type_declaration" => ["space_before" => "none"],
"short_scalar_cast" => true,
"single_blank_line_before_namespace" => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__ . "/examples")
->in(__DIR__ . "/lib")
->in(__DIR__ . "/test")
);
.valgrindrc 0000644 00000000116 13755316600 0006704 0 ustar 00 --error-limit=no
--trace-children=yes
--track-fds=yes
--undef-value-errors=no
LICENSE 0000644 00000002065 13755316600 0005562 0 ustar 00 The MIT License (MIT)
Copyright (c) 2015-2017 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Makefile 0000644 00000001517 13755316600 0006216 0 ustar 00 PHP_BIN := php
COMPOSER_BIN := composer
COVERAGE = coverage
SRCS = lib test
find_php_files = $(shell find $(1) -type f -name "*.php")
src = $(foreach d,$(SRCS),$(call find_php_files,$(d)))
.PHONY: test
test: setup phpunit code-style
.PHONY: clean
clean: clean-coverage clean-vendor
.PHONY: clean-coverage
clean-coverage:
test ! -e coverage || rm -r coverage
.PHONY: clean-vendor
clean-vendor:
test ! -e vendor || rm -r vendor
.PHONY: setup
setup: vendor/autoload.php
.PHONY: deps-update
deps-update:
$(COMPOSER_BIN) update
.PHONY: phpunit
phpunit: setup
$(PHP_BIN) vendor/bin/phpunit
.PHONY: code-style
code-style: setup
PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix
composer.lock: composer.json
$(COMPOSER_BIN) install
touch $@
vendor/autoload.php: composer.lock
$(COMPOSER_BIN) install
touch $@
README.md 0000644 00000006142 13755316600 0006034 0 ustar 00
True parallel processing using multiple processes or native threads for concurrent PHP code execution, without blocking, no extensions required.
`amphp/parallel` is a component for [Amp](https://amphp.org) that provides native threading, multiprocessing, process synchronization, shared memory, and task workers for concurrently executing PHP code. Like other Amp components, this library uses [Coroutines](http://amphp.org/amp/coroutines/) built from [Promises](http://amphp.org/amp/promises/) and [Generators](http://www.php.net/manual/en/language.generators.overview.php) to make writing asynchronous code more like writing synchronous code.
To be as flexible as possible, this library comes with a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker threads or processes.
## Installation
This package can be installed as a [Composer](https://getcomposer.org/) dependency.
```bash
composer require amphp/parallel
```
## Requirements
- PHP 7.0+ (no extensions required)
## Documentation
Documentation can be found on [amphp.org/parallel](https://amphp.org/parallel/) as well as in the [`./docs`](./docs) directory.
## Versioning
`amphp/parallel` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages.
## Security
If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker.
## License
The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
## Development and Contributing
Want to hack on the source? A [Vagrant](http://vagrantup.com) box is provided with the repository to give a common development environment for running concurrent threads and processes, and comes with a bunch of handy tools and scripts for testing and experimentation.
Starting up and logging into the virtual machine is as simple as
vagrant up && vagrant ssh
Once inside the VM, you can install PHP extensions with [Pickle](https://github.com/FriendsOfPHP/pickle), switch versions with `newphp VERSION`, and test for memory leaks with [Valgrind](http://valgrind.org).
Vagrantfile 0000644 00000000635 13755316600 0006743 0 ustar 00 Vagrant.configure(2) do |config|
config.vm.box = "rasmus/php7dev"
config.vm.provision "shell", inline: <<-SHELL
newphp 7 zts
# Install pthreads from master
git clone https://github.com/krakjoe/pthreads
cd pthreads
git checkout master
phpize
./configure
make
sudo make install
echo 'extension=pthreads.so' >> `php -i | grep php-cli.ini | awk '{print $5}'`
SHELL
end
appveyor.yml 0000644 00000002455 13755316600 0007150 0 ustar 00 build: false
shallow_clone: false
platform:
- x86
- x64
clone_folder: c:\projects\amphp
cache:
- c:\tools\php72 -> appveyor.yml
init:
- SET PATH=C:\Program Files\OpenSSL;c:\tools\php72;%PATH%
- SET COMPOSER_NO_INTERACTION=1
- SET PHP=1
- SET ANSICON=121x90 (121x90)
install:
- IF EXIST c:\tools\php72 (SET PHP=0)
- IF %PHP%==1 sc config wuauserv start= auto
- IF %PHP%==1 net start wuauserv
- IF %PHP%==1 cinst -y OpenSSL.Light
- IF %PHP%==1 cinst -y php
- cd c:\tools\php72
- IF %PHP%==1 copy php.ini-production php.ini /Y
- IF %PHP%==1 echo date.timezone="UTC" >> php.ini
- IF %PHP%==1 echo extension_dir=ext >> php.ini
- IF %PHP%==1 echo extension=php_openssl.dll >> php.ini
- IF %PHP%==1 echo extension=php_mbstring.dll >> php.ini
- IF %PHP%==1 echo extension=php_fileinfo.dll >> php.ini
- cd c:\projects\amphp
- appveyor DownloadFile https://getcomposer.org/composer.phar
- php composer.phar install --prefer-dist --no-progress
test_script:
- cd c:\projects\amphp
- phpdbg -qrr vendor/phpunit/phpunit/phpunit --colors=always --coverage-text --coverage-clover build/logs/clover.xml
# Disable for now, because it can't be combined and files can't be shown on coveralls.io
# https://github.com/php-coveralls/php-coveralls/issues/234
# - vendor/bin/coveralls -v
composer.json 0000755 00000002503 13755316600 0007277 0 ustar 00 {
"name": "amphp/parallel",
"description": "Parallel processing component for Amp.",
"keywords": [
"asynchronous",
"async",
"concurrent",
"multi-threading",
"multi-processing"
],
"homepage": "https://github.com/amphp/parallel",
"license": "MIT",
"authors": [
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Stephen Coakley",
"email": "me@stephencoakley.com"
}
],
"require": {
"amphp/amp": "^2",
"amphp/byte-stream": "^1.2",
"amphp/parser": "^1",
"amphp/process": "^0.2 || ^0.3",
"amphp/sync": "^1.0.1"
},
"require-dev": {
"phpunit/phpunit": "^6",
"amphp/phpunit-util": "^1",
"friendsofphp/php-cs-fixer": "^2.3"
},
"suggest": {
"ext-pthreads": "Required for thread contexts"
},
"autoload": {
"psr-4": {
"Amp\\Parallel\\": "lib"
},
"files": [
"lib/Worker/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\Parallel\\Example\\": "examples",
"Amp\\Parallel\\Test\\": "test"
}
},
"config": {
"platform": {
"php": "7.0.0"
}
}
}
docs/Gemfile 0000644 00000000171 13755316600 0006774 0 ustar 00 source "https://rubygems.org"
gem "github-pages"
gem "kramdown"
gem "jekyll-github-metadata"
gem "jekyll-relative-links"
docs/Gemfile.lock 0000644 00000014570 13755316600 0007733 0 ustar 00 GEM
remote: https://rubygems.org/
specs:
activesupport (4.2.9)
i18n (~> 0.7)
minitest (~> 5.1)
thread_safe (~> 0.3, >= 0.3.4)
tzinfo (~> 1.1)
addressable (2.5.2)
public_suffix (>= 2.0.2, < 4.0)
coffee-script (2.4.1)
coffee-script-source
execjs
coffee-script-source (1.11.1)
colorator (1.1.0)
commonmarker (0.17.6)
ruby-enum (~> 0.5)
concurrent-ruby (1.0.5)
ethon (0.11.0)
ffi (>= 1.3.0)
execjs (2.7.0)
faraday (0.13.1)
multipart-post (>= 1.2, < 3)
ffi (1.9.18)
forwardable-extended (2.6.0)
gemoji (3.0.0)
github-pages (172)
activesupport (= 4.2.9)
github-pages-health-check (= 1.3.5)
jekyll (= 3.6.2)
jekyll-avatar (= 0.5.0)
jekyll-coffeescript (= 1.0.2)
jekyll-commonmark-ghpages (= 0.1.3)
jekyll-default-layout (= 0.1.4)
jekyll-feed (= 0.9.2)
jekyll-gist (= 1.4.1)
jekyll-github-metadata (= 2.9.3)
jekyll-mentions (= 1.2.0)
jekyll-optional-front-matter (= 0.3.0)
jekyll-paginate (= 1.1.0)
jekyll-readme-index (= 0.2.0)
jekyll-redirect-from (= 0.12.1)
jekyll-relative-links (= 0.5.2)
jekyll-remote-theme (= 0.2.3)
jekyll-sass-converter (= 1.5.0)
jekyll-seo-tag (= 2.3.0)
jekyll-sitemap (= 1.1.1)
jekyll-swiss (= 0.4.0)
jekyll-theme-architect (= 0.1.0)
jekyll-theme-cayman (= 0.1.0)
jekyll-theme-dinky (= 0.1.0)
jekyll-theme-hacker (= 0.1.0)
jekyll-theme-leap-day (= 0.1.0)
jekyll-theme-merlot (= 0.1.0)
jekyll-theme-midnight (= 0.1.0)
jekyll-theme-minimal (= 0.1.0)
jekyll-theme-modernist (= 0.1.0)
jekyll-theme-primer (= 0.5.2)
jekyll-theme-slate (= 0.1.0)
jekyll-theme-tactile (= 0.1.0)
jekyll-theme-time-machine (= 0.1.0)
jekyll-titles-from-headings (= 0.5.0)
jemoji (= 0.8.1)
kramdown (= 1.14.0)
liquid (= 4.0.0)
listen (= 3.0.6)
mercenary (~> 0.3)
minima (= 2.1.1)
rouge (= 2.2.1)
terminal-table (~> 1.4)
github-pages-health-check (1.3.5)
addressable (~> 2.3)
net-dns (~> 0.8)
octokit (~> 4.0)
public_suffix (~> 2.0)
typhoeus (~> 0.7)
html-pipeline (2.7.1)
activesupport (>= 2)
nokogiri (>= 1.4)
i18n (0.9.1)
concurrent-ruby (~> 1.0)
jekyll (3.6.2)
addressable (~> 2.4)
colorator (~> 1.0)
jekyll-sass-converter (~> 1.0)
jekyll-watch (~> 1.1)
kramdown (~> 1.14)
liquid (~> 4.0)
mercenary (~> 0.3.3)
pathutil (~> 0.9)
rouge (>= 1.7, < 3)
safe_yaml (~> 1.0)
jekyll-avatar (0.5.0)
jekyll (~> 3.0)
jekyll-coffeescript (1.0.2)
coffee-script (~> 2.2)
coffee-script-source (~> 1.11.1)
jekyll-commonmark (1.1.0)
commonmarker (~> 0.14)
jekyll (>= 3.0, < 4.0)
jekyll-commonmark-ghpages (0.1.3)
commonmarker (~> 0.17.6)
jekyll-commonmark (~> 1)
rouge (~> 2)
jekyll-default-layout (0.1.4)
jekyll (~> 3.0)
jekyll-feed (0.9.2)
jekyll (~> 3.3)
jekyll-gist (1.4.1)
octokit (~> 4.2)
jekyll-github-metadata (2.9.3)
jekyll (~> 3.1)
octokit (~> 4.0, != 4.4.0)
jekyll-mentions (1.2.0)
activesupport (~> 4.0)
html-pipeline (~> 2.3)
jekyll (~> 3.0)
jekyll-optional-front-matter (0.3.0)
jekyll (~> 3.0)
jekyll-paginate (1.1.0)
jekyll-readme-index (0.2.0)
jekyll (~> 3.0)
jekyll-redirect-from (0.12.1)
jekyll (~> 3.3)
jekyll-relative-links (0.5.2)
jekyll (~> 3.3)
jekyll-remote-theme (0.2.3)
jekyll (~> 3.5)
rubyzip (>= 1.2.1, < 3.0)
typhoeus (>= 0.7, < 2.0)
jekyll-sass-converter (1.5.0)
sass (~> 3.4)
jekyll-seo-tag (2.3.0)
jekyll (~> 3.3)
jekyll-sitemap (1.1.1)
jekyll (~> 3.3)
jekyll-swiss (0.4.0)
jekyll-theme-architect (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-cayman (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-dinky (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-hacker (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-leap-day (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-merlot (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-midnight (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-minimal (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-modernist (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-primer (0.5.2)
jekyll (~> 3.5)
jekyll-github-metadata (~> 2.9)
jekyll-seo-tag (~> 2.2)
jekyll-theme-slate (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-tactile (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-time-machine (0.1.0)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-titles-from-headings (0.5.0)
jekyll (~> 3.3)
jekyll-watch (1.5.0)
listen (~> 3.0, < 3.1)
jemoji (0.8.1)
activesupport (~> 4.0, >= 4.2.9)
gemoji (~> 3.0)
html-pipeline (~> 2.2)
jekyll (>= 3.0)
kramdown (1.14.0)
liquid (4.0.0)
listen (3.0.6)
rb-fsevent (>= 0.9.3)
rb-inotify (>= 0.9.7)
mercenary (0.3.6)
mini_portile2 (2.3.0)
minima (2.1.1)
jekyll (~> 3.3)
minitest (5.10.3)
multipart-post (2.0.0)
net-dns (0.8.0)
nokogiri (1.8.1)
mini_portile2 (~> 2.3.0)
octokit (4.7.0)
sawyer (~> 0.8.0, >= 0.5.3)
pathutil (0.16.0)
forwardable-extended (~> 2.6)
public_suffix (2.0.5)
rb-fsevent (0.10.2)
rb-inotify (0.9.10)
ffi (>= 0.5.0, < 2)
rouge (2.2.1)
ruby-enum (0.7.1)
i18n
rubyzip (1.2.1)
safe_yaml (1.0.4)
sass (3.5.3)
sass-listen (~> 4.0.0)
sass-listen (4.0.0)
rb-fsevent (~> 0.9, >= 0.9.4)
rb-inotify (~> 0.9, >= 0.9.7)
sawyer (0.8.1)
addressable (>= 2.3.5, < 2.6)
faraday (~> 0.8, < 1.0)
terminal-table (1.8.0)
unicode-display_width (~> 1.1, >= 1.1.1)
thread_safe (0.3.6)
typhoeus (0.8.0)
ethon (>= 0.8.0)
tzinfo (1.2.4)
thread_safe (~> 0.1)
unicode-display_width (1.3.0)
PLATFORMS
ruby
DEPENDENCIES
github-pages
jekyll-github-metadata
jekyll-relative-links
kramdown
BUNDLED WITH
1.15.1
docs/_config.yml 0000644 00000000705 13755316600 0007633 0 ustar 00 kramdown:
input: GFM
toc_levels: 2..3
baseurl: "/parallel"
layouts_dir: ".shared/layout"
includes_dir: ".shared/includes"
exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"]
safe: true
repository: amphp/parallel
gems:
- "jekyll-github-metadata"
- "jekyll-relative-links"
defaults:
- scope:
path: ""
type: "pages"
values:
layout: "docs"
shared_asset_path: "/parallel/asset"
navigation:
- context
- sync
- worker
docs/index.md 0000644 00000001370 13755316600 0007134 0 ustar 00 ---
title: Parallel
permalink: /
---
**True parallel processing using multiple processes or native threads for concurrent PHP code execution, *without* blocking, no extensions required.**
## Installation
This package can be installed as a [Composer](https://getcomposer.org/) dependency.
```bash
composer require amphp/parallel
```
## Usage
This package provides native threading, multiprocessing, process synchronization, shared memory, and task workers for concurrently executing PHP code. To be as flexible as possible, this package includes a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker threads or processes.
lib/Context/Context.php 0000644 00000001257 13755316600 0011106 0 ustar 00 Resolves with the returned from the context.
*
* @throws \Amp\Parallel\Context\ContextException If the context dies unexpectedly.
* @throws \Amp\Parallel\Sync\PanicError If the context throws an uncaught exception.
*/
public function join(): Promise;
}
lib/Context/ContextException.php 0000644 00000000126 13755316600 0012757 0 ustar 00 function = $function;
$this->args = $args;
$this->socket = $socket;
}
/**
* Runs the thread code and the initialized function.
*
* @codeCoverageIgnore Only executed in thread.
*/
public function run() {
/* First thing we need to do is re-initialize the class autoloader. If
* we don't do this first, any object of a class that was loaded after
* the thread started will just be garbage data and unserializable
* values (like resources) will be lost. This happens even with
* thread-safe objects.
*/
// Protect scope by using an unbound closure (protects static access as well).
(static function () {
$paths = [
\dirname(__DIR__, 3) . \DIRECTORY_SEPARATOR . "vendor" . \DIRECTORY_SEPARATOR . "autoload.php",
\dirname(__DIR__, 5) . \DIRECTORY_SEPARATOR . "autoload.php",
];
foreach ($paths as $path) {
if (\file_exists($path)) {
$autoloadPath = $path;
break;
}
}
if (!isset($autoloadPath)) {
throw new \Error("Could not locate autoload.php");
}
require $autoloadPath;
})->bindTo(null, null)();
// At this point, the thread environment has been prepared so begin using the thread.
if ($this->killed) {
return; // Thread killed while requiring autoloader, simply exit.
}
Loop::run(function () {
$watcher = Loop::repeat(self::KILL_CHECK_FREQUENCY, function () {
if ($this->killed) {
Loop::stop();
}
});
Loop::unreference($watcher);
try {
$channel = new ChannelledSocket($this->socket, $this->socket);
yield from $this->execute($channel);
} catch (\Throwable $exception) {
return; // Parent context exited or destroyed thread, no need to continue.
} finally {
Loop::cancel($watcher);
}
});
}
/**
* Sets a local variable to true so the running event loop can check for a kill signal.
*/
public function kill() {
return $this->killed = true;
}
/**
* @param \Amp\Parallel\Sync\Channel $channel
*
* @return \Generator
*
* @codeCoverageIgnore Only executed in thread.
*/
private function execute(Channel $channel): \Generator {
try {
$result = new ExitSuccess(yield call($this->function, $channel, ...$this->args));
} catch (\Throwable $exception) {
$result = new ExitFailure($exception);
}
if ($this->killed) {
return; // Parent is not listening for a result.
}
// Attempt to return the result.
try {
try {
yield $channel->send($result);
} catch (SerializationException $exception) {
// Serializing the result failed. Send the reason why.
yield $channel->send(new ExitFailure($exception));
}
} catch (ChannelException $exception) {
// The result was not sendable! The parent context must have died or killed the context.
}
}
}
lib/Context/Internal/process-runner.php 0000644 00000004536 13755316600 0014226 0 ustar 00 send($result);
} catch (Sync\SerializationException $exception) {
// Serializing the result failed. Send the reason why.
yield $channel->send(new Sync\ExitFailure($exception));
}
} catch (\Throwable $exception) {
exit(1); // Parent context died, simply exit.
}
});
lib/Context/Process.php 0000644 00000017311 13755316600 0011076 0 ustar 00 start();
return $process;
}
/**
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string|null $cwd Working directory.
* @param mixed[] $env Array of environment variables.
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $cwd = null, array $env = [], string $binary = null) {
$options = [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
];
if (\is_array($script)) {
$script = \implode(" ", \array_map("escapeshellarg", $script));
} else {
$script = \escapeshellarg($script);
}
if ($binary === null) {
if (\PHP_SAPI === "cli") {
$binary = \PHP_BINARY;
} else {
$binary = self::$binaryPath ?? self::locateBinary();
}
} elseif (!\is_executable($binary)) {
throw new \Error(\sprintf("The PHP binary path '%s' was not found or is not executable", $binary));
}
// Write process runner to external file if inside a PHAR,
// because PHP can't open files inside a PHAR directly except for the stub.
if (\strpos(self::SCRIPT_PATH, "phar://") === 0) {
if (self::$pharScriptPath) {
$scriptPath = self::$pharScriptPath;
} else {
$contents = \file_get_contents(self::SCRIPT_PATH);
$contents = \str_replace("__DIR__", \var_export(\dirname(self::SCRIPT_PATH), true), $contents);
self::$pharScriptPath = $scriptPath = \tempnam(\sys_get_temp_dir(), "amp-process-runner-");
\file_put_contents($scriptPath, $contents);
\register_shutdown_function(static function () {
@\unlink(self::$pharScriptPath);
});
}
} else {
$scriptPath = self::SCRIPT_PATH;
}
$command = \implode(" ", [
\escapeshellarg($binary),
$this->formatOptions($options),
\escapeshellarg($scriptPath),
$script,
]);
$this->process = new BaseProcess($command, $cwd, $env);
}
private static function locateBinary(): string {
$executable = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "php.exe" : "php";
$paths = \array_filter(\explode(\PATH_SEPARATOR, \getenv("PATH")));
$paths[] = \PHP_BINDIR;
$paths = \array_unique($paths);
foreach ($paths as $path) {
$path .= \DIRECTORY_SEPARATOR . $executable;
if (\is_executable($path)) {
return self::$binaryPath = $path;
}
}
throw new \Error("Could not locate PHP executable binary");
}
private function formatOptions(array $options) {
$result = [];
foreach ($options as $option => $value) {
$result[] = \sprintf("-d%s=%s", $option, $value);
}
return \implode(" ", $result);
}
/**
* Private method to prevent cloning.
*/
private function __clone() {
}
/**
* {@inheritdoc}
*/
public function start() {
$this->process->start();
$this->channel = new ChannelledStream($this->process->getStdout(), $this->process->getStdin());
/** @var ByteStream\ResourceInputStream $childStderr */
$childStderr = $this->process->getStderr();
$childStderr->unreference();
asyncCall(static function () use ($childStderr) {
$stderr = new ByteStream\ResourceOutputStream(\STDERR);
yield ByteStream\pipe($childStderr, $stderr);
});
}
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
return $this->process->isRunning();
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
return call(function () {
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
return $data;
});
}
/**
* {@inheritdoc}
*/
public function send($data): Promise {
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
if ($data instanceof ExitResult) {
throw new \Error("Cannot send exit result objects");
}
return $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function join(): Promise {
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
return call(function () {
try {
$data = yield $this->channel->receive();
if (!$data instanceof ExitResult) {
throw new SynchronizationError("Did not receive an exit result from process");
}
} catch (\Throwable $exception) {
$this->kill();
throw $exception;
}
$code = yield $this->process->join();
if ($code !== 0) {
throw new ContextException(\sprintf("Process exited with code %d", $code));
}
return $data->getResult();
});
}
/**
* {@inheritdoc}
*/
public function kill() {
$this->process->kill();
}
}
lib/Context/StatusError.php 0000644 00000000115 13755316600 0011747 0 ustar 00 start();
return $thread;
}
/**
* Creates a new thread.
*
* @param callable $function The callable to invoke in the thread. First argument is an instance of
* \Amp\Parallel\Sync\Channel.
* @param mixed ...$args Additional arguments to pass to the given callable.
*
* @throws \Error Thrown if the pthreads extension is not available.
*/
public function __construct(callable $function, ...$args) {
if (!self::supported()) {
throw new \Error("The pthreads extension is required to create threads.");
}
$this->function = $function;
$this->args = $args;
}
/**
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
* first thread.
*/
public function __clone() {
$this->thread = null;
$this->socket = null;
$this->channel = null;
$this->oid = 0;
}
/**
* Kills the thread if it is still running.
*
* @throws \Amp\Parallel\Context\ContextException
*/
public function __destruct() {
if (\getmypid() === $this->oid) {
$this->kill();
}
}
/**
* Checks if the context is running.
*
* @return bool True if the context is running, otherwise false.
*/
public function isRunning(): bool {
return $this->channel !== null;
}
/**
* Spawns the thread and begins the thread's execution.
*
* @throws \Amp\Parallel\Context\StatusError If the thread has already been started.
* @throws \Amp\Parallel\Context\ContextException If starting the thread was unsuccessful.
*/
public function start() {
if ($this->oid !== 0) {
throw new StatusError('The thread has already been started.');
}
$this->oid = \getmypid();
$sockets = @\stream_socket_pair(
\stripos(\PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
);
if ($sockets === false) {
$message = "Failed to create socket pair";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new ContextException($message);
}
list($channel, $this->socket) = $sockets;
$thread = $this->thread = new Internal\Thread($this->socket, $this->function, $this->args);
if (!$this->thread->start(\PTHREADS_INHERIT_INI)) {
throw new ContextException('Failed to start the thread.');
}
$channel = $this->channel = new ChannelledSocket($channel, $channel);
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use ($thread, $channel) {
if (!$thread->isRunning()) {
// Delay closing to avoid race condition between thread exiting and data becoming available.
Loop::delay(self::EXIT_CHECK_FREQUENCY, [$channel, "close"]);
Loop::cancel($watcher);
}
});
Loop::disable($this->watcher);
}
/**
* Immediately kills the context.
*
* @throws ContextException If killing the thread was unsuccessful.
*/
public function kill() {
if ($this->thread !== null) {
try {
if ($this->thread->isRunning() && !$this->thread->kill()) {
throw new ContextException('Could not kill thread.');
}
} finally {
$this->close();
}
}
}
/**
* Closes channel and socket if still open.
*/
private function close() {
if ($this->channel !== null) {
$this->channel->close();
}
$this->channel = null;
Loop::cancel($this->watcher);
}
/**
* Gets a promise that resolves when the context ends and joins with the
* parent context.
*
* @return \Amp\Promise
*
* @throws StatusError Thrown if the context has not been started.
* @throws SynchronizationError Thrown if an exit status object is not received.
* @throws ContextException If the context stops responding.
*/
public function join(): Promise {
if ($this->channel == null || $this->thread === null) {
throw new StatusError('The thread has not been started or has already finished.');
}
return call(function () {
Loop::enable($this->watcher);
try {
$response = yield $this->channel->receive();
if (!$response instanceof ExitResult) {
throw new SynchronizationError('Did not receive an exit result from thread.');
}
} catch (ChannelException $exception) {
$this->kill();
throw new ContextException(
"The context stopped responding, potentially due to a fatal error or calling exit",
0,
$exception
);
} catch (\Throwable $exception) {
$this->kill();
throw $exception;
} finally {
Loop::disable($this->watcher);
$this->close();
}
return $response->getResult();
});
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
if ($this->channel === null) {
throw new StatusError('The process has not been started.');
}
return call(function () {
Loop::enable($this->watcher);
try {
$data = yield $this->channel->receive();
} finally {
Loop::disable($this->watcher);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Thread process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
return $data;
});
}
/**
* {@inheritdoc}
*/
public function send($data): Promise {
if ($this->channel === null) {
throw new StatusError('The thread has not been started or has already finished.');
}
if ($data instanceof ExitResult) {
throw new \Error('Cannot send exit result objects.');
}
return call(function () use ($data) {
Loop::enable($this->watcher);
try {
$result = yield $this->channel->send($data);
} finally {
Loop::disable($this->watcher);
}
return $result;
});
}
}
lib/Sync/Channel.php 0000644 00000002420 13755316600 0010313 0 ustar 00
*
* @throws \Amp\Parallel\Context\StatusError Thrown if the context has not been started.
* @throws \Amp\Parallel\Sync\SynchronizationError If the context has not been started or the context
* unexpectedly ends.
* @throws \Amp\Parallel\Sync\ChannelException If receiving from the channel fails.
* @throws \Amp\Parallel\Sync\SerializationException If unserializing the data fails.
*/
public function receive(): Promise;
/**
* @param mixed $data
*
* @return \Amp\Promise Resolves with the number of bytes sent on the channel.
*
* @throws \Amp\Parallel\Context\StatusError Thrown if the context has not been started.
* @throws \Amp\Parallel\Sync\SynchronizationError If the context has not been started or the context
* unexpectedly ends.
* @throws \Amp\Parallel\Sync\ChannelException If sending on the channel fails.
* @throws \Error If an ExitResult object is given.
* @throws \Amp\Parallel\Sync\SerializationException If serializing the data fails.
*/
public function send($data): Promise;
}
lib/Sync/ChannelException.php 0000644 00000000336 13755316600 0012176 0 ustar 00 read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
);
}
/**
* Closes the read and write resource streams.
*/
public function close() {
$this->read->close();
$this->write->close();
}
}
lib/Sync/ChannelledStream.php 0000644 00000004237 13755316600 0012164 0 ustar 00 read = $read;
$this->write = $write;
$this->received = new \SplQueue;
$this->parser = new ChannelParser([$this->received, 'push']);
}
/**
* {@inheritdoc}
*/
public function send($data): Promise {
return call(function () use ($data) {
try {
return yield $this->write->write($this->parser->encode($data));
} catch (StreamException $exception) {
throw new ChannelException("Sending on the channel failed. Did the context die?", $exception);
}
});
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
return call(function () {
while ($this->received->isEmpty()) {
try {
$chunk = yield $this->read->read();
} catch (StreamException $exception) {
throw new ChannelException("Reading from the channel failed. Did the context die?", $exception);
}
if ($chunk === null) {
throw new ChannelException("The channel closed unexpectedly. Did the context die?");
}
$this->parser->push($chunk);
}
return $this->received->shift();
});
}
}
lib/Sync/ExitFailure.php 0000644 00000001603 13755316600 0011166 0 ustar 00 type = \get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
}
/**
* {@inheritdoc}
*/
public function getResult() {
throw new PanicError(
$this->type,
\sprintf(
'Uncaught %s in execution context with message "%s" and code "%s"',
$this->type,
$this->message,
$this->code
),
$this->trace
);
}
}
lib/Sync/ExitResult.php 0000644 00000000447 13755316600 0011062 0 ustar 00 result = $result;
}
/**
* {@inheritdoc}
*/
public function getResult() {
return $this->result;
}
}
lib/Sync/Internal/ParcelStorage.php 0000644 00000000725 13755316600 0013260 0 ustar 00 value = $value;
}
/**
* @return mixed
*/
public function get() {
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value) {
$this->value = $value;
}
}
lib/Sync/PanicError.php 0000644 00000001743 13755316600 0011016 0 ustar 00 name = $name;
$this->trace = $trace;
}
/**
* Returns the class name of the uncaught exception.
*
* @return string
*/
public function getName(): string {
return $this->name;
}
/**
* Gets the stack trace at the point the panic occurred.
*
* @return string
*/
public function getPanicTrace(): string {
return $this->trace;
}
}
lib/Sync/Parcel.php 0000644 00000002700 13755316600 0010152 0 ustar 00 Resolves with the return value of $callback or fails if $callback
* throws an exception.
*/
public function synchronized(callable $callback): Promise;
/**
* @return \Amp\Promise A promise for the value inside the parcel.
*/
public function unwrap(): Promise;
}
lib/Sync/SerializationException.php 0000644 00000000137 13755316600 0013442 0 ustar 00 init($value, $size, $permissions);
return $parcel;
}
/**
* @param string $id
*
* @return \Amp\Parallel\Sync\SharedMemoryParcel
*/
public static function use(string $id): self {
$parcel = new self($id);
$parcel->open();
return $parcel;
}
/**
* Creates a new local object container.
*
* The object given will be assigned a new object ID and will have a
* reference to it stored in memory local to the thread.
*
* @param mixed $value The value to store in the container.
* @param int $size The number of bytes to allocate for the object.
* If not specified defaults to 16384 bytes.
* @param int $permissions The access permissions to set for the object.
* If not specified defaults to 0600.
*/
private function __construct(string $id) {
if (!\extension_loaded("shmop")) {
throw new \Error(__CLASS__ . " requires the shmop extension.");
}
$this->id = $id;
$this->key = self::makeKey($this->id);
}
/**
* @param mixed $value
* @param int $size
* @param int $permissions
*/
private function init($value, int $size = 8192, int $permissions = 0600) {
$this->semaphore = PosixSemaphore::create($this->id, 1);
$this->initializer = \getmypid();
$this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET);
$this->setHeader(self::STATE_ALLOCATED, 0, $permissions);
$this->wrap($value);
}
private function open() {
$this->semaphore = PosixSemaphore::use($this->id);
$this->memOpen($this->key, 'w', 0, 0);
}
/**
* Checks if the object has been freed.
*
* Note that this does not check if the object has been destroyed; it only
* checks if this handle has freed its reference to the object.
*
* @return bool True if the object is freed, otherwise false.
*/
private function isFreed(): bool {
// If we are no longer connected to the memory segment, check if it has
// been invalidated.
if ($this->handle !== null) {
$this->handleMovedMemory();
$header = $this->getHeader();
return $header['state'] === static::STATE_FREED;
}
return true;
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
if ($this->isFreed()) {
return new Failure(new SharedMemoryException('The object has already been freed.'));
}
$header = $this->getHeader();
// Make sure the header is in a valid state and format.
if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) {
new Failure(new SharedMemoryException('Shared object memory is corrupt.'));
}
// Read the actual value data from memory and unserialize it.
$data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']);
return new Success(\unserialize($data));
}
/**
* If the value requires more memory to store than currently allocated, a
* new shared memory segment will be allocated with a larger size to store
* the value in. The previous memory segment will be cleaned up and marked
* for deletion. Other processes and threads will be notified of the new
* memory segment on the next read attempt. Once all running processes and
* threads disconnect from the old segment, it will be freed by the OS.
*/
protected function wrap($value) {
if ($this->isFreed()) {
throw new SharedMemoryException('The object has already been freed.');
}
$serialized = \serialize($value);
$size = \strlen($serialized);
$header = $this->getHeader();
/* If we run out of space, we need to allocate a new shared memory
segment that is larger than the current one. To coordinate with other
processes, we will leave a message in the old segment that the segment
has moved and along with the new key. The old segment will be discarded
automatically after all other processes notice the change and close
the old handle.
*/
if (\shmop_size($this->handle) < $size + self::MEM_DATA_OFFSET) {
$this->key = $this->key < 0xffffffff ? $this->key + 1 : \mt_rand(0x10, 0xfffffffe);
$this->setHeader(self::STATE_MOVED, $this->key, 0);
$this->memDelete();
\shmop_close($this->handle);
$this->memOpen($this->key, 'n', $header['permissions'], $size * 2);
}
// Rewrite the header and the serialized value to memory.
$this->setHeader(self::STATE_ALLOCATED, $size, $header['permissions']);
$this->memSet(self::MEM_DATA_OFFSET, $serialized);
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback): Promise {
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->semaphore->acquire();
try {
$result = yield call($callback, yield $this->unwrap());
if ($result !== null) {
$this->wrap($result);
}
} finally {
$lock->release();
}
return $result;
});
}
/**
* Frees the shared object from memory.
*
* The memory containing the shared value will be invalidated. When all
* process disconnect from the object, the shared memory block will be
* destroyed by the OS.
*/
public function __destruct() {
if ($this->initializer === 0 || $this->initializer !== \getmypid()) {
return;
}
if ($this->isFreed()) {
return;
}
// Invalidate the memory block by setting its state to FREED.
$this->setHeader(static::STATE_FREED, 0, 0);
// Request the block to be deleted, then close our local handle.
$this->memDelete();
\shmop_close($this->handle);
$this->handle = null;
$this->semaphore = null;
}
/**
* Private method to prevent cloning.
*/
private function __clone() {
}
/**
* Private method to prevent serialization.
*/
private function __sleep() {
}
/**
* Updates the current memory segment handle, handling any moves made on the
* data.
*/
private function handleMovedMemory() {
// Read from the memory block and handle moved blocks until we find the
// correct block.
while (true) {
$header = $this->getHeader();
// If the state is STATE_MOVED, the memory is stale and has been moved
// to a new location. Move handle and try to read again.
if ($header['state'] !== self::STATE_MOVED) {
break;
}
\shmop_close($this->handle);
$this->key = $header['size'];
$this->memOpen($this->key, 'w', 0, 0);
}
}
/**
* Reads and returns the data header at the current memory segment.
*
* @return array An associative array of header data.
*/
private function getHeader(): array {
$data = $this->memGet(0, self::MEM_DATA_OFFSET);
return \unpack('Cstate/Lsize/Spermissions', $data);
}
/**
* Sets the header data for the current memory segment.
*
* @param int $state An object state.
* @param int $size The size of the stored data, or other value.
* @param int $permissions The permissions mask on the memory segment.
*/
private function setHeader(int $state, int $size, int $permissions) {
$header = \pack('CLS', $state, $size, $permissions);
$this->memSet(0, $header);
}
/**
* Opens a shared memory handle.
*
* @param int $key The shared memory key.
* @param string $mode The mode to open the shared memory in.
* @param int $permissions Process permissions on the shared memory.
* @param int $size The size to crate the shared memory in bytes.
*/
private function memOpen(int $key, string $mode, int $permissions, int $size) {
$this->handle = @\shmop_open($key, $mode, $permissions, $size);
if ($this->handle === false) {
throw new SharedMemoryException('Failed to create shared memory block.');
}
}
/**
* Reads binary data from shared memory.
*
* @param int $offset The offset to read from.
* @param int $size The number of bytes to read.
*
* @return string The binary data at the given offset.
*/
private function memGet(int $offset, int $size): string {
$data = \shmop_read($this->handle, $offset, $size);
if ($data === false) {
throw new SharedMemoryException('Failed to read from shared memory block.');
}
return $data;
}
/**
* Writes binary data to shared memory.
*
* @param int $offset The offset to write to.
* @param string $data The binary data to write.
*/
private function memSet(int $offset, string $data) {
if (!\shmop_write($this->handle, $data, $offset)) {
throw new SharedMemoryException('Failed to write to shared memory block.');
}
}
/**
* Requests the shared memory segment to be deleted.
*/
private function memDelete() {
if (!\shmop_delete($this->handle)) {
throw new SharedMemoryException('Failed to discard shared memory block.');
}
}
private static function makeKey(string $id): int {
return \abs(\unpack("l", \md5($id, true))[1]);
}
}
lib/Sync/SynchronizationError.php 0000644 00000000123 13755316600 0013154 0 ustar 00 mutex = new ThreadedMutex;
$this->storage = new Internal\ParcelStorage($value);
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
return new Success($this->storage->get());
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise {
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();
try {
$result = yield call($callback, $this->storage->get());
if ($result !== null) {
$this->storage->set($result);
}
} finally {
$lock->release();
}
return $result;
});
}
}
lib/Worker/AbstractWorker.php 0000644 00000007053 13755316600 0012244 0 ustar 00 isRunning()) {
throw new \Error("The context was already running");
}
$this->context = $context;
}
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
return $this->context->isRunning();
}
/**
* {@inheritdoc}
*/
public function isIdle(): bool {
return $this->pending === null;
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise {
if ($this->shutdown) {
throw new StatusError("The worker has been shut down");
}
if (!$this->context->isRunning()) {
$this->context->start();
}
$job = new Internal\Job($task);
$id = $job->getId();
$promise = $this->pending = call(function () use ($task, $job, $id) {
if ($this->pending) {
try {
yield $this->pending;
} catch (\Throwable $exception) {
// Ignore error from prior job.
}
}
if (!$this->context->isRunning()) {
throw new WorkerException("The worker was shutdown");
}
yield $this->context->send($job);
$result = yield $this->context->receive();
if (!$result instanceof TaskResult) {
$this->cancel(new WorkerException("Context did not return a task result"));
}
if ($result->getId() !== $id) {
$this->cancel(new WorkerException("Task results returned out of order"));
}
return $result->promise();
});
$promise->onResolve(function () use ($promise) {
if ($this->pending === $promise) {
$this->pending = null;
}
});
return $promise;
}
/**
* {@inheritdoc}
*/
public function shutdown(): Promise {
if ($this->shutdown) {
throw new StatusError("The worker is not running");
}
$this->shutdown = true;
if (!$this->context->isRunning()) {
return new Success(0);
}
return call(function () {
if ($this->pending) {
// If a task is currently running, wait for it to finish.
yield Promise\any([$this->pending]);
}
yield $this->context->send(0);
return yield $this->context->join();
});
}
/**
* {@inheritdoc}
*/
public function kill() {
$this->cancel();
}
/**
* Cancels all pending tasks and kills the context.
*
* @TODO Parameter kept for BC, remove in future version.
*
* @param \Throwable|null $exception Optional exception to be used as the previous exception.
*/
protected function cancel(\Throwable $exception = null) {
if ($this->context->isRunning()) {
$this->context->kill();
}
}
}
lib/Worker/BasicEnvironment.php 0000644 00000010557 13755316600 0012560 0 ustar 00 queue = new \SplPriorityQueue;
$this->timer = Loop::repeat(1000, function () {
$time = \time();
while (!$this->queue->isEmpty()) {
list($key, $expiration) = $this->queue->top();
if (!isset($this->data[$key])) {
// Item removed.
$this->queue->extract();
continue;
}
$struct = $this->data[$key];
if ($struct->expire === 0) {
// Item was set again without a TTL.
$this->queue->extract();
continue;
}
if ($struct->expire !== $expiration) {
// Expiration changed or TTL updated.
$this->queue->extract();
continue;
}
if ($time < $struct->expire) {
// Item at top has not expired, break out of loop.
break;
}
unset($this->data[$key]);
$this->queue->extract();
}
if ($this->queue->isEmpty()) {
Loop::disable($this->timer);
}
});
Loop::disable($this->timer);
Loop::unreference($this->timer);
}
/**
* @param string $key
*
* @return bool
*/
public function exists(string $key): bool {
return isset($this->data[$key]);
}
/**
* @param string $key
*
* @return mixed|null Returns null if the key does not exist.
*/
public function get(string $key) {
if (!isset($this->data[$key])) {
return null;
}
$struct = $this->data[$key];
if ($struct->ttl !== null) {
$expire = \time() + $struct->ttl;
if ($struct->expire < $expire) {
$struct->expire = $expire;
$this->queue->insert([$key, $struct->expire], -$struct->expire);
}
}
return $struct->data;
}
/**
* @param string $key
* @param mixed $value Using null for the value deletes the key.
* @param int $ttl Number of seconds until data is automatically deleted. Use null for unlimited TTL.
*
* @throws \Error If the time-to-live is not a positive integer.
*/
public function set(string $key, $value, int $ttl = null) {
if ($value === null) {
$this->delete($key);
return;
}
if ($ttl !== null && $ttl <= 0) {
throw new \Error("The time-to-live must be a positive integer or null");
}
$struct = new class {
use Struct;
public $data;
public $expire = 0;
public $ttl;
};
$struct->data = $value;
if ($ttl !== null) {
$struct->ttl = $ttl;
$struct->expire = \time() + $ttl;
$this->queue->insert([$key, $struct->expire], -$struct->expire);
Loop::enable($this->timer);
}
$this->data[$key] = $struct;
}
/**
* @param string $key
*/
public function delete(string $key) {
unset($this->data[$key]);
}
/**
* Alias of exists().
*
* @param $key
*
* @return bool
*/
public function offsetExists($key) {
return $this->exists($key);
}
/**
* Alias of get().
*
* @param string $key
*
* @return mixed
*/
public function offsetGet($key) {
return $this->get($key);
}
/**
* Alias of set() with $ttl = null.
*
* @param string $key
* @param mixed $value
*/
public function offsetSet($key, $value) {
$this->set($key, $value);
}
/**
* Alias of delete().
*
* @param string $key
*/
public function offsetUnset($key) {
$this->delete($key);
}
/**
* Removes all values.
*/
public function clear() {
$this->data = [];
Loop::disable($this->timer);
$this->queue = new \SplPriorityQueue;
}
}
lib/Worker/DefaultPool.php 0000644 00000015677 13755316600 0011540 0 ustar 00 maxSize = $maxSize;
// Use the global factory if none is given.
$this->factory = $factory ?: factory();
$this->workers = new \SplObjectStorage;
$this->idleWorkers = new \SplQueue;
$this->busyQueue = new \SplQueue;
$workers = $this->workers;
$idleWorkers = $this->idleWorkers;
$busyQueue = $this->busyQueue;
$this->push = static function (Worker $worker) use ($workers, $idleWorkers, $busyQueue) {
\assert($workers->contains($worker), "The provided worker was not part of this queue");
if (($workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($busyQueue as $key => $busy) {
if ($busy === $worker) {
unset($busyQueue[$key]);
break;
}
}
$idleWorkers->push($worker);
}
};
}
/**
* Checks if the pool is running.
*
* @return bool True if the pool is running, otherwise false.
*/
public function isRunning(): bool {
return $this->running;
}
/**
* Checks if the pool has any idle workers.
*
* @return bool True if the pool has at least one idle worker, otherwise false.
*/
public function isIdle(): bool {
return $this->idleWorkers->count() > 0 || $this->workers->count() === 0;
}
/**
* {@inheritdoc}
*/
public function getMaxSize(): int {
return $this->maxSize;
}
/**
* {@inheritdoc}
*/
public function getWorkerCount(): int {
return $this->workers->count();
}
/**
* {@inheritdoc}
*/
public function getIdleWorkerCount(): int {
return $this->idleWorkers->count();
}
/**
* Enqueues a task to be executed by the worker pool.
*
* @param Task $task The task to enqueue.
*
* @return \Amp\Promise The return value of Task::run().
*
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
*/
public function enqueue(Task $task): Promise {
$worker = $this->pull();
$promise = $worker->enqueue($task);
$promise->onResolve(function () use ($worker) {
($this->push)($worker);
});
return $promise;
}
/**
* Shuts down the pool and all workers in it.
*
* @return \Amp\Promise Array of exit status from all workers.
*
* @throws \Amp\Parallel\Context\StatusError If the pool has not been started.
*/
public function shutdown(): Promise {
if (!$this->isRunning()) {
throw new StatusError("The pool was shutdown");
}
$this->running = false;
$shutdowns = [];
foreach ($this->workers as $worker) {
if ($worker->isRunning()) {
$shutdowns[] = $worker->shutdown();
}
}
return Promise\all($shutdowns);
}
/**
* Kills all workers in the pool and halts the worker pool.
*/
public function kill() {
$this->running = false;
foreach ($this->workers as $worker) {
$worker->kill();
}
}
/**
* Creates a worker and adds them to the pool.
*
* @return Worker The worker created.
*/
private function createWorker() {
$worker = $this->factory->create();
$this->workers->attach($worker, 0);
return $worker;
}
/**
* {@inheritdoc}
*/
public function get(): Worker {
return new Internal\PooledWorker($this->pull(), $this->push);
}
/**
* Pulls a worker from the pool. The worker should be put back into the pool with push() to be marked as idle.
*
* @return \Amp\Parallel\Worker\Worker
* @throws \Amp\Parallel\Context\StatusError
*/
protected function pull(): Worker {
if (!$this->isRunning()) {
throw new StatusError("The pool was shutdown");
}
do {
if ($this->idleWorkers->isEmpty()) {
if ($this->getWorkerCount() >= $this->maxSize) {
// All possible workers busy, so shift from head (will be pushed back onto tail below).
$worker = $this->busyQueue->shift();
} else {
// Max worker count has not been reached, so create another worker.
$worker = $this->createWorker();
break;
}
} else {
// Shift a worker off the idle queue.
$worker = $this->idleWorkers->shift();
}
if ($worker->isRunning()) {
break;
}
$this->workers->detach($worker);
} while (true);
$this->busyQueue->push($worker);
$this->workers[$worker] += 1;
return $worker;
}
/**
* Pushes the worker back into the queue.
*
* @param \Amp\Parallel\Worker\Worker $worker
*
* @throws \Error If the worker was not part of this queue.
*/
protected function push(Worker $worker) {
($this->push)($worker); // Kept for BC
}
}
lib/Worker/DefaultWorkerFactory.php 0000644 00000003201 13755316600 0013404 0 ustar 00 className = $envClassName;
}
/**
* {@inheritdoc}
*
* The type of worker created depends on the extensions available. If multi-threading is enabled, a WorkerThread
* will be created. If threads are not available a WorkerProcess will be created.
*/
public function create(): Worker {
if (Thread::supported()) {
return new WorkerThread($this->className);
}
return new WorkerProcess(
$this->className,
[],
\getenv("AMP_PHP_BINARY") ?: (\defined("AMP_PHP_BINARY") ? \AMP_PHP_BINARY : null)
);
}
}
lib/Worker/Environment.php 0000644 00000001421 13755316600 0011604 0 ustar 00 task = $task;
$this->id = $id++;
}
public function getId(): string {
return $this->id;
}
public function getTask(): Task {
// Classes that cannot be autoloaded will be unserialized as an instance of __PHP_Incomplete_Class.
if ($this->task instanceof \__PHP_Incomplete_Class) {
throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class));
}
return $this->task;
}
}
lib/Worker/Internal/PooledWorker.php 0000644 00000002510 13755316600 0013470 0 ustar 00 worker = $worker;
$this->push = $push;
}
/**
* Automatically pushes the worker back into the queue.
*/
public function __destruct() {
($this->push)($this->worker);
}
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
return $this->worker->isRunning();
}
/**
* {@inheritdoc}
*/
public function isIdle(): bool {
return $this->worker->isIdle();
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise {
return $this->worker->enqueue($task);
}
/**
* {@inheritdoc}
*/
public function shutdown(): Promise {
return $this->worker->shutdown();
}
/**
* {@inheritdoc}
*/
public function kill() {
$this->worker->kill();
}
}
lib/Worker/Internal/TaskFailure.php 0000644 00000003517 13755316600 0013276 0 ustar 00 type = \get_class($exception);
$this->parent = $exception instanceof \Error ? self::PARENT_ERROR : self::PARENT_EXCEPTION;
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
}
public function promise(): Promise {
switch ($this->parent) {
case self::PARENT_ERROR:
$exception = new TaskError(
$this->type,
sprintf(
'Uncaught %s in worker with message "%s" and code "%s"',
$this->type,
$this->message,
$this->code
),
$this->trace
);
break;
default:
$exception = new TaskException(
$this->type,
sprintf(
'Uncaught %s in worker with message "%s" and code "%s"',
$this->type,
$this->message,
$this->code
),
$this->trace
);
}
return new Failure($exception);
}
}
lib/Worker/Internal/TaskResult.php 0000644 00000001106 13755316600 0013155 0 ustar 00 id = $id;
}
/**
* @return string Task identifier.
*/
public function getId(): string {
return $this->id;
}
/**
* @return \Amp\Promise Resolved with the task result or failure reason.
*/
abstract public function promise(): Promise;
}
lib/Worker/Internal/TaskSuccess.php 0000644 00000000644 13755316600 0013315 0 ustar 00 result = $result;
}
public function promise(): Promise {
return new Success($this->result);
}
}
lib/Worker/Internal/worker-process.php 0000644 00000001570 13755316600 0014046 0 ustar 00 run();
};
lib/Worker/Pool.php 0000644 00000002223 13755316600 0010212 0 ustar 00 name = $name;
$this->trace = $trace;
}
/**
* Returns the class name of the error thrown from the task.
*
* @return string
*/
public function getName(): string {
return $this->name;
}
/**
* Gets the stack trace at the point the error was thrown in the task.
*
* @return string
*/
public function getWorkerTrace(): string {
return $this->trace;
}
}
lib/Worker/TaskException.php 0000644 00000001773 13755316600 0012073 0 ustar 00 name = $name;
$this->trace = $trace;
}
/**
* Returns the class name of the exception thrown from the task.
*
* @return string
*/
public function getName(): string {
return $this->name;
}
/**
* Gets the stack trace at the point the exception was thrown in the task.
*
* @return string
*/
public function getWorkerTrace(): string {
return $this->trace;
}
}
lib/Worker/TaskRunner.php 0000644 00000002713 13755316600 0011401 0 ustar 00 channel = $channel;
$this->environment = $environment;
}
/**
* Runs the task runner, receiving tasks from the parent and sending the result of those tasks.
*
* @return \Amp\Promise
*/
public function run(): Promise {
return new Coroutine($this->execute());
}
/**
* @coroutine
*
* @return \Generator
*/
private function execute(): \Generator {
$job = yield $this->channel->receive();
while ($job instanceof Internal\Job) {
try {
$result = yield call([$job->getTask(), "run"], $this->environment);
$result = new Internal\TaskSuccess($job->getId(), $result);
} catch (\Throwable $exception) {
$result = new Internal\TaskFailure($job->getId(), $exception);
}
$job = null; // Free memory from last job.
yield $this->channel->send($result);
$result = null; // Free memory from last result.
$job = yield $this->channel->receive();
}
return $job;
}
}
lib/Worker/Worker.php 0000644 00000001607 13755316600 0010557 0 ustar 00 Resolves with the return value of Task::run().
*/
public function enqueue(Task $task): Promise;
/**
* @return \Amp\Promise Exit code.
*/
public function shutdown(): Promise;
/**
* Immediately kills the context.
*/
public function kill();
}
lib/Worker/WorkerException.php 0000644 00000000465 13755316600 0012437 0 ustar 00 run();
}, $envClassName));
}
}
lib/Worker/functions.php 0000644 00000003341 13755316600 0011313 0 ustar 00
*/
function enqueue(Task $task): Promise {
return pool()->enqueue($task);
}
/**
* Gets a worker from the global worker pool.
*
* @return \Amp\Parallel\Worker\Worker
*/
function get(): Worker {
return pool()->get();
}
/**
* Creates a worker using the global worker factory. The worker is automatically started.
*
* @return \Amp\Parallel\Worker\Worker
*/
function create(): Worker {
return factory()->create();
}
/**
* Gets or sets the global worker factory.
*
* @param \Amp\Parallel\Worker\WorkerFactory|null $factory
*
* @return \Amp\Parallel\Worker\WorkerFactory
*/
function factory(WorkerFactory $factory = null): WorkerFactory {
if ($factory === null) {
$factory = Loop::getState(LOOP_FACTORY_IDENTIFIER);
if ($factory) {
return $factory;
}
$factory = new DefaultWorkerFactory;
}
Loop::setState(LOOP_FACTORY_IDENTIFIER, $factory);
return $factory;
}
phpdoc.dist.xml 0000644 00000001035 13755316600 0007512 0 ustar 00
Amp
build/docs
utf8
build/docs
warn
build/log/docs/{DATE}.log
src
docs/.shared 0000777 00000000000 13755316600 0006753 5 ustar 00