Skip to content
This repository has been archived by the owner on Nov 26, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3 from brutto/patch-1
Browse files Browse the repository at this point in the history
Set table names as Manager properties
  • Loading branch information
fordnox committed Nov 4, 2015
2 parents 4d67da1 + 43f47ba commit 68f8918
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions Queue/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ class Manager
* @var $_pdo \PDO
*/
protected $_pdo;


/* tables for MQ */
protected $queueTable = 'queue'; // list of allowed queues
protected $messageTable = 'message'; // list of current messages in allowed queues

public function setPdo(\PDO $pdo)
{
$this->_pdo = $pdo;
Expand Down Expand Up @@ -55,7 +59,7 @@ public function queueSummary($name)

$db = $this->getDb();
$list = array();
$sql = 'SELECT * FROM message WHERE queue_id = :queue_id';
$sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE queue_id = :queue_id';
$sth = $db->prepare($sql);
$sth->execute(array('queue_id'=>$qid));
foreach($sth->fetchAll() as $msg) {
Expand Down Expand Up @@ -110,7 +114,7 @@ private function receiveQueueMessages(Queue $queue, $max, $timeout)
$db->beginTransaction();

$sql = "SELECT *
FROM message
FROM " . $this->messageTable . "
WHERE queue_id = :queue_id
AND (handle IS NULL OR timeout+" . (int)$timeout . " < " . (int)$microtime .")
LIMIT ".$max;
Expand All @@ -120,7 +124,7 @@ private function receiveQueueMessages(Queue $queue, $max, $timeout)
foreach ($stmt->fetchAll() as $data) {
$data['handle'] = md5(uniqid(rand(), true));

$sql = "UPDATE message
$sql = "UPDATE " . $this->messageTable . "
SET
handle = :handle,
timeout = :timeout
Expand Down Expand Up @@ -183,7 +187,7 @@ public function executeQueue(Queue $queue, $max, $timeout = 30)
*/
public function executeAll($max = 50)
{
$sth = $this->getDb()->prepare('SELECT * FROM queue WHERE 1');
$sth = $this->getDb()->prepare('SELECT * FROM ' . $this->queueTable . ' WHERE 1');
$sth->execute();
foreach($sth->fetchAll() as $queue) {
$q = new Queue($queue['queue_name'], $queue['timeout']);
Expand All @@ -206,7 +210,7 @@ public function addMessageToQueue(Message $message, $name)
$body = base64_encode(serialize($message));
$md5 = md5($body);

$sql = 'INSERT INTO message
$sql = 'INSERT INTO ' . $this->messageTable . '
(queue_id, body, created, timeout, md5)
VALUES
(:queue_id, :body, :created, :timeout, :md5)
Expand Down Expand Up @@ -255,7 +259,7 @@ public function getQueue($name, $timeout = null)
*/
public function getQueueId($name)
{
$sql = 'SELECT queue_id FROM queue WHERE queue_name = ? LIMIT 1';
$sql = 'SELECT queue_id FROM ' . $this->queueTable . ' WHERE queue_name = ? LIMIT 1';
$stmt = $this->getDb()->prepare($sql);
$stmt->execute(array($name));
return $stmt->fetchColumn();
Expand All @@ -274,7 +278,7 @@ private function create($name, $timeout = null)
$timeout = 10000;
}

$sql = 'INSERT INTO queue
$sql = 'INSERT INTO ' . $this->queueTable . '
(queue_name, timeout)
VALUES
(:queue_name, :timeout)
Expand All @@ -288,7 +292,7 @@ private function create($name, $timeout = null)

public function getQueues()
{
$sql = 'SELECT * FROM queue WHERE 1';
$sql = 'SELECT * FROM ' . $this->queueTable . ' WHERE 1';
$sth = $this->getDb()->prepare($sql);
$sth->execute();
$list = array();
Expand All @@ -300,25 +304,25 @@ public function getQueues()

public function clearQueues()
{
$sth = $this->getDb()->prepare('DELETE FROM message WHERE 1');
$sth = $this->getDb()->prepare('DELETE FROM ' . $this->messageTable . ' WHERE 1');
$sth->execute();
$sth = $this->getDb()->prepare('DELETE FROM queue WHERE 1');
$sth = $this->getDb()->prepare('DELETE FROM ' . $this->queueTable . ' WHERE 1');
$sth->execute();
}

public function clearQueue($name)
{
$qid = $this->getQueueId($name);
$sth = $this->getDb()->prepare('DELETE FROM message WHERE queue_id = ?');
$sth = $this->getDb()->prepare('DELETE FROM ' . $this->messageTable . ' WHERE queue_id = ?');
$sth->execute(array($qid));
$sth = $this->getDb()->prepare('DELETE FROM queue WHERE queue_id = ?');
$sth = $this->getDb()->prepare('DELETE FROM ' . $this->queueTable . ' WHERE queue_id = ?');
$sth->execute(array($qid));
}

public function getWaitingMessages()
{
$db = $this->getDb();
$sql = 'SELECT * FROM message WHERE handle IS NULL';
$sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE handle IS NULL';
$sth = $db->prepare($sql);
$sth->execute();
$list = array();
Expand All @@ -331,7 +335,7 @@ public function getWaitingMessages()
public function getStuckMessages()
{
$db = $this->getDb();
$sql = 'SELECT * FROM message WHERE handle IS NOT NULL';
$sql = 'SELECT * FROM ' . $this->messageTable . ' WHERE handle IS NOT NULL';
$sth = $db->prepare($sql);
$sth->execute();
$list = array();
Expand Down Expand Up @@ -360,7 +364,7 @@ public function deleteMessage(Message $message)
*/
public function deleteMessageById($id)
{
$sql = 'DELETE FROM message WHERE message_id = ?';
$sql = 'DELETE FROM ' . $this->messageTable . ' WHERE message_id = ?';
$stmt = $this->getDb()->prepare($sql);
$stmt->execute(array($id));
return true;
Expand All @@ -375,7 +379,7 @@ public function deleteMessageById($id)
*/
public function log(Message $message, \Exception $e)
{
$sql = "UPDATE message
$sql = "UPDATE " . $this->messageTable . "
SET log = :log
WHERE message_id = :id
";
Expand All @@ -392,9 +396,9 @@ public function log(Message $message, \Exception $e)
*/
public function getTotalMessages()
{
$sql = 'SELECT COUNT(message_id) FROM message WHERE 1';
$sql = 'SELECT COUNT(message_id) FROM ' . $this->messageTable . ' WHERE 1';
$stmt = $this->getDb()->prepare($sql);
$stmt->execute();
return $stmt->fetchColumn();
}
}
}

0 comments on commit 68f8918

Please sign in to comment.