.editorconfig 0000644 00000000203 13755316603 0007225 0 ustar 00 root = true [*] end_of_line = lf insert_final_newline = true trim_trailing_whitespace = true indent_style = space charset = utf-8 .gitmodules 0000644 00000000140 13755316603 0006725 0 ustar 00 [submodule "docs/.shared"] path = docs/.shared url = https://github.com/amphp/amphp.github.io .php_cs.dist 0000644 00000002563 13755316603 0007002 0 ustar 00 setRiskyAllowed(true) ->setRules([ "@PSR1" => true, "@PSR2" => true, "braces" => [ "allow_single_line_closure" => true, "position_after_functions_and_oop_constructs" => "same", ], "array_syntax" => ["syntax" => "short"], "cast_spaces" => true, "combine_consecutive_unsets" => true, "function_to_constant" => true, "no_multiline_whitespace_before_semicolons" => true, "no_unused_imports" => true, "no_useless_else" => true, "no_useless_return" => true, "no_whitespace_before_comma_in_array" => true, "no_whitespace_in_blank_line" => true, "non_printable_character" => true, "normalize_index_brace" => true, "ordered_imports" => true, "php_unit_construct" => true, "php_unit_dedicate_assert" => true, "php_unit_fqcn_annotation" => true, "phpdoc_summary" => true, "phpdoc_types" => true, "psr4" => true, "return_type_declaration" => ["space_before" => "none"], "short_scalar_cast" => true, "single_blank_line_before_namespace" => true, ]) ->setFinder( PhpCsFixer\Finder::create() ->in(__DIR__ . "/examples") ->in(__DIR__ . "/lib") ->in(__DIR__ . "/test") ); LICENSE 0000644 00000002066 13755316603 0005566 0 ustar 00 The MIT License (MIT) Copyright (c) 2016-2017 amphp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. Makefile 0000644 00000001517 13755316603 0006221 0 ustar 00 PHP_BIN := php COMPOSER_BIN := composer COVERAGE = coverage SRCS = lib test find_php_files = $(shell find $(1) -type f -name "*.php") src = $(foreach d,$(SRCS),$(call find_php_files,$(d))) .PHONY: test test: setup phpunit code-style .PHONY: clean clean: clean-coverage clean-vendor .PHONY: clean-coverage clean-coverage: test ! -e coverage || rm -r coverage .PHONY: clean-vendor clean-vendor: test ! -e vendor || rm -r vendor .PHONY: setup setup: vendor/autoload.php .PHONY: deps-update deps-update: $(COMPOSER_BIN) update .PHONY: phpunit phpunit: setup $(PHP_BIN) vendor/bin/phpunit .PHONY: code-style code-style: setup PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix composer.lock: composer.json $(COMPOSER_BIN) install touch $@ vendor/autoload.php: composer.lock $(COMPOSER_BIN) install touch $@ README.md 0000644 00000002501 13755316603 0006032 0 ustar 00 [![byte-stream](https://raw.githubusercontent.com/amphp/logo/master/repos/byte-stream.png?v=07-09-2017)](https://amphp.org/byte-stream/)
`amphp/byte-stream` is a stream abstraction to make working with non-blocking I/O simple. ## Installation This package can be installed as a [Composer](https://getcomposer.org/) dependency. ```bash composer require amphp/byte-stream ``` ## Requirements - PHP 7.0+ ## Documentation Documentation is bundled within this repository in the [`./docs`](./docs) directory. ## Versioning `amphp/byte-stream` follows the [semver](http://semver.org/) semantic versioning specification like all other `amphp` packages. ## Security If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker. ## License The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. composer.json 0000644 00000002274 13755316603 0007304 0 ustar 00 { "name": "amphp/byte-stream", "homepage": "http://amphp.org/byte-stream", "description": "A stream abstraction to make working with non-blocking I/O simple.", "support": { "issues": "https://github.com/amphp/byte-stream/issues", "irc": "irc://irc.freenode.org/amphp" }, "keywords": [ "stream", "async", "non-blocking", "amp", "amphp", "io" ], "license": "MIT", "authors": [ { "name": "Aaron Piotrowski", "email": "aaron@trowski.com" }, { "name": "Niklas Keller", "email": "me@kelunik.com" } ], "require": { "amphp/amp": "^2" }, "require-dev": { "amphp/phpunit-util": "^1", "phpunit/phpunit": "^6", "friendsofphp/php-cs-fixer": "^2.3" }, "autoload": { "psr-4": { "Amp\\ByteStream\\": "lib" }, "files": [ "lib/functions.php" ] }, "autoload-dev": { "psr-4": { "Amp\\ByteStream\\Test\\": "test" } }, "config": { "platform": { "php": "7.0.0" } } } docs/Gemfile 0000644 00000000171 13755316603 0006777 0 ustar 00 source "https://rubygems.org" gem "github-pages" gem "kramdown" gem "jekyll-github-metadata" gem "jekyll-relative-links" docs/Gemfile.lock 0000644 00000014777 13755316603 0007747 0 ustar 00 GEM remote: https://rubygems.org/ specs: activesupport (4.2.9) i18n (~> 0.7) minitest (~> 5.1) thread_safe (~> 0.3, >= 0.3.4) tzinfo (~> 1.1) addressable (2.5.2) public_suffix (>= 2.0.2, < 4.0) coffee-script (2.4.1) coffee-script-source execjs coffee-script-source (1.11.1) colorator (1.1.0) commonmarker (0.17.7.1) ruby-enum (~> 0.5) concurrent-ruby (1.0.5) em-websocket (0.5.1) eventmachine (>= 0.12.9) http_parser.rb (~> 0.6.0) ethon (0.11.0) ffi (>= 1.3.0) eventmachine (1.2.5) execjs (2.7.0) faraday (0.14.0) multipart-post (>= 1.2, < 3) ffi (1.9.21) forwardable-extended (2.6.0) gemoji (3.0.0) github-pages (175) activesupport (= 4.2.9) github-pages-health-check (= 1.3.5) jekyll (= 3.6.2) jekyll-avatar (= 0.5.0) jekyll-coffeescript (= 1.0.2) jekyll-commonmark-ghpages (= 0.1.3) jekyll-default-layout (= 0.1.4) jekyll-feed (= 0.9.2) jekyll-gist (= 1.4.1) jekyll-github-metadata (= 2.9.3) jekyll-mentions (= 1.2.0) jekyll-optional-front-matter (= 0.3.0) jekyll-paginate (= 1.1.0) jekyll-readme-index (= 0.2.0) jekyll-redirect-from (= 0.12.1) jekyll-relative-links (= 0.5.2) jekyll-remote-theme (= 0.2.3) jekyll-sass-converter (= 1.5.0) jekyll-seo-tag (= 2.3.0) jekyll-sitemap (= 1.1.1) jekyll-swiss (= 0.4.0) jekyll-theme-architect (= 0.1.0) jekyll-theme-cayman (= 0.1.0) jekyll-theme-dinky (= 0.1.0) jekyll-theme-hacker (= 0.1.0) jekyll-theme-leap-day (= 0.1.0) jekyll-theme-merlot (= 0.1.0) jekyll-theme-midnight (= 0.1.0) jekyll-theme-minimal (= 0.1.0) jekyll-theme-modernist (= 0.1.0) jekyll-theme-primer (= 0.5.2) jekyll-theme-slate (= 0.1.0) jekyll-theme-tactile (= 0.1.0) jekyll-theme-time-machine (= 0.1.0) jekyll-titles-from-headings (= 0.5.0) jemoji (= 0.8.1) kramdown (= 1.16.2) liquid (= 4.0.0) listen (= 3.0.6) mercenary (~> 0.3) minima (= 2.1.1) rouge (= 2.2.1) terminal-table (~> 1.4) github-pages-health-check (1.3.5) addressable (~> 2.3) net-dns (~> 0.8) octokit (~> 4.0) public_suffix (~> 2.0) typhoeus (~> 0.7) html-pipeline (2.7.1) activesupport (>= 2) nokogiri (>= 1.4) http_parser.rb (0.6.0) i18n (0.9.3) concurrent-ruby (~> 1.0) jekyll (3.6.2) addressable (~> 2.4) colorator (~> 1.0) jekyll-sass-converter (~> 1.0) jekyll-watch (~> 1.1) kramdown (~> 1.14) liquid (~> 4.0) mercenary (~> 0.3.3) pathutil (~> 0.9) rouge (>= 1.7, < 3) safe_yaml (~> 1.0) jekyll-avatar (0.5.0) jekyll (~> 3.0) jekyll-coffeescript (1.0.2) coffee-script (~> 2.2) coffee-script-source (~> 1.11.1) jekyll-commonmark (1.1.0) commonmarker (~> 0.14) jekyll (>= 3.0, < 4.0) jekyll-commonmark-ghpages (0.1.3) commonmarker (~> 0.17.6) jekyll-commonmark (~> 1) rouge (~> 2) jekyll-default-layout (0.1.4) jekyll (~> 3.0) jekyll-feed (0.9.2) jekyll (~> 3.3) jekyll-gist (1.4.1) octokit (~> 4.2) jekyll-github-metadata (2.9.3) jekyll (~> 3.1) octokit (~> 4.0, != 4.4.0) jekyll-mentions (1.2.0) activesupport (~> 4.0) html-pipeline (~> 2.3) jekyll (~> 3.0) jekyll-optional-front-matter (0.3.0) jekyll (~> 3.0) jekyll-paginate (1.1.0) jekyll-readme-index (0.2.0) jekyll (~> 3.0) jekyll-redirect-from (0.12.1) jekyll (~> 3.3) jekyll-relative-links (0.5.2) jekyll (~> 3.3) jekyll-remote-theme (0.2.3) jekyll (~> 3.5) rubyzip (>= 1.2.1, < 3.0) typhoeus (>= 0.7, < 2.0) jekyll-sass-converter (1.5.0) sass (~> 3.4) jekyll-seo-tag (2.3.0) jekyll (~> 3.3) jekyll-sitemap (1.1.1) jekyll (~> 3.3) jekyll-swiss (0.4.0) jekyll-theme-architect (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-cayman (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-dinky (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-hacker (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-leap-day (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-merlot (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-midnight (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-minimal (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-modernist (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-primer (0.5.2) jekyll (~> 3.5) jekyll-github-metadata (~> 2.9) jekyll-seo-tag (~> 2.2) jekyll-theme-slate (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-tactile (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-theme-time-machine (0.1.0) jekyll (~> 3.5) jekyll-seo-tag (~> 2.0) jekyll-titles-from-headings (0.5.0) jekyll (~> 3.3) jekyll-watch (1.5.1) listen (~> 3.0) jemoji (0.8.1) activesupport (~> 4.0, >= 4.2.9) gemoji (~> 3.0) html-pipeline (~> 2.2) jekyll (>= 3.0) kramdown (1.16.2) liquid (4.0.0) listen (3.0.6) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9.7) mercenary (0.3.6) mini_portile2 (2.3.0) minima (2.1.1) jekyll (~> 3.3) minitest (5.11.3) multipart-post (2.0.0) net-dns (0.8.0) nokogiri (1.8.2) mini_portile2 (~> 2.3.0) octokit (4.8.0) sawyer (~> 0.8.0, >= 0.5.3) pathutil (0.16.1) forwardable-extended (~> 2.6) public_suffix (2.0.5) rb-fsevent (0.10.2) rb-inotify (0.9.10) ffi (>= 0.5.0, < 2) rouge (2.2.1) ruby-enum (0.7.1) i18n rubyzip (1.2.1) safe_yaml (1.0.4) sass (3.5.5) sass-listen (~> 4.0.0) sass-listen (4.0.0) rb-fsevent (~> 0.9, >= 0.9.4) rb-inotify (~> 0.9, >= 0.9.7) sawyer (0.8.1) addressable (>= 2.3.5, < 2.6) faraday (~> 0.8, < 1.0) terminal-table (1.8.0) unicode-display_width (~> 1.1, >= 1.1.1) thread_safe (0.3.6) typhoeus (0.8.0) ethon (>= 0.8.0) tzinfo (1.2.5) thread_safe (~> 0.1) unicode-display_width (1.3.0) PLATFORMS ruby DEPENDENCIES github-pages jekyll-github-metadata jekyll-relative-links kramdown BUNDLED WITH 1.15.1 docs/README.md 0000644 00000001212 13755316603 0006760 0 ustar 00 # Documentation This directory contains the documentation for `amphp/byte-stream`. Documentation and code are bundled within a single repository for easier maintenance. Additionally, this preserves the documentation for older versions. ## Reading You can read this documentation either directly on GitHub or on our website. While the website will always contain the latest version, viewing on GitHub also works with older versions. ## Writing Our documentation is built using Jekyll. ``` sudo gem install bundler jekyll ``` ``` git submodule init git submodule update cd docs bundle install --path vendor/bundle bundle exec jekyll serve ``` docs/_config.yml 0000644 00000001371 13755316603 0007636 0 ustar 00 kramdown: input: GFM toc_levels: 2..3 baseurl: "/byte-stream" layouts_dir: ".shared/layout" includes_dir: ".shared/includes" exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"] safe: true repository: amphp/byte-stream gems: - "jekyll-github-metadata" - "jekyll-relative-links" defaults: - scope: path: "" type: "pages" values: layout: "docs" description: "amphp/byte-stream is a byte stream abstraction for Amp providing interfaces for InputStream and OutputStream." keywords: ['amphp', 'amp', 'byte-stream', 'non-blocking', 'io', 'stream', 'compression'] shared_asset_path: "/byte-stream/asset" navigation: - resource-streams - in-memory-stream - iterator-stream - compression-streams - message - payload docs/compression-streams.md 0000644 00000002112 13755316603 0012040 0 ustar 00 --- title: Compression Streams permalink: /compression-streams --- This package implements compression and decompression streams based on Zlib. `ZlibOutputStream` can be used for compression, while `ZlibInputStream` can be used for decompression. Both can simply wrap an existing stream to apply them. Both accept an `$encoding` and `$options` parameter in their constructor. ## Examples ```php $inputStream = new ResourceInputStream(STDIN); $gzInputStream = new ZlibInputStream($inputStream, \ZLIB_ENCODING_GZIP); while (null !== $chunk = yield $gzInputStream) { print $chunk; } ``` ```php $outputStream = new ResourceOutputStream(STDOUT); $gzOutputStream = new ZlibOutputStream($outputStream, \ZLIB_ENCODING_GZIP); for ($i = 0; $i < 100; $i++) { yield $gzOutputStream->write(bin2hex(random_bytes(32)); } yield $gzOutputStream->end(); ``` ## See also * [`./examples/gzip-compress.php`](https://github.com/amphp/byte-stream/blob/master/examples/gzip-compress.php) * [`./examples/gzip-decompress.php`](https://github.com/amphp/byte-stream/blob/master/examples/gzip-decompress.php) docs/in-memory-stream.md 0000644 00000000733 13755316603 0011237 0 ustar 00 --- title: InMemoryStream permalink: /in-memory-stream --- An `InMemoryStream` allows creating an `InputStream` from a single known string chunk. This is helpful if the complete stream contents are already known. ```php $inputStream = new InMemoryStream("foobar"); ``` It also allows creating a stream without any chunks by passing `null` as chunk. ```php $inputStream = new InMemoryStream; // The stream ends immediately assert(null === yield $inputStream->read()); ``` docs/index.md 0000644 00000005444 13755316603 0007145 0 ustar 00 --- title: Overview permalink: / --- Streams are an abstraction over ordered sequences of bytes. This package provides the fundamental interfaces `InputStream` and `OutputStream`. ## InputStream `InputStream` offers a single method: `read()`. It returns a promise that gets either resolved with a `string` or `null`. `null` indicates that the stream has ended. ### Example This example shows a simple `InputStream` consumption that buffers the complete stream contents inside a coroutine. ```php $inputStream = ...; $buffer = ""; while (($chunk = yield $inputStream->read()) !== null) { $buffer .= $chunk; } // do something with $buffer ``` {:.note} > While buffering a stream that way is relatively straightforward, you might want to use `yield new Message($inputStream)` to buffer a complete `InputStream`, making it even easier. ### Implementations This package offers some basic implementations, other libraries might provide even more implementations, such as [`amphp/socket`](https://github.com/amphp/socket). * [`InMemoryStream`](./in-memory-stream.md) * [`IteratorStream`](./iterator-stream.md) * [`Message`](./message.md) * [`Payload`](./payload.md) * [`ResourceInputStream`](./resource-streams.md) * [`ZlibInputStream`](./compression-streams.md) ## OutputStream `OutputStream` offers two methods: `write()` and `end()`. ### `write()` `write()` writes the given string to the stream. The returned `Promise` might be used to wait for completion. Waiting for completion allows writing only as fast as the underlying stream can write and potentially send over a network. TCP streams will resolve the returned `Promise` immediately as long as the write buffer isn't full. The write order is always ensured, even if the writer doesn't wait on the promise. {:.note} > Use `Amp\Promise\rethrow` on the returned `Promise` if you do not wait on it to get notified about write errors instead of silently doing nothing on errors. ### `end()` `end()` marks the stream as ended, optionally writing a last data chunk before. TCP streams might close the underlying stream for writing, but MUST NOT close it. Instead, all resources should be freed and actual resource handles be closed by PHP's garbage collection process. ## Example This example uses the previous example to read from a stream and simply writes all data to an `OutputStream`. ```php $inputStream = ...; $outputStream = ...; $buffer = ""; while (($chunk = yield $inputStream->read()) !== null) { yield $outputStream->write($chunk); } yield $outputStream->end(); ``` ### Implementations This package offers some basic implementations, other libraries might provide even more implementations, such as [`amphp/socket`](https://github.com/amphp/socket). * [`ResourceOutputStream`](./resource-streams.md) * [`ZlibOutputStream`](./compression-streams.md) docs/iterator-stream.md 0000644 00000000530 13755316603 0011147 0 ustar 00 --- title: IteratorStream permalink: /iterator-stream --- `IteratorStream` allows converting an `Amp\Iterator` that yields strings into an `InputStream`. ```php $inputStream = new IteratorStream(new Producer(function (callable $emit) { for ($i = 0; $i < 10; $i++) { yield new Delayed(1000); yield $emit("."); } }); ``` docs/message.md 0000644 00000003004 13755316603 0007450 0 ustar 00 --- title: Message permalink: /message --- {:.note} > `Message` has been deprecated. Use [`Payload`](./payload.md) instead. `Message` implements both `InputStream` _and_ `Promise`. This allows consuming a message either in chunks (streaming) or consume everything at once (buffering). ## Buffering Buffering a complete input stream is quite easy, you can simply `yield` a `Message` just like any other `Promise`. If you have an `InputStream` that's not a `Message`, simply create a new instance from it using `new Message($inputStream)`. ```php $message = new Message($inputStream); $content = yield $message; ``` ## Streaming Sometimes it's useful / possible to consume a message in chunks rather than first buffering it completely. An example might be streaming a large HTTP response body directly to disk. ```php while (($chunk = yield $message->read()) !== null) { // Use $chunk here, works just like any other InputStream } ``` ## Unwrapping In some cases you might want to resolve a promise with an `InputStream` or your method explicitly declares `InputStream` as a return type. In these cases you should use `Message::getInputStream` to return the raw input stream. This makes it possible to resolve promises with the `InputStream` and not run into unexpected issues. Only return a `Message` if you declare that as a type, otherwise an API assumes `InputStream` and might try to resolve a promise with that, resulting in buffering the message's content instead of resolving the promise with an `InputStream` instance. docs/payload.md 0000644 00000002235 13755316603 0007462 0 ustar 00 --- title: Payload permalink: /payload --- `Payload` implements `InputStream` while also providing a method `buffer()` for buffering the entire contents. This allows consuming a message either in chunks (streaming) or consume everything at once (buffering). When the object is destructed, any remaining data in the stream is automatically consumed and discarded. This class is useful for small payloads or when the entire contents of a stream is needed before any processing can be done. ## Buffering Buffering a complete input stream is quite easy, you can simply `yield` the promise returned from `buffer()` just like any other `Promise`. If you have an `InputStream` that's not a `Payload`, simply create a new instance from it using `new Payload($inputStream)`. ```php $payload = new Payload($inputStream); $content = yield $payload->buffer(); ``` ## Streaming Sometimes it's useful / possible to consume a payload in chunks rather than first buffering it completely. An example might be streaming a large HTTP response body directly to disk. ```php while (($chunk = yield $payload->read()) !== null) { // Use $chunk here, works just like any other InputStream } ``` docs/resource-streams.md 0000644 00000001007 13755316603 0011330 0 ustar 00 --- title: Resource Streams permalink: /resource-streams --- This package abstracts PHP's stream resources with `ResourceInputStream` and `ResourceOutputStream`. They automatically set the passed resource to non-blocking and allow reading and writing like any other `InputStream` / `OutputStream`. They also handle backpressure automatically by disabling the read watcher in case there's no read request and only activate a write watcher if the underlying write buffer is already full, which makes them very efficient. examples/benchmark-throughput.php 0000644 00000003764 13755316603 0013257 0 ustar 00 write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL); } try { if (!@\assert(false)) { $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); } } catch (AssertionError $exception) { $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); } $stderr->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL); Loop::delay($t * 1000, [$in, "close"]); Loop::run(function () use ($stderr, $in, $out) { $start = microtime(true); while (($chunk = yield $in->read()) !== null) { yield $out->write($chunk); } $t = microtime(true) - $start; $bytes = ftell($out->getResource()); $stderr->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL); $stderr->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL); }); examples/gzip-compress.php 0000644 00000000727 13755316603 0011714 0 ustar 00 read()) !== null) { yield $gzout->write($chunk); } }); examples/gzip-decompress.php 0000644 00000000723 13755316603 0012221 0 ustar 00 read()) !== null) { yield $stdout->write($chunk); } }); lib/ClosedException.php 0000644 00000000132 13755316603 0011120 0 ustar 00 contents = $contents; } /** * Reads data from the stream. * * @return Promise Resolves with the full contents or `null` if the stream has closed / already been consumed. */ public function read(): Promise { if ($this->contents === null) { return new Success; } $promise = new Success($this->contents); $this->contents = null; return $promise; } } lib/InputStream.php 0000644 00000001360 13755316603 0010307 0 ustar 00 read()) !== null) { * $buffer .= $chunk; * } * * return $buffer; * }); * } * ``` */ interface InputStream { /** * Reads data from the stream. * * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. * * @throws PendingReadError Thrown if another read operation is still pending. */ public function read(): Promise; } lib/IteratorStream.php 0000644 00000003027 13755316603 0011003 0 ustar 00 iterator = $iterator; } /** @inheritdoc */ public function read(): Promise { if ($this->exception) { return new Failure($this->exception); } if ($this->pending) { throw new PendingReadError; } $this->pending = true; $deferred = new Deferred; $this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) { $this->pending = false; if ($error) { $this->exception = $error; $deferred->fail($error); } elseif ($hasNextElement) { $chunk = $this->iterator->getCurrent(); if (!\is_string($chunk)) { $this->exception = new StreamException(\sprintf( "Unexpected iterator value of type '%s', expected string", \is_object($chunk) ? \get_class($chunk) : \gettype($chunk) )); $deferred->fail($this->exception); return; } $deferred->resolve($chunk); } else { $deferred->resolve(); } }); return $deferred->promise(); } } lib/Message.php 0000644 00000011260 13755316603 0007420 0 ustar 00 read()) !== null) { * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. * } * * @deprecated Use Amp\ByteStream\Payload instead. */ class Message implements InputStream, Promise { /** @var InputStream */ private $source; /** @var string */ private $buffer = ""; /** @var \Amp\Deferred|null */ private $pendingRead; /** @var \Amp\Coroutine */ private $coroutine; /** @var bool True if onResolve() has been called. */ private $buffering = false; /** @var \Amp\Deferred|null */ private $backpressure; /** @var bool True if the iterator has completed. */ private $complete = false; /** @var \Throwable Used to fail future reads on failure. */ private $error; /** * @param InputStream $source An iterator that only emits strings. */ public function __construct(InputStream $source) { $this->source = $source; } private function consume(): \Generator { while (($chunk = yield $this->source->read()) !== null) { $buffer = $this->buffer .= $chunk; if ($buffer === "") { continue; // Do not succeed reads with empty string. } elseif ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $this->buffer = ""; $deferred->resolve($buffer); $buffer = ""; // Destroy last emitted chunk to free memory. } elseif (!$this->buffering) { $buffer = ""; // Destroy last emitted chunk to free memory. $this->backpressure = new Deferred; yield $this->backpressure->promise(); } } $this->complete = true; if ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $deferred->resolve($this->buffer !== "" ? $this->buffer : null); $this->buffer = ""; } return $this->buffer; } /** @inheritdoc */ final public function read(): Promise { if ($this->pendingRead) { throw new PendingReadError; } if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); $this->coroutine->onResolve(function ($error) { if ($error) { $this->error = $error; } if ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $deferred->fail($error); } }); } if ($this->error) { return new Failure($this->error); } if ($this->buffer !== "") { $buffer = $this->buffer; $this->buffer = ""; if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } return new Success($buffer); } if ($this->complete) { return new Success; } $this->pendingRead = new Deferred; return $this->pendingRead->promise(); } /** @inheritdoc */ final public function onResolve(callable $onResolved) { $this->buffering = true; if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); } if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } $this->coroutine->onResolve($onResolved); } /** * Exposes the source input stream. * * This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with * other promises. * * @return InputStream */ final public function getInputStream(): InputStream { return $this->source; } } lib/OutputBuffer.php 0000644 00000002142 13755316603 0010465 0 ustar 00 deferred = new Deferred; } public function write(string $data): Promise { if ($this->closed) { throw new ClosedException("The stream has already been closed."); } $this->contents .= $data; return new Success(\strlen($data)); } public function end(string $finalData = ""): Promise { if ($this->closed) { throw new ClosedException("The stream has already been closed."); } $this->contents .= $finalData; $this->closed = true; $this->deferred->resolve($this->contents); $this->contents = ""; return new Success(\strlen($finalData)); } public function onResolve(callable $onResolved) { $this->deferred->promise()->onResolve($onResolved); } } lib/OutputStream.php 0000644 00000002236 13755316603 0010513 0 ustar 00 stream = $stream; } public function __destruct() { if (!$this->promise) { Promise\rethrow(new Coroutine($this->consume())); } } private function consume(): \Generator { try { if ($this->lastRead && null === yield $this->lastRead) { return; } while (null !== yield $this->stream->read()) { // Discard unread bytes from message. } } catch (\Throwable $exception) { // If exception is thrown here the connection closed anyway. } } /** * @inheritdoc * * @throws \Error If a buffered message was requested by calling buffer(). */ final public function read(): Promise { if ($this->promise) { throw new \Error("Cannot stream message data once a buffered message has been requested"); } return $this->lastRead = $this->stream->read(); } /** * Buffers the entire message and resolves the returned promise then. * * @return Promise