2016-05-01 22:03:21 -04:00

437 lines
9.6 KiB
PHP

<?php
namespace Pheanstalk;
/**
* Pheanstalk is a PHP client for the beanstalkd workqueue.
* The Pheanstalk class is a simple facade for the various underlying components.
*
* @see http://github.com/kr/beanstalkd
* @see http://xph.us/software/beanstalkd/
*
* @author Paul Annesley
* @package Pheanstalk
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Pheanstalk implements PheanstalkInterface
{
const VERSION = '3.0.2';
private $_connection;
private $_using = PheanstalkInterface::DEFAULT_TUBE;
private $_watching = array(PheanstalkInterface::DEFAULT_TUBE => true);
/**
* @param string $host
* @param int $port
* @param int $connectTimeout
* @param bool $connectPersistent
*/
public function __construct($host, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false)
{
$this->setConnection(new Connection($host, $port, $connectTimeout, $connectPersistent));
}
/**
* {@inheritdoc}
*/
public function setConnection(Connection $connection)
{
$this->_connection = $connection;
return $this;
}
/**
* {@inheritdoc}
*/
public function getConnection()
{
return $this->_connection;
}
// ----------------------------------------
/**
* {@inheritdoc}
*/
public function bury($job, $priority = PheanstalkInterface::DEFAULT_PRIORITY)
{
$this->_dispatch(new Command\BuryCommand($job, $priority));
}
/**
* {@inheritdoc}
*/
public function delete($job)
{
$this->_dispatch(new Command\DeleteCommand($job));
return $this;
}
/**
* {@inheritdoc}
*/
public function ignore($tube)
{
if (isset($this->_watching[$tube])) {
$this->_dispatch(new Command\IgnoreCommand($tube));
unset($this->_watching[$tube]);
}
return $this;
}
/**
* {@inheritdoc}
*/
public function kick($max)
{
$response = $this->_dispatch(new Command\KickCommand($max));
return $response['kicked'];
}
/**
* {@inheritdoc}
*/
public function kickJob($job)
{
$this->_dispatch(new Command\KickJobCommand($job));
return $this;
}
/**
* {@inheritdoc}
*/
public function listTubes()
{
return (array) $this->_dispatch(
new Command\ListTubesCommand()
);
}
/**
* {@inheritdoc}
*/
public function listTubesWatched($askServer = false)
{
if ($askServer) {
$response = (array) $this->_dispatch(
new Command\ListTubesWatchedCommand()
);
$this->_watching = array_fill_keys($response, true);
}
return array_keys($this->_watching);
}
/**
* {@inheritdoc}
*/
public function listTubeUsed($askServer = false)
{
if ($askServer) {
$response = $this->_dispatch(
new Command\ListTubeUsedCommand()
);
$this->_using = $response['tube'];
}
return $this->_using;
}
/**
* {@inheritdoc}
*/
public function pauseTube($tube, $delay)
{
$this->_dispatch(new Command\PauseTubeCommand($tube, $delay));
return $this;
}
/**
* {@inheritdoc}
*/
public function resumeTube($tube)
{
// Pause a tube with zero delay will resume the tube
$this->pauseTube($tube, 0);
return $this;
}
/**
* {@inheritdoc}
*/
public function peek($jobId)
{
$response = $this->_dispatch(
new Command\PeekCommand($jobId)
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekReady($tube = null)
{
if ($tube !== null) {
$this->useTube($tube);
}
$response = $this->_dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_READY)
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekDelayed($tube = null)
{
if ($tube !== null) {
$this->useTube($tube);
}
$response = $this->_dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekBuried($tube = null)
{
if ($tube !== null) {
$this->useTube($tube);
}
$response = $this->_dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function put(
$data,
$priority = PheanstalkInterface::DEFAULT_PRIORITY,
$delay = PheanstalkInterface::DEFAULT_DELAY,
$ttr = PheanstalkInterface::DEFAULT_TTR
)
{
$response = $this->_dispatch(
new Command\PutCommand($data, $priority, $delay, $ttr)
);
return $response['id'];
}
/**
* {@inheritdoc}
*/
public function putInTube(
$tube,
$data,
$priority = PheanstalkInterface::DEFAULT_PRIORITY,
$delay = PheanstalkInterface::DEFAULT_DELAY,
$ttr = PheanstalkInterface::DEFAULT_TTR
)
{
$this->useTube($tube);
return $this->put($data, $priority, $delay, $ttr);
}
/**
* {@inheritdoc}
*/
public function release(
$job,
$priority = PheanstalkInterface::DEFAULT_PRIORITY,
$delay = PheanstalkInterface::DEFAULT_DELAY
)
{
$this->_dispatch(
new Command\ReleaseCommand($job, $priority, $delay)
);
return $this;
}
/**
* {@inheritdoc}
*/
public function reserve($timeout = null)
{
$response = $this->_dispatch(
new Command\ReserveCommand($timeout)
);
$falseResponses = array(
Response::RESPONSE_DEADLINE_SOON,
Response::RESPONSE_TIMED_OUT,
);
if (in_array($response->getResponseName(), $falseResponses)) {
return false;
} else {
return new Job($response['id'], $response['jobdata']);
}
}
/**
* {@inheritdoc}
*/
public function reserveFromTube($tube, $timeout = null)
{
$this->watchOnly($tube);
return $this->reserve($timeout);
}
/**
* {@inheritdoc}
*/
public function statsJob($job)
{
return $this->_dispatch(new Command\StatsJobCommand($job));
}
/**
* {@inheritdoc}
*/
public function statsTube($tube)
{
return $this->_dispatch(new Command\StatsTubeCommand($tube));
}
/**
* {@inheritdoc}
*/
public function stats()
{
return $this->_dispatch(new Command\StatsCommand());
}
/**
* {@inheritdoc}
*/
public function touch($job)
{
$this->_dispatch(new Command\TouchCommand($job));
return $this;
}
/**
* {@inheritdoc}
*/
public function useTube($tube)
{
if ($this->_using != $tube) {
$this->_dispatch(new Command\UseCommand($tube));
$this->_using = $tube;
}
return $this;
}
/**
* {@inheritdoc}
*/
public function watch($tube)
{
if (!isset($this->_watching[$tube])) {
$this->_dispatch(new Command\WatchCommand($tube));
$this->_watching[$tube] = true;
}
return $this;
}
/**
* {@inheritdoc}
*/
public function watchOnly($tube)
{
$this->watch($tube);
$ignoreTubes = array_diff_key($this->_watching, array($tube => true));
foreach ($ignoreTubes as $ignoreTube => $true) {
$this->ignore($ignoreTube);
}
return $this;
}
// ----------------------------------------
/**
* Dispatches the specified command to the connection object.
*
* If a SocketException occurs, the connection is reset, and the command is
* re-attempted once.
*
* @param Command $command
* @return Response
*/
private function _dispatch($command)
{
try {
$response = $this->_connection->dispatchCommand($command);
} catch (Exception\SocketException $e) {
$this->_reconnect();
$response = $this->_connection->dispatchCommand($command);
}
return $response;
}
/**
* Creates a new connection object, based on the existing connection object,
* and re-establishes the used tube and watchlist.
*/
private function _reconnect()
{
$new_connection = new Connection(
$this->_connection->getHost(),
$this->_connection->getPort(),
$this->_connection->getConnectTimeout()
);
$this->setConnection($new_connection);
if ($this->_using != PheanstalkInterface::DEFAULT_TUBE) {
$tube = $this->_using;
$this->_using = null;
$this->useTube($tube);
}
foreach ($this->_watching as $tube => $true) {
if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
unset($this->_watching[$tube]);
$this->watch($tube);
}
}
if (!isset($this->_watching[PheanstalkInterface::DEFAULT_TUBE])) {
$this->ignore(PheanstalkInterface::DEFAULT_TUBE);
}
}
}