.editorconfig 0000644 00000000203 13755316542 0007227 0 ustar 00 root = true
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
indent_style = space
charset = utf-8
.gitmodules 0000644 00000000137 13755316542 0006735 0 ustar 00 [submodule "docs/shared"]
path = docs/.shared
url = https://github.com/amphp/amphp.github.io
.php_cs.dist 0000644 00000002641 13755316542 0007001 0 ustar 00 setRiskyAllowed(true)
->setRules([
"@PSR1" => true,
"@PSR2" => true,
"braces" => [
"allow_single_line_closure" => true,
"position_after_functions_and_oop_constructs" => "same",
],
"array_syntax" => ["syntax" => "short"],
"cast_spaces" => true,
"combine_consecutive_unsets" => true,
"function_to_constant" => true,
"native_function_invocation" => true,
"no_multiline_whitespace_before_semicolons" => true,
"no_unused_imports" => true,
"no_useless_else" => true,
"no_useless_return" => true,
"no_whitespace_before_comma_in_array" => true,
"no_whitespace_in_blank_line" => true,
"non_printable_character" => true,
"normalize_index_brace" => true,
"ordered_imports" => true,
"php_unit_construct" => true,
"php_unit_dedicate_assert" => true,
"php_unit_fqcn_annotation" => true,
"phpdoc_summary" => true,
"phpdoc_types" => true,
"psr4" => true,
"return_type_declaration" => ["space_before" => "none"],
"short_scalar_cast" => true,
"single_blank_line_before_namespace" => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__ . "/examples")
->in(__DIR__ . "/lib")
->in(__DIR__ . "/test")
);
CHANGELOG.md 0000644 00000013221 13755316542 0006367 0 ustar 00 ### 2.0.7
- Fixed recording of stack traces for double resolution in case `AMP_DEBUG` was not set as environment variable, defaulting to `0` now. (#217)
- `Loop::unreference()` is ignored on invalid watchers now.
- Invoke UV watchers in case no events or `UV_DISCONNECT` is indicated.
- Ignore `Loop::$driver` not being set during shutdown, which might happen due to the unreliable shutdown order. (#212)
### 2.0.6
- Resolution methods are now hidden in the `Promise` returned from `Deferred::promise()`. Their exposure was an implementation detail and never promised by the return type of `Deferred::promise()`.
- A named class is now used for promises, so you don't end up with an anonymous class in `var_dump()` and stack traces, which can be confusing.
### 2.0.5
- Fixed possible notices in `formatStacktrace()`. This is an internal API, but the notices might appear on certain stack traces when running with `AMP_DEBUG=true`.
- Fixed segfault with `ev`, see amphp/parallel-functions#3.
- Dropped multiple definitions of `Deferred` and `Emitter`. These were introduced as performance hack, but relied on `zend.assertions` being disabled to change behavior, which is bad. The performance impact is believed to be rather low, because coroutines are used a lot more than `Deferred` in Amp v2.
### 2.0.4
- Allow `AMP_DEBUG` to be defined via the environment.
- Fix formatting of stack traces used for double resolution debugging.
### 2.0.3
- `Loop::set()` replaces the current driver with a dummy driver for the time of `gc_collect_cycles()` now. This allows cyclic references to be cleaned up properly before the new driver is set. Without such a fix, cyclic references might have been cleaned up later, e.g. cancelling their watcher IDs on the new loop, thereby cancelling the wrong watchers.
- Promise combinator functions (`all(), `any()`, `first()`, `some()`) now preserve order of the given `$promises` array argument.
### 2.0.2
- Fixed warnings and timers in `EventDriver`.
- Does no longer hide warnings from `stream_select`.
### 2.0.1
- Fixed an issue where the loop blocks even though all watchers are unreferenced.
2.0.0
-----
* `Amp\reactor()` has been replaced with `Amp\Loop::set()` and `Amp\Loop::get()`.
* `Amp\driver()` has been replaced with `Amp\Loop\Factory::create()`.
* `Amp\tick()` no longer exists and doesn't have a replacement. Ticks are an internal detail.
* Functions for creating and managing watchers are now static methods of `Amp\Loop` instead of functions in the `Amp` namespace.
* `once()` is now `delay()` and `immediately()` is `defer()`.
* Parameter order for `delay()` and `repeat()` has been changed.
* `reference()` and `unreference()` have been added.
* `Amp\Pause` has been renamed to `Amp\Delayed` and accepts an optional resolution value now. Additionally `reference()` and `unreference()` methods have been added.
* Promise accepting functions have been moved to the `Amp\Promise` namespace.
* `Amp\Promise\some()` accepts an additional `$required` parameter.
* `Amp\call()`, `Amp\asyncCall()`, `Amp\coroutine()` and `Amp\asyncCoroutine()` have been added.
* `Amp\resolve()` has been removed, use `Amp\call()` instead.
* `Promise::when()` has been renamed to `Promise::onResolve()`
* `Promise::watch()` has been removed, use `Amp\Iterator`, [`amphp/byte-stream`](https://github.com/amphp/byte-stream) or a custom implementation that implements `Amp\Promise` instead and provides dedicated APIs to access the previously data shared via the `watch()` mechanism.
* `Amp\Iterator`, `Amp\Emitter` and `Amp\Producer` have been added with several functions in the `Amp\Iterator` namespace.
* Various other changes.
### 1.2.2
- Fix notice in `NativeReactor` when removing a handle while
an event is waiting for it. (Regression fix from 1.1.1)
### 1.2.1
- Fix `uv_run()` potentially exiting earlier than intended,
leading to an infinite loop in `UvReactor::run()`.
1.2.0
-----
- `resolve()` now also accepts callables returning generators.
### 1.1.1
- Fix memory leak in `NativeReactor`, retaining an empty array
for each stream.
- Remove circular references in `UvReactor` to avoid garbage
collector calls.
1.1.0
-----
- Add `getExceptions()` method to `CombinatorException` to get an
array of all the exceptions (affecting `some()` and `first()`).
- Fix `NativeReactor` not ending up in stopped state if primary
callback didn't install any events.
### 1.0.8
- Fix `NativeReactor` running a busy loop if no timers are active.
Properly block now in NativeReactor inside `stream_select()`.
### 1.0.7
- Several combinator functions could result in a promise already
resolved exception in case some values of the array weren't
promises.
### 1.0.6
- Fix issue in `NativeReactor` causing `stop()` to be delayed by
one second.
### 1.0.5
- Convert general `RuntimeException` to more specific
`Amp\CombinatorException`.
### 1.0.4
- Repeat watchers in `LibeventReactor` internally were handled in
microsecond intervals instead of milliseconds.
### 1.0.3
- Fix issue in `NativeReactor` capable of causing keep alive
counter corruption when a watcher was cancelled inside its
own callback.
- Fix issue in `UvReactor` with `libuv` >= 1.1.0 causing busy loop
with immediates present, but no watchers being triggered.
### 1.0.2
- Fix PHP 7 issue in which top-level `Throwable`s weren't caught
in certain coroutine contexts.
- Remove error suppression operator on optionally `null` option
assignment to avoid spurious `E_NOTICE` output when custom
error handlers are used.
### 1.0.1
- Fix bug preventing `UvReactor::tick()` from returning when no
events are ready for a single active IO watcher.
1.0.0
-----
- Initial stable API release
CONTRIBUTING.md 0000644 00000002661 13755316542 0007015 0 ustar 00 ## Submitting useful bug reports
Please search existing issues first to make sure this is not a duplicate.
Every issue report has a cost for the developers required to field it; be
respectful of others' time and ensure your report isn't spurious prior to
submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html).
## Development ideology
Truths which we believe to be self-evident:
- **It's an asynchronous world.** Be wary of anything that undermines
async principles.
- **The answer is not more options.** If you feel compelled to expose
new preferences to the user it's very possible you've made a wrong
turn somewhere.
- **There are no power users.** The idea that some users "understand"
concepts better than others has proven to be, for the most part, false.
If anything, "power users" are more dangerous than the rest, and we
should avoid exposing dangerous functionality to them.
## Code style
The amphp project adheres to the PSR-2 style guide with the exception that
opening braces for classes and methods must appear on the same line as
the declaration:
https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md
To apply amphp code standards you can run `php-cs-fixer` with following composer command:
```bash
composer code-style
```
## Running the tests
Run the test suite from root directory:
```bash
composer test
```
LICENSE 0000644 00000002161 13755316542 0005564 0 ustar 00
The MIT License (MIT)
Copyright (c) 2015-2017 amphp
Copyright (c) 2016 PHP Asynchronous Interoperability Group
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Makefile 0000644 00000001517 13755316542 0006223 0 ustar 00 PHP_BIN := php
COMPOSER_BIN := composer
COVERAGE = coverage
SRCS = lib test
find_php_files = $(shell find $(1) -type f -name "*.php")
src = $(foreach d,$(SRCS),$(call find_php_files,$(d)))
.PHONY: test
test: setup phpunit code-style
.PHONY: clean
clean: clean-coverage clean-vendor
.PHONY: clean-coverage
clean-coverage:
test ! -e coverage || rm -r coverage
.PHONY: clean-vendor
clean-vendor:
test ! -e vendor || rm -r vendor
.PHONY: setup
setup: vendor/autoload.php
.PHONY: deps-update
deps-update:
$(COMPOSER_BIN) update
.PHONY: phpunit
phpunit: setup
$(PHP_BIN) vendor/bin/phpunit
.PHONY: code-style
code-style: setup
PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix
composer.lock: composer.json
$(COMPOSER_BIN) install
touch $@
vendor/autoload.php: composer.lock
$(COMPOSER_BIN) install
touch $@
README.md 0000644 00000007271 13755316542 0006045 0 ustar 00
Amp is a non-blocking concurrency framework for PHP. It provides an event loop, promises and streams as a base for asynchronous programming.
Promises in combination with generators are used to build coroutines, which allow writing asynchronous code just like synchronous code, without any callbacks.
## Installation
This package can be installed as a [Composer](https://getcomposer.org/) dependency.
```bash
composer require amphp/amp
```
## Documentation
Documentation can be found on [amphp.org](https://amphp.org/) as well as in the [`./docs`](./docs) directory.
## Requirements
- PHP 7.0+
##### Optional Extension Backends
Extensions are only needed if your app necessitates a high numbers of concurrent socket connections.
- [ev](https://pecl.php.net/package/ev)
- [event](https://pecl.php.net/package/event)
- [php-uv](https://github.com/bwoebi/php-uv) (experimental fork)
## Examples
This simple example uses our [Artax](https://amphp.org/artax/) HTTP client to fetch multiple HTTP resources concurrently.
```php
setOption(Amp\Artax\Client::OP_DISCARD_BODY, true);
try {
foreach ($uris as $uri) {
$promises[$uri] = $client->request($uri);
}
$responses = yield $promises;
foreach ($responses as $uri => $response) {
print $uri . " - " . $response->getStatus() . $response->getReason() . PHP_EOL;
}
} catch (Amp\Artax\HttpException $error) {
// If something goes wrong Amp will throw the exception where the promise was yielded.
// The Client::request() method itself will never throw directly, but returns a promise.
print $error->getMessage() . PHP_EOL;
}
});
```
Further examples can be found in the [`./examples`](./examples) directory of this repository as well as in the `./examples` directory of [our other libraries](https://github.com/amphp?utf8=%E2%9C%93&q=&type=public&language=php)
## Versioning
`amphp/amp` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages.
| Version | Bug Fixes Until | Security Fixes Until |
| ------- | --------------- | -------------------- |
| 2.x | TBA | TBA |
| 1.x | 2017-12-31 | 2018-12-31 |
## Compatible Packages
Compatible packages should use the [`amphp`](https://github.com/search?utf8=%E2%9C%93&q=topic%3Aamphp) topic on GitHub.
## Security
If you discover any security related issues, please email [`bobwei9@hotmail.com`](mailto:bobwei9@hotmail.com) or [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker.
## License
The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
composer.json 0000644 00000003417 13755316542 0007306 0 ustar 00 {
"name": "amphp/amp",
"homepage": "http://amphp.org/amp",
"description": "A non-blocking concurrency framework for PHP applications.",
"keywords": [
"async",
"asynchronous",
"concurrency",
"promise",
"awaitable",
"future",
"non-blocking",
"event",
"event-loop"
],
"license": "MIT",
"authors": [
{
"name": "Daniel Lowrey",
"email": "rdlowrey@php.net"
},
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Bob Weinand",
"email": "bobwei9@hotmail.com"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
}
],
"require": {
"php": ">=7"
},
"require-dev": {
"amphp/phpunit-util": "^1",
"react/promise": "^2",
"friendsofphp/php-cs-fixer": "^2.3",
"phpunit/phpunit": "^6.0.9",
"phpstan/phpstan": "^0.8.5"
},
"autoload": {
"psr-4": {
"Amp\\": "lib"
},
"files": [
"lib/functions.php",
"lib/Internal/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\Test\\": "test"
}
},
"support": {
"issues": "https://github.com/amphp/amp/issues",
"irc": "irc://irc.freenode.org/amphp"
},
"extra": {
"branch-alias": {
"dev-master": "2.0.x-dev"
}
},
"config": {
"platform": {
"php": "7.0.0"
}
},
"scripts": {
"test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit",
"code-style": "@php ./vendor/bin/php-cs-fixer fix"
}
}
docs/Gemfile 0000644 00000000171 13755316542 0007001 0 ustar 00 source "https://rubygems.org"
gem "github-pages"
gem "kramdown"
gem "jekyll-github-metadata"
gem "jekyll-relative-links"
docs/Gemfile.lock 0000644 00000015306 13755316542 0007736 0 ustar 00 GEM
remote: https://rubygems.org/
specs:
activesupport (4.2.9)
i18n (~> 0.7)
minitest (~> 5.1)
thread_safe (~> 0.3, >= 0.3.4)
tzinfo (~> 1.1)
addressable (2.5.2)
public_suffix (>= 2.0.2, < 4.0)
coffee-script (2.4.1)
coffee-script-source
execjs
coffee-script-source (1.11.1)
colorator (1.1.0)
commonmarker (0.17.9)
ruby-enum (~> 0.5)
concurrent-ruby (1.0.5)
dnsruby (1.60.2)
em-websocket (0.5.1)
eventmachine (>= 0.12.9)
http_parser.rb (~> 0.6.0)
ethon (0.11.0)
ffi (>= 1.3.0)
eventmachine (1.2.5)
execjs (2.7.0)
faraday (0.15.0)
multipart-post (>= 1.2, < 3)
ffi (1.9.23)
forwardable-extended (2.6.0)
gemoji (3.0.0)
github-pages (183)
activesupport (= 4.2.9)
github-pages-health-check (= 1.7.3)
jekyll (= 3.7.3)
jekyll-avatar (= 0.5.0)
jekyll-coffeescript (= 1.1.1)
jekyll-commonmark-ghpages (= 0.1.5)
jekyll-default-layout (= 0.1.4)
jekyll-feed (= 0.9.3)
jekyll-gist (= 1.5.0)
jekyll-github-metadata (= 2.9.4)
jekyll-mentions (= 1.3.0)
jekyll-optional-front-matter (= 0.3.0)
jekyll-paginate (= 1.1.0)
jekyll-readme-index (= 0.2.0)
jekyll-redirect-from (= 0.13.0)
jekyll-relative-links (= 0.5.3)
jekyll-remote-theme (= 0.2.3)
jekyll-sass-converter (= 1.5.2)
jekyll-seo-tag (= 2.4.0)
jekyll-sitemap (= 1.2.0)
jekyll-swiss (= 0.4.0)
jekyll-theme-architect (= 0.1.1)
jekyll-theme-cayman (= 0.1.1)
jekyll-theme-dinky (= 0.1.1)
jekyll-theme-hacker (= 0.1.1)
jekyll-theme-leap-day (= 0.1.1)
jekyll-theme-merlot (= 0.1.1)
jekyll-theme-midnight (= 0.1.1)
jekyll-theme-minimal (= 0.1.1)
jekyll-theme-modernist (= 0.1.1)
jekyll-theme-primer (= 0.5.3)
jekyll-theme-slate (= 0.1.1)
jekyll-theme-tactile (= 0.1.1)
jekyll-theme-time-machine (= 0.1.1)
jekyll-titles-from-headings (= 0.5.1)
jemoji (= 0.9.0)
kramdown (= 1.16.2)
liquid (= 4.0.0)
listen (= 3.1.5)
mercenary (~> 0.3)
minima (= 2.4.1)
nokogiri (>= 1.8.1, < 2.0)
rouge (= 2.2.1)
terminal-table (~> 1.4)
github-pages-health-check (1.7.3)
addressable (~> 2.3)
dnsruby (~> 1.60)
octokit (~> 4.0)
public_suffix (~> 2.0)
typhoeus (~> 1.3)
html-pipeline (2.7.2)
activesupport (>= 2)
nokogiri (>= 1.4)
http_parser.rb (0.6.0)
i18n (0.9.5)
concurrent-ruby (~> 1.0)
jekyll (3.7.3)
addressable (~> 2.4)
colorator (~> 1.0)
em-websocket (~> 0.5)
i18n (~> 0.7)
jekyll-sass-converter (~> 1.0)
jekyll-watch (~> 2.0)
kramdown (~> 1.14)
liquid (~> 4.0)
mercenary (~> 0.3.3)
pathutil (~> 0.9)
rouge (>= 1.7, < 4)
safe_yaml (~> 1.0)
jekyll-avatar (0.5.0)
jekyll (~> 3.0)
jekyll-coffeescript (1.1.1)
coffee-script (~> 2.2)
coffee-script-source (~> 1.11.1)
jekyll-commonmark (1.2.0)
commonmarker (~> 0.14)
jekyll (>= 3.0, < 4.0)
jekyll-commonmark-ghpages (0.1.5)
commonmarker (~> 0.17.6)
jekyll-commonmark (~> 1)
rouge (~> 2)
jekyll-default-layout (0.1.4)
jekyll (~> 3.0)
jekyll-feed (0.9.3)
jekyll (~> 3.3)
jekyll-gist (1.5.0)
octokit (~> 4.2)
jekyll-github-metadata (2.9.4)
jekyll (~> 3.1)
octokit (~> 4.0, != 4.4.0)
jekyll-mentions (1.3.0)
activesupport (~> 4.0)
html-pipeline (~> 2.3)
jekyll (~> 3.0)
jekyll-optional-front-matter (0.3.0)
jekyll (~> 3.0)
jekyll-paginate (1.1.0)
jekyll-readme-index (0.2.0)
jekyll (~> 3.0)
jekyll-redirect-from (0.13.0)
jekyll (~> 3.3)
jekyll-relative-links (0.5.3)
jekyll (~> 3.3)
jekyll-remote-theme (0.2.3)
jekyll (~> 3.5)
rubyzip (>= 1.2.1, < 3.0)
typhoeus (>= 0.7, < 2.0)
jekyll-sass-converter (1.5.2)
sass (~> 3.4)
jekyll-seo-tag (2.4.0)
jekyll (~> 3.3)
jekyll-sitemap (1.2.0)
jekyll (~> 3.3)
jekyll-swiss (0.4.0)
jekyll-theme-architect (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-cayman (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-dinky (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-hacker (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-leap-day (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-merlot (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-midnight (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-minimal (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-modernist (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-primer (0.5.3)
jekyll (~> 3.5)
jekyll-github-metadata (~> 2.9)
jekyll-seo-tag (~> 2.0)
jekyll-theme-slate (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-tactile (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-theme-time-machine (0.1.1)
jekyll (~> 3.5)
jekyll-seo-tag (~> 2.0)
jekyll-titles-from-headings (0.5.1)
jekyll (~> 3.3)
jekyll-watch (2.0.0)
listen (~> 3.0)
jemoji (0.9.0)
activesupport (~> 4.0, >= 4.2.9)
gemoji (~> 3.0)
html-pipeline (~> 2.2)
jekyll (~> 3.0)
kramdown (1.16.2)
liquid (4.0.0)
listen (3.1.5)
rb-fsevent (~> 0.9, >= 0.9.4)
rb-inotify (~> 0.9, >= 0.9.7)
ruby_dep (~> 1.2)
mercenary (0.3.6)
mini_portile2 (2.3.0)
minima (2.4.1)
jekyll (~> 3.5)
jekyll-feed (~> 0.9)
jekyll-seo-tag (~> 2.1)
minitest (5.11.3)
multipart-post (2.0.0)
nokogiri (1.8.2)
mini_portile2 (~> 2.3.0)
octokit (4.8.0)
sawyer (~> 0.8.0, >= 0.5.3)
pathutil (0.16.1)
forwardable-extended (~> 2.6)
public_suffix (2.0.5)
rb-fsevent (0.10.3)
rb-inotify (0.9.10)
ffi (>= 0.5.0, < 2)
rouge (2.2.1)
ruby-enum (0.7.2)
i18n
ruby_dep (1.5.0)
rubyzip (1.2.1)
safe_yaml (1.0.4)
sass (3.5.6)
sass-listen (~> 4.0.0)
sass-listen (4.0.0)
rb-fsevent (~> 0.9, >= 0.9.4)
rb-inotify (~> 0.9, >= 0.9.7)
sawyer (0.8.1)
addressable (>= 2.3.5, < 2.6)
faraday (~> 0.8, < 1.0)
terminal-table (1.8.0)
unicode-display_width (~> 1.1, >= 1.1.1)
thread_safe (0.3.6)
typhoeus (1.3.0)
ethon (>= 0.9.0)
tzinfo (1.2.5)
thread_safe (~> 0.1)
unicode-display_width (1.3.2)
PLATFORMS
ruby
DEPENDENCIES
github-pages
jekyll-github-metadata
jekyll-relative-links
kramdown
BUNDLED WITH
1.15.1
docs/README.md 0000644 00000001120 13755316542 0006760 0 ustar 00 # Documentation
This directory contains the documentation for `amphp/amp`. Documentation and code are bundled within a single repository for easier maintenance. Additionally, this preserves the documentation for older versions.
## Reading
You can read this documentation either directly on GitHub or on our website. While the website will always contain the latest version, viewing on GitHub also works with older versions.
## Writing
Our documentation is built using Jekyll.
```
sudo gem install bundler jekyll
```
```
bundle install --path vendor/bundle
bundle exec jekyll serve
```
docs/_config.yml 0000644 00000000747 13755316542 0007646 0 ustar 00 kramdown:
input: GFM
toc_levels: 2..3
baseurl: "/amp"
layouts_dir: ".shared/layout"
includes_dir: ".shared/includes"
exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"]
safe: true
repository: amphp/amp
gems:
- "jekyll-github-metadata"
- "jekyll-relative-links"
defaults:
- scope:
path: ""
type: "pages"
values:
layout: "docs"
shared_asset_path: "/amp/asset"
navigation:
- event-loop
- promises
- coroutines
- iterators
- cancellation
- utils
docs/cancellation/README.md 0000644 00000004072 13755316542 0011425 0 ustar 00 ---
layout: docs
title: Cancellation
permalink: /cancellation/
---
Amp provides primitives to allow the cancellation of operations, namely `CancellationTokenSource` and `CancellationToken`.
```php
$tokenSource = new CancellationTokenSource;
$promise = asyncRequest("...", $tokenSource->getToken());
Loop::delay(1000, function () use ($tokenSource) {
$tokenSource->cancel();
});
$result = yield $promise;
```
Every operation that supports cancellation accepts an instance of `CancellationToken` as (optional) argument. Within a coroutine, `$token->throwIfRequested()` can be used to fail the operation with a `CancelledException`. As `$token` is often an optional parameter and might be `null`, these calls need to be guared with a `if ($token)` or similar check. Instead of doing so, it's often easier to simply set the token to `$token = $token ?? new NullCancellationToken` at the beginning of the method.
While `throwIfRequested()` works well within coroutines, some operations might want to subscribe with a callback instead. They can do so using `CancellationToken::subscribe()` to subscribe any cancellation requests that might happen.
If the operation consists of any sub-operations that support cancellation, it passes that same `CancellationToken` instance down to these sub-operations.
The original caller creates a `CancellationToken` by creating an instance of `CancellationTokenSource` and passing `$cancellationTokenSource->getToken()` to the operation as shown in the above example. Only the original caller has access to the `CancellationTokenSource` and can cancel the operation using `CancellationTokenSource::cancel()`, similar to the way it works with `Deferred` and `Promise`.
{:.note}
> Cancellations are advisory only. A DNS resolver might ignore cancellation requests after the query has been sent as the response has to be processed anyway and can still be cached. An HTTP client might continue a nearly finished HTTP request to reuse the connection, but might abort a chunked encoding response as it cannot know whether continuing is actually cheaper than aborting.
docs/coroutines/README.md 0000644 00000006177 13755316542 0011173 0 ustar 00 ---
layout: docs
title: Coroutines
permalink: /coroutines/
---
Coroutines are interruptible functions. In PHP they can be implemented using [generators](http://php.net/manual/en/language.generators.overview.php).
While generators are usually used to implement simple iterators and yielding elements using the `yield` keyword, Amp uses `yield` as interruption points. When a coroutine yields a value, execution of the coroutine is temporarily interrupted, allowing other tasks to be run, such as I/O handlers, timers, or other coroutines.
```php
// Fetches a resource with Artax and returns its body.
$promise = Amp\call(function () use ($http) {
try {
// Yield control until the generator resolves
// and return its eventual result.
$response = yield $http->request("https://example.com/");
$body = yield $response->getBody();
return $body;
} catch (HttpException $e) {
// If promise resolution fails the exception is
// thrown back to us and we handle it as needed.
}
});
```
Every time a promise is `yield`ed, the coroutine subscribes to the promise and automatically continues it once the promise resolved.
On successful resolution the coroutine will send the resolution value into the generator using [`Generator::send()`](https://secure.php.net/generator.send).
On failure it will throw the exception into the generator using [`Generator::throw()`](https://secure.php.net/generator.throw).
This allows writing asynchronous code almost like synchronous code.
Note that no callbacks need to be registered to consume promises and errors can be handled with ordinary `catch` clauses, which will bubble up to the calling context if uncaught in the same way exceptions bubble up in synchronous code.
{:.note}
> Use `Amp\call()` to always return a promise instead of a `\Generator` from your public APIs. Generators are an implementation detail that shouldn't be leaked to API consumers.
## Yield Behavior
All `yield`s in a coroutine must be one of the following three types:
| Yieldable | Description |
| --------------| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `Amp\Promise` | Any promise instance may be yielded and control will be returned to the coroutine once the promise resolves. If resolution fails the relevant exception is thrown into the generator and must be handled by the application or it will bubble up. If resolution succeeds the promise's resolved value is sent back into the generator. |
| `React\Promise\PromiseInterface` | Same as `Amp\Promise`. Any React promise will automatically be adapted to an Amp promise. |
| `array` | Yielding an array of promises combines them implicitly using `Amp\Promise\all()`. An array with elements not being promises will result in an `Amp\InvalidYieldError`. |
docs/coroutines/helpers.md 0000644 00000002733 13755316542 0011672 0 ustar 00 ---
layout: docs
title: Coroutine Helpers
permalink: /coroutines/helpers
---
`Amp\Coroutine` requires an already instantiated `Generator` to be passed to its constructor. Always calling a callable before passing the `Generator` to `Amp\Coroutine` is unnecessary boilerplate.
## `coroutine()`
Returns a new function that wraps `$callback` in a promise/coroutine-aware function that automatically runs generators as coroutines. The returned function always returns a promise when invoked. Errors have to be handled by the callback caller or they will go unnoticed.
```php
function coroutine(callable $callback): callable { ... }
```
Use this function to create a coroutine-aware callable for a promise-aware callback caller.
## `asyncCoroutine()`
Same as `coroutine()` but doesn't return a `Promise` when the returned callback is called. Instead, promises are passed to `Amp\Promise\rethrow()` to handle errors automatically.
## `call()`
```php
function call(callable $callback, ...$args): Promise { ... }
```
Calls the given function, always returning a promise. If the function returns a `Generator`, it will be run as a coroutine. If the function throws, a failed promise will be returned.
`call($callable, ...$args)` is equivalent to `coroutine($callable)(...$args)`.
## `asyncCall()`
```php
function asyncCall(callable $callback, ...$args) { ... }
```
Same as `call()`, but doesn't return the `Promise`. Promises are automatically passed to `Amp\Promise\rethrow` for error handling.
docs/event-loop/README.md 0000644 00000012515 13755316542 0011062 0 ustar 00 ---
layout: docs
title: Event Loop
permalink: /event-loop/
---
It may surprise people to learn that the PHP standard library already has everything we need to write event-driven and non-blocking applications. We only reach the limits of native PHP's functionality in this area when we ask it to poll thousands of file descriptors for IO activity at the same time. Even in this case, though, the fault is not with PHP but the underlying system `select()` call which is linear in its performance degradation as load increases.
For performance that scales out to high volume we require more advanced capabilities currently found only in extensions. If you wish to, for example, service 10,000 simultaneous clients in an Amp-backed socket server, you should use one of the event loop implementations based on a PHP extension. However, if you're using Amp in a strictly local program for non-blocking concurrency or you don't need to handle more than a few hundred simultaneous clients in a server application the native PHP functionality should be adequate.
## Global Accessor
Amp uses a global accessor for the event loop as there's only one event loop for each application. It doesn't make sense to have two loops running at the same time, as they would just have to schedule each other in a busy waiting manner to operate correctly.
The event loop should be accessed through the methods provided by `Amp\Loop`. On the first use of the accessor, Amp will automatically setup the best available driver, see next section.
`Amp\Loop::set()` can be used to set a custom driver or to reset the driver in tests, as each test should run with a fresh driver instance to achieve test isolation. In case of PHPUnit, you can use a [`TestListener` to reset the event loop](https://github.com/amphp/phpunit-util) automatically after each tests.
## Implementations
Amp offers different event loop implementations based on various backends. All implementations extend `Amp\Loop\Driver`. Each behaves exactly the same way from an external API perspective. The main differences have to do with underlying performance characteristics. The current implementations are listed here:
| Class | Extension |
| ------------------------- | ------------------------------------------------------ |
| `Amp\Loop\NativeDriver` | – |
| `Amp\Loop\EvDriver` | [`pecl/ev`](https://pecl.php.net/package/ev) |
| `Amp\Loop\EventDriver` | [`pecl/event`](https://pecl.php.net/package/event) |
| `Amp\Loop\UvDriver` | [`php-uv`](https://github.com/bwoebi/php-uv) |
It's not important to choose one implementation for your application. Amp will automatically select the best available driver. It's perfectly fine to have one of the extensions in production while relying on the `NativeDriver` locally for development.
If you want to quickly switch implementations during development, e.g. for comparison or testing, you can set the `AMP_LOOP_DRIVER` environment variable to one of the classes. If you use a custom implementation, this only works if the implementation doesn't take any arguments.
## Event Loop as Task Scheduler
The first thing we need to understand to program effectively using an event loop is this:
**The event loop is our task scheduler.**
The event loop controls the program flow as long as it runs. Once we tell the event loop to run it will maintain control until the application errors out, has nothing left to do, or is explicitly stopped.
Consider this very simple example:
```php
This is an advanced low-level API. Most users should use [`amphp/byte-stream`](https://github.com/amphp/byte-stream) instead.
Watchers registered via `Loop::onReadable()` trigger their callbacks in the following situations:
- When data is available to read on the stream under observation
- When the stream is at EOF (for sockets, this means the connection is broken)
A common usage pattern for reacting to readable data looks something like this example:
```php
You should always read a multiple of the configured chunk size (default: 8192), otherwise your code might not work as expected with loop backends other than `stream_select()`, see [amphp/amp#65](https://github.com/amphp/amp/issues/65) for more information.
### `onWritable()`
{:.note}
> This is an advanced low-level API. Most users should use [`amphp/byte-stream`](https://github.com/amphp/byte-stream) instead.
- Streams are essentially *"always"* writable. The only time they aren't is when their respective write buffers are full.
A common usage pattern for reacting to writability involves initializing a writability watcher without enabling it when a client first connects to a server. Once incomplete writes occur we're then able to "unpause" the write watcher using `Loop::enable()` until data is fully sent without having to create and cancel new watcher resources on the same stream multiple times.
## Pausing, Resuming and Canceling Watchers
All watchers, regardless of type, can be temporarily disabled and enabled in addition to being cleared via `Loop::cancel()`. This allows for advanced capabilities such as disabling the acceptance of new socket clients in server applications when simultaneity limits are reached. In general, the performance characteristics of watcher reuse via pause/resume are favorable by comparison to repeatedly canceling and re-registering watchers.
### `disable()`
A simple disable example:
```php
socket = $sock;
$readWatcher = Loop::onReadable($sock, function () use ($client) {
$this->onReadable($client);
});
$writeWatcher = Loop::onWritable($sock, function () use ($client) {
$this->doWrite($client);
});
Loop::disable($writeWatcher); // <-- let's initialize the watcher as "disabled"
$client->readWatcher = $readWatcher;
$client->writeWatcher = $writeWatcher;
$this->clients[$socketId] = $client;
}
// ... other class implementation details here ...
private function writeToClient($client, $data) {
$client->writeBuffer .= $data;
$this->doWrite($client);
}
private function doWrite(ClientStruct $client) {
$bytesToWrite = strlen($client->writeBuffer);
$bytesWritten = @fwrite($client->socket, $client->writeBuffer);
if ($bytesToWrite === $bytesWritten) {
Loop::disable($client->writeWatcher);
} elseif ($bytesWritten >= 0) {
$client->writeBuffer = substr($client->writeBuffer, $bytesWritten);
Loop::enable($client->writeWatcher);
} elseif ($this->isSocketDead($client->socket)) {
$this->unloadClient($client);
}
}
// ... other class implementation details here ...
}
```
### `cancel()`
It's important to *always* cancel persistent watchers once you're finished with them or you'll create memory leaks in your application. This functionality works in exactly the same way as the above `enable` / `disable` examples:
```php
= 3) {
Loop::cancel($watcherId); // <-- cancel myself!
}
});
```
It is also always safe to cancel a watcher from multiple places. A double-cancel will simply be ignored.
### An Important Note on Writability
Because streams are essentially *"always"* writable you should only enable writability watchers while you have data to send. If you leave these watchers enabled when your application doesn't have anything to write the watcher will trigger endlessly until disabled or canceled. This will max out your CPU. If you're seeing inexplicably high CPU usage in your application it's a good bet you've got a writability watcher that you failed to disable or cancel after you were finished with it.
A standard pattern in this area is to initialize writability watchers in a disabled state before subsequently enabling them at a later time as shown here:
```php
advance()) {
$element = $iterator->getCurrent();
// do something with $element
}
```
## Iterator Creation
### Emitter
What `Deferred` is for promises, is `Emitter` for iterators. A library that returns an `Iterator` for asynchronous consumption of an iterable result creates an `Amp\Emitter` and returns the `Iterator` using `iterate()`. This ensures a consumer can only consume the iterator, but not emit values or complete the iterator.
#### `emit()`
`emit()` emits a new value to the `Iterator`, which can be consumed by a consumer. The emitted value is passed as first argument to `emit()`. `emit()` returns a `Promise` that can be waited on before emitting new values. This allow emitting values just as fast as the consumer can consume them.
#### `complete()`
`complete()` marks the `Emitter` / linked `Iterator` as complete. No further emits are allowed after completing an `Emitter` / `Iterator`.
### Producer
`Producer` is a simplified form of `Emitter` that can be used when a single coroutine can emit all values.
`Producer` accepts a `callable` as first constructor parameter that gets run as a coroutine and passed an `$emit` callable that can be used to emit values just like the `emit()` method does in `Emitter`.
#### Example
```php
$iterator = new Producer(function (callable $emit) {
yield $emit(1);
yield $emit(new Delayed(500, 2));
yield $emit(3);
yield $emit(4);
});
```
### `fromIterable`
Iterators can also be created from ordinary PHP arrays or `Traversable` instances, which is mainly useful in tests, but might also be used for the same reasons as `Success` and `Failure`.
```php
function fromIterable($iterable, int $delay = 0) { ... }
```
`$delay` allows adding a delay between each emission.
docs/iterators/combinators.md 0000644 00000001305 13755316542 0012364 0 ustar 00 ---
layout: docs
title: Iterator Combination
permalink: /iterators/combinators
---
Amp provides two common combination helpers for iterators: `concat` and `merge`.
## `concat()`
`concat()` accepts an array of `Iterator` instances and concatenates the given iterators into a single iterator, emitting values from a single iterator at a time. The prior iterator must complete before values are emitted from any subsequent iterators. Iterators are concatenated in the order given (iteration order of the array).
## `merge()`
`merge()` accepts an array of `Iterator` instances and creates an `Iterator` that emits values emitted from any iterator in the array of iterators ending once all emitters completed.
docs/iterators/transformation.md 0000644 00000001107 13755316542 0013112 0 ustar 00 ---
layout: docs
title: Iterator Transformation
permalink: /iterators/transformation
---
Amp provides two common transformation helpers for iterators: `map` and `filter`.
Further primitives are very easy to implement using `Producer` with those two as examples.
## `map()`
`map()` accepts an `Iterator` and a `callable` `$onEmit` that can transform each value into another value.
## `filter()`
`filter()` accepts an `Iterator` and a `callable` `$filter`. If `$filter($value)` returns `false` the value gets filtered, otherwise the value is retained in the resulting `Iterator`.
docs/promises/README.md 0000644 00000016641 13755316542 0010637 0 ustar 00 ---
layout: docs
title: Promises
permalink: /promises/
---
A `Promise` is an object representing the eventual result of an asynchronous operation.
There are three states:
- **Success**: The promise resolved successfully.
- **Failure**: The promise failed.
- **Pending**: The promise has not been resolved yet.
A successful resolution is like returning a value in synchronous code while failing a promise is like throwing an exception.
Promises are the basic unit of concurrency in asynchronous applications.
In Amp they implement the `Amp\Promise` interface.
These objects should be thought of as placeholders for values or tasks that might not be complete immediately.
Another way to approach asynchronous APIs is using callbacks that are passed when the operation is started.
```php
doSomething(function ($error, $value) {
if ($error) {
/* ... */
} else {
/* ... */
}
});
```
The callback approach has several drawbacks.
- Passing callbacks and doing further actions in them that depend on the result of the first action gets messy really quickly.
- An explicit callback is required as input parameter to the function, and the return value is simply unused. There's no way to use this API without involving a callback.
That's where promises come into play.
They're simple placeholders that are returned and allow a callback (or several callbacks) to be registered.
```php
doSomething()->onResolve(function ($error, $value)) {
if ($error) {
/* ... */
} else {
/* ... */
}
});
```
This doesn't seem a lot better at first sight, we have just moved the callback.
But in fact this enabled a lot.
We can now write helper functions like [`Amp\Promise\all()`](https://amphp.org/amp/promises/combinators#all) which subscribe to several of those placeholders and combine them. We don't have to write any complicated code to combine the results of several callbacks.
But the most important improvement of promises is that they allow writing [coroutines](https://amphp.org/amp/coroutines/), which completely eliminate the need for _any_ callbacks.
Coroutines make use of PHP's generators.
Every time a promise is `yield`ed, the coroutine subscribes to the promise and automatically continues it once the promise resolved.
On successful resolution the coroutine will send the resolution value into the generator using [`Generator::send()`](https://secure.php.net/generator.send).
On failure it will throw the exception into the generator using [`Generator::throw()`](https://secure.php.net/generator.throw).
This allows writing asynchronous code almost like synchronous code.
{:.note}
> Amp's `Promise` interface **does not** conform to the "Thenables" abstraction common in JavaScript promise implementations. Chaining `.then()` calls is a suboptimal method for avoiding callback hell in a world with generator coroutines. Instead, Amp utilizes PHP generators as described above.
>
> However, as ReactPHP is another wide-spread implementation, we also accept any `React\Promise\PromiseInterface` where we accept instances of `Amp\Promise`. In case of custom implementations not implementing `React\Promise\PromiseInterface`, `Amp\Promise\adapt()` can be used to adapt any object having a `then` or `done` method.
## Promise Consumption
```php
interface Promise {
public function onResolve(callable $onResolve);
}
```
In its simplest form the `Amp\Promise` aggregates callbacks for dealing with results once they eventually resolve. While most code will not interact with this API directly thanks to [coroutines](../coroutines/), let's take a quick look at the one simple API method exposed on `Amp\Promise` implementations:
| Parameter | Callback Signature |
| ------------ | ------------------------------------------ |
| `$onResolve` | `function ($error = null, $result = null)` |
`Amp\Promise::onResolve()` accepts an error-first callback. This callback is responsible for reacting to the eventual result represented by the promise placeholder. For example:
```php
onResolve(function (Throwable $error = null, $result = null) {
if ($error) {
printf(
"Something went wrong:\n%s\n",
$error->getMessage()
);
} else {
printf(
"Hurray! Our result is:\n%s\n",
print_r($result, true)
);
}
});
```
Those familiar with JavaScript code generally reflect that the above interface quickly devolves into ["callback hell"](http://callbackhell.com/), and they're correct. We will shortly see how to avoid this problem in the [coroutines](../coroutines/README.md) section.
## Promise Creation
Promises can be created in several different ways. Most code will use [`Amp\call()`](https://amphp.org/amp/coroutines/helpers#call) which takes a function and runs it as coroutine if it returns a `Generator`.
### Success and Failure
Sometimes values are immediately available. This might be due to them being cached, but can also be the case if an interface mandates a promise to be returned to allow for async I/O but the specific implementation always having the result directly available. In these cases `Amp\Success` and `Amp\Failure` can be used to construct an immediately resolved promise. `Amp\Success` accepts a resolution value. `Amp\Failure` accepts an exception as failure reason.
### Deferred
{:.note}
> The `Deferred` API described below is an advanced API that many applications probably don't need. Use [`Amp\call()`](https://amphp.org/amp/coroutines/helpers#call) or [promise combinators](https://amphp.org/amp/promises/combinators) instead where possible.
`Amp\Deferred` is the abstraction responsible for resolving future values once they become available. A library that resolves values asynchronously creates an `Amp\Deferred` and uses it to return an `Amp\Promise` to API consumers. Once the async library determines that the value is ready it resolves the promise held by the API consumer using methods on the linked promisor.
```php
final class Deferred {
public function promise(): Promise;
public function resolve($result = null);
public function fail(Throwable $error);
}
```
#### `promise()`
Returns the corresponding `Promise` instance. `Deferred` and `Promise` are separated, so the consumer of the promise can't fulfill it. You should always return `$deferred->promise()` to API consumers. If you're passing `Deferred` objects around, you're probably doing something wrong.
#### `resolve()`
Resolves the promise with the first parameter as value, otherwise `null`. If a `Amp\Promise` is passed, the resolution will wait until the passed promise has been resolved. Invokes all registered `Promise::onResolve()` callbacks.
#### `fail()`
Makes the promise fail. Invokes all registered `Promise::onResolve()` callbacks with the passed `Throwable` as `$error` argument.
Here's a simple example of an async value producer `asyncMultiply()` creating a deferred and returning the associated promise to its API consumer.
```php
resolve($x * $y);
});
return $deferred->promise();
}
$promise = asyncMultiply(6, 7);
$result = Amp\Promise\wait($promise);
var_dump($result); // int(42)
```
docs/promises/combinators.md 0000644 00000005207 13755316542 0012216 0 ustar 00 ---
layout: docs
title: Promise Combinators
permalink: /promises/combinators
---
Multiple promises can be combined into a single promise using different functions.
## `all()`
`Amp\Promise\all()` combines an array of promise objects into a single promise that will resolve
when all promises in the group resolve. If any one of the `Amp\Promise` instances fails the
combinator's `Promise` will fail. Otherwise the resulting `Promise` succeeds with an array matching
keys from the input array to their resolved values.
The `all()` combinator is extremely powerful because it allows us to concurrently execute many
asynchronous operations at the same time. Let's look at a simple example using the Amp HTTP client
([Artax](https://github.com/amphp/artax)) to retrieve multiple HTTP resources concurrently:
```php
"http://www.google.com",
"news" => "http://news.google.com",
"bing" => "http://www.bing.com",
"yahoo" => "https://www.yahoo.com",
];
try {
// magic combinator sauce to flatten the promise
// array into a single promise.
// yielding an array is an implicit "yield Amp\Promise\all($array)".
$responses = yield array_map(function ($uri) use ($httpClient) {
return $httpClient->request($uri);
}, $uris);
foreach ($responses as $key => $response) {
printf(
"%s | HTTP/%s %d %s\n",
$key,
$response->getProtocolVersion(),
$response->getStatus(),
$response->getReason()
);
}
} catch (Amp\MultiReasonException $e) {
// If any one of the requests fails the combo will fail and
// be thrown back into our generator.
echo $e->getMessage(), "\n";
}
Loop::stop();
});
```
## `some()`
`Amp\Promise\some()` is the same as `all()` except that it tolerates individual failures. As long
as at least one promise in the passed succeeds, the combined promise will succeed. The successful
resolution value is an array of the form `[$arrayOfErrors, $arrayOfValues]`. The individual keys
in the component arrays are preserved from the promise array passed to the functor for evaluation.
## `any()`
`Amp\Promise\any()` is the same as `some()` except that it tolerates all failures. It will succeed even if all promises failed.
## `first()`
`Amp\Promise\first()` resolves with the first successful result. The resulting promise will only fail if all
promises in the group fail or if the promise array is empty.
docs/promises/miscellaneous.md 0000644 00000002631 13755316542 0012537 0 ustar 00 ---
layout: docs
title: Promise Helpers
permalink: /promises/miscellaneous
---
Amp offers some small promise helpers, namely
* `Amp\Promise\rethrow()`
* `Amp\Promise\timeout()`
* `Amp\Promise\wait()`
## `rethrow()`
`rethrow(Amp\Promise|React\Promise\PromiseInterface): void` subscribes to the passed `Promise` and forwards all errors to the event loop. That handler can log these failures or the event loop will stop if no such handler exists.
`rethrow()` is useful whenever you want to fire and forget, but still care about any errors that happen.
## `timeout()`
`timeout(Amp\Promise|React\Promise\PromiseInterface, int $timeout): Amp\Promise` applies a timeout to the passed promise, either resolving with the original value or error reason in case the promise resolves within the timeout period, or otherwise fails the returned promise with an `Amp\TimeoutException`.
Note that `timeout()` does not cancel any operation or frees any resources. If available, use dedicated API options instead, e.g. for socket connect timeouts.
## `wait()`
`wait(Amp\Promise|React\Promise\PromiseInterface): mixed` can be used to synchronously wait for a promise to resolve. It returns the result value or throws an exception in case of an error. `wait()` blocks and calls `Loop::run()` internally. It SHOULD NOT be used in fully asynchronous applications, but only when integrating async APIs into an otherwise synchronous application.
docs/utils/README.md 0000644 00000000505 13755316542 0010126 0 ustar 00 ---
layout: docs
title: Utils
permalink: /utils/
---
This documentation section deals with helpers that are not async specific, but generic helpers.
* [`CallableMaker`](./callable-maker.md)
* [`Struct`](./struct.md)
Further utils for PHPUnit are provided in [`amphp/phpunit-util`](https://github.com/amphp/phpunit-util).
docs/utils/callable-maker.md 0000644 00000001511 13755316542 0012023 0 ustar 00 ---
layout: docs
title: CallableMaker
permalink: /utils/callable-maker
---
`Amp\CallableMaker` is a helper trait that allows creating closures from private / protected static and instance methods in an easy way. Creating such callables might be necessary to register private / protected methods as callbacks in an efficient manner without making those methods public.
This trait should only be used in projects with a PHP 7.0 minimum requirement. If PHP 7.1 or later are the minimum requirement, `Closure::fromCallable` should be used directly.
## `callableFromInstanceMethod()`
Creates a `Closure` form an instance method with the given name and returns it. The closure can be passed around without worrying about the method's visibility.
## `callableFromStaticMethod()`
Same as `callableFromInstanceMethod()`, but for static methods.
docs/utils/struct.md 0000644 00000003662 13755316542 0010524 0 ustar 00 ---
layout: docs
title: Struct
permalink: /utils/struct
---
A struct is a generic computer science term for an object composed of public properties. The `\Amp\Struct` trait
is intended to make using public properties a little safer by throwing an `\Error` when undefined properties
are attempted to be written or read.
PHP allows for dynamic creation of public properties. This can lead to insidious bugs created by typos related to
writing to and reading from public properties. One common solution to this problem is to make all properties private and
provide public setter and getter methods which control access to the underlying properties. However effective this
solution may be, it requires that additional code be written and subsequently tested for the setter and getter methods.
Let's try some examples with anonymous classes to demonstrate the advantages of using the `\Amp\Struct` trait. Running
the following code will not error; although, the typo will likely create some unexpected behavior:
```php
$obj = new class {
public $foo = null;
};
$obj->fooo = "bar";
```
If you were to access the `$foo` property of the `$obj` object after the above code, you might expect the value
to be `"bar"` when it would actually be `NULL`.
When a class uses the `\Amp\Struct` trait, an `\Error` will be thrown when attempting to access a property not defined
in the class definition. For example, the code below will throw an `\Error` with some context that attempts to help
diagnose the issue.
```php
$obj = new class {
use Amp\Struct;
public $foo = null;
};
$obj->fooo = "bar";
```
The message for the thrown `\Error` will be similar to:
*Uncaught Error: class@anonymous@php shell code0x10ee8005b property "fooo" does not exist ... did you mean "foo?"*
Although, an `\Error` being thrown in your code may cause some havoc, it will not allow for unpredictable
behavior caused by the use of properties which are not part of the class definition.
lib/CallableMaker.php 0000644 00000004634 13755316542 0010524 0 ustar 00 getMethod($method);
}
return self::$__reflectionMethods[$method]->getClosure($this);
}
/**
* Creates a callable from a protected or private static method that may be invoked by methods requiring a
* publicly invokable callback.
*
* @param string $method Static method name.
*
* @return callable
*/
private static function callableFromStaticMethod(string $method): callable {
if (!isset(self::$__reflectionMethods[$method])) {
if (self::$__reflectionClass === null) {
self::$__reflectionClass = new \ReflectionClass(self::class);
}
self::$__reflectionMethods[$method] = self::$__reflectionClass->getMethod($method);
}
return self::$__reflectionMethods[$method]->getClosure();
}
}
} else {
trait CallableMaker {
/**
* @deprecated Use \Closure::fromCallable() instead of this method in PHP 7.1.
*/
private function callableFromInstanceMethod(string $method): callable {
return \Closure::fromCallable([$this, $method]);
}
/**
* @deprecated Use \Closure::fromCallable() instead of this method in PHP 7.1.
*/
private static function callableFromStaticMethod(string $method): callable {
return \Closure::fromCallable([self::class, $method]);
}
}
} // @codeCoverageIgnoreEnd
lib/CancellationToken.php 0000644 00000002662 13755316542 0011441 0 ustar 00 getToken();
*
* $response = yield $httpClient->request("https://example.com/stream", $token);
* $responseBody = $response->getBody();
*
* while (($chunk = yield $response->read()) !== null) {
* // consume $chunk
*
* if ($noLongerInterested) {
* $cancellationTokenSource->cancel();
* break;
* }
* }
* ```
*
* @see CancellationToken
* @see CancelledException
*/
final class CancellationTokenSource {
private $token;
private $onCancel;
public function __construct() {
$this->token = new class($this->onCancel) implements CancellationToken {
/** @var string */
private $nextId = "a";
/** @var callable[] */
private $callbacks = [];
/** @var \Throwable|null */
private $exception = null;
public function __construct(&$onCancel) {
$onCancel = function (\Throwable $exception) {
$this->exception = $exception;
$callbacks = $this->callbacks;
$this->callbacks = [];
foreach ($callbacks as $callback) {
$this->invokeCallback($callback);
}
};
}
private function invokeCallback($callback) {
// No type declaration to prevent exception outside the try!
try {
$result = $callback($this->exception);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
public function subscribe(callable $callback): string {
$id = $this->nextId++;
if ($this->exception) {
$this->invokeCallback($callback);
} else {
$this->callbacks[$id] = $callback;
}
return $id;
}
public function unsubscribe(string $id) {
unset($this->callbacks[$id]);
}
public function isRequested(): bool {
return isset($this->exception);
}
public function throwIfRequested() {
if (isset($this->exception)) {
throw $this->exception;
}
}
};
}
public function getToken(): CancellationToken {
return $this->token;
}
/**
* @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException.
*/
public function cancel(\Throwable $previous = null) {
if ($this->onCancel === null) {
return;
}
$onCancel = $this->onCancel;
$this->onCancel = null;
$onCancel(new CancelledException($previous));
}
}
lib/CancelledException.php 0000644 00000000520 13755316542 0011564 0 ustar 00 generator = $generator;
try {
$yielded = $this->generator->current();
if (!$yielded instanceof Promise) {
if (!$this->generator->valid()) {
$this->resolve($this->generator->getReturn());
return;
}
$yielded = $this->transform($yielded);
}
} catch (\Throwable $exception) {
$this->fail($exception);
return;
}
/**
* @param \Throwable|null $exception Exception to be thrown into the generator.
* @param mixed $value Value to be sent into the generator.
*/
$this->onResolve = function ($exception, $value) {
$this->exception = $exception;
$this->value = $value;
if (!$this->immediate) {
$this->immediate = true;
return;
}
try {
do {
if ($this->exception) {
// Throw exception at current execution point.
$yielded = $this->generator->throw($this->exception);
} else {
// Send the new value and execute to next yield statement.
$yielded = $this->generator->send($this->value);
}
if (!$yielded instanceof Promise) {
if (!$this->generator->valid()) {
$this->resolve($this->generator->getReturn());
$this->onResolve = null;
return;
}
$yielded = $this->transform($yielded);
}
$this->immediate = false;
$yielded->onResolve($this->onResolve);
} while ($this->immediate);
$this->immediate = true;
} catch (\Throwable $exception) {
$this->fail($exception);
$this->onResolve = null;
} finally {
$this->exception = null;
$this->value = null;
}
};
$yielded->onResolve($this->onResolve);
}
/**
* Attempts to transform the non-promise yielded from the generator into a promise, otherwise returns an instance
* `Amp\Failure` failed with an instance of `Amp\InvalidYieldError`.
*
* @param mixed $yielded Non-promise yielded from generator.
*
* @return \Amp\Promise
*/
private function transform($yielded): Promise {
try {
if (\is_array($yielded)) {
return Promise\all($yielded);
}
if ($yielded instanceof ReactPromise) {
return Promise\adapt($yielded);
}
// No match, continue to returning Failure below.
} catch (\Throwable $exception) {
// Conversion to promise failed, fall-through to returning Failure below.
}
return new Failure(new InvalidYieldError(
$this->generator,
\sprintf(
"Unexpected yield; Expected an instance of %s or %s or an array of such instances",
Promise::class,
ReactPromise::class
),
$exception ?? null
));
}
}
lib/Deferred.php 0000644 00000002447 13755316542 0007565 0 ustar 00 resolver = new class implements Promise {
use Internal\Placeholder {
resolve as public;
fail as public;
}
};
$this->promise = new Internal\PrivatePromise($this->resolver);
}
/**
* @return \Amp\Promise
*/
public function promise(): Promise {
return $this->promise;
}
/**
* Fulfill the promise with the given value.
*
* @param mixed $value
*/
public function resolve($value = null) {
$this->resolver->resolve($value);
}
/**
* Fails the promise the the given reason.
*
* @param \Throwable $reason
*/
public function fail(\Throwable $reason) {
$this->resolver->fail($reason);
}
}
lib/Delayed.php 0000644 00000002103 13755316542 0007401 0 ustar 00 watcher = Loop::delay($time, function () use ($value) {
$this->resolve($value);
});
}
/**
* References the internal watcher in the event loop, keeping the loop running while this promise is pending.
*/
public function reference() {
Loop::reference($this->watcher);
}
/**
* Unreferences the internal watcher in the event loop, allowing the loop to stop while this promise is pending if
* no other events are pending in the loop.
*/
public function unreference() {
Loop::unreference($this->watcher);
}
}
lib/Emitter.php 0000644 00000003035 13755316542 0007450 0 ustar 00 emitter = new class implements Iterator {
use Internal\Producer {
emit as public;
complete as public;
fail as public;
}
};
$this->iterator = new Internal\PrivateIterator($this->emitter);
}
/**
* @return \Amp\Promise
*/
public function iterate(): Iterator {
return $this->iterator;
}
/**
* Emits a value to the iterator.
*
* @param mixed $value
*
* @return \Amp\Promise
*/
public function emit($value): Promise {
return $this->emitter->emit($value);
}
/**
* Completes the iterator.
*/
public function complete() {
$this->emitter->complete();
}
/**
* Fails the iterator with the given reason.
*
* @param \Throwable $reason
*/
public function fail(\Throwable $reason) {
$this->emitter->fail($reason);
}
}
lib/Failure.php 0000644 00000002067 13755316542 0007432 0 ustar 00 exception = $exception;
}
/**
* {@inheritdoc}
*/
public function onResolve(callable $onResolved) {
try {
$result = $onResolved($this->exception, null);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
}
lib/Internal/Placeholder.php 0000644 00000010234 13755316542 0012034 0 ustar 00 resolved) {
if ($this->result instanceof Promise) {
$this->result->onResolve($onResolved);
return;
}
try {
$result = $onResolved(null, $this->result);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
return;
}
if (null === $this->onResolved) {
$this->onResolved = $onResolved;
return;
}
if (!$this->onResolved instanceof ResolutionQueue) {
$this->onResolved = new ResolutionQueue($this->onResolved);
}
$this->onResolved->push($onResolved);
}
/**
* @param mixed $value
*
* @throws \Error Thrown if the promise has already been resolved.
*/
private function resolve($value = null) {
if ($this->resolved) {
$message = "Promise has already been resolved";
if (isset($this->resolutionTrace)) {
$trace = formatStacktrace($this->resolutionTrace);
$message .= ". Previous resolution trace:\n\n{$trace}\n\n";
} else {
// @codeCoverageIgnoreStart
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
. "for a stacktrace of the previous resolution.";
// @codeCoverageIgnoreEnd
}
throw new \Error($message);
}
\assert((function () {
$env = \getenv("AMP_DEBUG") ?: "0";
if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) {
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}
return true;
})());
if ($value instanceof ReactPromise) {
$value = Promise\adapt($value);
}
$this->resolved = true;
$this->result = $value;
if ($this->onResolved === null) {
return;
}
$onResolved = $this->onResolved;
$this->onResolved = null;
if ($this->result instanceof Promise) {
$this->result->onResolve($onResolved);
return;
}
try {
$result = $onResolved(null, $this->result);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
/**
* @param \Throwable $reason Failure reason.
*/
private function fail(\Throwable $reason) {
$this->resolve(new Failure($reason));
}
}
lib/Internal/PrivateIterator.php 0000644 00000001133 13755316542 0012734 0 ustar 00 iterator = $iterator;
}
public function advance(): Promise {
return $this->iterator->advance();
}
public function getCurrent() {
return $this->iterator->getCurrent();
}
}
lib/Internal/PrivatePromise.php 0000644 00000000775 13755316542 0012574 0 ustar 00 promise = $promise;
}
public function onResolve(callable $onResolved) {
$this->promise->onResolve($onResolved);
}
}
lib/Internal/Producer.php 0000644 00000012325 13755316542 0011400 0 ustar 00 waiting !== null) {
throw new \Error("The prior promise returned must resolve before invoking this method again");
}
if (isset($this->backPressure[$this->position])) {
$future = $this->backPressure[$this->position];
unset($this->values[$this->position], $this->backPressure[$this->position]);
$future->resolve();
}
++$this->position;
if (\array_key_exists($this->position, $this->values)) {
return new Success(true);
}
if ($this->complete) {
return $this->complete;
}
$this->waiting = new Deferred;
return $this->waiting->promise();
}
/**
* {@inheritdoc}
*/
public function getCurrent() {
if (empty($this->values) && $this->complete) {
throw new \Error("The iterator has completed");
}
if (!\array_key_exists($this->position, $this->values)) {
throw new \Error("Promise returned from advance() must resolve before calling this method");
}
return $this->values[$this->position];
}
/**
* Emits a value from the iterator. The returned promise is resolved with the emitted value once all listeners
* have been invoked.
*
* @param mixed $value
*
* @return \Amp\Promise
*
* @throws \Error If the iterator has completed.
*/
private function emit($value): Promise {
if ($this->complete) {
throw new \Error("Iterators cannot emit values after calling complete");
}
if ($value instanceof ReactPromise) {
$value = Promise\adapt($value);
}
if ($value instanceof Promise) {
$deferred = new Deferred;
$value->onResolve(function ($e, $v) use ($deferred) {
if ($this->complete) {
$deferred->fail(
new \Error("The iterator was completed before the promise result could be emitted")
);
return;
}
if ($e) {
$this->fail($e);
$deferred->fail($e);
return;
}
$deferred->resolve($this->emit($v));
});
return $deferred->promise();
}
$this->values[] = $value;
$this->backPressure[] = $pressure = new Deferred;
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve(true);
}
return $pressure->promise();
}
/**
* Completes the iterator.
*
* @throws \Error If the iterator has already been completed.
*/
private function complete() {
if ($this->complete) {
$message = "Iterator has already been completed";
if (isset($this->resolutionTrace)) {
$trace = formatStacktrace($this->resolutionTrace);
$message .= ". Previous completion trace:\n\n{$trace}\n\n";
} else {
// @codeCoverageIgnoreStart
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
. "for a stacktrace of the previous resolution.";
// @codeCoverageIgnoreEnd
}
throw new \Error($message);
}
\assert((function () {
$env = \getenv("AMP_DEBUG") ?: "0";
if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) {
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}
return true;
})());
$this->complete = new Success(false);
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve($this->complete);
}
}
private function fail(\Throwable $exception) {
$this->complete = new Failure($exception);
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve($this->complete);
}
}
}
lib/Internal/ResolutionQueue.php 0000644 00000003463 13755316542 0012770 0 ustar 00 push($callback);
}
}
/**
* Unrolls instances of self to avoid blowing up the call stack on resolution.
*
* @param callable $callback
*/
public function push(callable $callback) {
if ($callback instanceof self) {
$this->queue = \array_merge($this->queue, $callback->queue);
return;
}
$this->queue[] = $callback;
}
/**
* Calls each callback in the queue, passing the provided values to the function.
*
* @param \Throwable|null $exception
* @param mixed $value
*/
public function __invoke($exception, $value) {
foreach ($this->queue as $callback) {
try {
$result = $callback($exception, $value);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
}
}
lib/Internal/functions.php 0000644 00000002430 13755316542 0011621 0 ustar 00 current();
$prefix .= \sprintf(
"; %s yielded at key %s",
\is_object($yielded) ? \get_class($yielded) : \gettype($yielded),
\var_export($generator->key(), true)
);
if (!$generator->valid()) {
parent::__construct($prefix, 0, $previous);
return;
}
$reflGen = new \ReflectionGenerator($generator);
$exeGen = $reflGen->getExecutingGenerator();
if ($isSubgenerator = ($exeGen !== $generator)) {
$reflGen = new \ReflectionGenerator($exeGen);
}
parent::__construct(\sprintf(
"%s on line %s in %s",
$prefix,
$reflGen->getExecutingLine(),
$reflGen->getExecutingFile()
), 0, $previous);
}
}
lib/Iterator.php 0000644 00000001757 13755316542 0007641 0 ustar 00
*
* @throws \Error If the prior promise returned from this method has not resolved.
* @throws \Throwable The exception used to fail the iterator.
*/
public function advance(): Promise;
/**
* Gets the last emitted value or throws an exception if the iterator has completed.
*
* @return mixed Value emitted from the iterator.
*
* @throws \Error If the iterator has resolved or advance() was not called before calling this method.
* @throws \Throwable The exception used to fail the iterator.
*/
public function getCurrent();
}
lib/LazyPromise.php 0000644 00000002123 13755316542 0010312 0 ustar 00 promisor = $promisor;
}
/**
* {@inheritdoc}
*/
public function onResolve(callable $onResolved) {
if ($this->promise === null) {
$provider = $this->promisor;
$this->promisor = null;
$this->promise = call($provider);
}
$this->promise->onResolve($onResolved);
}
}
lib/Loop.php 0000644 00000041055 13755316542 0006754 0 ustar 00 defer($callback);
}
self::$driver->run();
}
/**
* Stop the event loop.
*
* When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls
* to stop MUST be ignored and MUST NOT raise an exception.
*
* @return void
*/
public static function stop() {
self::$driver->stop();
}
/**
* Defer the execution of a callback.
*
* The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be
* preserved when executing the callbacks.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param callable(string $watcherId, mixed $data) $callback The callback to defer. The `$watcherId` will be
* invalidated before the callback call.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function defer(callable $callback, $data = null): string {
return self::$driver->defer($callback, $data);
}
/**
* Delay the execution of a callback.
*
* The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which
* timers expire first, but timers with the same expiration time MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $delay The amount of time, in milliseconds, to delay the execution for.
* @param callable(string $watcherId, mixed $data) $callback The callback to delay. The `$watcherId` will be
* invalidated before the callback call.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function delay(int $delay, callable $callback, $data = null): string {
return self::$driver->delay($delay, $callback, $data);
}
/**
* Repeatedly execute a callback.
*
* The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be
* determined by which timers expire first, but timers with the same expiration time MAY be executed in any order.
* The first execution is scheduled after the first interval period.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $interval The time interval, in milliseconds, to wait between executions.
* @param callable(string $watcherId, mixed $data) $callback The callback to repeat.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function repeat(int $interval, callable $callback, $data = null): string {
return self::$driver->repeat($interval, $callback, $data);
}
/**
* Execute a callback when a stream resource becomes readable or is closed for reading.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid
* resources, but are not required to, due to the high performance impact. Watchers on closed resources are
* therefore undefined behavior.
*
* Multiple watchers on the same stream MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function onReadable($stream, callable $callback, $data = null): string {
return self::$driver->onReadable($stream, $callback, $data);
}
/**
* Execute a callback when a stream resource becomes writable or is closed for writing.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid
* resources, but are not required to, due to the high performance impact. Watchers on closed resources are
* therefore undefined behavior.
*
* Multiple watchers on the same stream MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function onWritable($stream, callable $callback, $data = null): string {
return self::$driver->onWritable($stream, $callback, $data);
}
/**
* Execute a callback when a signal is received.
*
* Warning: Installing the same signal on different instances of this interface is deemed undefined behavior.
* Implementations MAY try to detect this, if possible, but are not required to. This is due to technical
* limitations of the signals being registered globally per process.
*
* Multiple watchers on the same signal MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $signo The signal number to monitor.
* @param callable(string $watcherId, int $signo, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the $data parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*
* @throws UnsupportedFeatureException If signal handling is not supported.
*/
public static function onSignal(int $signo, callable $callback, $data = null): string {
return self::$driver->onSignal($signo, $callback, $data);
}
/**
* Enable a watcher to be active starting in the next tick.
*
* Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before
* the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*
* @throws InvalidWatcherError If the watcher identifier is invalid.
*/
public static function enable(string $watcherId) {
self::$driver->enable($watcherId);
}
/**
* Disable a watcher immediately.
*
* A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer
* watcher isn't executed in this tick.
*
* Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an
* invalid watcher.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public static function disable(string $watcherId) {
if (\PHP_VERSION_ID < 70200 && !isset(self::$driver)) {
// Prior to PHP 7.2, self::$driver may be unset during destruct.
// See https://github.com/amphp/amp/issues/212.
return;
}
self::$driver->disable($watcherId);
}
/**
* Cancel a watcher.
*
* This will detatch the event loop from all resources that are associated to the watcher. After this operation the
* watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public static function cancel(string $watcherId) {
if (\PHP_VERSION_ID < 70200 && !isset(self::$driver)) {
// Prior to PHP 7.2, self::$driver may be unset during destruct.
// See https://github.com/amphp/amp/issues/212.
return;
}
self::$driver->cancel($watcherId);
}
/**
* Reference a watcher.
*
* This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by
* default.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*
* @throws InvalidWatcherError If the watcher identifier is invalid.
*/
public static function reference(string $watcherId) {
self::$driver->reference($watcherId);
}
/**
* Unreference a watcher.
*
* The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers
* are all referenced by default.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public static function unreference(string $watcherId) {
if (\PHP_VERSION_ID < 70200 && !isset(self::$driver)) {
// Prior to PHP 7.2, self::$driver may be unset during destruct.
// See https://github.com/amphp/amp/issues/212.
return;
}
self::$driver->unreference($watcherId);
}
/**
* Stores information in the loop bound registry.
*
* Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages
* MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key.
*
* If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated
* interface for that purpose instead of sharing the storage key.
*
* @param string $key The namespaced storage key.
* @param mixed $value The value to be stored.
*
* @return void
*/
public static function setState(string $key, $value) {
self::$driver->setState($key, $value);
}
/**
* Gets information stored bound to the loop.
*
* Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages
* MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key.
*
* If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated
* interface for that purpose instead of sharing the storage key.
*
* @param string $key The namespaced storage key.
*
* @return mixed The previously stored value or `null` if it doesn't exist.
*/
public static function getState(string $key) {
return self::$driver->getState($key);
}
/**
* Set a callback to be executed when an error occurs.
*
* The callback receives the error as the first and only parameter. The return value of the callback gets ignored.
* If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation
* MUST be thrown into the `run` loop and stop the driver.
*
* Subsequent calls to this method will overwrite the previous handler.
*
* @param callable(\Throwable $error)|null $callback The callback to execute. `null` will clear the
* current handler.
*
* @return callable(\Throwable $error)|null The previous handler, `null` if there was none.
*/
public static function setErrorHandler(callable $callback = null) {
return self::$driver->setErrorHandler($callback);
}
/**
* Retrieve an associative array of information about the event loop driver.
*
* The returned array MUST contain the following data describing the driver's currently registered watchers:
*
* [
* "defer" => ["enabled" => int, "disabled" => int],
* "delay" => ["enabled" => int, "disabled" => int],
* "repeat" => ["enabled" => int, "disabled" => int],
* "on_readable" => ["enabled" => int, "disabled" => int],
* "on_writable" => ["enabled" => int, "disabled" => int],
* "on_signal" => ["enabled" => int, "disabled" => int],
* "enabled_watchers" => ["referenced" => int, "unreferenced" => int],
* "running" => bool
* ];
*
* Implementations MAY optionally add more information in the array but at minimum the above `key => value` format
* MUST always be provided.
*
* @return array Statistics about the loop in the described format.
*/
public static function getInfo(): array {
return self::$driver->getInfo();
}
/**
* Retrieve the event loop driver that is in scope.
*
* @return Driver
*/
public static function get(): Driver {
return self::$driver;
}
}
// Default factory, don't move this to a file loaded by the composer "files" autoload mechanism, otherwise custom
// implementations might have issues setting a default loop, because it's overridden by us then.
// @codeCoverageIgnoreStart
Loop::set((new DriverFactory)->create());
// @codeCoverageIgnoreEnd
lib/Loop/Driver.php 0000644 00000060743 13755316542 0010214 0 ustar 00 running = true;
try {
while ($this->running) {
if ($this->isEmpty()) {
return;
}
$this->tick();
}
} finally {
$this->stop();
}
}
/**
* @return bool True if no enabled and referenced watchers remain in the loop.
*/
private function isEmpty() {
foreach ($this->watchers as $watcher) {
if ($watcher->enabled && $watcher->referenced) {
return false;
}
}
return true;
}
/**
* Executes a single tick of the event loop.
*/
private function tick() {
if (empty($this->deferQueue)) {
$this->deferQueue = $this->nextTickQueue;
} else {
$this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue);
}
$this->nextTickQueue = [];
$this->activate($this->enableQueue);
$this->enableQueue = [];
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running && !$this->isEmpty());
}
/**
* Activates (enables) all the given watchers.
*
* @param \Amp\Loop\Watcher[] $watchers
*/
abstract protected function activate(array $watchers);
/**
* Dispatches any pending read/write, timer, and signal events.
*
* @param bool $blocking
*/
abstract protected function dispatch(bool $blocking);
/**
* Stop the event loop.
*
* When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls
* to stop MUST be ignored and MUST NOT raise an exception.
*
* @return void
*/
public function stop() {
$this->running = false;
}
/**
* Defer the execution of a callback.
*
* The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be
* preserved when executing the callbacks.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param callable (string $watcherId, mixed $data) $callback The callback to defer. The `$watcherId` will be
* invalidated before the callback call.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public function defer(callable $callback, $data = null): string {
$watcher = new Watcher;
$watcher->type = Watcher::DEFER;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->nextTickQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Delay the execution of a callback.
*
* The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which
* timers expire first, but timers with the same expiration time MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $delay The amount of time, in milliseconds, to delay the execution for.
* @param callable (string $watcherId, mixed $data) $callback The callback to delay. The `$watcherId` will be
* invalidated before the callback call.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public function delay(int $delay, callable $callback, $data = null): string {
if ($delay < 0) {
throw new \Error("Delay must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::DELAY;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $delay;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Repeatedly execute a callback.
*
* The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be
* determined by which timers expire first, but timers with the same expiration time MAY be executed in any order.
* The first execution is scheduled after the first interval period.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $interval The time interval, in milliseconds, to wait between executions.
* @param callable (string $watcherId, mixed $data) $callback The callback to repeat.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public function repeat(int $interval, callable $callback, $data = null): string {
if ($interval < 0) {
throw new \Error("Interval must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::REPEAT;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $interval;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Execute a callback when a stream resource becomes readable or is closed for reading.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid
* resources, but are not required to, due to the high performance impact. Watchers on closed resources are
* therefore undefined behavior.
*
* Multiple watchers on the same stream MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param callable (string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public function onReadable($stream, callable $callback, $data = null): string {
$watcher = new Watcher;
$watcher->type = Watcher::READABLE;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Execute a callback when a stream resource becomes writable or is closed for writing.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid
* resources, but are not required to, due to the high performance impact. Watchers on closed resources are
* therefore undefined behavior.
*
* Multiple watchers on the same stream MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param callable (string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the `$data` parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public function onWritable($stream, callable $callback, $data = null): string {
$watcher = new Watcher;
$watcher->type = Watcher::WRITABLE;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Execute a callback when a signal is received.
*
* Warning: Installing the same signal on different instances of this interface is deemed undefined behavior.
* Implementations MAY try to detect this, if possible, but are not required to. This is due to technical
* limitations of the signals being registered globally per process.
*
* Multiple watchers on the same signal MAY be executed in any order.
*
* The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param int $signo The signal number to monitor.
* @param callable (string $watcherId, int $signo, mixed $data) $callback The callback to execute.
* @param mixed $data Arbitrary data given to the callback function as the $data parameter.
*
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*
* @throws UnsupportedFeatureException If signal handling is not supported.
*/
public function onSignal(int $signo, callable $callback, $data = null): string {
$watcher = new Watcher;
$watcher->type = Watcher::SIGNAL;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $signo;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* Enable a watcher to be active starting in the next tick.
*
* Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before
* the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*
* @throws InvalidWatcherError If the watcher identifier is invalid.
*/
public function enable(string $watcherId) {
if (!isset($this->watchers[$watcherId])) {
throw new InvalidWatcherError($watcherId, "Cannot enable an invalid watcher identifier: '{$watcherId}'");
}
$watcher = $this->watchers[$watcherId];
if ($watcher->enabled) {
return; // Watcher already enabled.
}
$watcher->enabled = true;
switch ($watcher->type) {
case Watcher::DEFER:
$this->nextTickQueue[$watcher->id] = $watcher;
break;
default:
$this->enableQueue[$watcher->id] = $watcher;
break;
}
}
/**
* Cancel a watcher.
*
* This will detatch the event loop from all resources that are associated to the watcher. After this operation the
* watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public function cancel(string $watcherId) {
$this->disable($watcherId);
unset($this->watchers[$watcherId]);
}
/**
* Disable a watcher immediately.
*
* A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer
* watcher isn't executed in this tick.
*
* Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an
* invalid watcher.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public function disable(string $watcherId) {
if (!isset($this->watchers[$watcherId])) {
return;
}
$watcher = $this->watchers[$watcherId];
if (!$watcher->enabled) {
return; // Watcher already disabled.
}
$watcher->enabled = false;
$id = $watcher->id;
switch ($watcher->type) {
case Watcher::DEFER:
if (isset($this->nextTickQueue[$id])) {
// Watcher was only queued to be enabled.
unset($this->nextTickQueue[$id]);
} else {
unset($this->deferQueue[$id]);
}
break;
default:
if (isset($this->enableQueue[$id])) {
// Watcher was only queued to be enabled.
unset($this->enableQueue[$id]);
} else {
$this->deactivate($watcher);
}
break;
}
}
/**
* Deactivates (disables) the given watcher.
*
* @param \Amp\Loop\Watcher $watcher
*/
abstract protected function deactivate(Watcher $watcher);
/**
* Reference a watcher.
*
* This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by
* default.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*
* @throws InvalidWatcherError If the watcher identifier is invalid.
*/
public function reference(string $watcherId) {
if (!isset($this->watchers[$watcherId])) {
throw new InvalidWatcherError($watcherId, "Cannot reference an invalid watcher identifier: '{$watcherId}'");
}
$this->watchers[$watcherId]->referenced = true;
}
/**
* Unreference a watcher.
*
* The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers
* are all referenced by default.
*
* @param string $watcherId The watcher identifier.
*
* @return void
*/
public function unreference(string $watcherId) {
if (!isset($this->watchers[$watcherId])) {
return;
}
$this->watchers[$watcherId]->referenced = false;
}
/**
* Stores information in the loop bound registry.
*
* Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages
* MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key.
*
* If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated
* interface for that purpose instead of sharing the storage key.
*
* @param string $key The namespaced storage key.
* @param mixed $value The value to be stored.
*
* @return void
*/
final public function setState(string $key, $value) {
if ($value === null) {
unset($this->registry[$key]);
} else {
$this->registry[$key] = $value;
}
}
/**
* Gets information stored bound to the loop.
*
* Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages
* MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key.
*
* If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated
* interface for that purpose instead of sharing the storage key.
*
* @param string $key The namespaced storage key.
*
* @return mixed The previously stored value or `null` if it doesn't exist.
*/
final public function getState(string $key) {
return isset($this->registry[$key]) ? $this->registry[$key] : null;
}
/**
* Set a callback to be executed when an error occurs.
*
* The callback receives the error as the first and only parameter. The return value of the callback gets ignored.
* If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation
* MUST be thrown into the `run` loop and stop the driver.
*
* Subsequent calls to this method will overwrite the previous handler.
*
* @param callable (\Throwable|\Exception $error)|null $callback The callback to execute. `null` will clear the
* current handler.
*
* @return callable(\Throwable|\Exception $error)|null The previous handler, `null` if there was none.
*/
public function setErrorHandler(callable $callback = null) {
$previous = $this->errorHandler;
$this->errorHandler = $callback;
return $previous;
}
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from a watcher callback.
*
* @throws \Throwable If no error handler has been set.
*/
protected function error(\Throwable $exception) {
if ($this->errorHandler === null) {
throw $exception;
}
($this->errorHandler)($exception);
}
/**
* Get the underlying loop handle.
*
* Example: the `uv_loop` resource for `libuv` or the `EvLoop` object for `libev` or `null` for a native driver.
*
* Note: This function is *not* exposed in the `Loop` class. Users shall access it directly on the respective loop
* instance.
*
* @return null|object|resource The loop handle the event loop operates on. `null` if there is none.
*/
abstract public function getHandle();
/**
* Returns the same array of data as getInfo().
*
* @return array
*/
public function __debugInfo() {
// @codeCoverageIgnoreStart
return $this->getInfo();
// @codeCoverageIgnoreEnd
}
/**
* Retrieve an associative array of information about the event loop driver.
*
* The returned array MUST contain the following data describing the driver's currently registered watchers:
*
* [
* "defer" => ["enabled" => int, "disabled" => int],
* "delay" => ["enabled" => int, "disabled" => int],
* "repeat" => ["enabled" => int, "disabled" => int],
* "on_readable" => ["enabled" => int, "disabled" => int],
* "on_writable" => ["enabled" => int, "disabled" => int],
* "on_signal" => ["enabled" => int, "disabled" => int],
* "enabled_watchers" => ["referenced" => int, "unreferenced" => int],
* "running" => bool
* ];
*
* Implementations MAY optionally add more information in the array but at minimum the above `key => value` format
* MUST always be provided.
*
* @return array Statistics about the loop in the described format.
*/
public function getInfo(): array {
$watchers = [
"referenced" => 0,
"unreferenced" => 0,
];
$defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [
"enabled" => 0,
"disabled" => 0,
];
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
$array = &$onReadable;
break;
case Watcher::WRITABLE:
$array = &$onWritable;
break;
case Watcher::SIGNAL:
$array = &$onSignal;
break;
case Watcher::DEFER:
$array = &$defer;
break;
case Watcher::DELAY:
$array = &$delay;
break;
case Watcher::REPEAT:
$array = &$repeat;
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
if ($watcher->enabled) {
++$array["enabled"];
if ($watcher->referenced) {
++$watchers["referenced"];
} else {
++$watchers["unreferenced"];
}
} else {
++$array["disabled"];
}
}
return [
"enabled_watchers" => $watchers,
"defer" => $defer,
"delay" => $delay,
"repeat" => $repeat,
"on_readable" => $onReadable,
"on_writable" => $onWritable,
"on_signal" => $onSignal,
"running" => (bool) $this->running,
];
}
}
lib/Loop/DriverFactory.php 0000644 00000002471 13755316542 0011536 0 ustar 00 createDriverFromEnv()) {
return $driver;
}
if (UvDriver::isSupported()) {
return new UvDriver;
}
if (EvDriver::isSupported()) {
return new EvDriver;
}
if (EventDriver::isSupported()) {
return new EventDriver;
}
return new NativeDriver;
}
private function createDriverFromEnv() {
$driver = \getenv("AMP_LOOP_DRIVER");
if (!$driver) {
return null;
}
if (!\class_exists($driver)) {
throw new \Error(\sprintf(
"Driver '%s' does not exist.",
$driver
));
}
if (!\is_subclass_of($driver, Driver::class)) {
throw new \Error(\sprintf(
"Driver '%s' is not a subclass of '%s'.",
$driver,
Driver::class
));
}
return new $driver;
}
}
// @codeCoverageIgnoreEnd
lib/Loop/EvDriver.php 0000644 00000015751 13755316542 0010506 0 ustar 00 handle = new \EvLoop;
if (self::$activeSignals === null) {
self::$activeSignals = &$this->signals;
}
$this->ioCallback = function (\EvIO $event) {
/** @var \Amp\Loop\Watcher $watcher */
$watcher = $event->data;
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->timerCallback = function (\EvTimer $event) {
/** @var \Amp\Loop\Watcher $watcher */
$watcher = $event->data;
if ($watcher->type & Watcher::DELAY) {
$this->cancel($watcher->id);
} elseif ($watcher->value === 0) {
// Disable and re-enable so it's not executed repeatedly in the same tick
// See https://github.com/amphp/amp/issues/131
$this->disable($watcher->id);
$this->enable($watcher->id);
}
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function (\EvSignal $event) {
/** @var \Amp\Loop\Watcher $watcher */
$watcher = $event->data;
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $watcherId) {
parent::cancel($watcherId);
unset($this->events[$watcherId]);
}
public static function isSupported(): bool {
return \extension_loaded("ev");
}
public function __destruct() {
foreach ($this->events as $event) {
if ($event !== null) { // Events may have been nulled in extension depending on destruct order.
$event->stop();
}
}
// We need to clear all references to events manually, see
// https://bitbucket.org/osmanov/pecl-ev/issues/31/segfault-in-ev_timer_stop
$this->events = [];
}
/**
* {@inheritdoc}
*/
public function run() {
$active = self::$activeSignals;
foreach ($active as $event) {
$event->stop();
}
self::$activeSignals = &$this->signals;
foreach ($this->signals as $event) {
$event->start();
}
try {
parent::run();
} finally {
foreach ($this->signals as $event) {
$event->stop();
}
self::$activeSignals = &$active;
foreach ($active as $event) {
$event->start();
}
}
}
/**
* {@inheritdoc}
*/
public function stop() {
$this->handle->stop();
parent::stop();
}
/**
* {@inheritdoc}
*/
public function getHandle(): \EvLoop {
return $this->handle;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking) {
$this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT);
}
/**
* {@inheritdoc}
*/
protected function activate(array $watchers) {
foreach ($watchers as $watcher) {
if (!isset($this->events[$id = $watcher->id])) {
switch ($watcher->type) {
case Watcher::READABLE:
$this->events[$id] = $this->handle->io($watcher->value, \Ev::READ, $this->ioCallback, $watcher);
break;
case Watcher::WRITABLE:
$this->events[$id] = $this->handle->io($watcher->value, \Ev::WRITE, $this->ioCallback, $watcher);
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$interval = $watcher->value / self::MILLISEC_PER_SEC;
$this->events[$id] = $this->handle->timer(
$interval,
$watcher->type & Watcher::REPEAT ? $interval : 0,
$this->timerCallback,
$watcher
);
break;
case Watcher::SIGNAL:
$this->events[$id] = $this->handle->signal($watcher->value, $this->signalCallback, $watcher);
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
} else {
$this->events[$id]->start();
}
if ($watcher->type === Watcher::SIGNAL) {
$this->signals[$id] = $this->events[$id];
}
}
}
/**
* {@inheritdoc}
*/
protected function deactivate(Watcher $watcher) {
if (isset($this->events[$id = $watcher->id])) {
$this->events[$id]->stop();
if ($watcher->type === Watcher::SIGNAL) {
unset($this->signals[$id]);
}
}
}
}
lib/Loop/EventDriver.php 0000644 00000020235 13755316542 0011206 0 ustar 00 handle = new \EventBase;
$this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
if (self::$activeSignals === null) {
self::$activeSignals = &$this->signals;
}
$this->ioCallback = function ($resource, $what, Watcher $watcher) {
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->timerCallback = function ($resource, $what, Watcher $watcher) {
if ($watcher->type & Watcher::DELAY) {
$this->cancel($watcher->id);
} else {
$this->events[$watcher->id]->add($watcher->value / self::MILLISEC_PER_SEC);
}
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function ($signum, $what, Watcher $watcher) {
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $watcherId) {
parent::cancel($watcherId);
if (isset($this->events[$watcherId])) {
$this->events[$watcherId]->free();
unset($this->events[$watcherId]);
}
}
public static function isSupported(): bool {
return \extension_loaded("event");
}
/**
* @codeCoverageIgnore
*/
public function __destruct() {
foreach ($this->events as $event) {
if ($event !== null) { // Events may have been nulled in extension depending on destruct order.
$event->free();
}
}
// Unset here, otherwise $event->del() fails with a warning, because __destruct order isn't defined.
// See https://github.com/amphp/amp/issues/159.
$this->events = [];
// Manually free the loop handle to fully release loop resources.
// See https://github.com/amphp/amp/issues/177.
if ($this->handle !== null) {
$this->handle->free();
$this->handle = null;
}
}
/**
* {@inheritdoc}
*/
public function run() {
$active = self::$activeSignals;
foreach ($active as $event) {
$event->del();
}
self::$activeSignals = &$this->signals;
foreach ($this->signals as $event) {
$event->add();
}
try {
parent::run();
} finally {
foreach ($this->signals as $event) {
$event->del();
}
self::$activeSignals = &$active;
foreach ($active as $event) {
$event->add();
}
}
}
/**
* {@inheritdoc}
*/
public function stop() {
$this->handle->stop();
parent::stop();
}
/**
* {@inheritdoc}
*/
public function getHandle(): \EventBase {
return $this->handle;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking) {
$this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK);
$this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
}
/**
* {@inheritdoc}
*/
protected function activate(array $watchers) {
$now = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
foreach ($watchers as $watcher) {
if (!isset($this->events[$id = $watcher->id])) {
switch ($watcher->type) {
case Watcher::READABLE:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::READ | \Event::PERSIST,
$this->ioCallback,
$watcher
);
break;
case Watcher::WRITABLE:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::WRITE | \Event::PERSIST,
$this->ioCallback,
$watcher
);
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$this->events[$id] = new \Event(
$this->handle,
-1,
\Event::TIMEOUT,
$this->timerCallback,
$watcher
);
break;
case Watcher::SIGNAL:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::SIGNAL | \Event::PERSIST,
$this->signalCallback,
$watcher
);
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
switch ($watcher->type) {
case Watcher::DELAY:
case Watcher::REPEAT:
$interval = $watcher->value - ($now - $this->now);
$this->events[$id]->add($interval > 0 ? $interval / self::MILLISEC_PER_SEC : 0);
break;
case Watcher::SIGNAL:
$this->signals[$id] = $this->events[$id];
// no break
default:
$this->events[$id]->add();
break;
}
}
}
/**
* {@inheritdoc}
*/
protected function deactivate(Watcher $watcher) {
if (isset($this->events[$id = $watcher->id])) {
$this->events[$id]->del();
if ($watcher->type === Watcher::SIGNAL) {
unset($this->signals[$id]);
}
}
}
}
lib/Loop/InvalidWatcherError.php 0000644 00000001427 13755316542 0012671 0 ustar 00 watcherId = $watcherId;
parent::__construct($message);
}
/**
* @return string The watcher identifier.
*/
public function getWatcherId() {
return $this->watcherId;
}
}
lib/Loop/NativeDriver.php 0000644 00000030606 13755316542 0011356 0 ustar 00 timerQueue = new \SplPriorityQueue();
$this->signalHandling = \extension_loaded("pcntl");
$this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
}
/**
* {@inheritdoc}
*
* @throws \Amp\Loop\UnsupportedFeatureException If the pcntl extension is not available.
*/
public function onSignal(int $signo, callable $callback, $data = null): string {
if (!$this->signalHandling) {
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
}
return parent::onSignal($signo, $callback, $data);
}
/**
* {@inheritdoc}
*/
public function getHandle() {
return null;
}
protected function dispatch(bool $blocking) {
$this->selectStreams(
$this->readStreams,
$this->writeStreams,
$blocking ? $this->getTimeout() : 0
);
if (!empty($this->timerExpires)) {
$scheduleQueue = [];
try {
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->top();
$id = $watcher->id;
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
if ($this->timerExpires[$id] > $this->now) { // Timer at top of queue has not expired.
break;
}
$this->timerQueue->extract();
if ($watcher->type & Watcher::REPEAT) {
$expiration = $this->now + $watcher->value;
$this->timerExpires[$watcher->id] = $expiration;
$scheduleQueue[] = [$watcher, $expiration];
} else {
$this->cancel($id);
}
try {
// Execute the timer.
$result = ($watcher->callback)($id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
} finally {
foreach ($scheduleQueue as $item) {
$this->timerQueue->insert($item, -$item[1]);
}
}
}
if ($this->signalHandling) {
\pcntl_signal_dispatch();
}
}
/**
* @param resource[] $read
* @param resource[] $write
* @param int $timeout
*/
private function selectStreams(array $read, array $write, $timeout) {
$timeout /= self::MILLISEC_PER_SEC;
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
} else {
$seconds = null;
$microseconds = null;
}
$except = null;
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
if (!($result = @\stream_select($read, $write, $except, $seconds, $microseconds))) {
if ($result === 0) {
return;
}
$error = \error_get_last();
if (\strpos($error["message"], "unable to select") !== 0) {
return;
}
$this->error(new \Exception($error["message"]));
}
foreach ($read as $stream) {
$streamId = (int) $stream;
if (!isset($this->readWatchers[$streamId])) {
continue; // All read watchers disabled.
}
foreach ($this->readWatchers[$streamId] as $watcher) {
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (!isset($this->writeWatchers[$streamId])) {
continue; // All write watchers disabled.
}
foreach ($this->writeWatchers[$streamId] as $watcher) {
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
return;
}
if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0.
\usleep($timeout * self::MICROSEC_PER_SEC);
}
}
/**
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
*/
private function getTimeout() {
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->top();
$id = $watcher->id;
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
$expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC);
if ($expiration < 0) {
return 0;
}
return $expiration;
}
return -1;
}
/**
* {@inheritdoc}
*/
protected function activate(array $watchers) {
$now = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
foreach ($watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$expiration = $this->now + $watcher->value;
$this->timerExpires[$watcher->id] = $expiration;
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
break;
case Watcher::SIGNAL:
if (!isset($this->signalWatchers[$watcher->value])) {
if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) {
$message = "Failed to register signal handler";
if ($error = \error_get_last()) {
$message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]);
}
throw new \Error($message);
}
}
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
$this->now = $now;
}
/**
* {@inheritdoc}
*/
protected function deactivate(Watcher $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
break;
case Watcher::DELAY:
case Watcher::REPEAT:
unset($this->timerExpires[$watcher->id]);
break;
case Watcher::SIGNAL:
if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);
if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
/**
* @param int $signo
*/
private function handleSignal(int $signo) {
foreach ($this->signalWatchers[$signo] as $watcher) {
if (!isset($this->signalWatchers[$signo][$watcher->id])) {
continue;
}
try {
$result = ($watcher->callback)($watcher->id, $signo, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
}
lib/Loop/UnsupportedFeatureException.php 0000644 00000000421 13755316542 0014467 0 ustar 00 handle = \uv_loop_new();
$this->ioCallback = function ($event, $status, $events, $resource) {
$watchers = $this->watchers[(int) $event];
switch ($status) {
case 0: // OK
break;
default: // Invoke the callback on errors, as this matches behavior with other loop back-ends.
// Re-enable watcher as libuv disables the watcher on non-zero status.
$flags = 0;
foreach ($this->watchers[(int) $event] as $watcher) {
$flags |= $watcher->enabled ? $watcher->type : 0;
}
\uv_poll_start($event, $flags, $this->ioCallback);
break;
}
foreach ($watchers as $watcher) {
// $events is OR'ed with 4 to trigger watcher if no events are indicated (0) or on UV_DISCONNECT (4).
// http://docs.libuv.org/en/v1.x/poll.html
if (!($watcher->enabled && ($watcher->type & $events || ($events | 4) === 4))) {
continue;
}
try {
$result = ($watcher->callback)($watcher->id, $resource, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
};
$this->timerCallback = function ($event) {
$watcher = $this->watchers[(int) $event];
if ($watcher->type & Watcher::DELAY) {
unset($this->events[$watcher->id], $this->watchers[(int) $event]); // Avoid call to uv_is_active().
$this->cancel($watcher->id); // Remove reference to watcher in parent.
} elseif ($watcher->value === 0) {
// Disable and re-enable so it's not executed repeatedly in the same tick
// See https://github.com/amphp/amp/issues/131
$this->disable($watcher->id);
$this->enable($watcher->id);
}
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function ($event, $signo) {
$watcher = $this->watchers[(int) $event];
try {
$result = ($watcher->callback)($watcher->id, $signo, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $watcherId) {
parent::cancel($watcherId);
if (!isset($this->events[$watcherId])) {
return;
}
$event = $this->events[$watcherId];
$eventId = (int) $event;
if ($this->watchers[$eventId] instanceof Watcher) { // All except IO watchers.
unset($this->watchers[$eventId]);
} else {
$watcher = $this->watchers[$eventId][$watcherId];
unset($this->watchers[$eventId][$watcherId]);
if (empty($this->watchers[$eventId])) {
unset($this->watchers[$eventId], $this->streams[(int) $watcher->value]);
}
}
unset($this->events[$watcherId]);
}
public static function isSupported(): bool {
return \extension_loaded("uv");
}
/**
* {@inheritdoc}
*/
public function getHandle() {
return $this->handle;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking) {
\uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT);
}
/**
* {@inheritdoc}
*/
protected function activate(array $watchers) {
foreach ($watchers as $watcher) {
$id = $watcher->id;
switch ($watcher->type) {
case Watcher::READABLE:
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
if (isset($this->streams[$streamId])) {
$event = $this->streams[$streamId];
} elseif (isset($this->events[$id])) {
$event = $this->streams[$streamId] = $this->events[$id];
} else {
$event = $this->streams[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value);
}
$eventId = (int) $event;
$this->events[$id] = $event;
$this->watchers[$eventId][$id] = $watcher;
$flags = 0;
foreach ($this->watchers[$eventId] as $watcher) {
$flags |= $watcher->enabled ? $watcher->type : 0;
}
\uv_poll_start($event, $flags, $this->ioCallback);
break;
case Watcher::DELAY:
case Watcher::REPEAT:
if (isset($this->events[$id])) {
$event = $this->events[$id];
} else {
$event = $this->events[$id] = \uv_timer_init($this->handle);
}
$this->watchers[(int) $event] = $watcher;
\uv_timer_start(
$event,
$watcher->value,
$watcher->type & Watcher::REPEAT ? $watcher->value : 0,
$this->timerCallback
);
break;
case Watcher::SIGNAL:
if (isset($this->events[$id])) {
$event = $this->events[$id];
} else {
$event = $this->events[$id] = \uv_signal_init($this->handle);
}
$this->watchers[(int) $event] = $watcher;
\uv_signal_start($event, $this->signalCallback, $watcher->value);
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
}
/**
* {@inheritdoc}
*/
protected function deactivate(Watcher $watcher) {
$id = $watcher->id;
if (!isset($this->events[$id])) {
return;
}
$event = $this->events[$id];
if (!\uv_is_active($event)) {
return;
}
switch ($watcher->type) {
case Watcher::READABLE:
case Watcher::WRITABLE:
$flags = 0;
foreach ($this->watchers[(int) $event] as $watcher) {
$flags |= $watcher->enabled ? $watcher->type : 0;
}
if ($flags) {
\uv_poll_start($event, $flags, $this->ioCallback);
} else {
\uv_poll_stop($event);
}
break;
case Watcher::DELAY:
case Watcher::REPEAT:
\uv_timer_stop($event);
break;
case Watcher::SIGNAL:
\uv_signal_stop($event);
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
}
lib/Loop/Watcher.php 0000644 00000001512 13755316542 0010343 0 ustar 00 reasons = $reasons;
}
/**
* @return \Throwable[]
*/
public function getReasons(): array {
return $this->reasons;
}
}
lib/NullCancellationToken.php 0000644 00000001601 13755316542 0012264 0 ustar 00 throwIfRequested();
* }
* ```
*
* potentially multiple times, it allows writing
*
* ```php
* $token = $token ?? new NullCancellationToken;
*
* // ...
*
* $token->throwIfRequested();
* ```
*
* instead.
*/
final class NullCancellationToken implements CancellationToken {
/** @inheritdoc */
public function subscribe(callable $callback): string {
return "null-token";
}
/** @inheritdoc */
public function unsubscribe(string $id) {
// nothing to do
}
/** @inheritdoc */
public function isRequested(): bool {
return false;
}
/** @inheritdoc */
public function throwIfRequested() {
// nothing to do
}
}
lib/Producer.php 0000644 00000001561 13755316542 0007624 0 ustar 00 callableFromInstanceMethod("emit"));
if (!$result instanceof \Generator) {
throw new \Error("The callable did not return a Generator");
}
$coroutine = new Coroutine($result);
$coroutine->onResolve(function ($exception) {
if ($this->complete) {
return;
}
if ($exception) {
$this->fail($exception);
return;
}
$this->complete();
});
}
}
lib/Promise.php 0000644 00000002113 13755316542 0007451 0 ustar 00 generateStructPropertyError($property)
);
}
public function __set(string $property, $value) {
throw new \Error(
$this->generateStructPropertyError($property)
);
}
private function generateStructPropertyError(string $property): string {
$suggestion = $this->suggestPropertyName($property);
$suggestStr = ($suggestion == "") ? "" : " ... did you mean \"{$suggestion}?\"";
return \sprintf(
"%s property \"%s\" does not exist%s",
\str_replace("\0", "@", \get_class($this)), // Handle anonymous class names.
$property,
$suggestStr
);
}
private function suggestPropertyName(string $badProperty): string {
$badProperty = \strtolower($badProperty);
$bestMatch = "";
$bestMatchPercentage = 0;
foreach ($this as $property => $value) {
// Never suggest properties that begin with an underscore
if ($property[0] === "_") {
continue;
}
\similar_text($badProperty, \strtolower($property), $byRefPercentage);
if ($byRefPercentage > $bestMatchPercentage) {
$bestMatchPercentage = $byRefPercentage;
$bestMatch = $property;
}
}
return ($bestMatchPercentage >= $this->__propertySuggestThreshold) ? $bestMatch : "";
}
}
lib/Success.php 0000644 00000002544 13755316542 0007453 0 ustar 00 value = $value;
}
/**
* {@inheritdoc}
*/
public function onResolve(callable $onResolved) {
try {
$result = $onResolved(null, $this->value);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
}
lib/TimeoutCancellationToken.php 0000644 00000002567 13755316542 0013014 0 ustar 00 token = $source->getToken();
$this->watcher = Loop::delay($timeout, static function () use ($source) {
$source->cancel(new TimeoutException);
});
Loop::unreference($this->watcher);
}
/**
* Cancels the delay watcher.
*/
public function __destruct() {
Loop::cancel($this->watcher);
}
/**
* {@inheritdoc}
*/
public function subscribe(callable $callback): string {
return $this->token->subscribe($callback);
}
/**
* {@inheritdoc}
*/
public function unsubscribe(string $id) {
$this->token->unsubscribe($id);
}
/**
* {@inheritdoc}
*/
public function isRequested(): bool {
return $this->token->isRequested();
}
/**
* {@inheritdoc}
*/
public function throwIfRequested() {
$this->token->throwIfRequested();
}
}
lib/TimeoutException.php 0000644 00000000560 13755316542 0011344 0 ustar 00 onResolve(function ($exception) {
if ($exception) {
throw $exception;
}
});
}
/**
* Runs the event loop until the promise is resolved. Should not be called within a running event loop.
*
* Use this function only in synchronous contexts to wait for an asynchronous operation. Use coroutines and yield to
* await promise resolution in a fully asynchronous application instead.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to wait for.
*
* @return mixed Promise success value.
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
* @throws \Error If the event loop stopped without the $promise being resolved.
* @throws \Throwable Promise failure reason.
*/
function wait($promise) {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
}
$resolved = false;
try {
Loop::run(function () use (&$resolved, &$value, &$exception, $promise) {
$promise->onResolve(function ($e, $v) use (&$resolved, &$value, &$exception) {
Loop::stop();
$resolved = true;
$exception = $e;
$value = $v;
});
});
} catch (\Throwable $throwable) {
throw new \Error("Loop exceptionally stopped without resolving the promise", 0, $throwable);
}
if (!$resolved) {
throw new \Error("Loop stopped without resolving the promise");
}
if ($exception) {
throw $exception;
}
return $value;
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the timeout expires before the promise is resolved, the returned promise fails with an instance of
* `Amp\TimeoutException`.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
function timeout($promise, int $timeout): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
}
$deferred = new Deferred;
$watcher = Loop::delay($timeout, function () use (&$deferred) {
$temp = $deferred; // prevent double resolve
$deferred = null;
$temp->fail(new TimeoutException);
});
Loop::unreference($watcher);
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
return $deferred->promise();
}
/**
* Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled,
* callable $onRejected) method to a promise usable by components depending on placeholders implementing
* \AsyncInterop\Promise.
*
* @param object $promise Object with a done() or then() method.
*
* @return \Amp\Promise Promise resolved by the $thenable object.
*
* @throws \Error If the provided object does not have a then() method.
*/
function adapt($promise): Promise {
$deferred = new Deferred;
if (\method_exists($promise, 'done')) {
$promise->done([$deferred, 'resolve'], [$deferred, 'fail']);
} elseif (\method_exists($promise, 'then')) {
$promise->then([$deferred, 'resolve'], [$deferred, 'fail']);
} else {
throw new \Error("Object must have a 'then' or 'done' method");
}
return $deferred->promise();
}
/**
* Returns a promise that is resolved when all promises are resolved. The returned promise will not fail.
* Returned promise succeeds with a two-item array delineating successful and failed promise results,
* with keys identical and corresponding to the original given array.
*
* This function is the same as some() with the notable exception that it will never fail even
* if all promises in the array resolve unsuccessfully.
*
* @param Promise[] $promises
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function any(array $promises): Promise {
return some($promises, 0);
}
/**
* Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned
* promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to
* the array of promises.
*
* @param \Amp\Promise[] $promises Array of only promises.
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function all(array $promises): Promise {
if (empty($promises)) {
return new Success([]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$pending = \count($promises);
$values = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
$values[$key] = null; // add entry to array to preserve order
$promise->onResolve(function ($exception, $value) use (&$deferred, &$values, &$pending, $key) {
if ($pending === 0) {
return;
}
if ($exception) {
$pending = 0;
$deferred->fail($exception);
$deferred = null;
return;
}
$values[$key] = $value;
if (0 === --$pending) {
$deferred->resolve($values);
}
});
}
return $result;
}
/**
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.
*
* @param \Amp\Promise[] $promises Array of only promises.
*
* @return \Amp\Promise
*
* @throws \Error If the array is empty or a non-Promise is in the array.
*/
function first(array $promises): Promise {
if (empty($promises)) {
throw new \Error("No promises provided");
}
$deferred = new Deferred;
$result = $deferred->promise();
$pending = \count($promises);
$exceptions = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
$exceptions[$key] = null; // add entry to array to preserve order
$promise->onResolve(function ($error, $value) use (&$deferred, &$exceptions, &$pending, $key) {
if ($pending === 0) {
return;
}
if (!$error) {
$pending = 0;
$deferred->resolve($value);
$deferred = null;
return;
}
$exceptions[$key] = $error;
if (0 === --$pending) {
$deferred->fail(new MultiReasonException($exceptions));
}
});
}
return $result;
}
/**
* Resolves with a two-item array delineating successful and failed Promise results.
*
* The returned promise will only fail if the given number of required promises fail.
*
* @param \Amp\Promise[] $promises Array of only promises.
* @param int $required Number of promises that must succeed for the returned promise to succeed.
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function some(array $promises, int $required = 1): Promise {
if ($required < 0) {
throw new \Error("Number of promises required must be non-negative");
}
$pending = \count($promises);
if ($required > $pending) {
throw new \Error("Too few promises provided");
}
if (empty($promises)) {
return new Success([[], []]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$values = [];
$exceptions = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
$values[$key] = $exceptions[$key] = null; // add entry to arrays to preserve order
$promise->onResolve(function ($exception, $value) use (
&$values, &$exceptions, &$pending, $key, $required, $deferred
) {
if ($exception) {
$exceptions[$key] = $exception;
unset($values[$key]);
} else {
$values[$key] = $value;
unset($exceptions[$key]);
}
if (0 === --$pending) {
if (\count($values) < $required) {
$deferred->fail(new MultiReasonException($exceptions));
} else {
$deferred->resolve([$exceptions, $values]);
}
}
});
}
return $result;
}
}
namespace Amp\Iterator {
use Amp\Delayed;
use Amp\Emitter;
use Amp\Iterator;
use Amp\Producer;
use Amp\Promise;
use function Amp\coroutine;
use function Amp\Internal\createTypeError;
/**
* Creates an iterator from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the iterator will fail with the same reason.
*
* @param array|\Traversable $iterable Elements to emit.
* @param int $delay Delay between element emissions in milliseconds.
*
* @return \Amp\Iterator
*
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
function fromIterable(/* iterable */ $iterable, int $delay = 0): Iterator {
if (!$iterable instanceof \Traversable && !\is_array($iterable)) {
throw createTypeError(["array", "Traversable"], $iterable);
}
return new Producer(function (callable $emit) use ($iterable, $delay) {
foreach ($iterable as $value) {
if ($delay) {
yield new Delayed($delay);
}
yield $emit($value);
}
});
}
/**
* @param \Amp\Iterator $iterator
* @param callable (mixed $value): mixed $onEmit
*
* @return \Amp\Iterator
*/
function map(Iterator $iterator, callable $onEmit): Iterator {
return new Producer(function (callable $emit) use ($iterator, $onEmit) {
while (yield $iterator->advance()) {
yield $emit($onEmit($iterator->getCurrent()));
}
});
}
/**
* @param \Amp\Iterator $iterator
* @param callable (mixed $value): bool $filter
*
* @return \Amp\Iterator
*/
function filter(Iterator $iterator, callable $filter): Iterator {
return new Producer(function (callable $emit) use ($iterator, $filter) {
while (yield $iterator->advance()) {
if ($filter($iterator->getCurrent())) {
yield $emit($iterator->getCurrent());
}
}
});
}
/**
* Creates an iterator that emits values emitted from any iterator in the array of iterators.
*
* @param \Amp\Iterator[] $iterators
*
* @return \Amp\Iterator
*/
function merge(array $iterators): Iterator {
$emitter = new Emitter;
$result = $emitter->iterate();
$coroutine = coroutine(function (Iterator $iterator) use (&$emitter) {
while ((yield $iterator->advance()) && $emitter !== null) {
yield $emitter->emit($iterator->getCurrent());
}
});
$coroutines = [];
foreach ($iterators as $iterator) {
if (!$iterator instanceof Iterator) {
throw createTypeError([Iterator::class], $iterator);
}
$coroutines[] = $coroutine($iterator);
}
Promise\all($coroutines)->onResolve(function ($exception) use (&$emitter) {
if ($exception) {
$emitter->fail($exception);
$emitter = null;
} else {
$emitter->complete();
}
});
return $result;
}
/**
* Concatenates the given iterators into a single iterator, emitting values from a single iterator at a time. The
* prior iterator must complete before values are emitted from any subsequent iterators. Iterators are concatenated
* in the order given (iteration order of the array).
*
* @param array $iterators
*
* @return \Amp\Iterator
*/
function concat(array $iterators): Iterator {
foreach ($iterators as $iterator) {
if (!$iterator instanceof Iterator) {
throw createTypeError([Iterator::class], $iterator);
}
}
$emitter = new Emitter;
$previous = [];
$promise = Promise\all($previous);
$coroutine = coroutine(function (Iterator $iterator, callable $emit) {
while (yield $iterator->advance()) {
yield $emit($iterator->getCurrent());
}
});
foreach ($iterators as $iterator) {
$emit = coroutine(function ($value) use ($emitter, $promise) {
static $pending = true, $failed = false;
if ($failed) {
return;
}
if ($pending) {
try {
yield $promise;
$pending = false;
} catch (\Throwable $exception) {
$failed = true;
return; // Prior iterator failed.
}
}
yield $emitter->emit($value);
});
$previous[] = $coroutine($iterator, $emit);
$promise = Promise\all($previous);
}
$promise->onResolve(function ($exception) use ($emitter) {
if ($exception) {
$emitter->fail($exception);
return;
}
$emitter->complete();
});
return $emitter->iterate();
}
}
travis/install-ev.sh 0000755 00000000304 13755316542 0010501 0 ustar 00 #!/usr/bin/env bash
curl -LS https://pecl.php.net/get/ev | tar -xz;
pushd ev-*;
phpize;
./configure;
make;
make install;
popd;
echo "extension=ev.so" >> "$(php -r 'echo php_ini_loaded_file();')"; travis/install-event.sh 0000755 00000000462 13755316542 0011215 0 ustar 00 #!/usr/bin/env bash
curl -LS https://pecl.php.net/get/event-2.3.0 | tar -xz \
&& pushd event-* \
&& phpize \
&& ./configure --with-event-core --with-event-extra --with-event-pthreads \
&& make \
&& make install \
&& popd \
&& echo "extension=event.so" >> "$(php -r 'echo php_ini_loaded_file();')";
travis/install-uv.sh 0000755 00000001141 13755316542 0010521 0 ustar 00 #!/usr/bin/env bash
wget https://github.com/libuv/libuv/archive/v1.x.tar.gz -O /tmp/libuv.tar.gz -q &
wget https://github.com/bwoebi/php-uv/archive/master.tar.gz -O /tmp/php-uv.tar.gz -q &
wait
mkdir libuv && tar -xf /tmp/libuv.tar.gz -C libuv --strip-components=1
mkdir php-uv && tar -xf /tmp/php-uv.tar.gz -C php-uv --strip-components=1
pushd libuv;
./autogen.sh
./configure --prefix=$(dirname `pwd`)/libuv-install
make
make install
popd
pushd php-uv
phpize
./configure --with-uv=$(dirname `pwd`)/libuv-install
make
make install
popd
echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')"
docs/.shared 0000777 00000000000 13755316542 0006760 5 ustar 00