.gitmodules 0000644 00000000140 13755316427 0006731 0 ustar 00 [submodule "docs/.shared"]
path = docs/.shared
url = https://github.com/amphp/amphp.github.io
.php_cs.dist 0000644 00000002513 13755316427 0007001 0 ustar 00 setRiskyAllowed(true)
->setRules([
"@PSR1" => true,
"@PSR2" => true,
"braces" => [
"allow_single_line_closure" => true,
"position_after_functions_and_oop_constructs" => "same",
],
"array_syntax" => ["syntax" => "short"],
"cast_spaces" => true,
"combine_consecutive_unsets" => true,
"function_to_constant" => true,
"no_multiline_whitespace_before_semicolons" => true,
"no_unused_imports" => true,
"no_useless_else" => true,
"no_useless_return" => true,
"no_whitespace_before_comma_in_array" => true,
"no_whitespace_in_blank_line" => true,
"non_printable_character" => true,
"normalize_index_brace" => true,
"ordered_imports" => true,
"php_unit_construct" => true,
"php_unit_dedicate_assert" => true,
"php_unit_fqcn_annotation" => true,
"phpdoc_summary" => true,
"phpdoc_types" => true,
"psr4" => true,
"return_type_declaration" => ["space_before" => "none"],
"short_scalar_cast" => true,
"single_blank_line_before_namespace" => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__ . "/src")
->in(__DIR__ . "/test")
);
CONTRIBUTING.md 0000644 00000002333 13755316427 0007013 0 ustar 00 ## Submitting useful bug reports
Please search existing issues first to make sure this is not a duplicate.
Every issue report has a cost for the developers required to field it; be
respectful of others' time and ensure your report isn't spurious prior to
submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html).
## Development ideology
Truths which we believe to be self-evident:
- **It's an asynchronous world.** Be wary of anything that undermines
async principles.
- **The answer is not more options.** If you feel compelled to expose
new preferences to the user it's very possible you've made a wrong
turn somewhere.
- **There are no power users.** The idea that some users "understand"
concepts better than others has proven to be, for the most part, false.
If anything, "power users" are more dangerous than the rest, and we
should avoid exposing dangerous functionality to them.
## Code style
The amphp project adheres to the PSR-2 style guide with the exception that
opening braces for classes and methods must appear on the same line as
the declaration:
https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md
LICENSE 0000644 00000002065 13755316427 0005571 0 ustar 00 The MIT License (MIT)
Copyright (c) 2016-2018 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Makefile 0000644 00000001517 13755316427 0006225 0 ustar 00 PHP_BIN := php
COMPOSER_BIN := composer
COVERAGE = coverage
SRCS = lib test
find_php_files = $(shell find $(1) -type f -name "*.php")
src = $(foreach d,$(SRCS),$(call find_php_files,$(d)))
.PHONY: test
test: setup phpunit code-style
.PHONY: clean
clean: clean-coverage clean-vendor
.PHONY: clean-coverage
clean-coverage:
test ! -e coverage || rm -r coverage
.PHONY: clean-vendor
clean-vendor:
test ! -e vendor || rm -r vendor
.PHONY: setup
setup: vendor/autoload.php
.PHONY: deps-update
deps-update:
$(COMPOSER_BIN) update
.PHONY: phpunit
phpunit: setup
$(PHP_BIN) vendor/bin/phpunit
.PHONY: code-style
code-style: setup
PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix
composer.lock: composer.json
$(COMPOSER_BIN) install
touch $@
vendor/autoload.php: composer.lock
$(COMPOSER_BIN) install
touch $@
README.md 0000644 00000002344 13755316427 0006043 0 ustar 00 # beanstalk
[![Build Status](https://img.shields.io/travis/amphp/beanstalk/master.svg?style=flat-square)](https://travis-ci.org/amphp/beanstalk)
[![CoverageStatus](https://img.shields.io/coveralls/amphp/beanstalk/master.svg?style=flat-square)](https://coveralls.io/github/amphp/beanstalk?branch=master)
![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square)
`amphp/beanstalk` is an asynchronous [Beanstalk](http://kr.github.io/beanstalkd/) client for PHP based on Amp.
## Installation
```
composer require amphp/beanstalk
```
## Examples
More extensive code examples reside in the [`examples`](./examples) directory.
```php
use('sometube');
$payload = json_encode([
"job" => bin2hex(random_bytes(16)),
"type" => "compress-image",
"path" => "/path/to/image.png"
]);
$jobId = yield $beanstalk->put($payload);
echo "Inserted job id: $jobId\n";
$beanstalk->quit();
});
```
## License
The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. composer.json 0000644 00000001257 13755316427 0007310 0 ustar 00 {
"name": "amphp/beanstalk",
"description": "Async beanstalk driver for Amp.",
"license": "MIT",
"authors": [
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
}
],
"autoload": {
"psr-4": {
"Amp\\Beanstalk\\": "src"
}
},
"require": {
"amphp/amp": "^2",
"amphp/socket": "^1.0",
"amphp/uri": "^0.1",
"symfony/yaml": "^3.3"
},
"require-dev": {
"amphp/phpunit-util": "^1",
"phpunit/phpunit": "^6",
"friendsofphp/php-cs-fixer": "^2.3"
},
"config": {
"platform": {
"php": "7.1.0"
}
}
}
docs/.gitignore 0000644 00000000042 13755316427 0007475 0 ustar 00 .bundle
_site
vendor
Gemfile.lock
docs/Gemfile 0000644 00000000171 13755316427 0007003 0 ustar 00 source "https://rubygems.org"
gem "github-pages"
gem "kramdown"
gem "jekyll-github-metadata"
gem "jekyll-relative-links"
docs/_config.yml 0000644 00000001216 13755316427 0007640 0 ustar 00 kramdown:
input: GFM
toc_levels: 2..3
baseurl: "/beanstalk"
layouts_dir: ".shared/layout"
includes_dir: ".shared/includes"
exclude: ["Gemfile", "Gemfile.lock", "README.md", "vendor"]
safe: true
repository: amphp/beanstalk
gems:
- "jekyll-github-metadata"
- "jekyll-relative-links"
defaults:
- scope:
path: ""
type: "pages"
values:
layout: "docs"
description: "amphp/beanstalk is an asynchronous Beanstalk client for Amp."
keywords: ['amphp', 'amp', 'beanstalk', 'beanstalkd', 'client', 'php', 'queue', 'tube', 'jobs', 'async']
shared_asset_path: "/beanstalk/asset"
navigation:
- classes
- jobs
- tubes
- misc
docs/classes/index.md 0000644 00000000363 13755316427 0010601 0 ustar 00 ---
title: Classes
permalink: /classes/
---
- [`System`](system.md) — Value object representing system stats
- [`Tube`](tube.md) — Value object representing tube stats
- [`Job`](job.md) — Value object representing job stats
docs/classes/job.md 0000644 00000003212 13755316427 0010240 0 ustar 00 ---
title: Job
permalink: /classes/job
---
The `Job` class exposes no methods, just properties. These properties represent various details about a job within [Beanstalk](http://kr.github.io/beanstalkd/)
## Properties
| Property | Type | Description |
| -------- | ---- | ----------- |
| `$id` | int | Beanstalk's internal identifier for the job |
| `$tube` | string | The name of the tube that contains this job |
| `$state` | string | The state of the job, can be "ready", "delayed", "reserved" or "buried" |
| `$priority` | int | The priority of the job set by the put, release, or bury commands |
| `$age` | int | The time in seconds since the put command that created this job |
| `$delay` | int | The number of seconds to wait before putting this job in the ready queue. |
| `$ttr` | int | The number of seconds a worker is allowed to run this job (time to run) |
| `$timeLeft` | int | The number of seconds left until the server puts this job into the ready queue.
This number is only meaningful if the job is reserved or delayed.
If the job is reserved and this amount of time elapses before its state changes, it is considered to have timed out. |
| `$file` | string | The number of the earliest binlog file containing this job.
If -b wasn't used, this will be 0. |
| `$reserves` | int | The number of times this job has been reserved |
| `$timeouts` | int | The number of times this job has timed out during a reservation |
| `$releases` | int | The number of times a client has released this job from a reservation |
| `$buries` | int | The number of times this job has been buried |
| `$kicks` | int | The number of times this job has been kicked |
docs/classes/system.md 0000644 00000007446 13755316427 0011027 0 ustar 00 ---
title: System
permalink: /classes/system
---
The `System` class exposes no methods, just properties. These properties represent various details about a [Beanstalk](http://kr.github.io/beanstalkd/) server
## Properties
| Property | Type | Description |
| -------- | ---- | ----------- |
| `$currentJobsUrgent` | int | The number of ready jobs with priority < 1024 |
| `$currentJobsReady` | int | The number of jobs in the ready queue |
| `$currentJobsReserved` | int | The number of jobs reserved by all clients |
| `$currentJobsDelayed` | int | The number of delayed jobs |
| `$currentJobsBuried` | int | The number of buried jobs |
| `$cmdPut` | int | The cumulative number of put commands |
| `$cmdPeek` | int | The cumulative number of peek commands |
| `$cmdPeekReady` | int | The cumulative number of peek-ready commands |
| `$cmdPeekDelayed` | int | The cumulative number of peek-delayed commands |
| `$cmdPeekBuried` | int | The cumulative number of peek-buried commands |
| `$cmdReserve` | int | The cumulative number of reserve commands |
| `$cmdUse` | int | The cumulative number of use commands |
| `$cmdWatch` | int | The cumulative number of watch commands |
| `$cmdIgnore` | int | The cumulative number of ignore commands |
| `$cmdDelete` | int | The cumulative number of delete commands |
| `$cmdRelease` | int | The cumulative number of release commands |
| `$cmdBury` | int | The cumulative number of bury commands |
| `$cmdKick` | int | The cumulative number of kick commands |
| `$cmdStats` | int | The cumulative number of stats commands |
| `$cmdStatsJob` | int | The cumulative number of stats-job commands |
| `$cmdStatsTube` | int | The cumulative number of stats-tube commands |
| `$cmdListTubes` | int | The cumulative number of list-tubes commands |
| `$cmdListTubeUsed` | int | The cumulative number of list-tubes-used commands |
| `$cmdListTubesWatched` | int | The cumulative number of list-tubes-watched commands |
| `$cmdPauseTube` | int | The cumulative number of pause-tube commands |
| `jobTimeouts` | int | The cumulative count of jobs that timed out |
| `$totalJobs` | int | The cumulative count of jobs created |
| `$maxJobSize` | int | The maximum number of bytes in a job |
| `$currentTubes` | int | The number of currently existing tubes |
| `$currentConnections` | int | The number of currently open connections |
| `$currentProducers` | int | The number of currently open connections that have issued at least one put command |
| `$currentWorkers` | int | The number of currently open connections that have issued at least one reserve command |
| `$currentWaiting` | int | The number of currently open connections that have issued a reserve command but haven't received a response yet |
| `$totalConnections` | int | The cumulative number of connections |
| `$pid` | int | The process id of the server |
| `$version` | string | The version of the server |
| `$rusageUtime` | float | The cumulative user CPU time of this process in seconds and microseconds |
| `$rusageStime` | float | The cumulative system CPU time of this process in seconds and microseconds |
| `$uptime` | int | The number of seconds since this server started running |
| `$binlogOldestIndex` | int | The index of the oldest binlog file needed to store the current jobs |
| `$binlogCurrentIndex` | int | The index of the current binlog file being written to. If binlog is not active this value will be 0 |
| `$binlogMaxSize` | int | The maximum size in bytes a binlog file is allowed to get before a new binlog file is opened |
| `$binlogRecordsWritten` | int | The cumulative number of records written to the binlog |
| `$binlogRecordsMigrated` | int | The cumulative number of records written as part of compaction |
| `$id` | string | Random id string for the server process |
| `$hostname` | string | The hostname of the machine as determined by uname |
docs/classes/tube.md 0000644 00000002653 13755316427 0010435 0 ustar 00 ---
title: Tube
permalink: /classes/tube
---
The `Tube` class exposes no methods, just properties. These properties represent various details about a tube within [Beanstalk](http://kr.github.io/beanstalkd/)
## Properties
| Property | Type | Description |
| -------- | ---- | ----------- |
| `$name` | string | The tubes name |
| `$currentJobsUrgent` | int | The number of ready jobs with priority < 1024 in this tube |
| `$currentJobsReady` | int | The number of jobs in the ready queue in this tube. |
| `$currentJobsReserved` | int | The number of jobs reserved by all clients in this tube |
| `$currentJobsDelayed` | int | The number of delayed jobs in this tube |
| `$currentJobsBuried` | int | The number of buried jobs in this tube |
| `$totalJobs` | int | The cumulative count of jobs created in this tube |
| `$currentUsing` | int | The number of open connections that are currently using this tube |
| `$currentWaiting` | int | The number of open connections that have issued a reserve command while watching this tube |
| `$currentWatching` | int | The number of open connections that are currently watching this tube |
| `$pause` | int | The number of seconds the tube has been paused for |
| `$cmdDelete` | int | The cumulative number of delete commands for this tube |
| `$cmdPauseTube` | int | The cumulative number of pause-tube commands for this tube |
| `$pauseTimeLeft` | int | The number of seconds until the tube is un-paused |
docs/index.md 0000644 00000001274 13755316427 0007146 0 ustar 00 ---
title: Introduction
permalink: /
---
`amphp/beanstalk` is an asynchronous client for [Beanstalk][beanstalk].
## Installation
```
composer require amphp/beanstalk
```
## Usage
Connecting to a [Beanstalk][beanstalk] server.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
// If you already know the tube you need to connect to, or have a single tube, you
// can connect to the server with an additional tube query parameter.
// $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300?tube=foobar");
$systemStats = yield $beanstalk->getSystemStats();
$readyJobs = $systemStats->currentJobsReady;
```
[beanstalk]: http://kr.github.io/beanstalkd/
docs/jobs.md 0000644 00000003050 13755316427 0006766 0 ustar 00 ---
title: Working with Jobs
permalink: /jobs
---
* Table of Contents
{:toc}
## Pushing Jobs onto a Queue
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
// This step not required if you included a tube query parameter when creating the client
$beanstalk->use('foobar');
$payload = json_encode([
"job" => bin2hex(random_bytes(16)),
"type" => "compress-image"
"path" => "/path/to/image.png"
]);
$jobId = yield $beanstalk->put($payload);
```
## Pulling Jobs off a Queue
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$beanstalk->watch('foobar');
while([$jobId, $jobData] = yield $beanstalk->reserve()) {
// Work the job using $jobData
// Once you're finished, delete the job
yield $beanstalk->delete($jobId);
// If there was an error, you can bury the job for inspection later
yield $beanstalk->bury($jobId);
// Of you can release the job, to be picked up by a new worker
yield $beanstalk->release($jobId);
}
```
## Working a Long Running Job
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$beanstalk->watch('foobar');
while([$jobId, $jobData] = yield $beanstalk->reserve()) {
// Work the job
// If you still need time to work the job, you can utilize the touch command
yield $beantstalk->touch($jobId);
}
```
## Getting a Jobs Stats
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$jobStats = yield $beanstalk->getJobStats($jobId = 42);
$jobStats->state; // ready
```
docs/misc.md 0000644 00000000771 13755316427 0006773 0 ustar 00 ---
title: Miscellaneous
permalink: /misc
---
* Table of Contents
{:toc}
## Get System Stats
To see what stats are available for the system, checkout the [System](classes/system) class page.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$stats = yield $beanstalk->getSystemStats();
```
## Close the Connection
To manually close the connection to the server.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$beanstalk->quit();
```
docs/tubes.md 0000644 00000005153 13755316427 0007161 0 ustar 00 ---
title: Working with Tubes
permalink: /tubes
---
* Table of Contents
{:toc}
## Using a Different Tube
By default Beanstalk will use the default tube for reserving and storing new jobs. To work with a different tube, you can use `use`:
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
// This will store the job on the "default" tube.
$jobId = yield $beanstalk->put($payload = json_encode([
"job" => bin2hex(random_bytes(16)),
"type" => "compress-image"
"path" => "/path/to/image.png"
]););
$beanstalk->use('foobar');
// This will store the job on the "foobar" tube.
$jobId = yield $beanstalk->put($payload = json_encode([
"job" => bin2hex(random_bytes(16)),
"type" => "compress-image"
"path" => "/path/to/image.png"
]));
```
## Pausing a Tube
If you need to pause a tube, preventing any new jobs from being reserved, you can do the following:
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
yield $beanstalk->pause($tube = 'foobar');
```
## Watching and Ignoring Tubes
By default when you reserve a job you'll either pull from the `default` tube, or the tube you `use`ed. If you'd like to reserve jobs from other tubes, you can use `watch` to pull from multiple tubes. If you need to remove a job from the watch list, you can use `ignore`.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
yield $beanstalk->watch($tube = 'foobar');
yield $beanstalk->watch($tube = 'barbaz');
yield $beanstalk->ignore($tube = 'default');
// Watchlist will contain "foobar" and "barbaz"
```
### Getting the Watched Tubes
To find out which tubes your connection is currently watching.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
yield $beanstalk->watch($tube = 'foobar');
yield $beanstalk->watch($tube = 'barbaz');
yield $beanstalk->ignore($tube = 'default');
$watchlist = $beanstalk->listWatchedTubes();
```
## Get a List of All Existing Tubes
If you need to see a list of all the tubes that exist on the server.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$tubes = yield $beanstalk->listTubes();
```
## Get the Tube Being Used
To determine which tube your client is currently using.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$tube = yield $beanstalk->getUsedTube();
```
## Get Tube Stats
To see what stats are available for a tube, checkout the [Tube](classes/tube) class page.
```php
$beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300");
$stats = yield $beanstalk->getTubeStats($tube = 'default');
```
phpunit.xml.dist 0000644 00000000603 13755316427 0007733 0 ustar 00
./test
./src
src/BadFormatException.php 0000644 00000000131 13755316427 0011572 0 ustar 00 applyUri($uri);
$this->deferreds = [];
$this->connection = new Connection($uri);
$this->connection->addEventHandler("response", function ($response) {
/** @var Deferred $deferred */
$deferred = array_shift($this->deferreds);
if ($response instanceof Throwable) {
$deferred->fail($response);
} else {
$deferred->resolve($response);
}
});
$this->connection->addEventHandler(["close", "error"], function (Throwable $error = null) {
if ($error) {
// Fail any outstanding promises
while ($this->deferreds) {
/** @var Deferred $deferred */
$deferred = array_shift($this->deferreds);
$deferred->fail($error);
}
}
});
if ($this->tube) {
$this->connection->addEventHandler("connect", function () {
array_unshift($this->deferreds, new Deferred);
return "use $this->tube\r\n";
});
}
}
private function applyUri(string $uri) {
$this->tube = (new Uri($uri))->getQueryParameter("tube");
}
private function send(string $message, callable $transform = null): Promise {
return call(function () use ($message, $transform) {
$this->deferreds[] = $deferred = new Deferred;
$promise = $deferred->promise();
yield $this->connection->send($message);
$response = yield $promise;
return $transform ? $transform($response) : $response;
});
}
public function use(string $tube) {
return $this->send("use " . $tube . "\r\n", function () use ($tube) {
$this->tube = $tube;
return null;
});
}
public function pause(string $tube, int $delay): Promise {
$payload = "pause-tube $tube $delay\r\n";
return $this->send($payload, function (array $response) use ($tube) {
list($type) = $response;
switch ($type) {
case "PAUSED":
return null;
case "NOT_FOUND":
throw new NotFoundException("Tube with name $tube is not found");
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Promise {
$payload = "put $priority $delay $timeout " . strlen($payload) . "\r\n$payload\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "INSERTED":
case "BURIED":
return (int) $response[1];
case "EXPECTED_CRLF":
throw new ExpectedCrlfException;
case "JOB_TOO_BIG":
throw new JobTooBigException;
case "DRAINING":
throw new DrainingException;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function reserve(int $timeout = null): Promise {
$payload = $timeout === null ? "reserve\r\n" : "reserve-with-timeout $timeout\r\n";
return $this->send($payload, function (array $response): array {
list($type) = $response;
switch ($type) {
case "DEADLINE_SOON":
throw new DeadlineSoonException;
case "TIMED_OUT":
throw new TimedOutException;
case "RESERVED":
return [$response[1], $response[2]];
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function delete(int $id): Promise {
$payload = "delete $id\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "DELETED":
return true;
case "NOT_FOUND":
return false;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function release(int $id, int $delay = 0, int $priority = 0): Promise {
$payload = "release $id $priority $delay\r\n";
return $this->send($payload, function (array $response): string {
list($type) = $response;
switch ($type) {
case "BURIED":
case "RELEASED":
case "NOT_FOUND":
return $type;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function bury(int $id, int $priority = 0): Promise {
$payload = "bury $id $priority\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "BURIED":
return true;
case "NOT_FOUND":
return false;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function kickJob(int $id): Promise {
$payload = "kick-job $id\r\n";
return $this->send($payload, function (array $response): bool {
list($type) = $response;
switch ($type) {
case "KICKED":
return true;
case "NOT_FOUND":
return false;
default:
throw new BeanstalkException("Unknown response: $type");
}
});
}
public function kick(int $count): Promise {
$payload = "kick $count\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "KICKED":
return (int) $response[1];
default:
throw new BeanstalkException("Unknown response: $type");
}
});
}
public function touch(int $id): Promise {
$payload = "touch $id\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "TOUCHED":
return true;
case "NOT_FOUND":
return false;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function watch(string $tube): Promise {
$payload = "watch $tube\r\n";
return $this->send($payload, function (array $response): int {
if ($response[0] !== "WATCHING") {
throw new BeanstalkException("Unknown response: " . $response[0]);
}
return (int) $response[1];
});
}
public function ignore(string $tube): Promise {
$payload = "ignore $tube\r\n";
return $this->send($payload, function (array $response): int {
list($type) = $response;
switch ($type) {
case "WATCHING":
return (int) $response[1];
case "NOT_IGNORED":
throw new NotIgnoredException;
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function quit() {
$this->send("quit\r\n");
}
public function getJobStats(int $id): Promise {
$payload = "stats-job $id\r\n";
return $this->send($payload, function (array $response) use ($id): Job {
list($type) = $response;
switch ($type) {
case "OK":
return new Job(Yaml::parse($response[1]));
case "NOT_FOUND":
throw new NotFoundException("Job with $id is not found");
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function getTubeStats(string $tube): Promise {
$payload = "stats-tube $tube\r\n";
return $this->send($payload, function (array $response) use ($tube): Tube {
list($type) = $response;
switch ($type) {
case "OK":
return new Tube(Yaml::parse($response[1]));
case "NOT_FOUND":
throw new NotFoundException("Tube $tube is not found");
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function getSystemStats(): Promise {
$payload = "stats\r\n";
return $this->send($payload, function (array $response): System {
if ($response[0] !== "OK") {
throw new BeanstalkException("Unknown response: " . $response[0]);
}
return new System(Yaml::parse($response[1]));
});
}
public function listTubes(): Promise {
$payload = "list-tubes\r\n";
return $this->send($payload, function (array $response): array {
list($type) = $response;
switch ($type) {
case "OK":
return Yaml::parse($response[1]);
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function listWatchedTubes(): Promise {
$payload = "list-tubes-watched\r\n";
return $this->send($payload, function (array $response): array {
list($type) = $response;
switch ($type) {
case "OK":
return Yaml::parse($response[1]);
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function getUsedTube(): Promise {
$payload = "list-tube-used\r\n";
return $this->send($payload, function (array $response): string {
list($type) = $response;
switch ($type) {
case "USING":
return $response[1];
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function peek(int $id): Promise {
$payload = "peek $id\r\n";
return $this->send($payload, function (array $response) use ($id): string {
list($type) = $response;
switch ($type) {
case "FOUND":
return $response[2];
case "NOT_FOUND":
throw new NotFoundException("Job with $id is not found");
default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
public function peekReady(): Promise {
return $this->peekInState('ready');
}
public function peekDelayed(): Promise {
return $this->peekInState('delayed');
}
public function peekBuried(): Promise {
return $this->peekInState('buried');
}
private function peekInState(string $state): Promise {
$payload = "peek-$state\r\n";
return $this->send(
$payload,
function (array $response) use ($state): string {
list($type) = $response;
switch ($type) {
case "FOUND":
return $response[2];
case "NOT_FOUND":
throw new NotFoundException("No Job in $state state");
default:
throw new BeanstalkException("Unknown response: " . $type);
}
}
);
}
}
src/BeanstalkException.php 0000644 00000000121 13755316427 0011636 0 ustar 00 applyUri($uri);
$this->handlers = [
"connect" => [],
"response" => [],
"error" => [],
"close" => [],
];
$this->parser = new Parser(function ($response) {
foreach ($this->handlers["response"] as $handler) {
$handler($response);
}
if ($response instanceof BadFormatException) {
$this->onError($response);
}
});
}
private function applyUri($uri) {
$uri = new Uri($uri);
$this->timeout = (int) ($uri->getQueryParameter("timeout") ?? $this->timeout);
$this->uri = $uri->getScheme() . "://" . $uri->getHost() . ":" . $uri->getPort();
}
public function addEventHandler($events, callable $callback) {
$events = (array) $events;
foreach ($events as $event) {
if (!isset($this->handlers[$event])) {
throw new \Error("Unknown event: " . $event);
}
$this->handlers[$event][] = $callback;
}
}
public function send(string $payload) {
return call(function () use ($payload) {
yield $this->connect();
yield $this->socket->write($payload);
});
}
private function connect() {
// If we're in the process of connecting already return that same promise
if ($this->connectPromisor) {
return $this->connectPromisor->promise();
}
// If a read watcher exists we know we're already connected
if ($this->socket) {
return new Success;
}
$this->connectPromisor = new Deferred;
$socketPromise = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout));
$socketPromise->onResolve(function ($error, $socket) {
$connectPromisor = $this->connectPromisor;
$this->connectPromisor = null;
if ($error) {
$connectPromisor->fail(new ConnectException(
"Connection attempt failed",
$code = 0,
$error
));
return;
}
$this->socket = $socket;
foreach ($this->handlers["connect"] as $handler) {
$pipelinedCommand = $handler();
if (!empty($pipelinedCommand)) {
$this->socket->write($pipelinedCommand);
}
}
asyncCall(function () {
while (null !== $chunk = yield $this->socket->read()) {
$this->parser->send($chunk);
}
$this->close();
});
$connectPromisor->resolve();
});
return $this->connectPromisor->promise();
}
private function onError(\Throwable $exception) {
foreach ($this->handlers["error"] as $handler) {
$handler($exception);
}
$this->close();
}
public function close() {
$this->parser->reset();
if ($this->socket) {
$this->socket->close();
$this->socket = null;
}
foreach ($this->handlers["close"] as $handler) {
$handler();
}
}
public function __destruct() {
$this->close();
}
}
src/DeadlineSoonException.php 0000644 00000000134 13755316427 0012302 0 ustar 00 responseCallback = $responseCallback;
}
public function send(string $bytes) {
$this->buffer .= $bytes;
do {
$pos = strpos($this->buffer, self::CRLF);
if ($pos === false) {
return;
}
$line = substr($this->buffer, 0, $pos);
$args = explode(" ", $line);
$callback = $this->responseCallback;
switch ($args[0]) {
case self::ERROR_OUT_OF_MEMORY:
$this->buffer = substr($this->buffer, strlen($line) + 2);
$callback(new OutOfMemoryException);
break;
case self::ERROR_INTERNAL_ERROR:
$this->buffer = substr($this->buffer, strlen($line) + 2);
$callback(new InternalErrorException);
break;
case self::ERROR_BAD_FORMAT:
$this->buffer = substr($this->buffer, strlen($line) + 2);
$callback(new BadFormatException);
break;
case self::ERROR_UNKNOWN_COMMAND:
$this->buffer = substr($this->buffer, strlen($line) + 2);
$callback(new UnknownCommandException);
break;
case "OK":
$size = (int) $args[1];
if (strlen($line) + $size + 4 > strlen($this->buffer)) {
return;
}
$data = substr($this->buffer, strlen($line) + 2, $size);
$this->buffer = substr($this->buffer, strlen($line) + $size + 4);
$callback(["OK", $data]);
break;
case "FOUND":
case "RESERVED":
$size = (int) $args[2];
if (strlen($line) + $size + 4 > strlen($this->buffer)) {
return;
}
$data = substr($this->buffer, strlen($line) + 2, $size);
$this->buffer = substr($this->buffer, strlen($line) + $size + 4);
$callback([$args[0], (int) $args[1], $data]);
break;
default:
$this->buffer = substr($this->buffer, strlen($line) + 2);
$callback($args);
break;
}
} while (isset($this->buffer[0]));
}
public function reset() {
$this->buffer = "";
}
}
src/Stats/Job.php 0000644 00000002273 13755316427 0007675 0 ustar 00 id = (int) $struct["id"];
$this->tube = $struct["tube"];
$this->state = $struct["state"];
$this->priority = (int) $struct["pri"];
$this->age = (int) $struct["age"];
$this->delay = (int) $struct["delay"];
$this->ttr = (int) $struct["ttr"];
$this->timeLeft = (int) $struct["time-left"];
$this->file = $struct["file"];
$this->reserves = (int) $struct["reserves"];
$this->timeouts = (int) $struct["timeouts"];
$this->releases = (int) $struct["releases"];
$this->buries = (int) $struct["buries"];
$this->kicks = (int) $struct["kicks"];
}
public $id;
public $tube;
public $state;
public $priority;
public $age;
public $delay;
public $ttr;
public $timeLeft;
public $file;
public $reserves;
public $timeouts;
public $releases;
public $buries;
public $kicks;
}
src/Stats/System.php 0000644 00000010142 13755316427 0010441 0 ustar 00 currentJobsUrgent = (int) $struct["current-jobs-urgent"];
$this->currentJobsReady = (int) $struct["current-jobs-ready"];
$this->currentJobsReserved = (int) $struct["current-jobs-reserved"];
$this->currentJobsDelayed = (int) $struct["current-jobs-delayed"];
$this->currentJobsBuried = (int) $struct["current-jobs-buried"];
$this->cmdPut = (int) $struct["cmd-put"];
$this->cmdPeek = (int) $struct["cmd-peek"];
$this->cmdPeekReady = (int) $struct["cmd-peek-ready"];
$this->cmdPeekDelayed = (int) $struct["cmd-peek-delayed"];
$this->cmdPeekBuried = (int) $struct["cmd-peek-buried"];
$this->cmdReserve = (int) $struct["cmd-reserve"];
$this->cmdUse = (int) $struct["cmd-use"];
$this->cmdWatch = (int) $struct["cmd-watch"];
$this->cmdIgnore = (int) $struct["cmd-ignore"];
$this->cmdDelete = (int) $struct["cmd-delete"];
$this->cmdRelease = (int) $struct["cmd-release"];
$this->cmdBury = (int) $struct["cmd-bury"];
$this->cmdKick = (int) $struct["cmd-kick"];
$this->cmdStats = (int) $struct["cmd-stats"];
$this->cmdStatsJob = (int) $struct["cmd-stats-job"];
$this->cmdStatsTube = (int) $struct["cmd-stats-tube"];
$this->cmdListTubes = (int) $struct["cmd-list-tubes"];
$this->cmdListTubeUsed = (int) $struct["cmd-list-tube-used"];
$this->cmdListTubesWatched = (int) $struct["cmd-list-tubes-watched"];
$this->cmdPauseTube = (int) $struct["cmd-pause-tube"];
$this->jobTimeouts = (int) $struct["job-timeouts"];
$this->totalJobs = (int) $struct["total-jobs"];
$this->maxJobSize = (int) $struct["max-job-size"];
$this->currentTubes = (int) $struct["current-tubes"];
$this->currentConnections = (int) $struct["current-connections"];
$this->currentProducers = (int) $struct["current-producers"];
$this->currentWorkers = (int) $struct["current-workers"];
$this->currentWaiting = (int) $struct["current-waiting"];
$this->totalConnections = (int) $struct["total-connections"];
$this->pid = (int) $struct["pid"];
$this->version = $struct["version"];
$this->rusageUtime = (float) $struct["rusage-utime"];
$this->rusageStime = (float) $struct["rusage-stime"];
$this->uptime = (int) $struct["uptime"];
$this->binlogOldestIndex = (int) $struct["binlog-oldest-index"];
$this->binlogCurrentIndex = (int) $struct["binlog-current-index"];
$this->binlogMaxSize = (int) $struct["binlog-max-size"];
$this->binlogRecordsWritten = (int) $struct["binlog-records-written"];
$this->binlogRecordsMigrated = (int) $struct["binlog-records-migrated"];
$this->id = $struct["id"];
$this->hostname = $struct["hostname"];
}
public $currentJobsUrgent;
public $currentJobsReady;
public $currentJobsReserved;
public $currentJobsDelayed;
public $currentJobsBuried;
public $cmdPut;
public $cmdPeek;
public $cmdPeekReady;
public $cmdPeekDelayed;
public $cmdPeekBuried;
public $cmdReserve;
public $cmdUse;
public $cmdWatch;
public $cmdIgnore;
public $cmdDelete;
public $cmdRelease;
public $cmdBury;
public $cmdKick;
public $cmdStats;
public $cmdStatsJob;
public $cmdStatsTube;
public $cmdListTubes;
public $cmdListTubeUsed;
public $cmdListTubesWatched;
public $cmdPauseTube;
public $jobTimeouts;
public $totalJobs;
public $maxJobSize;
public $currentTubes;
public $currentConnections;
public $currentProducers;
public $currentWorkers;
public $currentWaiting;
public $totalConnections;
public $pid;
public $version;
public $rusageUtime;
public $rusageStime;
public $uptime;
public $binlogOldestIndex;
public $binlogCurrentIndex;
public $binlogMaxSize;
public $binlogRecordsWritten;
public $binlogRecordsMigrated;
public $id;
public $hostname;
}
src/Stats/Tube.php 0000644 00000002603 13755316427 0010057 0 ustar 00 name = $struct["name"];
$this->currentJobsUrgent = (int) $struct["current-jobs-urgent"];
$this->currentJobsReady = (int) $struct["current-jobs-ready"];
$this->currentJobsReserved = (int) $struct["current-jobs-reserved"];
$this->currentJobsDelayed = (int) $struct["current-jobs-delayed"];
$this->currentJobsBuried = (int) $struct["current-jobs-buried"];
$this->totalJobs = (int) $struct["total-jobs"];
$this->currentUsing = (int) $struct["current-using"];
$this->currentWaiting = (int) $struct["current-waiting"];
$this->currentWatching = (int) $struct["current-watching"];
$this->pause = (int) $struct["pause"];
$this->cmdDelete = (int) $struct["cmd-delete"];
$this->cmdPauseTube = (int) $struct["cmd-pause-tube"];
$this->pauseTimeLeft = (int) $struct["pause-time-left"];
}
public $name;
public $currentJobsUrgent;
public $currentJobsReady;
public $currentJobsReserved;
public $currentJobsDelayed;
public $currentJobsBuried;
public $totalJobs;
public $currentUsing;
public $currentWaiting;
public $currentWatching;
public $pause;
public $cmdDelete;
public $cmdPauseTube;
public $pauseTimeLeft;
}
src/TimedOutException.php 0000644 00000000130 13755316427 0011464 0 ustar 00