diff --git a/Queue/Manager.php b/Queue/Manager.php index 2960adb..1975b93 100644 --- a/Queue/Manager.php +++ b/Queue/Manager.php @@ -23,7 +23,11 @@ class Manager * @var $_pdo \PDO */ protected $_pdo; - + + /* tables for MQ */ + protected $queueTable = 'queue'; // list of allowed queues + protected $messageTable = 'message'; // list of current messages in allowed queues + public function setPdo(\PDO $pdo) { $this->_pdo = $pdo; @@ -55,7 +59,7 @@ public function queueSummary($name) $db = $this->getDb(); $list = array(); - $sql = 'SELECT * FROM message WHERE queue_id = :queue_id'; + $sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE queue_id = :queue_id'; $sth = $db->prepare($sql); $sth->execute(array('queue_id'=>$qid)); foreach($sth->fetchAll() as $msg) { @@ -110,7 +114,7 @@ private function receiveQueueMessages(Queue $queue, $max, $timeout) $db->beginTransaction(); $sql = "SELECT * - FROM message + FROM " . $this->messageTable . " WHERE queue_id = :queue_id AND (handle IS NULL OR timeout+" . (int)$timeout . " < " . (int)$microtime .") LIMIT ".$max; @@ -120,7 +124,7 @@ private function receiveQueueMessages(Queue $queue, $max, $timeout) foreach ($stmt->fetchAll() as $data) { $data['handle'] = md5(uniqid(rand(), true)); - $sql = "UPDATE message + $sql = "UPDATE " . $this->messageTable . " SET handle = :handle, timeout = :timeout @@ -183,7 +187,7 @@ public function executeQueue(Queue $queue, $max, $timeout = 30) */ public function executeAll($max = 50) { - $sth = $this->getDb()->prepare('SELECT * FROM queue WHERE 1'); + $sth = $this->getDb()->prepare('SELECT * FROM ' . $this->queueTable . ' WHERE 1'); $sth->execute(); foreach($sth->fetchAll() as $queue) { $q = new Queue($queue['queue_name'], $queue['timeout']); @@ -206,7 +210,7 @@ public function addMessageToQueue(Message $message, $name) $body = base64_encode(serialize($message)); $md5 = md5($body); - $sql = 'INSERT INTO message + $sql = 'INSERT INTO ' . $this->messageTable . ' (queue_id, body, created, timeout, md5) VALUES (:queue_id, :body, :created, :timeout, :md5) @@ -255,7 +259,7 @@ public function getQueue($name, $timeout = null) */ public function getQueueId($name) { - $sql = 'SELECT queue_id FROM queue WHERE queue_name = ? LIMIT 1'; + $sql = 'SELECT queue_id FROM ' . $this->queueTable . ' WHERE queue_name = ? LIMIT 1'; $stmt = $this->getDb()->prepare($sql); $stmt->execute(array($name)); return $stmt->fetchColumn(); @@ -274,7 +278,7 @@ private function create($name, $timeout = null) $timeout = 10000; } - $sql = 'INSERT INTO queue + $sql = 'INSERT INTO ' . $this->queueTable . ' (queue_name, timeout) VALUES (:queue_name, :timeout) @@ -288,7 +292,7 @@ private function create($name, $timeout = null) public function getQueues() { - $sql = 'SELECT * FROM queue WHERE 1'; + $sql = 'SELECT * FROM ' . $this->queueTable . ' WHERE 1'; $sth = $this->getDb()->prepare($sql); $sth->execute(); $list = array(); @@ -300,25 +304,25 @@ public function getQueues() public function clearQueues() { - $sth = $this->getDb()->prepare('DELETE FROM message WHERE 1'); + $sth = $this->getDb()->prepare('DELETE FROM ' . $this->messageTable . ' WHERE 1'); $sth->execute(); - $sth = $this->getDb()->prepare('DELETE FROM queue WHERE 1'); + $sth = $this->getDb()->prepare('DELETE FROM ' . $this->queueTable . ' WHERE 1'); $sth->execute(); } public function clearQueue($name) { $qid = $this->getQueueId($name); - $sth = $this->getDb()->prepare('DELETE FROM message WHERE queue_id = ?'); + $sth = $this->getDb()->prepare('DELETE FROM ' . $this->messageTable . ' WHERE queue_id = ?'); $sth->execute(array($qid)); - $sth = $this->getDb()->prepare('DELETE FROM queue WHERE queue_id = ?'); + $sth = $this->getDb()->prepare('DELETE FROM ' . $this->queueTable . ' WHERE queue_id = ?'); $sth->execute(array($qid)); } public function getWaitingMessages() { $db = $this->getDb(); - $sql = 'SELECT * FROM message WHERE handle IS NULL'; + $sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE handle IS NULL'; $sth = $db->prepare($sql); $sth->execute(); $list = array(); @@ -331,7 +335,7 @@ public function getWaitingMessages() public function getStuckMessages() { $db = $this->getDb(); - $sql = 'SELECT * FROM message WHERE handle IS NOT NULL'; + $sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE handle IS NOT NULL'; $sth = $db->prepare($sql); $sth->execute(); $list = array(); @@ -360,7 +364,7 @@ public function deleteMessage(Message $message) */ public function deleteMessageById($id) { - $sql = 'DELETE FROM message WHERE message_id = ?'; + $sql = 'DELETE FROM ' . $this->messageTable . ' WHERE message_id = ?'; $stmt = $this->getDb()->prepare($sql); $stmt->execute(array($id)); return true; @@ -375,7 +379,7 @@ public function deleteMessageById($id) */ public function log(Message $message, \Exception $e) { - $sql = "UPDATE message + $sql = "UPDATE " . $this->messageTable . " SET log = :log WHERE message_id = :id "; @@ -392,9 +396,9 @@ public function log(Message $message, \Exception $e) */ public function getTotalMessages() { - $sql = 'SELECT COUNT(message_id) FROM message WHERE 1'; + $sql = 'SELECT COUNT(message_id) FROM ' . $this->messageTable . ' WHERE 1'; $stmt = $this->getDb()->prepare($sql); $stmt->execute(); return $stmt->fetchColumn(); } -} \ No newline at end of file +}