diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..09f37b8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea +.DS_Store +._.* +vendor +composer.lock +composer.phar \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..78aae6f --- /dev/null +++ b/.travis.yml @@ -0,0 +1,18 @@ +language: php + +php: + - 5.3 + - 5.4 + - 5.5 + - 5.6 + - hhvm + +before_script: + - composer self-update + - composer install --no-interaction --prefer-source --dev + +script: + - ./vendor/bin/phpunit + +notifications: + email: false \ No newline at end of file diff --git a/Queue/Exception.php b/Queue/Exception.php new file mode 100644 index 0000000..7ac9189 --- /dev/null +++ b/Queue/Exception.php @@ -0,0 +1,18 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Queue; + +/** + * Exception class for Queue Namespace. + * + * @author Andrius Putna + */ +class Exception extends \Exception {} \ No newline at end of file diff --git a/Queue/Manager.php b/Queue/Manager.php new file mode 100644 index 0000000..5d7abb7 --- /dev/null +++ b/Queue/Manager.php @@ -0,0 +1,397 @@ + + */ +class Manager +{ + /** + * @var $_pdo PDO + */ + protected $_pdo; + + public function setPdo(\PDO $pdo) + { + $this->_pdo = $pdo; + } + + /** + * Configure database + * + * @return \PDO + */ + public function getDb() + { + if(!is_object($this->_pdo)) { + throw new Exception('Database connection was not set'); + } + return $this->_pdo; + } + + /** + * Return Queue summary + * + * @param string $name + * @return array + */ + public function queueSummary($name) + { + $qid = $this->getQueueId($name); + + $db = $this->getDb(); + $list = array(); + $sql = 'SELECT * FROM message WHERE queue_id = :queue_id'; + $sth = $db->prepare($sql); + $sth->execute(array('queue_id'=>$qid)); + foreach($sth->fetchAll() as $msg) { + $o = unserialize(base64_decode($msg['body'])); + $list[] = array( + 'queue_name' => $name, + 'message_id' => $msg['message_id'], + 'message_class' => get_class($o), + 'handle' => $msg['handle'], + 'log' => $msg['log'], + 'created' => $msg['created'], + 'params' => $o->toArray(), + 'timeout' => $msg['timeout'], + ); + } + return $list; + } + + /** + * Return all Queues summary + * + * @return array + */ + public function summary() + { + $list = array(); + foreach($this->getQueues() as $queue) { + $list = array_merge($list, $this->queueSummary($queue->getName())); + } + return $list; + } + + /** + * Select unselected messages from queue + * + * @param Queue $queue + * @param int $max + * @param int $timeout + * @return \ArrayObject + */ + private function receiveQueueMessages(Queue $queue, $max, $timeout) + { + $msgs = array(); + $microtime = microtime(true); // cache microtime + $db = $this->getDb(); + $qid = $this->getQueueId($queue->getName()); + + // start transaction handling + try { + if ( $max > 0 ) { + $db->beginTransaction(); + + $sql = "SELECT * + FROM message + WHERE queue_id = ? + AND handle IS NULL OR timeout+" . (int)$timeout . " < " . (int)$microtime ." + LIMIT ?"; + $stmt = $db->prepare($sql); + $stmt->execute(array($qid, $max)); + + foreach ($stmt->fetchAll() as $data) { + $data['handle'] = md5(uniqid(rand(), true)); + + $sql = "UPDATE message + SET + handle = :handle, + timeout = :timeout + WHERE + message_id = :id + AND + handle IS NULL OR timeout+" . (int)$timeout . " < " . (int)$microtime; + + $stmt = $db->prepare($sql); + $stmt->bindParam(':handle', $data['handle'], \PDO::PARAM_STR); + $stmt->bindParam(':id', $data['message_id'], \PDO::PARAM_STR); + $stmt->bindValue(':timeout', $microtime); + $updated = $stmt->execute(); + + if ($updated) { + $msgs[] = $data; + } + } + $db->commit(); + } + } catch (\Exception $e) { + $db->rollBack(); + throw new Exception($e->getMessage(), $e->getCode(), $e); + } + + $m = array(); + foreach($msgs as $msg) { + $message = unserialize(base64_decode($msg['body'])); + if($message instanceof Message) { + $message->message_id = $msg['message_id']; + $m[] = $message; + } + } + + return new \ArrayObject($m); + } + + /** + * Execute messages in queue + * + * @param Queue $queue + * @param int $max + * @param int $timeout + */ + public function executeQueue(Queue $queue, $max, $timeout) + { + if(null == $timeout) { + $timeout = self::RECEIVE_TIMEOUT_DEFAULT; + } + + foreach($this->receiveQueueMessages($queue, $max, $timeout) as $message) { + try { + $message->execute(); + $this->deleteMessage($message); + } catch(\Exception $e) { + $this->log($message, $e); + } + } + } + + /** + * Execute all queues + * + * @param int $max + */ + public function executeAll($max = 50) + { + $sth = $this->getDb()->prepare('SELECT * FROM queue WHERE 1'); + $sth->execute(); + foreach($sth->fetchAll() as $queue) { + $q = new Queue($queue['queue_name'], $queue['timeout']); + $q->setManager($this); + $q->execute($max); + } + } + + /** + * Add new message to queue + * + * @param Message $message + * @param type $name + * @return type + */ + public function addMessageToQueue(Message $message, $name) + { + $q = $this->getQueue($name); + $qid = $this->getQueueId($q->getName()); + $body = base64_encode(serialize($message)); + $md5 = md5($body); + + $sql = 'INSERT INTO message + (queue_id, body, created, timeout, md5) + VALUES + (:queue_id, :body, :created, :timeout, :md5) + '; + $stmt = $this->getDb()->prepare($sql); + $stmt->bindParam(':queue_id', $qid, \PDO::PARAM_INT); + $stmt->bindParam(':body', $body, \PDO::PARAM_STR); + $stmt->bindParam(':md5', $md5, \PDO::PARAM_STR); + $stmt->bindValue(':created', time(), \PDO::PARAM_INT); + $stmt->bindValue(':timeout', 30, \PDO::PARAM_INT); + $stmt->execute(); + return true; + } + + /** + * Return new queue instance + * + * @param type $name + * @param type $timeout + * @return Queue + */ + public function getQueue($name, $timeout = null) + { + if (!is_string($name)) { + throw new Exception('$name is not a string'); + } + + if ((null !== $timeout) && !is_integer($timeout)) { + throw new Exception('$timeout must be an integer'); + } + + if(!$this->getQueueId($name)) { + $this->create($name, $timeout); + } + + $q = new Queue($name, $timeout); + $q->setManager($this); + return $q; + } + + /** + * Returns queue id or bool false + * @param string $name + * @return int + */ + public function getQueueId($name) + { + $sql = 'SELECT queue_id FROM queue WHERE queue_name = ? LIMIT 1'; + $stmt = $this->getDb()->prepare($sql); + $stmt->execute(array($name)); + return $stmt->fetchColumn(); + } + + /** + * Creates new Queue + * + * @param string $name + * @param int $timeout + * @return bool + */ + private function create($name, $timeout = null) + { + if(null === $timeout) { + $timeout = 10000; + } + + $sql = 'INSERT INTO queue + (queue_name, timeout) + VALUES + (:queue_name, :timeout) + '; + $stmt = $this->getDb()->prepare($sql); + $stmt->bindParam(':queue_name', $name, \PDO::PARAM_STR); + $stmt->bindParam(':timeout', $timeout, \PDO::PARAM_INT); + $stmt->execute(); + return true; + } + + public function getQueues() + { + $sql = 'SELECT * FROM queue WHERE 1'; + $sth = $this->getDb()->prepare($sql); + $sth->execute(); + $list = array(); + foreach($sth->fetchAll() as $q) { + $list[] = new Queue($q['queue_name'], $q['timeout']); + } + return new \ArrayObject($list); + } + + public function clearQueues() + { + $sth = $this->getDb()->prepare('DELETE FROM message WHERE 1'); + $sth->execute(); + $sth = $this->getDb()->prepare('DELETE FROM queue WHERE 1'); + $sth->execute(); + } + + public function clearQueue($name) + { + $qid = $this->getQueueId($name); + $sth = $this->getDb()->prepare('DELETE FROM message WHERE queue_id = ?'); + $sth->execute(array($qid)); + $sth = $this->getDb()->prepare('DELETE FROM queue WHERE queue_id = ?'); + $sth->execute(array($qid)); + } + + public function getWaitingMessages() + { + $db = $this->getDb(); + $sql = 'SELECT * FROM message WHERE handle IS NULL'; + $sth = $db->prepare($sql); + $sth->execute(); + $list = array(); + foreach($sth->fetchAll() as $msg) { + $list[] = unserialize(base64_decode($msg['body'])); + } + return $list; + } + + public function getStuckMessages() + { + $db = $this->getDb(); + $sql = 'SELECT * FROM message WHERE handle IS NOT NULL'; + $sth = $db->prepare($sql); + $sth->execute(); + $list = array(); + foreach($sth->fetchAll() as $msg) { + $list[] = unserialize(base64_decode($msg['body'])); + } + return $list; + } + + /** + * Remove message from queue + * + * @param Message $message + * @return bool + */ + public function deleteMessage(Message $message) + { + return $this->deleteMessageById($message->message_id); + } + + /** + * Remove message from queue by id + * + * @param int $id + * @return bool + */ + public function deleteMessageById($id) + { + $sql = 'DELETE FROM message WHERE message_id = ?'; + $stmt = $this->getDb()->prepare($sql); + $stmt->execute(array($id)); + return true; + } + + /** + * Saves error message to message on execution failure + * + * @param Message $message + * @param \Exception $e + * @return void + */ + public function log(Message $message, \Exception $e) + { + $sql = "UPDATE message + SET log = :log + WHERE message_id = :id + "; + $stmt = $this->getDb()->prepare($sql); + $stmt->bindValue(':log', $e->getMessage(), \PDO::PARAM_STR); + $stmt->bindValue(':id', $message->message_id, \PDO::PARAM_INT); + $stmt->execute(); + } + + /** + * Get total messages in all queues + * + * @return int + */ + public function getTotalMessages() + { + $sql = 'SELECT COUNT(message_id) FROM message WHERE 1'; + $stmt = $this->getDb()->prepare($sql); + $stmt->execute(); + return $stmt->fetchColumn(); + } +} \ No newline at end of file diff --git a/Queue/Message.php b/Queue/Message.php new file mode 100644 index 0000000..20b3d75 --- /dev/null +++ b/Queue/Message.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Queue; + +use Queue\Exception; + +/** + * Message abstract class + * + * This class should be extended by your custom Message. + * Every message has its own execute method. + * + * @author Andrius Putna + */ +abstract class Message +{ + protected $_data = array(); + + public function __construct(array $options = array()) + { + $this->_data = $options; + } + + abstract public function execute(); + + public function toArray() + { + return $this->_data; + } + + public function __get($key) + { + if (!array_key_exists($key, $this->_data)) { + throw new Exception("Specified field \"$key\" is not in the message"); + } + return $this->_data[$key]; + } + + public function __set($key, $value) + { + $this->_data[$key] = $value; + } + + public function __sleep() + { + return array('_data'); + } + + public function __toString() + { + return sprintf('Message %s with params %s'.PHP_EOL, get_class($this), print_r($this->_data, true)); + } +} \ No newline at end of file diff --git a/Queue/Queue.php b/Queue/Queue.php new file mode 100644 index 0000000..2fb6c6d --- /dev/null +++ b/Queue/Queue.php @@ -0,0 +1,114 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Queue; + +use Queue\Message; +use Queue\Manager; + +/** + * Queue class + * + * Main Queue class reprents One Queue and is a Proxy for all queues. + * + * @author Andrius Putna + */ +class Queue +{ + /** + * Message will be selected + * once again after 30s for execution. + * Message must be removed from queue in order to not select it + */ + const RECEIVE_TIMEOUT_DEFAULT = 30; + + private $name; + private $timeout; + + /** + * @var $manager Manager + */ + protected $manager; + + public function __construct($name = 'UNIVERSAL', $timeout = 10000) + { + $this->name = $name; + $this->timeout = $timeout; + } + + public function setManager(Manager $manager) + { + $this->manager = $manager; + } + + /** + * Return Queue manager + * + * @return Manager + */ + private function getManager() + { + if(!$this->manager) { + throw new \Exception('Queue manager was not set'); + } + return $this->manager; + } + + /** + * Return queue identificator + * + * @param string + */ + public function getName() + { + return $this->name; + } + + /** + * Return queue timeout + * @return int + */ + public function getTimeout() + { + return $this->timeout; + } + + /** + * Send message to Queue + * + * @param Message $message + */ + public function send(Message $message) + { + return $this->getManager()->addMessageToQueue($message, $this->name); + } + + /** + * Executes $max messages from queue + * + * @param int $max + * @param int $timeout + * @return void + */ + public function execute($max = 50, $timeout = self::RECEIVE_TIMEOUT_DEFAULT) + { + return $this->getManager()->executeQueue($this, $max, $timeout); + } + + /** + * Prints class + * + * @return string + */ + public function __toString() + { + return sprintf('Queue: %s Timeout: %s'.PHP_EOL,$this->name, $this->timeout); + } +} \ No newline at end of file diff --git a/QueueExample/Dummy.php b/QueueExample/Dummy.php new file mode 100644 index 0000000..d4190d5 --- /dev/null +++ b/QueueExample/Dummy.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace QueueExample; + +use Queue\Message; + +/** + * A Dummy Queue Message. + * + * Acts as an example message. It can be safely removed or replaced. + * + * @author Andrius Putna + */ +class Dummy extends Message +{ + /** + * This Message purpose is to return true + * + * @return bool + */ + public function execute() + { + return true; + } +} \ No newline at end of file diff --git a/QueueExample/Error.php b/QueueExample/Error.php new file mode 100644 index 0000000..9d00552 --- /dev/null +++ b/QueueExample/Error.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace QueueExample; + +use Queue\Message; + +/** + * An example message with error + * + * Acts as an example message. It can be safely removed or replaced. + * + * @author Andrius Putna + */ +class Error extends Message +{ + /** + * This Message purpose is to throw an Exception + * + * @return bool + * @throws \LogicException + */ + public function execute() + { + throw new \LogicException('This message should be logged to queue', 123); + } +} \ No newline at end of file diff --git a/QueueExample/SendMail.php b/QueueExample/SendMail.php new file mode 100644 index 0000000..c466a4d --- /dev/null +++ b/QueueExample/SendMail.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace QueueExample; + +use Queue\Message; + +/** + * An email sending message + * + * Message contains enough parameters to send an email + * + * @author Andrius Putna + */ +class SendMail extends Message +{ + public function execute() + { + $message = wordwrap($this->message, 70); + mail($this->to_email, $this->subject, $message); + } +} \ No newline at end of file diff --git a/README.md b/README.md index e2c4ba7..86e781b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,58 @@ -php-queue-manager +PHP Queue Manager and Task Executor ================= -PHP Queue Manager and Task Executor +Queue manager - A simple, fast queue manager and tasks executor. +Stack your tasks to queue and execute them later. + +Requirements +================= + +* PHP >5.3 +* PDO +* Database backend (sqlite, sqlite memory, mysql, postgres, any PDO supported database) + + +Install with Composer +================= + +* Install dependency + +```json +{ + "require": { + "fordnox/php-queue-manager": "0.1" + } +} +``` + +* Use included sqlite file (extra/queue.sqlite) for basic backend system to connect to via PDO + +```php +$dbh = new PDO('sqlite:queue.sqlite'); +$dbh->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION ); +$dbh->setAttribute(\PDO::ATTR_DEFAULT_FETCH_MODE, \PDO::FETCH_ASSOC); +$dbh->exec($structure); +``` + +Example Queue +================= + +```php + +$message = new \QueueExample\Dummy(); + +$manager = new \Queue\Manager(); +$manager->setPdo($dbh); +$manager->addMessageToQueue($message, 'AmazonEmailsQueue'); + +// Usually this call should be executed via cron or some other worker +$amazonEmails = $manager->getQueue('AmazonEmailsQueue'); +$amazonEmails->execute(); + +``` + +Todo +================= + +- [ ] Write tests for all SQL backends +- [ ] Create simple Web Interface \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..82e9be4 --- /dev/null +++ b/composer.json @@ -0,0 +1,29 @@ +{ + "name": "fordnox/php-queue-manager", + "description": "PHP Queue with database backend. Use sqlite, mysql, postgres database", + "keywords": [ + "queue", + "executor", + "worker", + "task", + "manager" + ], + "homepage": "http://github.com/fordnox/php-queue-manager", + "type": "library", + "author": "Andrius Putna", + "license": "MIT", + "version": "0.1", + "require": { + "php": ">=5.3.0", + "ext-PDO": "*" + }, + "require-dev": { + "phpunit/phpunit": "4.1.1" + }, + "autoload": { + "psr-4": { + "Queue\\": "./Queue", + "QueueExample\\": "./QueueExample" + } + } +} \ No newline at end of file diff --git a/extra/queue.sqlite b/extra/queue.sqlite new file mode 100755 index 0000000..1b755ce Binary files /dev/null and b/extra/queue.sqlite differ diff --git a/extra/structure.sqlite.sql b/extra/structure.sqlite.sql new file mode 100644 index 0000000..f7ecb00 --- /dev/null +++ b/extra/structure.sqlite.sql @@ -0,0 +1,18 @@ +BEGIN TRANSACTION; +CREATE TABLE queue +( + queue_id INTEGER PRIMARY KEY AUTOINCREMENT, + queue_name VARCHAR(100) NOT NULL, + timeout INTEGER NOT NULL DEFAULT 30 +); +CREATE TABLE message +( + message_id INTEGER PRIMARY KEY AUTOINCREMENT, + queue_id INTEGER, + handle CHAR(32), + body VARCHAR(8192) NOT NULL, + md5 CHAR(32) NOT NULL, + timeout REAL, + created INTEGER +, "log" TEXT); +COMMIT; diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..ea7d73f --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,18 @@ + + + + + ./tests/ + + + \ No newline at end of file diff --git a/tests/ManagerTest.php b/tests/ManagerTest.php new file mode 100644 index 0000000..fd5e83c --- /dev/null +++ b/tests/ManagerTest.php @@ -0,0 +1,80 @@ +m = new \Queue\Manager(); + $this->m->setPdo($dbh); + } + + public function tearDown() + { + $this->m->clearQueues(); + } + + public function testManager() + { + $this->assertInstanceOf('\Queue\Manager', $this->m); + $this->assertInstanceOf('PDO', $this->m->getDb()); + } + + public function testDefaultFlow() + { + $q = $this->m->getQueue('default'); + $bool = $q->send(new \QueueExample\Dummy()); + $this->assertTrue($bool); + + $bool = $this->m->addMessageToQueue(new \QueueExample\Dummy(), 'other'); + $this->assertTrue($bool); + } + + public function testCreateQueue() + { + $queue = $this->m->getQueue('UNIVERSAL'); + $this->assertInstanceOf('\Queue\Queue', $queue); + $this->assertEquals('UNIVERSAL', $queue->getName()); + + $queues = $this->m->getQueues(); + $this->assertInstanceOf('ArrayObject', $queues); + $this->assertInstanceOf('\Queue\Queue', $queues[0]); + $this->assertEquals(1, count($queues)); + + $this->m->executeAll(50); + $this->m->getStuckMessages(); + $this->m->getWaitingMessages(); + + $this->m->clearQueue('UNIVERSAL'); + $queues = $this->m->getQueues(); + $this->assertEquals(0, count($queues)); + } + + public function testCreateQueueInstance() + { + $q = new \Queue\Queue('test'); + $q->setManager($this->m); + $queues = $this->m->getQueues(); + $this->assertEquals(0, count($queues)); + + $m = new \QueueExample\Dummy(); + $q->send($m); + + $msgs = $this->m->getTotalMessages(); + $this->assertEquals(1, $msgs); + + $queues = $this->m->getQueues(); + $this->assertEquals(1, count($queues)); + } + + public function testSummaries() + { + $queue = $this->m->getQueue('UNIVERSAL'); + + $list = $this->m->summary(); + $this->assertInternalType('array', $list); + + $list = $this->m->queueSummary($queue->getName()); + $this->assertInternalType('array', $list); + } +} diff --git a/tests/MessageTest.php b/tests/MessageTest.php new file mode 100644 index 0000000..95b9945 --- /dev/null +++ b/tests/MessageTest.php @@ -0,0 +1,27 @@ +m = new \QueueExample\Dummy(); + } + + public function testQueue() + { + $this->assertInstanceOf('\Queue\Message', $this->m); + + $this->m->variable = 'value'; + + try { + $this->m->fake; + } catch(Exception $e) { + + } + + $data = $this->m->toArray(); + $this->assertInternalType('array', $data); + $str = $this->m->__toString(); + $this->assertInternalType('string', $str); + } +} diff --git a/tests/QueueTest.php b/tests/QueueTest.php new file mode 100644 index 0000000..353e88a --- /dev/null +++ b/tests/QueueTest.php @@ -0,0 +1,43 @@ +setPdo($dbh); + + $q = new \Queue\Queue('PHPUNIT', 5000); + $q->setManager($m); + + $this->m = $m; + $this->q = $q; + } + + public function tearDown() + { + $this->m->clearQueues(); + } + + public function testQueue() + { + $this->assertInstanceOf('\Queue\Queue', $this->q); + + $name = $this->q->getName(); + $this->assertEquals('PHPUNIT', $name); + + $timeout = $this->q->getTimeout(); + $this->assertEquals(5000, $timeout); + + $m = new \QueueExample\Dummy(); + $bool = $this->q->send($m); + $this->assertTrue($bool); + + $this->q->execute(1); + + $total = $this->m->getTotalMessages(); + $this->assertEquals(0, $total); + $str = $this->q->__toString(); + } +} diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..06fb584 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,13 @@ +setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION ); + $dbh->setAttribute(\PDO::ATTR_DEFAULT_FETCH_MODE, \PDO::FETCH_ASSOC); + $dbh->exec($structure); + return $dbh; +} \ No newline at end of file