.editorconfig 0000644 00000000203 13755316336 0007230 0 ustar 00 root = true [*] end_of_line = lf insert_final_newline = true trim_trailing_whitespace = true indent_style = space charset = utf-8 .gitmodules 0000644 00000000137 13755316336 0006736 0 ustar 00 [submodule "docs/shared"] path = docs/.shared url = https://github.com/amphp/amphp.github.io .php_cs.dist 0000644 00000002563 13755316336 0007005 0 ustar 00 setRiskyAllowed(true) ->setRules([ "@PSR1" => true, "@PSR2" => true, "braces" => [ "allow_single_line_closure" => true, "position_after_functions_and_oop_constructs" => "same", ], "array_syntax" => ["syntax" => "short"], "cast_spaces" => true, "combine_consecutive_unsets" => true, "function_to_constant" => true, "no_multiline_whitespace_before_semicolons" => true, "no_unused_imports" => true, "no_useless_else" => true, "no_useless_return" => true, "no_whitespace_before_comma_in_array" => true, "no_whitespace_in_blank_line" => true, "non_printable_character" => true, "normalize_index_brace" => true, "ordered_imports" => true, "php_unit_construct" => true, "php_unit_dedicate_assert" => true, "php_unit_fqcn_annotation" => true, "phpdoc_summary" => true, "phpdoc_types" => true, "psr4" => true, "return_type_declaration" => ["space_before" => "none"], "short_scalar_cast" => true, "single_blank_line_before_namespace" => true, ]) ->setFinder( PhpCsFixer\Finder::create() ->in(__DIR__ . "/examples") ->in(__DIR__ . "/lib") ->in(__DIR__ . "/test") ); CHANGELOG.md 0000644 00000010350 13755316336 0006370 0 ustar 00 ### 2.0.3 - `Loop::set()` replaces the current driver with a dummy driver for the time of `gc_collect_cycles()` now. This allows cyclic references to be cleaned up properly before the new driver is set. Without such a fix, cyclic references might have been cleaned up later, e.g. cancelling their watcher IDs on the new loop, thereby cancelling the wrong watchers. - Promise combinator functions (`all(), `any()`, `first()`, `some()`) now preserve order of the given `$promises` array argument. ### 2.0.2 - Fixed warnings and timers in `EventDriver`. - Does no longer hide warnings from `stream_select`. ### 2.0.1 - Fixed an issue where the loop blocks even though all watchers are unreferenced. 2.0.0 ----- * `Amp\reactor()` has been replaced with `Amp\Loop::set()` and `Amp\Loop::get()`. * `Amp\driver()` has been replaced with `Amp\Loop\Factory::create()`. * `Amp\tick()` no longer exists and doesn't have a replacement. Ticks are an internal detail. * Functions for creating and managing watchers are now static methods of `Amp\Loop` instead of functions in the `Amp` namespace. * `once()` is now `delay()` and `immediately()` is `defer()`. * Parameter order for `delay()` and `repeat()` has been changed. * `reference()` and `unreference()` have been added. * `Amp\Pause` has been renamed to `Amp\Delayed` and accepts an optional resolution value now. Additionally `reference()` and `unreference()` methods have been added. * Promise accepting functions have been moved to the `Amp\Promise` namespace. * `Amp\Promise\some()` accepts an additional `$required` parameter. * `Amp\call()`, `Amp\asyncCall()`, `Amp\coroutine()` and `Amp\asyncCoroutine()` have been added. * `Amp\resolve()` has been removed, use `Amp\call()` instead. * `Promise::when()` has been renamed to `Promise::onResolve()` * `Promise::watch()` has been removed, use `Amp\Iterator`, [`amphp/byte-stream`](https://github.com/amphp/byte-stream) or a custom implementation that implements `Amp\Promise` instead and provides dedicated APIs to access the previously data shared via the `watch()` mechanism. * `Amp\Iterator`, `Amp\Emitter` and `Amp\Producer` have been added with several functions in the `Amp\Iterator` namespace. * Various other changes. ### 1.2.2 - Fix notice in `NativeReactor` when removing a handle while an event is waiting for it. (Regression fix from 1.1.1) ### 1.2.1 - Fix `uv_run()` potentially exiting earlier than intended, leading to an infinite loop in `UvReactor::run()`. 1.2.0 ----- - `resolve()` now also accepts callables returning generators. ### 1.1.1 - Fix memory leak in `NativeReactor`, retaining an empty array for each stream. - Remove circular references in `UvReactor` to avoid garbage collector calls. 1.1.0 ----- - Add `getExceptions()` method to `CombinatorException` to get an array of all the exceptions (affecting `some()` and `first()`). - Fix `NativeReactor` not ending up in stopped state if primary callback didn't install any events. ### 1.0.8 - Fix `NativeReactor` running a busy loop if no timers are active. Properly block now in NativeReactor inside `stream_select()`. ### 1.0.7 - Several combinator functions could result in a promise already resolved exception in case some values of the array weren't promises. ### 1.0.6 - Fix issue in `NativeReactor` causing `stop()` to be delayed by one second. ### 1.0.5 - Convert general `RuntimeException` to more specific `Amp\CombinatorException`. ### 1.0.4 - Repeat watchers in `LibeventReactor` internally were handled in microsecond intervals instead of milliseconds. ### 1.0.3 - Fix issue in `NativeReactor` capable of causing keep alive counter corruption when a watcher was cancelled inside its own callback. - Fix issue in `UvReactor` with `libuv` >= 1.1.0 causing busy loop with immediates present, but no watchers being triggered. ### 1.0.2 - Fix PHP 7 issue in which top-level `Throwable`s weren't caught in certain coroutine contexts. - Remove error suppression operator on optionally `null` option assignment to avoid spurious `E_NOTICE` output when custom error handlers are used. ### 1.0.1 - Fix bug preventing `UvReactor::tick()` from returning when no events are ready for a single active IO watcher. 1.0.0 ----- - Initial stable API release CONTRIBUTING.md 0000644 00000002332 13755316336 0007011 0 ustar 00 ## Submitting useful bug reports Please search existing issues first to make sure this is not a duplicate. Every issue report has a cost for the developers required to field it; be respectful of others' time and ensure your report isn't spurious prior to submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html). ## Development ideology Truths which we believe to be self-evident: - **It's an asynchronous world.** Be wary of anything that undermines async principles. - **The answer is not more options.** If you feel compelled to expose new preferences to the user it's very possible you've made a wrong turn somewhere. - **There are no power users.** The idea that some users "understand" concepts better than others has proven to be, for the most part, false. If anything, "power users" are more dangerous than the rest, and we should avoid exposing dangerous functionality to them. ## Code style The amphp project adheres to the PSR-2 style guide with the exception that opening braces for classes and methods must appear on the same line as the declaration: https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md LICENSE 0000644 00000002161 13755316336 0005565 0 ustar 00 The MIT License (MIT) Copyright (c) 2015-2017 amphp Copyright (c) 2016 PHP Asynchronous Interoperability Group Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. Makefile 0000644 00000001517 13755316336 0006224 0 ustar 00 PHP_BIN := php COMPOSER_BIN := composer COVERAGE = coverage SRCS = lib test find_php_files = $(shell find $(1) -type f -name "*.php") src = $(foreach d,$(SRCS),$(call find_php_files,$(d))) .PHONY: test test: setup phpunit code-style .PHONY: clean clean: clean-coverage clean-vendor .PHONY: clean-coverage clean-coverage: test ! -e coverage || rm -r coverage .PHONY: clean-vendor clean-vendor: test ! -e vendor || rm -r vendor .PHONY: setup setup: vendor/autoload.php .PHONY: deps-update deps-update: $(COMPOSER_BIN) update .PHONY: phpunit phpunit: setup $(PHP_BIN) vendor/bin/phpunit .PHONY: code-style code-style: setup PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix composer.lock: composer.json $(COMPOSER_BIN) install touch $@ vendor/autoload.php: composer.lock $(COMPOSER_BIN) install touch $@ README.md 0000644 00000004215 13755316336 0006041 0 ustar 00 [![Amp](https://raw.githubusercontent.com/amphp/logo/master/repos/amp.png?v=16-07-2017-18-21-00)](http://amphp.org)
Amp is a non-blocking concurrency framework for PHP. It provides an event loop, promises and streams as a base for asynchronous programming. Promises in combination with generators are used to build coroutines, which allow writing asynchronous code just like synchronous code, without any callbacks. ## Installation This package can be installed as a [Composer](https://getcomposer.org/) dependency. ```bash composer require amphp/amp ``` ## Requirements - PHP 7.0+ ##### Optional Extension Backends Extensions are only needed if your app necessitates a high numbers of concurrent socket connections. - [ev](https://pecl.php.net/package/ev) - [event](https://pecl.php.net/package/event) - [php-uv](https://github.com/bwoebi/php-uv) (experimental fork) ## Documentation Documentation is bundled within this repository in the [`./docs`](./docs) directory. ## Versioning `amphp/amp` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages. | Version | Bug Fixes Until | Security Fixes Until | | ------- | --------------- | -------------------- | | 2.x | TBA | TBA | | 1.x | 2017-12-31 | 2018-12-31 | ## Compatible Packages Compatible packages should use the [`amphp`](https://github.com/search?utf8=%E2%9C%93&q=topic%3Aamphp) topic on GitHub. ## Security If you discover any security related issues, please email [`bobwei9@hotmail.com`](mailto:bobwei9@hotmail.com) or [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker. ## License The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. composer.json 0000644 00000002757 13755316336 0007315 0 ustar 00 { "name": "amphp/amp", "homepage": "http://amphp.org/amp", "description": "A non-blocking concurrency framework for PHP applications.", "keywords": [ "async", "asynchronous", "concurrency", "promise", "awaitable", "future", "non-blocking", "event", "event-loop" ], "license": "MIT", "authors": [ { "name": "Daniel Lowrey", "email": "rdlowrey@php.net" }, { "name": "Aaron Piotrowski", "email": "aaron@trowski.com" }, { "name": "Bob Weinand", "email": "bobwei9@hotmail.com" }, { "name": "Niklas Keller", "email": "me@kelunik.com" } ], "require": { "php": ">=7" }, "require-dev": { "amphp/phpunit-util": "^1", "react/promise": "^2", "friendsofphp/php-cs-fixer": "^2.3", "phpunit/phpunit": "^6.0.9" }, "autoload": { "psr-4": { "Amp\\": "lib" }, "files": [ "lib/functions.php", "lib/Internal/functions.php" ] }, "autoload-dev": { "psr-4": { "Amp\\Test\\": "test" } }, "support": { "issues": "https://github.com/amphp/amp/issues", "irc": "irc://irc.freenode.org/amphp" }, "extra": { "branch-alias": { "dev-master": "2.0.x-dev" } } } docs/Gemfile 0000644 00000000171 13755316336 0007002 0 ustar 00 source "https://rubygems.org" gem "github-pages" gem "kramdown" gem "jekyll-github-metadata" gem "jekyll-relative-links" docs/Gemfile.lock 0000644 00000012340 13755316336 0007732 0 ustar 00 GEM remote: https://rubygems.org/ specs: activesupport (4.2.8) i18n (~> 0.7) minitest (~> 5.1) thread_safe (~> 0.3, >= 0.3.4) tzinfo (~> 1.1) addressable (2.5.1) public_suffix (~> 2.0, >= 2.0.2) coffee-script (2.4.1) coffee-script-source execjs coffee-script-source (1.12.2) colorator (1.1.0) ethon (0.10.1) ffi (>= 1.3.0) execjs (2.7.0) faraday (0.12.1) multipart-post (>= 1.2, < 3) ffi (1.9.18) forwardable-extended (2.6.0) gemoji (3.0.0) github-pages (138) activesupport (= 4.2.8) github-pages-health-check (= 1.3.3) jekyll (= 3.4.3) jekyll-avatar (= 0.4.2) jekyll-coffeescript (= 1.0.1) jekyll-default-layout (= 0.1.4) jekyll-feed (= 0.9.2) jekyll-gist (= 1.4.0) jekyll-github-metadata (= 2.3.1) jekyll-mentions (= 1.2.0) jekyll-optional-front-matter (= 0.1.2) jekyll-paginate (= 1.1.0) jekyll-readme-index (= 0.1.0) jekyll-redirect-from (= 0.12.1) jekyll-relative-links (= 0.4.0) jekyll-sass-converter (= 1.5.0) jekyll-seo-tag (= 2.2.3) jekyll-sitemap (= 1.0.0) jekyll-swiss (= 0.4.0) jekyll-theme-architect (= 0.0.4) jekyll-theme-cayman (= 0.0.4) jekyll-theme-dinky (= 0.0.4) jekyll-theme-hacker (= 0.0.4) jekyll-theme-leap-day (= 0.0.4) jekyll-theme-merlot (= 0.0.4) jekyll-theme-midnight (= 0.0.4) jekyll-theme-minimal (= 0.0.4) jekyll-theme-modernist (= 0.0.4) jekyll-theme-primer (= 0.1.8) jekyll-theme-slate (= 0.0.4) jekyll-theme-tactile (= 0.0.4) jekyll-theme-time-machine (= 0.0.4) jekyll-titles-from-headings (= 0.1.5) jemoji (= 0.8.0) kramdown (= 1.13.2) liquid (= 3.0.6) listen (= 3.0.6) mercenary (~> 0.3) minima (= 2.1.1) rouge (= 1.11.1) terminal-table (~> 1.4) github-pages-health-check (1.3.3) addressable (~> 2.3) net-dns (~> 0.8) octokit (~> 4.0) public_suffix (~> 2.0) typhoeus (~> 0.7) html-pipeline (2.6.0) activesupport (>= 2) nokogiri (>= 1.4) i18n (0.8.1) jekyll (3.4.3) addressable (~> 2.4) colorator (~> 1.0) jekyll-sass-converter (~> 1.0) jekyll-watch (~> 1.1) kramdown (~> 1.3) liquid (~> 3.0) mercenary (~> 0.3.3) pathutil (~> 0.9) rouge (~> 1.7) safe_yaml (~> 1.0) jekyll-avatar (0.4.2) jekyll (~> 3.0) jekyll-coffeescript (1.0.1) coffee-script (~> 2.2) jekyll-default-layout (0.1.4) jekyll (~> 3.0) jekyll-feed (0.9.2) jekyll (~> 3.3) jekyll-gist (1.4.0) octokit (~> 4.2) jekyll-github-metadata (2.3.1) jekyll (~> 3.1) octokit (~> 4.0, != 4.4.0) jekyll-mentions (1.2.0) activesupport (~> 4.0) html-pipeline (~> 2.3) jekyll (~> 3.0) jekyll-optional-front-matter (0.1.2) jekyll (~> 3.0) jekyll-paginate (1.1.0) jekyll-readme-index (0.1.0) jekyll (~> 3.0) jekyll-redirect-from (0.12.1) jekyll (~> 3.3) jekyll-relative-links (0.4.0) jekyll (~> 3.3) jekyll-sass-converter (1.5.0) sass (~> 3.4) jekyll-seo-tag (2.2.3) jekyll (~> 3.3) jekyll-sitemap (1.0.0) jekyll (~> 3.3) jekyll-swiss (0.4.0) jekyll-theme-architect (0.0.4) jekyll (~> 3.3) jekyll-theme-cayman (0.0.4) jekyll (~> 3.3) jekyll-theme-dinky (0.0.4) jekyll (~> 3.3) jekyll-theme-hacker (0.0.4) jekyll (~> 3.3) jekyll-theme-leap-day (0.0.4) jekyll (~> 3.3) jekyll-theme-merlot (0.0.4) jekyll (~> 3.3) jekyll-theme-midnight (0.0.4) jekyll (~> 3.3) jekyll-theme-minimal (0.0.4) jekyll (~> 3.3) jekyll-theme-modernist (0.0.4) jekyll (~> 3.3) jekyll-theme-primer (0.1.8) jekyll (~> 3.3) jekyll-theme-slate (0.0.4) jekyll (~> 3.3) jekyll-theme-tactile (0.0.4) jekyll (~> 3.3) jekyll-theme-time-machine (0.0.4) jekyll (~> 3.3) jekyll-titles-from-headings (0.1.5) jekyll (~> 3.3) jekyll-watch (1.5.0) listen (~> 3.0, < 3.1) jemoji (0.8.0) activesupport (~> 4.0) gemoji (~> 3.0) html-pipeline (~> 2.2) jekyll (>= 3.0) kramdown (1.13.2) liquid (3.0.6) listen (3.0.6) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9.7) mercenary (0.3.6) mini_portile2 (2.1.0) minima (2.1.1) jekyll (~> 3.3) minitest (5.10.2) multipart-post (2.0.0) net-dns (0.8.0) nokogiri (1.7.2) mini_portile2 (~> 2.1.0) octokit (4.7.0) sawyer (~> 0.8.0, >= 0.5.3) pathutil (0.14.0) forwardable-extended (~> 2.6) public_suffix (2.0.5) rb-fsevent (0.9.8) rb-inotify (0.9.8) ffi (>= 0.5.0) rouge (1.11.1) safe_yaml (1.0.4) sass (3.4.24) sawyer (0.8.1) addressable (>= 2.3.5, < 2.6) faraday (~> 0.8, < 1.0) terminal-table (1.8.0) unicode-display_width (~> 1.1, >= 1.1.1) thread_safe (0.3.6) typhoeus (0.8.0) ethon (>= 0.8.0) tzinfo (1.2.3) thread_safe (~> 0.1) unicode-display_width (1.2.1) PLATFORMS ruby DEPENDENCIES github-pages jekyll-github-metadata jekyll-relative-links kramdown BUNDLED WITH 1.15.0 docs/README.md 0000644 00000001120 13755316336 0006761 0 ustar 00 # Documentation This directory contains the documentation for `amphp/amp`. Documentation and code are bundled within a single repository for easier maintenance. Additionally, this preserves the documentation for older versions. ## Reading You can read this documentation either directly on GitHub or on our website. While the website will always contain the latest version, viewing on GitHub also works with older versions. ## Writing Our documentation is built using Jekyll. ``` sudo gem install bundler jekyll ``` ``` bundle install --path vendor/bundle bundle exec jekyll serve ``` docs/_config.yml 0000644 00000000747 13755316336 0007647 0 ustar 00 kramdown: input: GFM toc_levels: 2..3 baseurl: "/amp" layouts_dir: ".shared/layout" includes_dir: ".shared/includes" exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"] safe: true repository: amphp/amp gems: - "jekyll-github-metadata" - "jekyll-relative-links" defaults: - scope: path: "" type: "pages" values: layout: "docs" shared_asset_path: "/amp/asset" navigation: - event-loop - promises - coroutines - iterators - cancellation - utils docs/cancellation/README.md 0000644 00000004072 13755316336 0011426 0 ustar 00 --- layout: docs title: Cancellation permalink: /cancellation/ --- Amp provides primitives to allow the cancellation of operations, namely `CancellationTokenSource` and `CancellationToken`. ```php $tokenSource = new CancellationTokenSource; $promise = asyncRequest("...", $tokenSource->getToken()); Loop::delay(1000, function () use ($tokenSource) { $tokenSource->cancel(); }); $result = yield $promise; ``` Every operation that supports cancellation accepts an instance of `CancellationToken` as (optional) argument. Within a coroutine, `$token->throwIfRequested()` can be used to fail the operation with a `CancelledException`. As `$token` is often an optional parameter and might be `null`, these calls need to be guared with a `if ($token)` or similar check. Instead of doing so, it's often easier to simply set the token to `$token = $token ?? new NullCancellationToken` at the beginning of the method. While `throwIfRequested()` works well within coroutines, some operations might want to subscribe with a callback instead. They can do so using `CancellationToken::subscribe()` to subscribe any cancellation requests that might happen. If the operation consists of any sub-operations that support cancellation, it passes that same `CancellationToken` instance down to these sub-operations. The original caller creates a `CancellationToken` by creating an instance of `CancellationTokenSource` and passing `$cancellationTokenSource->getToken()` to the operation as shown in the above example. Only the original caller has access to the `CancellationTokenSource` and can cancel the operation using `CancellationTokenSource::cancel()`, similar to the way it works with `Deferred` and `Promise`. {:.note} > Cancellations are advisory only. A DNS resolver might ignore cancellation requests after the query has been sent as the response has to be processed anyway and can still be cached. An HTTP client might continue a nearly finished HTTP request to reuse the connection, but might abort a chunked encoding response as it cannot know whether continuing is actually cheaper than aborting. docs/coroutines/README.md 0000644 00000005272 13755316336 0011167 0 ustar 00 --- layout: docs title: Coroutines permalink: /coroutines/ --- Coroutines are interruptible functions. In PHP they can be implemented using [generators](http://php.net/manual/en/language.generators.overview.php). While generators are usually used to implement simple iterators and yielding elements using the `yield` keyword, Amp uses `yield` as interruption points. When a coroutine yields a value, execution of the coroutine is temporarily interrupted, allowing other tasks to be run, such as I/O handlers, timers, or other coroutines. ## Yield Behavior All yields must be one of the following three types: | Yieldable | Description | | --------------| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `Amp\Promise` | Any promise instance may be yielded and control will be returned to the coroutine once the promise resolves. If resolution fails the relevant exception is thrown into the generator and must be handled by the application or it will bubble up. If resolution succeeds the promise's resolved value is sent back into the generator. | | `React\Promise\PromiseInterface` | Same as `Amp\Promise`. Any React promise will automatically be adapted to an Amp promise. | | `array` | Yielding an array of promises combines them implicitly using `Amp\Promise\all()`. An array with elements not being promises will result in an `Amp\InvalidYieldError`. | ## Example ```php socket = $sock; $readWatcher = Loop::onReadable($sock, function () use ($client) { $this->onReadable($client); }); $writeWatcher = Loop::onWritable($sock, function () use ($client) { $this->doWrite($client); }); Loop::disable($writeWatcher); // <-- let's initialize the watcher as "disabled" $client->readWatcher = $readWatcher; $client->writeWatcher = $writeWatcher; $this->clients[$socketId] = $client; } // ... other class implementation details here ... private function writeToClient($client, $data) { $client->writeBuffer .= $data; $this->doWrite($client); } private function doWrite(ClientStruct $client) { $bytesToWrite = strlen($client->writeBuffer); $bytesWritten = @fwrite($client->socket, $client->writeBuffer); if ($bytesToWrite === $bytesWritten) { Loop::disable($client->writeWatcher); } elseif ($bytesWritten >= 0) { $client->writeBuffer = substr($client->writeBuffer, $bytesWritten); Loop::enable($client->writeWatcher); } elseif ($this->isSocketDead($client->socket)) { $this->unloadClient($client); } } // ... other class implementation details here ... } ``` ### `cancel()` It's important to *always* cancel persistent watchers once you're finished with them or you'll create memory leaks in your application. This functionality works in exactly the same way as the above `enable` / `disable` examples: ```php = 3) { Loop::cancel($watcherId); // <-- cancel myself! } }); ``` It is also always safe to cancel a watcher from multiple places. A double-cancel will simply be ignored. ### An Important Note on Writability Because streams are essentially *"always"* writable you should only enable writability watchers while you have data to send. If you leave these watchers enabled when your application doesn't have anything to write the watcher will trigger endlessly until disabled or canceled. This will max out your CPU. If you're seeing inexplicably high CPU usage in your application it's a good bet you've got a writability watcher that you failed to disable or cancel after you were finished with it. A standard pattern in this area is to initialize writability watchers in a disabled state before subsequently enabling them at a later time as shown here: ```php advance()) { $element = $iterator->getCurrent(); // do something with $element } ``` ## Iterator Creation ### Emitter What `Deferred` is for promises, is `Emitter` for iterators. A library that returns an `Iterator` for asynchronous consumption of an iterable result creates an `Amp\Emitter` and returns the `Iterator` using `iterate()`. This ensures a consumer can only consume the iterator, but not emit values or complete the iterator. #### `emit()` `emit()` emits a new value to the `Iterator`, which can be consumed by a consumer. The emitted value is passed as first argument to `emit()`. `emit()` returns a `Promise` that can be waited on before emitting new values. This allow emitting values just as fast as the consumer can consume them. #### `complete()` `complete()` marks the `Emitter` / linked `Iterator` as complete. No further emits are allowed after completing an `Emitter` / `Iterator`. ### Producer `Producer` is a simplified form of `Emitter` that can be used when a single coroutine can emit all values. `Producer` accepts a `callable` as first constructor parameter that gets run as a coroutine and passed an `$emit` callable that can be used to emit values just like the `emit()` method does in `Emitter`. #### Example ```php $iterator = new Producer(function (callable $emit) { yield $emit(1); yield $emit(new Delayed(500, 2)); yield $emit(3); yield $emit(4); }); ``` ### `fromIterable` Iterators can also be created from ordinary PHP arrays or `Traversable` instances, which is mainly useful in tests, but might also be used for the same reasons as `Success` and `Failure`. ```php function fromIterable($iterable, int $delay = 0) { ... } ``` `$delay` allows adding a delay between each emission. docs/iterators/combinators.md 0000644 00000001305 13755316336 0012365 0 ustar 00 --- layout: docs title: Iterator Combination permalink: /iterators/combinators --- Amp provides two common combination helpers for iterators: `concat` and `merge`. ## `concat()` `concat()` accepts an array of `Iterator` instances and concatenates the given iterators into a single iterator, emitting values from a single iterator at a time. The prior iterator must complete before values are emitted from any subsequent iterators. Iterators are concatenated in the order given (iteration order of the array). ## `merge()` `merge()` accepts an array of `Iterator` instances and creates an `Iterator` that emits values emitted from any iterator in the array of iterators ending once all emitters completed. docs/iterators/transformation.md 0000644 00000001107 13755316336 0013113 0 ustar 00 --- layout: docs title: Iterator Transformation permalink: /iterators/transformation --- Amp provides two common transformation helpers for iterators: `map` and `filter`. Further primitives are very easy to implement using `Producer` with those two as examples. ## `map()` `map()` accepts an `Iterator` and a `callable` `$onEmit` that can transform each value into another value. ## `filter()` `filter()` accepts an `Iterator` and a `callable` `$filter`. If `$filter($value)` returns `false` the value gets filtered, otherwise the value is retained in the resulting `Iterator`. docs/promises/README.md 0000644 00000012125 13755316336 0010631 0 ustar 00 --- layout: docs title: Promises permalink: /promises/ --- The basic unit of concurrency in Amp applications is the `Amp\Promise`. These objects should be thought of as placeholders for values or tasks that aren't yet complete. By using placeholders we're able to reason about the results of concurrent operations as if they were already complete variables. {:.note} > Amp's `Promise` interface **does not** conform to the "Thenables" abstraction common in JavaScript promise implementations. Chaining `.then()` calls is a suboptimal method for avoiding callback hell in a world with generator coroutines. Instead, Amp utilizes PHP generators to "synchronize" concurrent task execution. > > However, as ReactPHP is another wide-spread implementation, we also accept any `React\Promise\PromiseInterface` where we accept instances of `Amp\Promise`. In case of custom implementations not implementing `React\Promise\PromiseInterface`, `Amp\Promise\adapt` can be used to adapt any object having a `then` or `done` method. ## Promise Consumption ```php interface Promise { public function onResolve(callable $onResolve); } ``` In its simplest form the `Amp\Promise` aggregates callbacks for dealing with results once they eventually resolve. While most code will not interact with this API directly thanks to [coroutines](../coroutines/), let's take a quick look at the one simple API method exposed on `Amp\Promise` implementations: | Parameter | Callback Signature | | ------------ | ------------------------------------------ | | `$onResolve` | `function ($error = null, $result = null)` | `Amp\Promise::onResolve()` accepts an error-first callback. This callback is responsible for reacting to the eventual result represented by the promise placeholder. For example: ```php onResolve(function (Throwable $error = null, $result = null) { if ($error) { printf( "Something went wrong:\n%s\n", $error->getMessage() ); } else { printf( "Hurray! Our result is:\n%s\n", print_r($result, true) ); } }); ``` Those familiar with JavaScript code generally reflect that the above interface quickly devolves into ["callback hell"](http://callbackhell.com/), and they're correct. We will shortly see how to avoid this problem in the [coroutines](../coroutines/README.md) section. ## Promise Creation `Amp\Deferred` is the abstraction responsible for resolving future values once they become available. A library that resolves values asynchronously creates an `Amp\Deferred` and uses it to return an `Amp\Promise` to API consumers. Once the async library determines that the value is ready it resolves the promise held by the API consumer using methods on the linked promisor. ```php final class Deferred { public function promise(): Promise; public function resolve($result = null); public function fail(Throwable $error); } ``` #### `promise()` Returns the corresponding `Promise` instance. `Deferred` and `Promise` are separated, so the consumer of the promise can't fulfill it. You should always return `$deferred->promise()` to API consumers. If you're passing `Deferred` objects around, you're probably doing something wrong. {:.note} > This separation of concerns is generally a good thing. However, creating two objects instead of one for each fundamental placeholder is a measurable performance penalty. For that reason, this separation only exists if assertions are enabled to ensure the code does what it's expected to do. `Deferred` directly implements `Promise` if assertions are disabled. #### `resolve()` Resolves the promise with the first parameter as value, otherwise `null`. If a `Amp\Promise` is passed, the resolution will wait until the passed promise has been resolved. Invokes all registered `Promise::onResolve()` callbacks. #### `fail()` Makes the promise fail. Invokes all registered `Promise::onResolve()` callbacks with the passed `Throwable` as `$error` argument. Here's a simple example of an async value producer `asyncMultiply()` creating a deferred and returning the associated promise to its API consumer. ```php resolve($x * $y); }); return $deferred->promise(); } $promise = asyncMultiply(6, 7); $result = Amp\Promise\wait($promise); var_dump($result); // int(42) ``` ### Success and Failure Sometimes values are immediately available. This might be due to them being cached, but can also be the case if an interface mandates a promise to be returned to allow for async I/O but the specific implementation always having the result directly available. In these cases `Amp\Success` and `Amp\Failure` can be used to construct an immediately resolved promise. `Amp\Success` accepts a resolution value. `Amp\Failure` accepts an exception as failure reason. docs/promises/combinators.md 0000644 00000005212 13755316336 0012213 0 ustar 00 --- layout: docs title: Promise Combinators permalink: /promises/combinators --- Multiple promises can be combined into a single promise using different functions. ## `all()` `Amp\Promise\all()` combines an array of promise objects into a single promise that will resolve when all promises in the group resolve. If any one of the `Amp\Promise` instances fails the combinator's `Promise` will fail. Otherwise the resulting `Promise` succeeds with an array matching keys from the input array to their resolved values. The `all()` combinator is extremely powerful because it allows us to concurrently execute many asynchronous operations at the same time. Let's look at a simple example using the Amp HTTP client ([Artax](https://github.com/amphp/artax)) to retrieve multiple HTTP resources concurrently: ```php "http://www.google.com", "news" => "http://news.google.com", "bing" => "http://www.bing.com", "yahoo" => "https://www.yahoo.com", ]; try { // magic combinator sauce to flatten the promise // array into a single promise. // yielding an array is an implicit "yield Amp\Promise\all($array)". $responses = yield array_map(function ($uri) use ($httpClient) { return $httpClient->request($uri); }, $uris); foreach ($responses as $key => $response) { printf( "%s | HTTP/%s %d %s\n", $key, $response->getProtocolVersion(), $response->getStatus(), $response->getReason() ); } } catch (Amp\MultiReasonException $e) { // If any one of the requests fails the combo will fail and // be thrown back into our generator. echo $e->getMessage(), "\n"; } Loop::stop(); }); ``` ### `some()` `Amp\Promise\some()` is the same as `all()` except that it tolerates individual failures. As long as at least one promise in the passed succeeds, the combined promise will succeed. The successful resolution value is an array of the form `[$arrayOfErrors, $arrayOfValues]`. The individual keys in the component arrays are preserved from the promise array passed to the functor for evaluation. ### `any()` `Amp\Promise\any()` is the same as `some()` except that it tolerates all failures. It will succeed even if all promises failed. ### `first()` `Amp\Promise\first()` resolves with the first successful result. The resulting promise will only fail if all promises in the group fail or if the promise array is empty. docs/promises/miscellaneous.md 0000644 00000002363 13755316336 0012542 0 ustar 00 --- layout: docs title: Promise Helpers permalink: /promises/miscellaneous --- Amp offers some small promise helpers, namely * `Amp\Promise\rethrow()` * `Amp\Promise\timeout()` * `Amp\Promise\wait()` ## `rethrow()` `rethrow()` subscribes to the passed `Promise` and forwards all errors to the event loop. That handler can log these failures or the event loop will stop if no such handler exists. `rethrow()` is useful whenever you want to fire and forget, but still care about any errors that happen. ## `timeout()` `timeout()` applies a timeout to the passed promise, either resolving with the original value or error reason in case the promise resolves within the timeout period, or otherwise fails the returned promise with an `Amp\TimeoutException`. Note that `timeout()` does not cancel any operation or frees any resources. If available, use dedicated API options instead, e.g. for socket connect timeouts. ## `wait()` `wait()` can be used to synchronously wait for a promise to resolve. It returns the result value or throws an exception in case of an error. `wait()` blocks and calls `Loop::run()` internally. It SHOULD NOT be used in fully asynchronous applications, but only when integrating async APIs into an otherwise synchronous application. docs/utils/README.md 0000644 00000000505 13755316336 0010127 0 ustar 00 --- layout: docs title: Utils permalink: /utils/ --- This documentation section deals with helpers that are not async specific, but generic helpers. * [`CallableMaker`](./callable-maker.md) * [`Struct`](./struct.md) Further utils for PHPUnit are provided in [`amphp/phpunit-util`](https://github.com/amphp/phpunit-util). docs/utils/callable-maker.md 0000644 00000001511 13755316336 0012024 0 ustar 00 --- layout: docs title: CallableMaker permalink: /utils/callable-maker --- `Amp\CallableMaker` is a helper trait that allows creating closures from private / protected static and instance methods in an easy way. Creating such callables might be necessary to register private / protected methods as callbacks in an efficient manner without making those methods public. This trait should only be used in projects with a PHP 7.0 minimum requirement. If PHP 7.1 or later are the minimum requirement, `Closure::fromCallable` should be used directly. ## `callableFromInstanceMethod()` Creates a `Closure` form an instance method with the given name and returns it. The closure can be passed around without worrying about the method's visibility. ## `callableFromStaticMethod()` Same as `callableFromInstanceMethod()`, but for static methods. docs/utils/struct.md 0000644 00000003662 13755316336 0010525 0 ustar 00 --- layout: docs title: Struct permalink: /utils/struct --- A struct is a generic computer science term for an object composed of public properties. The `\Amp\Struct` trait is intended to make using public properties a little safer by throwing an `\Error` when undefined properties are attempted to be written or read. PHP allows for dynamic creation of public properties. This can lead to insidious bugs created by typos related to writing to and reading from public properties. One common solution to this problem is to make all properties private and provide public setter and getter methods which control access to the underlying properties. However effective this solution may be, it requires that additional code be written and subsequently tested for the setter and getter methods. Let's try some examples with anonymous classes to demonstrate the advantages of using the `\Amp\Struct` trait. Running the following code will not error; although, the typo will likely create some unexpected behavior: ```php $obj = new class { public $foo = null; }; $obj->fooo = "bar"; ``` If you were to access the `$foo` property of the `$obj` object after the above code, you might expect the value to be `"bar"` when it would actually be `NULL`. When a class uses the `\Amp\Struct` trait, an `\Error` will be thrown when attempting to access a property not defined in the class definition. For example, the code below will throw an `\Error` with some context that attempts to help diagnose the issue. ```php $obj = new class { use Amp\Struct; public $foo = null; }; $obj->fooo = "bar"; ``` The message for the thrown `\Error` will be similar to: *Uncaught Error: class@anonymous@php shell code0x10ee8005b property "fooo" does not exist ... did you mean "foo?"* Although, an `\Error` being thrown in your code may cause some havoc, it will not allow for unpredictable behavior caused by the use of properties which are not part of the class definition. lib/CallableMaker.php 0000644 00000004634 13755316336 0010525 0 ustar 00 getMethod($method); } return self::$__reflectionMethods[$method]->getClosure($this); } /** * Creates a callable from a protected or private static method that may be invoked by methods requiring a * publicly invokable callback. * * @param string $method Static method name. * * @return callable */ private static function callableFromStaticMethod(string $method): callable { if (!isset(self::$__reflectionMethods[$method])) { if (self::$__reflectionClass === null) { self::$__reflectionClass = new \ReflectionClass(self::class); } self::$__reflectionMethods[$method] = self::$__reflectionClass->getMethod($method); } return self::$__reflectionMethods[$method]->getClosure(); } } } else { trait CallableMaker { /** * @deprecated Use \Closure::fromCallable() instead of this method in PHP 7.1. */ private function callableFromInstanceMethod(string $method): callable { return \Closure::fromCallable([$this, $method]); } /** * @deprecated Use \Closure::fromCallable() instead of this method in PHP 7.1. */ private static function callableFromStaticMethod(string $method): callable { return \Closure::fromCallable([self::class, $method]); } } } // @codeCoverageIgnoreEnd lib/CancellationToken.php 0000644 00000002662 13755316336 0011442 0 ustar 00 getToken(); * * $response = yield $httpClient->request("https://example.com/stream", $token); * $responseBody = $response->getBody(); * * while (($chunk = yield $response->read()) !== null) { * // consume $chunk * * if ($noLongerInterested) { * $cancellationTokenSource->cancel(); * break; * } * } * ``` * * @see CancellationToken * @see CancelledException */ final class CancellationTokenSource { private $token; private $onCancel; public function __construct() { $this->token = new class($this->onCancel) implements CancellationToken { /** @var string */ private $nextId = "a"; /** @var callable[] */ private $callbacks = []; /** @var \Throwable|null */ private $exception = null; public function __construct(&$onCancel) { $onCancel = function (\Throwable $exception) { $this->exception = $exception; $callbacks = $this->callbacks; $this->callbacks = []; foreach ($callbacks as $callback) { $this->invokeCallback($callback); } }; } private function invokeCallback($callback) { // No type declaration to prevent exception outside the try! try { $result = $callback($this->exception); if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } public function subscribe(callable $callback): string { $id = $this->nextId++; if ($this->exception) { $this->invokeCallback($callback); } else { $this->callbacks[$id] = $callback; } return $id; } public function unsubscribe(string $id) { unset($this->callbacks[$id]); } public function isRequested(): bool { return isset($this->exception); } public function throwIfRequested() { if (isset($this->exception)) { throw $this->exception; } } }; } public function getToken(): CancellationToken { return $this->token; } /** * @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException. */ public function cancel(\Throwable $previous = null) { if ($this->onCancel === null) { return; } $onCancel = $this->onCancel; $this->onCancel = null; $onCancel(new CancelledException($previous)); } } lib/CancelledException.php 0000644 00000000520 13755316336 0011565 0 ustar 00 generator = $generator; try { $yielded = $this->generator->current(); if (!$yielded instanceof Promise) { if (!$this->generator->valid()) { $this->resolve($this->generator->getReturn()); return; } $yielded = $this->transform($yielded); } } catch (\Throwable $exception) { $this->fail($exception); return; } /** * @param \Throwable|null $exception Exception to be thrown into the generator. * @param mixed $value Value to be sent into the generator. */ $this->onResolve = function ($exception, $value) { $this->exception = $exception; $this->value = $value; if (!$this->immediate) { $this->immediate = true; return; } try { do { if ($this->exception) { // Throw exception at current execution point. $yielded = $this->generator->throw($this->exception); } else { // Send the new value and execute to next yield statement. $yielded = $this->generator->send($this->value); } if (!$yielded instanceof Promise) { if (!$this->generator->valid()) { $this->resolve($this->generator->getReturn()); $this->onResolve = null; return; } $yielded = $this->transform($yielded); } $this->immediate = false; $yielded->onResolve($this->onResolve); } while ($this->immediate); $this->immediate = true; } catch (\Throwable $exception) { $this->fail($exception); $this->onResolve = null; } finally { $this->exception = null; $this->value = null; } }; $yielded->onResolve($this->onResolve); } /** * Attempts to transform the non-promise yielded from the generator into a promise, otherwise returns an instance * `Amp\Failure` failed with an instance of `Amp\InvalidYieldError`. * * @param mixed $yielded Non-promise yielded from generator. * * @return \Amp\Promise */ private function transform($yielded): Promise { try { if (\is_array($yielded)) { return Promise\all($yielded); } if ($yielded instanceof ReactPromise) { return Promise\adapt($yielded); } // No match, continue to returning Failure below. } catch (\Throwable $exception) { // Conversion to promise failed, fall-through to returning Failure below. } return new Failure(new InvalidYieldError( $this->generator, \sprintf( "Unexpected yield; Expected an instance of %s or %s or an array of such instances", Promise::class, ReactPromise::class ), $exception ?? null )); } } lib/Deferred.php 0000644 00000005012 13755316336 0007555 0 ustar 00 promise = new class($this->resolve, $this->fail) implements Promise { use CallableMaker, Internal\Placeholder; public function __construct(&$resolve, &$fail) { $resolve = $this->callableFromInstanceMethod("resolve"); $fail = $this->callableFromInstanceMethod("fail"); } }; } /** * @return \Amp\Promise */ public function promise(): Promise { return $this->promise; } /** * Fulfill the promise with the given value. * * @param mixed $value */ public function resolve($value = null) { ($this->resolve)($value); } /** * Fails the promise the the given reason. * * @param \Throwable $reason */ public function fail(\Throwable $reason) { ($this->fail)($reason); } } } else { production: // PHP 7 production environment (zend.assertions=0) /** * An optimized version of Deferred for production environments that is itself the promise. Eval is used to * prevent IDEs and other tools from reporting multiple definitions. */ eval('namespace Amp; final class Deferred implements Promise { use Internal\Placeholder { resolve as public; fail as public; } public function promise(): Promise { return $this; } }'); } } catch (\AssertionError $exception) { goto development; // zend.assertions=1 and assert.exception=1, use development definition. } // @codeCoverageIgnoreEnd lib/Delayed.php 0000644 00000002103 13755316336 0007402 0 ustar 00 watcher = Loop::delay($time, function () use ($value) { $this->resolve($value); }); } /** * References the internal watcher in the event loop, keeping the loop running while this promise is pending. */ public function reference() { Loop::reference($this->watcher); } /** * Unreferences the internal watcher in the event loop, allowing the loop to stop while this promise is pending if * no other events are pending in the loop. */ public function unreference() { Loop::unreference($this->watcher); } } lib/Emitter.php 0000644 00000005721 13755316336 0007455 0 ustar 00 iterator = new class($this->emit, $this->complete, $this->fail) implements Iterator { use CallableMaker, Internal\Producer; public function __construct(&$emit, &$complete, &$fail) { $emit = $this->callableFromInstanceMethod("emit"); $complete = $this->callableFromInstanceMethod("complete"); $fail = $this->callableFromInstanceMethod("fail"); } }; } /** * @return \Amp\Iterator */ public function iterate(): Iterator { return $this->iterator; } /** * Emits a value to the iterator. * * @param mixed $value * * @return \Amp\Promise */ public function emit($value): Promise { return ($this->emit)($value); } /** * Completes the iterator. */ public function complete() { ($this->complete)(); } /** * Fails the iterator with the given reason. * * @param \Throwable $reason */ public function fail(\Throwable $reason) { ($this->fail)($reason); } } } else { production: // PHP 7 production environment (zend.assertions=0) /** * An optimized version of Emitter for production environments that is itself the iterator. Eval is used to * prevent IDEs and other tools from reporting multiple definitions. */ eval('namespace Amp; final class Emitter implements Iterator { use Internal\Producer { emit as public; complete as public; fail as public; } public function iterate(): Iterator { return $this; } }'); } } catch (\AssertionError $exception) { goto development; // zend.assertions=1 and assert.exception=1, use development definition. } // @codeCoverageIgnoreEnd lib/Failure.php 0000644 00000002067 13755316336 0007433 0 ustar 00 exception = $exception; } /** * {@inheritdoc} */ public function onResolve(callable $onResolved) { try { $result = $onResolved($this->exception, null); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { Promise\rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } } lib/Internal/Placeholder.php 0000644 00000007662 13755316336 0012050 0 ustar 00 resolved) { if ($this->result instanceof Promise) { $this->result->onResolve($onResolved); return; } try { $result = $onResolved(null, $this->result); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { Promise\rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } return; } if (null === $this->onResolved) { $this->onResolved = $onResolved; return; } if (!$this->onResolved instanceof ResolutionQueue) { $this->onResolved = new ResolutionQueue($this->onResolved); } $this->onResolved->push($onResolved); } /** * @param mixed $value * * @throws \Error Thrown if the promise has already been resolved. */ private function resolve($value = null) { if ($this->resolved) { $message = "Promise has already been resolved"; if (isset($this->resolutionTrace)) { $trace = formatStacktrace($this->resolutionTrace); $message .= ". Previous resolution trace:\n\n{$trace}\n\n"; } else { $message .= ", define const AMP_DEBUG = true and enable assertions for a stacktrace of the previous resolution."; } throw new \Error($message); } \assert((function () { if (\defined("AMP_DEBUG") && \AMP_DEBUG) { $trace = \debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS); \array_shift($trace); // remove current closure $this->resolutionTrace = $trace; } return true; })()); if ($value instanceof ReactPromise) { $value = Promise\adapt($value); } $this->resolved = true; $this->result = $value; if ($this->onResolved === null) { return; } $onResolved = $this->onResolved; $this->onResolved = null; if ($this->result instanceof Promise) { $this->result->onResolve($onResolved); return; } try { $result = $onResolved(null, $this->result); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { Promise\rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } /** * @param \Throwable $reason Failure reason. */ private function fail(\Throwable $reason) { $this->resolve(new Failure($reason)); } } lib/Internal/Producer.php 0000644 00000012227 13755316336 0011402 0 ustar 00 waiting !== null) { throw new \Error("The prior promise returned must resolve before invoking this method again"); } if (isset($this->backPressure[$this->position])) { $future = $this->backPressure[$this->position]; unset($this->values[$this->position], $this->backPressure[$this->position]); $future->resolve(); } ++$this->position; if (\array_key_exists($this->position, $this->values)) { return new Success(true); } if ($this->complete) { return $this->complete; } $this->waiting = new Deferred; return $this->waiting->promise(); } /** * {@inheritdoc} */ public function getCurrent() { if (empty($this->values) && $this->complete) { throw new \Error("The iterator has completed"); } if (!\array_key_exists($this->position, $this->values)) { throw new \Error("Promise returned from advance() must resolve before calling this method"); } return $this->values[$this->position]; } /** * Emits a value from the iterator. The returned promise is resolved with the emitted value once all listeners * have been invoked. * * @param mixed $value * * @return \Amp\Promise * * @throws \Error If the iterator has completed. */ private function emit($value): Promise { if ($this->complete) { throw new \Error("Iterators cannot emit values after calling complete"); } if ($value instanceof ReactPromise) { $value = Promise\adapt($value); } if ($value instanceof Promise) { $deferred = new Deferred; $value->onResolve(function ($e, $v) use ($deferred) { if ($this->complete) { $deferred->fail( new \Error("The iterator was completed before the promise result could be emitted") ); return; } if ($e) { $this->fail($e); $deferred->fail($e); return; } $deferred->resolve($this->emit($v)); }); return $deferred->promise(); } $this->values[] = $value; $this->backPressure[] = $pressure = new Deferred; if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve(true); } return $pressure->promise(); } /** * Completes the iterator. * * @throws \Error If the iterator has already been completed. */ private function complete() { if ($this->complete) { $message = "Iterator has already been completed"; if (isset($this->resolutionTrace)) { // @codeCoverageIgnoreStart $trace = formatStacktrace($this->resolutionTrace); $message .= ". Previous completion trace:\n\n{$trace}\n\n"; // @codeCoverageIgnoreEnd } else { $message .= ", define const AMP_DEBUG = true and enable assertions for a stacktrace of the previous completion."; } throw new \Error($message); } \assert((function () { if (\defined("AMP_DEBUG") && \AMP_DEBUG) { // @codeCoverageIgnoreStart $trace = \debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS); \array_shift($trace); // remove current closure $this->resolutionTrace = $trace; // @codeCoverageIgnoreEnd } return true; })()); $this->complete = new Success(false); if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve($this->complete); } } private function fail(\Throwable $exception) { $this->complete = new Failure($exception); if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve($this->complete); } } } lib/Internal/ResolutionQueue.php 0000644 00000003463 13755316336 0012771 0 ustar 00 push($callback); } } /** * Unrolls instances of self to avoid blowing up the call stack on resolution. * * @param callable $callback */ public function push(callable $callback) { if ($callback instanceof self) { $this->queue = \array_merge($this->queue, $callback->queue); return; } $this->queue[] = $callback; } /** * Calls each callback in the queue, passing the provided values to the function. * * @param \Throwable|null $exception * @param mixed $value */ public function __invoke($exception, $value) { foreach ($this->queue as $callback) { try { $result = $callback($exception, $value); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { Promise\rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } } } lib/Internal/functions.php 0000644 00000002311 13755316336 0011620 0 ustar 00 current(); $prefix .= \sprintf( "; %s yielded at key %s", \is_object($yielded) ? \get_class($yielded) : \gettype($yielded), \var_export($generator->key(), true) ); if (!$generator->valid()) { parent::__construct($prefix, 0, $previous); return; } $reflGen = new \ReflectionGenerator($generator); $exeGen = $reflGen->getExecutingGenerator(); if ($isSubgenerator = ($exeGen !== $generator)) { $reflGen = new \ReflectionGenerator($exeGen); } parent::__construct(\sprintf( "%s on line %s in %s", $prefix, $reflGen->getExecutingLine(), $reflGen->getExecutingFile() ), 0, $previous); } } lib/Iterator.php 0000644 00000001757 13755316336 0007642 0 ustar 00 * * @throws \Error If the prior promise returned from this method has not resolved. * @throws \Throwable The exception used to fail the iterator. */ public function advance(): Promise; /** * Gets the last emitted value or throws an exception if the iterator has completed. * * @return mixed Value emitted from the iterator. * * @throws \Error If the iterator has resolved or advance() was not called before calling this method. * @throws \Throwable The exception used to fail the iterator. */ public function getCurrent(); } lib/LazyPromise.php 0000644 00000002123 13755316336 0010313 0 ustar 00 promisor = $promisor; } /** * {@inheritdoc} */ public function onResolve(callable $onResolved) { if ($this->promise === null) { $provider = $this->promisor; $this->promisor = null; $this->promise = call($provider); } $this->promise->onResolve($onResolved); } } lib/Loop.php 0000644 00000037705 13755316336 0006764 0 ustar 00 defer($callback); } self::$driver->run(); } /** * Stop the event loop. * * When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls * to stop MUST be ignored and MUST NOT raise an exception. * * @return void */ public static function stop() { self::$driver->stop(); } /** * Defer the execution of a callback. * * The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be * preserved when executing the callbacks. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param callable(string $watcherId, mixed $data) $callback The callback to defer. The `$watcherId` will be * invalidated before the callback call. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public static function defer(callable $callback, $data = null): string { return self::$driver->defer($callback, $data); } /** * Delay the execution of a callback. * * The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which * timers expire first, but timers with the same expiration time MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $delay The amount of time, in milliseconds, to delay the execution for. * @param callable(string $watcherId, mixed $data) $callback The callback to delay. The `$watcherId` will be * invalidated before the callback call. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public static function delay(int $delay, callable $callback, $data = null): string { return self::$driver->delay($delay, $callback, $data); } /** * Repeatedly execute a callback. * * The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be * determined by which timers expire first, but timers with the same expiration time MAY be executed in any order. * The first execution is scheduled after the first interval period. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $interval The time interval, in milliseconds, to wait between executions. * @param callable(string $watcherId, mixed $data) $callback The callback to repeat. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public static function repeat(int $interval, callable $callback, $data = null): string { return self::$driver->repeat($interval, $callback, $data); } /** * Execute a callback when a stream resource becomes readable or is closed for reading. * * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid * resources, but are not required to, due to the high performance impact. Watchers on closed resources are * therefore undefined behavior. * * Multiple watchers on the same stream MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param resource $stream The stream to monitor. * @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public static function onReadable($stream, callable $callback, $data = null): string { return self::$driver->onReadable($stream, $callback, $data); } /** * Execute a callback when a stream resource becomes writable or is closed for writing. * * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid * resources, but are not required to, due to the high performance impact. Watchers on closed resources are * therefore undefined behavior. * * Multiple watchers on the same stream MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param resource $stream The stream to monitor. * @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public static function onWritable($stream, callable $callback, $data = null): string { return self::$driver->onWritable($stream, $callback, $data); } /** * Execute a callback when a signal is received. * * Warning: Installing the same signal on different instances of this interface is deemed undefined behavior. * Implementations MAY try to detect this, if possible, but are not required to. This is due to technical * limitations of the signals being registered globally per process. * * Multiple watchers on the same signal MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $signo The signal number to monitor. * @param callable(string $watcherId, int $signo, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the $data parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. * * @throws UnsupportedFeatureException If signal handling is not supported. */ public static function onSignal(int $signo, callable $callback, $data = null): string { return self::$driver->onSignal($signo, $callback, $data); } /** * Enable a watcher to be active starting in the next tick. * * Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before * the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public static function enable(string $watcherId) { self::$driver->enable($watcherId); } /** * Disable a watcher immediately. * * A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer * watcher isn't executed in this tick. * * Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an * invalid watcher. * * @param string $watcherId The watcher identifier. * * @return void */ public static function disable(string $watcherId) { self::$driver->disable($watcherId); } /** * Cancel a watcher. * * This will detatch the event loop from all resources that are associated to the watcher. After this operation the * watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher. * * @param string $watcherId The watcher identifier. * * @return void */ public static function cancel(string $watcherId) { self::$driver->cancel($watcherId); } /** * Reference a watcher. * * This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by * default. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public static function reference(string $watcherId) { self::$driver->reference($watcherId); } /** * Unreference a watcher. * * The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers * are all referenced by default. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public static function unreference(string $watcherId) { self::$driver->unreference($watcherId); } /** * Stores information in the loop bound registry. * * Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages * MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key. * * If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated * interface for that purpose instead of sharing the storage key. * * @param string $key The namespaced storage key. * @param mixed $value The value to be stored. * * @return void */ public static function setState(string $key, $value) { self::$driver->setState($key, $value); } /** * Gets information stored bound to the loop. * * Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages * MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key. * * If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated * interface for that purpose instead of sharing the storage key. * * @param string $key The namespaced storage key. * * @return mixed The previously stored value or `null` if it doesn't exist. */ public static function getState(string $key) { return self::$driver->getState($key); } /** * Set a callback to be executed when an error occurs. * * The callback receives the error as the first and only parameter. The return value of the callback gets ignored. * If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation * MUST be thrown into the `run` loop and stop the driver. * * Subsequent calls to this method will overwrite the previous handler. * * @param callable(\Throwable $error)|null $callback The callback to execute. `null` will clear the * current handler. * * @return callable(\Throwable $error)|null The previous handler, `null` if there was none. */ public static function setErrorHandler(callable $callback = null) { return self::$driver->setErrorHandler($callback); } /** * Retrieve an associative array of information about the event loop driver. * * The returned array MUST contain the following data describing the driver's currently registered watchers: * * [ * "defer" => ["enabled" => int, "disabled" => int], * "delay" => ["enabled" => int, "disabled" => int], * "repeat" => ["enabled" => int, "disabled" => int], * "on_readable" => ["enabled" => int, "disabled" => int], * "on_writable" => ["enabled" => int, "disabled" => int], * "on_signal" => ["enabled" => int, "disabled" => int], * "enabled_watchers" => ["referenced" => int, "unreferenced" => int], * "running" => bool * ]; * * Implementations MAY optionally add more information in the array but at minimum the above `key => value` format * MUST always be provided. * * @return array Statistics about the loop in the described format. */ public static function getInfo(): array { return self::$driver->getInfo(); } /** * Retrieve the event loop driver that is in scope. * * @return Driver */ public static function get(): Driver { return self::$driver; } } // Default factory, don't move this to a file loaded by the composer "files" autoload mechanism, otherwise custom // implementations might have issues setting a default loop, because it's overridden by us then. // @codeCoverageIgnoreStart Loop::set((new DriverFactory)->create()); // @codeCoverageIgnoreEnd lib/Loop/Driver.php 0000644 00000061232 13755316336 0010207 0 ustar 00 running = true; try { while ($this->running) { if ($this->isEmpty()) { return; } $this->tick(); } } finally { $this->stop(); } } /** * @return bool True if no enabled and referenced watchers remain in the loop. */ private function isEmpty() { foreach ($this->watchers as $watcher) { if ($watcher->enabled && $watcher->referenced) { return false; } } return true; } /** * Executes a single tick of the event loop. */ private function tick() { if (empty($this->deferQueue)) { $this->deferQueue = $this->nextTickQueue; } else { $this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue); } $this->nextTickQueue = []; $this->activate($this->enableQueue); $this->enableQueue = []; foreach ($this->deferQueue as $watcher) { if (!isset($this->deferQueue[$watcher->id])) { continue; // Watcher disabled by another defer watcher. } unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]); try { $result = ($watcher->callback)($watcher->id, $watcher->data); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } $this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running && !$this->isEmpty()); } /** * Activates (enables) all the given watchers. * * @param \Amp\Loop\Watcher[] $watchers */ abstract protected function activate(array $watchers); /** * Dispatches any pending read/write, timer, and signal events. * * @param bool $blocking */ abstract protected function dispatch(bool $blocking); /** * Stop the event loop. * * When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls * to stop MUST be ignored and MUST NOT raise an exception. * * @return void */ public function stop() { $this->running = false; } /** * Defer the execution of a callback. * * The deferred callable MUST be executed before any other type of watcher in a tick. Order of enabling MUST be * preserved when executing the callbacks. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param callable (string $watcherId, mixed $data) $callback The callback to defer. The `$watcherId` will be * invalidated before the callback call. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public function defer(callable $callback, $data = null): string { $watcher = new Watcher; $watcher->type = Watcher::DEFER; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->nextTickQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Delay the execution of a callback. * * The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which * timers expire first, but timers with the same expiration time MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $delay The amount of time, in milliseconds, to delay the execution for. * @param callable (string $watcherId, mixed $data) $callback The callback to delay. The `$watcherId` will be * invalidated before the callback call. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public function delay(int $delay, callable $callback, $data = null): string { if ($delay < 0) { throw new \Error("Delay must be greater than or equal to zero"); } $watcher = new Watcher; $watcher->type = Watcher::DELAY; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->value = $delay; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Repeatedly execute a callback. * * The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be * determined by which timers expire first, but timers with the same expiration time MAY be executed in any order. * The first execution is scheduled after the first interval period. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $interval The time interval, in milliseconds, to wait between executions. * @param callable (string $watcherId, mixed $data) $callback The callback to repeat. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public function repeat(int $interval, callable $callback, $data = null): string { if ($interval < 0) { throw new \Error("Interval must be greater than or equal to zero"); } $watcher = new Watcher; $watcher->type = Watcher::REPEAT; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->value = $interval; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Execute a callback when a stream resource becomes readable or is closed for reading. * * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid * resources, but are not required to, due to the high performance impact. Watchers on closed resources are * therefore undefined behavior. * * Multiple watchers on the same stream MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param resource $stream The stream to monitor. * @param callable (string $watcherId, resource $stream, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public function onReadable($stream, callable $callback, $data = null): string { $watcher = new Watcher; $watcher->type = Watcher::READABLE; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->value = $stream; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Execute a callback when a stream resource becomes writable or is closed for writing. * * Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the * watcher when closing the resource locally. Drivers MAY choose to notify the user if there are watchers on invalid * resources, but are not required to, due to the high performance impact. Watchers on closed resources are * therefore undefined behavior. * * Multiple watchers on the same stream MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param resource $stream The stream to monitor. * @param callable (string $watcherId, resource $stream, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the `$data` parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. */ public function onWritable($stream, callable $callback, $data = null): string { $watcher = new Watcher; $watcher->type = Watcher::WRITABLE; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->value = $stream; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Execute a callback when a signal is received. * * Warning: Installing the same signal on different instances of this interface is deemed undefined behavior. * Implementations MAY try to detect this, if possible, but are not required to. This is due to technical * limitations of the signals being registered globally per process. * * Multiple watchers on the same signal MAY be executed in any order. * * The created watcher MUST immediately be marked as enabled, but only be activated (i.e. callback can be called) * right before the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param int $signo The signal number to monitor. * @param callable (string $watcherId, int $signo, mixed $data) $callback The callback to execute. * @param mixed $data Arbitrary data given to the callback function as the $data parameter. * * @return string An unique identifier that can be used to cancel, enable or disable the watcher. * * @throws UnsupportedFeatureException If signal handling is not supported. */ public function onSignal(int $signo, callable $callback, $data = null): string { $watcher = new Watcher; $watcher->type = Watcher::SIGNAL; $watcher->id = $this->nextId++; $watcher->callback = $callback; $watcher->value = $signo; $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } /** * Enable a watcher to be active starting in the next tick. * * Watchers MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before * the next tick. Callbacks of watchers MUST NOT be called in the tick they were enabled. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public function enable(string $watcherId) { if (!isset($this->watchers[$watcherId])) { throw new InvalidWatcherError($watcherId, "Cannot enable an invalid watcher identifier: '{$watcherId}'"); } $watcher = $this->watchers[$watcherId]; if ($watcher->enabled) { return; // Watcher already enabled. } $watcher->enabled = true; switch ($watcher->type) { case Watcher::DEFER: $this->nextTickQueue[$watcher->id] = $watcher; break; default: $this->enableQueue[$watcher->id] = $watcher; break; } } /** * Cancel a watcher. * * This will detatch the event loop from all resources that are associated to the watcher. After this operation the * watcher is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid watcher. * * @param string $watcherId The watcher identifier. * * @return void */ public function cancel(string $watcherId) { $this->disable($watcherId); unset($this->watchers[$watcherId]); } /** * Disable a watcher immediately. * * A watcher MUST be disabled immediately, e.g. if a defer watcher disables a later defer watcher, the second defer * watcher isn't executed in this tick. * * Disabling a watcher MUST NOT invalidate the watcher. Calling this function MUST NOT fail, even if passed an * invalid watcher. * * @param string $watcherId The watcher identifier. * * @return void */ public function disable(string $watcherId) { if (!isset($this->watchers[$watcherId])) { return; } $watcher = $this->watchers[$watcherId]; if (!$watcher->enabled) { return; // Watcher already disabled. } $watcher->enabled = false; $id = $watcher->id; switch ($watcher->type) { case Watcher::DEFER: if (isset($this->nextTickQueue[$id])) { // Watcher was only queued to be enabled. unset($this->nextTickQueue[$id]); } else { unset($this->deferQueue[$id]); } break; default: if (isset($this->enableQueue[$id])) { // Watcher was only queued to be enabled. unset($this->enableQueue[$id]); } else { $this->deactivate($watcher); } break; } } /** * Deactivates (disables) the given watcher. * * @param \Amp\Loop\Watcher $watcher */ abstract protected function deactivate(Watcher $watcher); /** * Reference a watcher. * * This will keep the event loop alive whilst the watcher is still being monitored. Watchers have this state by * default. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public function reference(string $watcherId) { if (!isset($this->watchers[$watcherId])) { throw new InvalidWatcherError($watcherId, "Cannot reference an invalid watcher identifier: '{$watcherId}'"); } $this->watchers[$watcherId]->referenced = true; } /** * Unreference a watcher. * * The event loop should exit the run method when only unreferenced watchers are still being monitored. Watchers * are all referenced by default. * * @param string $watcherId The watcher identifier. * * @return void * * @throws InvalidWatcherError If the watcher identifier is invalid. */ public function unreference(string $watcherId) { if (!isset($this->watchers[$watcherId])) { throw new InvalidWatcherError($watcherId, "Cannot unreference an invalid watcher identifier: '{$watcherId}'"); } $this->watchers[$watcherId]->referenced = false; } /** * Stores information in the loop bound registry. * * Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages * MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key. * * If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated * interface for that purpose instead of sharing the storage key. * * @param string $key The namespaced storage key. * @param mixed $value The value to be stored. * * @return void */ final public function setState(string $key, $value) { if ($value === null) { unset($this->registry[$key]); } else { $this->registry[$key] = $value; } } /** * Gets information stored bound to the loop. * * Stored information is package private. Packages MUST NOT retrieve the stored state of other packages. Packages * MUST use their namespace as prefix for keys. They may do so by using `SomeClass::class` as key. * * If packages want to expose loop bound state to consumers other than the package, they SHOULD provide a dedicated * interface for that purpose instead of sharing the storage key. * * @param string $key The namespaced storage key. * * @return mixed The previously stored value or `null` if it doesn't exist. */ final public function getState(string $key) { return isset($this->registry[$key]) ? $this->registry[$key] : null; } /** * Set a callback to be executed when an error occurs. * * The callback receives the error as the first and only parameter. The return value of the callback gets ignored. * If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation * MUST be thrown into the `run` loop and stop the driver. * * Subsequent calls to this method will overwrite the previous handler. * * @param callable (\Throwable|\Exception $error)|null $callback The callback to execute. `null` will clear the * current handler. * * @return callable(\Throwable|\Exception $error)|null The previous handler, `null` if there was none. */ public function setErrorHandler(callable $callback = null) { $previous = $this->errorHandler; $this->errorHandler = $callback; return $previous; } /** * Invokes the error handler with the given exception. * * @param \Throwable $exception The exception thrown from a watcher callback. * * @throws \Throwable If no error handler has been set. */ protected function error(\Throwable $exception) { if ($this->errorHandler === null) { throw $exception; } ($this->errorHandler)($exception); } /** * Get the underlying loop handle. * * Example: the `uv_loop` resource for `libuv` or the `EvLoop` object for `libev` or `null` for a native driver. * * Note: This function is *not* exposed in the `Loop` class. Users shall access it directly on the respective loop * instance. * * @return null|object|resource The loop handle the event loop operates on. `null` if there is none. */ abstract public function getHandle(); /** * Returns the same array of data as getInfo(). * * @return array */ public function __debugInfo() { // @codeCoverageIgnoreStart return $this->getInfo(); // @codeCoverageIgnoreEnd } /** * Retrieve an associative array of information about the event loop driver. * * The returned array MUST contain the following data describing the driver's currently registered watchers: * * [ * "defer" => ["enabled" => int, "disabled" => int], * "delay" => ["enabled" => int, "disabled" => int], * "repeat" => ["enabled" => int, "disabled" => int], * "on_readable" => ["enabled" => int, "disabled" => int], * "on_writable" => ["enabled" => int, "disabled" => int], * "on_signal" => ["enabled" => int, "disabled" => int], * "enabled_watchers" => ["referenced" => int, "unreferenced" => int], * "running" => bool * ]; * * Implementations MAY optionally add more information in the array but at minimum the above `key => value` format * MUST always be provided. * * @return array Statistics about the loop in the described format. */ public function getInfo(): array { $watchers = [ "referenced" => 0, "unreferenced" => 0, ]; $defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [ "enabled" => 0, "disabled" => 0, ]; foreach ($this->watchers as $watcher) { switch ($watcher->type) { case Watcher::READABLE: $array = &$onReadable; break; case Watcher::WRITABLE: $array = &$onWritable; break; case Watcher::SIGNAL: $array = &$onSignal; break; case Watcher::DEFER: $array = &$defer; break; case Watcher::DELAY: $array = &$delay; break; case Watcher::REPEAT: $array = &$repeat; break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } if ($watcher->enabled) { ++$array["enabled"]; if ($watcher->referenced) { ++$watchers["referenced"]; } else { ++$watchers["unreferenced"]; } } else { ++$array["disabled"]; } } return [ "enabled_watchers" => $watchers, "defer" => $defer, "delay" => $delay, "repeat" => $repeat, "on_readable" => $onReadable, "on_writable" => $onWritable, "on_signal" => $onSignal, "running" => (bool) $this->running, ]; } } lib/Loop/DriverFactory.php 0000644 00000002471 13755316336 0011537 0 ustar 00 createDriverFromEnv()) { return $driver; } if (UvDriver::isSupported()) { return new UvDriver; } if (EvDriver::isSupported()) { return new EvDriver; } if (EventDriver::isSupported()) { return new EventDriver; } return new NativeDriver; } private function createDriverFromEnv() { $driver = \getenv("AMP_LOOP_DRIVER"); if (!$driver) { return null; } if (!\class_exists($driver)) { throw new \Error(\sprintf( "Driver '%s' does not exist.", $driver )); } if (!\is_subclass_of($driver, Driver::class)) { throw new \Error(\sprintf( "Driver '%s' is not a subclass of '%s'.", $driver, Driver::class )); } return new $driver; } } // @codeCoverageIgnoreEnd lib/Loop/EvDriver.php 0000644 00000015265 13755316336 0010507 0 ustar 00 handle = new \EvLoop; if (self::$activeSignals === null) { self::$activeSignals = &$this->signals; } $this->ioCallback = function (\EvIO $event) { /** @var \Amp\Loop\Watcher $watcher */ $watcher = $event->data; try { $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; $this->timerCallback = function (\EvTimer $event) { /** @var \Amp\Loop\Watcher $watcher */ $watcher = $event->data; if ($watcher->type & Watcher::DELAY) { $this->cancel($watcher->id); } elseif ($watcher->value === 0) { // Disable and re-enable so it's not executed repeatedly in the same tick // See https://github.com/amphp/amp/issues/131 $this->disable($watcher->id); $this->enable($watcher->id); } try { $result = ($watcher->callback)($watcher->id, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; $this->signalCallback = function (\EvSignal $event) { /** @var \Amp\Loop\Watcher $watcher */ $watcher = $event->data; try { $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; } /** * {@inheritdoc} */ public function cancel(string $watcherId) { parent::cancel($watcherId); unset($this->events[$watcherId]); } public static function isSupported(): bool { return \extension_loaded("ev"); } public function __destruct() { foreach ($this->events as $event) { $event->stop(); } } /** * {@inheritdoc} */ public function run() { $active = self::$activeSignals; foreach ($active as $event) { $event->stop(); } self::$activeSignals = &$this->signals; foreach ($this->signals as $event) { $event->start(); } try { parent::run(); } finally { foreach ($this->signals as $event) { $event->stop(); } self::$activeSignals = &$active; foreach ($active as $event) { $event->start(); } } } /** * {@inheritdoc} */ public function stop() { $this->handle->stop(); parent::stop(); } /** * {@inheritdoc} */ public function getHandle(): \EvLoop { return $this->handle; } /** * {@inheritdoc} */ protected function dispatch(bool $blocking) { $this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT); } /** * {@inheritdoc} */ protected function activate(array $watchers) { foreach ($watchers as $watcher) { if (!isset($this->events[$id = $watcher->id])) { switch ($watcher->type) { case Watcher::READABLE: $this->events[$id] = $this->handle->io($watcher->value, \Ev::READ, $this->ioCallback, $watcher); break; case Watcher::WRITABLE: $this->events[$id] = $this->handle->io($watcher->value, \Ev::WRITE, $this->ioCallback, $watcher); break; case Watcher::DELAY: case Watcher::REPEAT: $interval = $watcher->value / self::MILLISEC_PER_SEC; $this->events[$id] = $this->handle->timer( $interval, $watcher->type & Watcher::REPEAT ? $interval : 0, $this->timerCallback, $watcher ); break; case Watcher::SIGNAL: $this->events[$id] = $this->handle->signal($watcher->value, $this->signalCallback, $watcher); break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } else { $this->events[$id]->start(); } if ($watcher->type === Watcher::SIGNAL) { $this->signals[$id] = $this->events[$id]; } } } /** * {@inheritdoc} */ protected function deactivate(Watcher $watcher) { if (isset($this->events[$id = $watcher->id])) { $this->events[$id]->stop(); if ($watcher->type === Watcher::SIGNAL) { unset($this->signals[$id]); } } } } lib/Loop/EventDriver.php 0000644 00000017447 13755316336 0011222 0 ustar 00 handle = new \EventBase; $this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC); if (self::$activeSignals === null) { self::$activeSignals = &$this->signals; } $this->ioCallback = function ($resource, $what, Watcher $watcher) { try { $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; $this->timerCallback = function ($resource, $what, Watcher $watcher) { if ($watcher->type & Watcher::DELAY) { $this->cancel($watcher->id); } else { $this->events[$watcher->id]->add($watcher->value / self::MILLISEC_PER_SEC); } try { $result = ($watcher->callback)($watcher->id, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; $this->signalCallback = function ($signum, $what, Watcher $watcher) { try { $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; } /** * {@inheritdoc} */ public function cancel(string $watcherId) { parent::cancel($watcherId); if (isset($this->events[$watcherId])) { $this->events[$watcherId]->free(); unset($this->events[$watcherId]); } } public static function isSupported(): bool { return \extension_loaded("event"); } /** * @codeCoverageIgnore */ public function __destruct() { foreach ($this->events as $event) { $event->free(); } // Unset here, otherwise $event->del() fails with a warning, because __destruct order isn't defined. // See https://github.com/amphp/amp/issues/159. $this->events = []; } /** * {@inheritdoc} */ public function run() { $active = self::$activeSignals; foreach ($active as $event) { $event->del(); } self::$activeSignals = &$this->signals; foreach ($this->signals as $event) { $event->add(); } try { parent::run(); } finally { foreach ($this->signals as $event) { $event->del(); } self::$activeSignals = &$active; foreach ($active as $event) { $event->add(); } } } /** * {@inheritdoc} */ public function stop() { $this->handle->stop(); parent::stop(); } /** * {@inheritdoc} */ public function getHandle(): \EventBase { return $this->handle; } /** * {@inheritdoc} */ protected function dispatch(bool $blocking) { $this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK); $this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC); } /** * {@inheritdoc} */ protected function activate(array $watchers) { $now = (int) (\microtime(true) * self::MILLISEC_PER_SEC); foreach ($watchers as $watcher) { if (!isset($this->events[$id = $watcher->id])) { switch ($watcher->type) { case Watcher::READABLE: $this->events[$id] = new \Event( $this->handle, $watcher->value, \Event::READ | \Event::PERSIST, $this->ioCallback, $watcher ); break; case Watcher::WRITABLE: $this->events[$id] = new \Event( $this->handle, $watcher->value, \Event::WRITE | \Event::PERSIST, $this->ioCallback, $watcher ); break; case Watcher::DELAY: case Watcher::REPEAT: $this->events[$id] = new \Event( $this->handle, -1, \Event::TIMEOUT, $this->timerCallback, $watcher ); break; case Watcher::SIGNAL: $this->events[$id] = new \Event( $this->handle, $watcher->value, \Event::SIGNAL | \Event::PERSIST, $this->signalCallback, $watcher ); break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } switch ($watcher->type) { case Watcher::DELAY: case Watcher::REPEAT: $interval = $watcher->value - ($now - $this->now); $this->events[$id]->add($interval > 0 ? $interval / self::MILLISEC_PER_SEC : 0); break; case Watcher::SIGNAL: $this->signals[$id] = $this->events[$id]; // no break default: $this->events[$id]->add(); break; } } } /** * {@inheritdoc} */ protected function deactivate(Watcher $watcher) { if (isset($this->events[$id = $watcher->id])) { $this->events[$id]->del(); if ($watcher->type === Watcher::SIGNAL) { unset($this->signals[$id]); } } } } lib/Loop/InvalidWatcherError.php 0000644 00000001427 13755316336 0012672 0 ustar 00 watcherId = $watcherId; parent::__construct($message); } /** * @return string The watcher identifier. */ public function getWatcherId() { return $this->watcherId; } } lib/Loop/NativeDriver.php 0000644 00000030606 13755316336 0011357 0 ustar 00 timerQueue = new \SplPriorityQueue(); $this->signalHandling = \extension_loaded("pcntl"); $this->now = (int) (\microtime(true) * self::MILLISEC_PER_SEC); } /** * {@inheritdoc} * * @throws \Amp\Loop\UnsupportedFeatureException If the pcntl extension is not available. */ public function onSignal(int $signo, callable $callback, $data = null): string { if (!$this->signalHandling) { throw new UnsupportedFeatureException("Signal handling requires the pcntl extension"); } return parent::onSignal($signo, $callback, $data); } /** * {@inheritdoc} */ public function getHandle() { return null; } protected function dispatch(bool $blocking) { $this->selectStreams( $this->readStreams, $this->writeStreams, $blocking ? $this->getTimeout() : 0 ); if (!empty($this->timerExpires)) { $scheduleQueue = []; try { while (!$this->timerQueue->isEmpty()) { list($watcher, $expiration) = $this->timerQueue->top(); $id = $watcher->id; if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { $this->timerQueue->extract(); // Timer was removed from queue. continue; } if ($this->timerExpires[$id] > $this->now) { // Timer at top of queue has not expired. break; } $this->timerQueue->extract(); if ($watcher->type & Watcher::REPEAT) { $expiration = $this->now + $watcher->value; $this->timerExpires[$watcher->id] = $expiration; $scheduleQueue[] = [$watcher, $expiration]; } else { $this->cancel($id); } try { // Execute the timer. $result = ($watcher->callback)($id, $watcher->data); if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } } finally { foreach ($scheduleQueue as $item) { $this->timerQueue->insert($item, -$item[1]); } } } if ($this->signalHandling) { \pcntl_signal_dispatch(); } } /** * @param resource[] $read * @param resource[] $write * @param int $timeout */ private function selectStreams(array $read, array $write, $timeout) { $timeout /= self::MILLISEC_PER_SEC; if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop. if ($timeout >= 0) { $seconds = (int) $timeout; $microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC); } else { $seconds = null; $microseconds = null; } $except = null; // Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal. if (!($result = @\stream_select($read, $write, $except, $seconds, $microseconds))) { if ($result === 0) { return; } $error = \error_get_last(); if (\strpos($error["message"], "unable to select") !== 0) { return; } $this->error(new \Exception($error["message"])); } foreach ($read as $stream) { $streamId = (int) $stream; if (!isset($this->readWatchers[$streamId])) { continue; // All read watchers disabled. } foreach ($this->readWatchers[$streamId] as $watcher) { if (!isset($this->readWatchers[$streamId][$watcher->id])) { continue; // Watcher disabled by another IO watcher. } try { $result = ($watcher->callback)($watcher->id, $stream, $watcher->data); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } } foreach ($write as $stream) { $streamId = (int) $stream; if (!isset($this->writeWatchers[$streamId])) { continue; // All write watchers disabled. } foreach ($this->writeWatchers[$streamId] as $watcher) { if (!isset($this->writeWatchers[$streamId][$watcher->id])) { continue; // Watcher disabled by another IO watcher. } try { $result = ($watcher->callback)($watcher->id, $stream, $watcher->data); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } } return; } if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0. \usleep($timeout * self::MICROSEC_PER_SEC); } } /** * @return int Milliseconds until next timer expires or -1 if there are no pending times. */ private function getTimeout() { while (!$this->timerQueue->isEmpty()) { list($watcher, $expiration) = $this->timerQueue->top(); $id = $watcher->id; if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { $this->timerQueue->extract(); // Timer was removed from queue. continue; } $expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC); if ($expiration < 0) { return 0; } return $expiration; } return -1; } /** * {@inheritdoc} */ protected function activate(array $watchers) { $now = (int) (\microtime(true) * self::MILLISEC_PER_SEC); foreach ($watchers as $watcher) { switch ($watcher->type) { case Watcher::READABLE: $streamId = (int) $watcher->value; $this->readWatchers[$streamId][$watcher->id] = $watcher; $this->readStreams[$streamId] = $watcher->value; break; case Watcher::WRITABLE: $streamId = (int) $watcher->value; $this->writeWatchers[$streamId][$watcher->id] = $watcher; $this->writeStreams[$streamId] = $watcher->value; break; case Watcher::DELAY: case Watcher::REPEAT: $expiration = $this->now + $watcher->value; $this->timerExpires[$watcher->id] = $expiration; $this->timerQueue->insert([$watcher, $expiration], -$expiration); break; case Watcher::SIGNAL: if (!isset($this->signalWatchers[$watcher->value])) { if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) { $message = "Failed to register signal handler"; if ($error = \error_get_last()) { $message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]); } throw new \Error($message); } } $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } $this->now = $now; } /** * {@inheritdoc} */ protected function deactivate(Watcher $watcher) { switch ($watcher->type) { case Watcher::READABLE: $streamId = (int) $watcher->value; unset($this->readWatchers[$streamId][$watcher->id]); if (empty($this->readWatchers[$streamId])) { unset($this->readWatchers[$streamId], $this->readStreams[$streamId]); } break; case Watcher::WRITABLE: $streamId = (int) $watcher->value; unset($this->writeWatchers[$streamId][$watcher->id]); if (empty($this->writeWatchers[$streamId])) { unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]); } break; case Watcher::DELAY: case Watcher::REPEAT: unset($this->timerExpires[$watcher->id]); break; case Watcher::SIGNAL: if (isset($this->signalWatchers[$watcher->value])) { unset($this->signalWatchers[$watcher->value][$watcher->id]); if (empty($this->signalWatchers[$watcher->value])) { unset($this->signalWatchers[$watcher->value]); @\pcntl_signal($watcher->value, \SIG_DFL); } } break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } /** * @param int $signo */ private function handleSignal(int $signo) { foreach ($this->signalWatchers[$signo] as $watcher) { if (!isset($this->signalWatchers[$signo][$watcher->id])) { continue; } try { $result = ($watcher->callback)($watcher->id, $signo, $watcher->data); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } } } lib/Loop/UnsupportedFeatureException.php 0000644 00000000421 13755316336 0014470 0 ustar 00 handle = \uv_loop_new(); $this->ioCallback = function ($event, $status, $events, $resource) { $watchers = $this->watchers[(int) $event]; switch ($status) { case 0: // OK break; default: // Invoke the callback on errors, as this matches behavior with other loop back-ends. // Re-enable watcher as libuv disables the watcher on non-zero status. $flags = 0; foreach ($this->watchers[(int) $event] as $watcher) { $flags |= $watcher->enabled ? $watcher->type : 0; } \uv_poll_start($event, $flags, $this->ioCallback); break; } foreach ($watchers as $watcher) { if (!($watcher->enabled && $watcher->type & $events)) { continue; } try { $result = ($watcher->callback)($watcher->id, $resource, $watcher->data); if ($result === null) { continue; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } } }; $this->timerCallback = function ($event) { $watcher = $this->watchers[(int) $event]; if ($watcher->type & Watcher::DELAY) { unset($this->events[$watcher->id], $this->watchers[(int) $event]); // Avoid call to uv_is_active(). $this->cancel($watcher->id); // Remove reference to watcher in parent. } elseif ($watcher->value === 0) { // Disable and re-enable so it's not executed repeatedly in the same tick // See https://github.com/amphp/amp/issues/131 $this->disable($watcher->id); $this->enable($watcher->id); } try { $result = ($watcher->callback)($watcher->id, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; $this->signalCallback = function ($event, $signo) { $watcher = $this->watchers[(int) $event]; try { $result = ($watcher->callback)($watcher->id, $signo, $watcher->data); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { $this->error($exception); } }; } /** * {@inheritdoc} */ public function cancel(string $watcherId) { parent::cancel($watcherId); if (!isset($this->events[$watcherId])) { return; } $event = $this->events[$watcherId]; $eventId = (int) $event; if ($this->watchers[$eventId] instanceof Watcher) { // All except IO watchers. unset($this->watchers[$eventId]); } else { $watcher = $this->watchers[$eventId][$watcherId]; unset($this->watchers[$eventId][$watcherId]); if (empty($this->watchers[$eventId])) { unset($this->watchers[$eventId], $this->streams[(int) $watcher->value]); } } unset($this->events[$watcherId]); } public static function isSupported(): bool { return \extension_loaded("uv"); } /** * {@inheritdoc} */ public function getHandle() { return $this->handle; } /** * {@inheritdoc} */ protected function dispatch(bool $blocking) { \uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT); } /** * {@inheritdoc} */ protected function activate(array $watchers) { foreach ($watchers as $watcher) { $id = $watcher->id; switch ($watcher->type) { case Watcher::READABLE: case Watcher::WRITABLE: $streamId = (int) $watcher->value; if (isset($this->streams[$streamId])) { $event = $this->streams[$streamId]; } elseif (isset($this->events[$id])) { $event = $this->streams[$streamId] = $this->events[$id]; } else { $event = $this->streams[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value); } $eventId = (int) $event; $this->events[$id] = $event; $this->watchers[$eventId][$id] = $watcher; $flags = 0; foreach ($this->watchers[$eventId] as $watcher) { $flags |= $watcher->enabled ? $watcher->type : 0; } \uv_poll_start($event, $flags, $this->ioCallback); break; case Watcher::DELAY: case Watcher::REPEAT: if (isset($this->events[$id])) { $event = $this->events[$id]; } else { $event = $this->events[$id] = \uv_timer_init($this->handle); } $this->watchers[(int) $event] = $watcher; \uv_timer_start( $event, $watcher->value, $watcher->type & Watcher::REPEAT ? $watcher->value : 0, $this->timerCallback ); break; case Watcher::SIGNAL: if (isset($this->events[$id])) { $event = $this->events[$id]; } else { $event = $this->events[$id] = \uv_signal_init($this->handle); } $this->watchers[(int) $event] = $watcher; \uv_signal_start($event, $this->signalCallback, $watcher->value); break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } } /** * {@inheritdoc} */ protected function deactivate(Watcher $watcher) { $id = $watcher->id; if (!isset($this->events[$id])) { return; } $event = $this->events[$id]; if (!\uv_is_active($event)) { return; } switch ($watcher->type) { case Watcher::READABLE: case Watcher::WRITABLE: $flags = 0; foreach ($this->watchers[(int) $event] as $watcher) { $flags |= $watcher->enabled ? $watcher->type : 0; } if ($flags) { \uv_poll_start($event, $flags, $this->ioCallback); } else { \uv_poll_stop($event); } break; case Watcher::DELAY: case Watcher::REPEAT: \uv_timer_stop($event); break; case Watcher::SIGNAL: \uv_signal_stop($event); break; default: // @codeCoverageIgnoreStart throw new \Error("Unknown watcher type"); // @codeCoverageIgnoreEnd } } } lib/Loop/Watcher.php 0000644 00000001512 13755316336 0010344 0 ustar 00 reasons = $reasons; } /** * @return \Throwable[] */ public function getReasons(): array { return $this->reasons; } } lib/NullCancellationToken.php 0000644 00000001601 13755316336 0012265 0 ustar 00 throwIfRequested(); * } * ``` * * potentially multiple times, it allows writing * * ```php * $token = $token ?? new NullCancellationToken; * * // ... * * $token->throwIfRequested(); * ``` * * instead. */ final class NullCancellationToken implements CancellationToken { /** @inheritdoc */ public function subscribe(callable $callback): string { return "null-token"; } /** @inheritdoc */ public function unsubscribe(string $id) { // nothing to do } /** @inheritdoc */ public function isRequested(): bool { return false; } /** @inheritdoc */ public function throwIfRequested() { // nothing to do } } lib/Producer.php 0000644 00000001561 13755316336 0007625 0 ustar 00 callableFromInstanceMethod("emit")); if (!$result instanceof \Generator) { throw new \Error("The callable did not return a Generator"); } $coroutine = new Coroutine($result); $coroutine->onResolve(function ($exception) { if ($this->complete) { return; } if ($exception) { $this->fail($exception); return; } $this->complete(); }); } } lib/Promise.php 0000644 00000001304 13755316336 0007453 0 ustar 00 generateStructPropertyError($property) ); } public function __set(string $property, $value) { throw new \Error( $this->generateStructPropertyError($property) ); } private function generateStructPropertyError(string $property): string { $suggestion = $this->suggestPropertyName($property); $suggestStr = ($suggestion == "") ? "" : " ... did you mean \"{$suggestion}?\""; return \sprintf( "%s property \"%s\" does not exist%s", \str_replace("\0", "@", \get_class($this)), // Handle anonymous class names. $property, $suggestStr ); } private function suggestPropertyName(string $badProperty): string { $badProperty = \strtolower($badProperty); $bestMatch = ""; $bestMatchPercentage = 0; foreach ($this as $property => $value) { // Never suggest properties that begin with an underscore if ($property[0] === "_") { continue; } \similar_text($badProperty, \strtolower($property), $byRefPercentage); if ($byRefPercentage > $bestMatchPercentage) { $bestMatchPercentage = $byRefPercentage; $bestMatch = $property; } } return ($bestMatchPercentage >= $this->__propertySuggestThreshold) ? $bestMatch : ""; } } lib/Success.php 0000644 00000002544 13755316336 0007454 0 ustar 00 value = $value; } /** * {@inheritdoc} */ public function onResolve(callable $onResolved) { try { $result = $onResolved(null, $this->value); if ($result === null) { return; } if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { Promise\rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } } lib/TimeoutCancellationToken.php 0000644 00000002567 13755316336 0013015 0 ustar 00 token = $source->getToken(); $this->watcher = Loop::delay($timeout, static function () use ($source) { $source->cancel(new TimeoutException); }); Loop::unreference($this->watcher); } /** * Cancels the delay watcher. */ public function __destruct() { Loop::cancel($this->watcher); } /** * {@inheritdoc} */ public function subscribe(callable $callback): string { return $this->token->subscribe($callback); } /** * {@inheritdoc} */ public function unsubscribe(string $id) { $this->token->unsubscribe($id); } /** * {@inheritdoc} */ public function isRequested(): bool { return $this->token->isRequested(); } /** * {@inheritdoc} */ public function throwIfRequested() { $this->token->throwIfRequested(); } } lib/TimeoutException.php 0000644 00000000560 13755316336 0011345 0 ustar 00 onResolve(function ($exception) { if ($exception) { throw $exception; } }); } /** * Runs the event loop until the promise is resolved. Should not be called within a running event loop. * * Use this function only in synchronous contexts to wait for an asynchronous operation. Use coroutines and yield to * await promise resolution in a fully asynchronous application instead. * * @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to wait for. * * @return mixed Promise success value. * * @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface. * @throws \Error If the event loop stopped without the $promise being resolved. * @throws \Throwable Promise failure reason. */ function wait($promise) { if (!$promise instanceof Promise) { if ($promise instanceof ReactPromise) { $promise = adapt($promise); } else { throw createTypeError([Promise::class, ReactPromise::class], $promise); } } $resolved = false; try { Loop::run(function () use (&$resolved, &$value, &$exception, $promise) { $promise->onResolve(function ($e, $v) use (&$resolved, &$value, &$exception) { Loop::stop(); $resolved = true; $exception = $e; $value = $v; }); }); } catch (\Throwable $throwable) { throw new \Error("Loop exceptionally stopped without resolving the promise", 0, $throwable); } if (!$resolved) { throw new \Error("Loop stopped without resolving the promise"); } if ($exception) { throw $exception; } return $value; } /** * Creates an artificial timeout for any `Promise`. * * If the timeout expires before the promise is resolved, the returned promise fails with an instance of * `Amp\TimeoutException`. * * @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to which the timeout is applied. * @param int $timeout Timeout in milliseconds. * * @return \Amp\Promise * * @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface. */ function timeout($promise, int $timeout): Promise { if (!$promise instanceof Promise) { if ($promise instanceof ReactPromise) { $promise = adapt($promise); } else { throw createTypeError([Promise::class, ReactPromise::class], $promise); } } $deferred = new Deferred; $watcher = Loop::delay($timeout, function () use (&$deferred) { $temp = $deferred; // prevent double resolve $deferred = null; $temp->fail(new TimeoutException); }); Loop::unreference($watcher); $promise->onResolve(function () use (&$deferred, $promise, $watcher) { if ($deferred !== null) { Loop::cancel($watcher); $deferred->resolve($promise); } }); return $deferred->promise(); } /** * Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled, * callable $onRejected) method to a promise usable by components depending on placeholders implementing * \AsyncInterop\Promise. * * @param object $promise Object with a done() or then() method. * * @return \Amp\Promise Promise resolved by the $thenable object. * * @throws \Error If the provided object does not have a then() method. */ function adapt($promise): Promise { $deferred = new Deferred; if (\method_exists($promise, 'done')) { $promise->done([$deferred, 'resolve'], [$deferred, 'fail']); } elseif (\method_exists($promise, 'then')) { $promise->then([$deferred, 'resolve'], [$deferred, 'fail']); } else { throw new \Error("Object must have a 'then' or 'done' method"); } return $deferred->promise(); } /** * Returns a promise that is resolved when all promises are resolved. The returned promise will not fail. * Returned promise succeeds with a two-item array delineating successful and failed promise results, * with keys identical and corresponding to the original given array. * * This function is the same as some() with the notable exception that it will never fail even * if all promises in the array resolve unsuccessfully. * * @param Promise[] $promises * * @return \Amp\Promise * * @throws \Error If a non-Promise is in the array. */ function any(array $promises): Promise { return some($promises, 0); } /** * Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned * promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to * the array of promises. * * @param \Amp\Promise[] $promises Array of only promises. * * @return \Amp\Promise * * @throws \Error If a non-Promise is in the array. */ function all(array $promises): Promise { if (empty($promises)) { return new Success([]); } $deferred = new Deferred; $result = $deferred->promise(); $pending = \count($promises); $values = []; foreach ($promises as $key => $promise) { if ($promise instanceof ReactPromise) { $promise = adapt($promise); } elseif (!$promise instanceof Promise) { throw createTypeError([Promise::class, ReactPromise::class], $promise); } $values[$key] = null; // add entry to array to preserve order $promise->onResolve(function ($exception, $value) use (&$deferred, &$values, &$pending, $key) { if ($pending === 0) { return; } if ($exception) { $pending = 0; $deferred->fail($exception); $deferred = null; return; } $values[$key] = $value; if (0 === --$pending) { $deferred->resolve($values); } }); } return $result; } /** * Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail. * * @param \Amp\Promise[] $promises Array of only promises. * * @return \Amp\Promise * * @throws \Error If the array is empty or a non-Promise is in the array. */ function first(array $promises): Promise { if (empty($promises)) { throw new \Error("No promises provided"); } $deferred = new Deferred; $result = $deferred->promise(); $pending = \count($promises); $exceptions = []; foreach ($promises as $key => $promise) { if ($promise instanceof ReactPromise) { $promise = adapt($promise); } elseif (!$promise instanceof Promise) { throw createTypeError([Promise::class, ReactPromise::class], $promise); } $exceptions[$key] = null; // add entry to array to preserve order $promise->onResolve(function ($error, $value) use (&$deferred, &$exceptions, &$pending, &$resolved, $key) { if ($pending === 0) { return; } if (!$error) { $pending = 0; $deferred->resolve($value); $deferred = null; return; } $exceptions[$key] = $error; if (0 === --$pending) { $deferred->fail(new MultiReasonException($exceptions)); } }); } return $result; } /** * Resolves with a two-item array delineating successful and failed Promise results. * * The returned promise will only fail if the given number of required promises fail. * * @param \Amp\Promise[] $promises Array of only promises. * @param int $required Number of promises that must succeed for the returned promise to succeed. * * @return \Amp\Promise * * @throws \Error If a non-Promise is in the array. */ function some(array $promises, int $required = 1): Promise { if ($required < 0) { throw new \Error("Number of promises required must be non-negative"); } $pending = \count($promises); if ($required > $pending) { throw new \Error("Too few promises provided"); } if (empty($promises)) { return new Success([[], []]); } $deferred = new Deferred; $result = $deferred->promise(); $values = []; $exceptions = []; foreach ($promises as $key => $promise) { if ($promise instanceof ReactPromise) { $promise = adapt($promise); } elseif (!$promise instanceof Promise) { throw createTypeError([Promise::class, ReactPromise::class], $promise); } $values[$key] = $exceptions[$key] = null; // add entry to arrays to preserve order $promise->onResolve(function ($exception, $value) use ( &$values, &$exceptions, &$pending, $key, $required, $deferred ) { if ($exception) { $exceptions[$key] = $exception; unset($values[$key]); } else { $values[$key] = $value; unset($exceptions[$key]); } if (0 === --$pending) { if (\count($values) < $required) { $deferred->fail(new MultiReasonException($exceptions)); } else { $deferred->resolve([$exceptions, $values]); } } }); } return $result; } } namespace Amp\Iterator { use Amp\Delayed; use Amp\Emitter; use Amp\Iterator; use Amp\Producer; use Amp\Promise; use function Amp\coroutine; use function Amp\Internal\createTypeError; /** * Creates an iterator from the given iterable, emitting the each value. The iterable may contain promises. If any * promise fails, the iterator will fail with the same reason. * * @param array|\Traversable $iterable Elements to emit. * @param int $delay Delay between element emissions in milliseconds. * * @return \Amp\Iterator * * @throws \TypeError If the argument is not an array or instance of \Traversable. */ function fromIterable(/* iterable */ $iterable, int $delay = 0): Iterator { if (!$iterable instanceof \Traversable && !\is_array($iterable)) { throw createTypeError(["array", "Traversable"], $iterable); } return new Producer(function (callable $emit) use ($iterable, $delay) { foreach ($iterable as $value) { if ($delay) { yield new Delayed($delay); } yield $emit($value); } }); } /** * @param \Amp\Iterator $iterator * @param callable (mixed $value): mixed $onEmit * * @return \Amp\Iterator */ function map(Iterator $iterator, callable $onEmit): Iterator { return new Producer(function (callable $emit) use ($iterator, $onEmit) { while (yield $iterator->advance()) { yield $emit($onEmit($iterator->getCurrent())); } }); } /** * @param \Amp\Iterator $iterator * @param callable (mixed $value): bool $filter * * @return \Amp\Iterator */ function filter(Iterator $iterator, callable $filter): Iterator { return new Producer(function (callable $emit) use ($iterator, $filter) { while (yield $iterator->advance()) { if ($filter($iterator->getCurrent())) { yield $emit($iterator->getCurrent()); } } }); } /** * Creates an iterator that emits values emitted from any iterator in the array of iterators. * * @param \Amp\Iterator[] $iterators * * @return \Amp\Iterator */ function merge(array $iterators): Iterator { $emitter = new Emitter; $result = $emitter->iterate(); $coroutine = coroutine(function (Iterator $iterator) use (&$emitter) { while ((yield $iterator->advance()) && $emitter !== null) { yield $emitter->emit($iterator->getCurrent()); } }); $coroutines = []; foreach ($iterators as $iterator) { if (!$iterator instanceof Iterator) { throw createTypeError([Iterator::class], $iterator); } $coroutines[] = $coroutine($iterator); } Promise\all($coroutines)->onResolve(function ($exception) use (&$emitter) { if ($exception) { $emitter->fail($exception); $emitter = null; } else { $emitter->complete(); } }); return $result; } /** * Concatenates the given iterators into a single iterator, emitting values from a single iterator at a time. The * prior iterator must complete before values are emitted from any subsequent iterators. Iterators are concatenated * in the order given (iteration order of the array). * * @param array $iterators * * @return \Amp\Iterator */ function concat(array $iterators): Iterator { foreach ($iterators as $iterator) { if (!$iterator instanceof Iterator) { throw createTypeError([Iterator::class], $iterator); } } $emitter = new Emitter; $previous = []; $promise = Promise\all($previous); $coroutine = coroutine(function (Iterator $iterator, callable $emit) { while (yield $iterator->advance()) { yield $emit($iterator->getCurrent()); } }); foreach ($iterators as $iterator) { $emit = coroutine(function ($value) use ($emitter, $promise) { static $pending = true, $failed = false; if ($failed) { return; } if ($pending) { try { yield $promise; $pending = false; } catch (\Throwable $exception) { $failed = true; return; // Prior iterator failed. } } yield $emitter->emit($value); }); $previous[] = $coroutine($iterator, $emit); $promise = Promise\all($previous); } $promise->onResolve(function ($exception) use ($emitter) { if ($exception) { $emitter->fail($exception); return; } $emitter->complete(); }); return $emitter->iterate(); } } travis/install-ev.sh 0000755 00000000304 13755316336 0010502 0 ustar 00 #!/usr/bin/env bash curl -LS https://pecl.php.net/get/ev | tar -xz; pushd ev-*; phpize; ./configure; make; make install; popd; echo "extension=ev.so" >> "$(php -r 'echo php_ini_loaded_file();')"; travis/install-event.sh 0000755 00000000410 13755316336 0011207 0 ustar 00 #!/usr/bin/env bash curl -LS https://pecl.php.net/get/event | tar -xz; pushd event-*; phpize; ./configure --with-event-core --with-event-extra --with-event-pthreads; make; make install; popd; echo "extension=event.so" >> "$(php -r 'echo php_ini_loaded_file();')"; travis/install-uv.sh 0000755 00000001141 13755316336 0010522 0 ustar 00 #!/usr/bin/env bash wget https://github.com/libuv/libuv/archive/v1.x.tar.gz -O /tmp/libuv.tar.gz -q & wget https://github.com/bwoebi/php-uv/archive/master.tar.gz -O /tmp/php-uv.tar.gz -q & wait mkdir libuv && tar -xf /tmp/libuv.tar.gz -C libuv --strip-components=1 mkdir php-uv && tar -xf /tmp/php-uv.tar.gz -C php-uv --strip-components=1 pushd libuv; ./autogen.sh ./configure --prefix=$(dirname `pwd`)/libuv-install make make install popd pushd php-uv phpize ./configure --with-uv=$(dirname `pwd`)/libuv-install make make install popd echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')" docs/.shared 0000777 00000000000 13755316336 0006761 5 ustar 00