File indexing completed on 2025-01-19 05:21:24

0001 <?php
0002 /**
0003  * Zend Framework
0004  *
0005  * LICENSE
0006  *
0007  * This source file is subject to the new BSD license that is bundled
0008  * with this package in the file LICENSE.txt.
0009  * It is also available through the world-wide-web at this URL:
0010  * http://framework.zend.com/license/new-bsd
0011  * If you did not receive a copy of the license and are unable to
0012  * obtain it through the world-wide-web, please send an email
0013  * to license@zend.com so we can send you a copy immediately.
0014  *
0015  * @category   Zend
0016  * @package    Zend_Queue
0017  * @subpackage Adapter
0018  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0019  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0020  * @version    $Id$
0021  */
0022 
0023 /**
0024  * @see Zend_Queue_Adapter_AdapterAbstract
0025  */
0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
0027 
0028 /**
0029  * @see Zend_Db_Select
0030  */
0031 // require_once 'Zend/Db/Select.php';
0032 
0033 /**
0034  * @see Zend_Db
0035  */
0036 // require_once 'Zend/Db.php';
0037 
0038 /**
0039  * @see Zend_Queue_Adapter_Db_Queue
0040  */
0041 // require_once 'Zend/Queue/Adapter/Db/Queue.php';
0042 
0043 /**
0044  * @see Zend_Queue_Adapter_Db_Message
0045  */
0046 // require_once 'Zend/Queue/Adapter/Db/Message.php';
0047 
0048 /**
0049  * Class for using connecting to a Zend_Db-based queuing system
0050  *
0051  * @category   Zend
0052  * @package    Zend_Queue
0053  * @subpackage Adapter
0054  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0055  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0056  */
0057 class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
0058 {
0059     /**
0060      * @var Zend_Queue_Adapter_Db_Queue
0061      */
0062     protected $_queueTable = null;
0063 
0064     /**
0065      * @var Zend_Queue_Adapter_Db_Message
0066      */
0067     protected $_messageTable = null;
0068 
0069     /**
0070      * @var Zend_Db_Table_Row_Abstract
0071      */
0072     protected $_messageRow = null;
0073 
0074     /**
0075      * Constructor
0076      *
0077      * @param  array|Zend_Config $options
0078      * @param  Zend_Queue|null $queue
0079      * @return void
0080      */
0081     public function __construct($options, Zend_Queue $queue = null)
0082     {
0083         parent::__construct($options, $queue);
0084 
0085         if (!isset($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
0086             // turn off auto update by default
0087             $this->_options['options'][Zend_Db_Select::FOR_UPDATE] = false;
0088         }
0089 
0090         if (!is_bool($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
0091             // require_once 'Zend/Queue/Exception.php';
0092             throw new Zend_Queue_Exception('Options array item: Zend_Db_Select::FOR_UPDATE must be boolean');
0093         }
0094 
0095         if (isset($this->_options['dbAdapter'])
0096             && $this->_options['dbAdapter'] instanceof Zend_Db_Adapter_Abstract) {
0097             $db = $this->_options['dbAdapter'];
0098         } else {
0099             $db = $this->_initDbAdapter();
0100         }
0101 
0102         $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array(
0103             'db' => $db,
0104         ));
0105 
0106         $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array(
0107             'db' => $db,
0108         ));
0109 
0110     }
0111 
0112     /**
0113      * Initialize Db adapter using 'driverOptions' section of the _options array
0114      *
0115      * Throws an exception if the adapter cannot connect to DB.
0116      *
0117      * @return Zend_Db_Adapter_Abstract
0118      * @throws Zend_Queue_Exception
0119      */
0120     protected function _initDbAdapter()
0121     {
0122         $options = &$this->_options['driverOptions'];
0123         if (!array_key_exists('type', $options)) {
0124             // require_once 'Zend/Queue/Exception.php';
0125             throw new Zend_Queue_Exception("Configuration array must have a key for 'type' for the database type to use");
0126         }
0127 
0128         if (!array_key_exists('host', $options)) {
0129             // require_once 'Zend/Queue/Exception.php';
0130             throw new Zend_Queue_Exception("Configuration array must have a key for 'host' for the host to use");
0131         }
0132 
0133         if (!array_key_exists('username', $options)) {
0134             // require_once 'Zend/Queue/Exception.php';
0135             throw new Zend_Queue_Exception("Configuration array must have a key for 'username' for the username to use");
0136         }
0137 
0138         if (!array_key_exists('password', $options)) {
0139             // require_once 'Zend/Queue/Exception.php';
0140             throw new Zend_Queue_Exception("Configuration array must have a key for 'password' for the password to use");
0141         }
0142 
0143         if (!array_key_exists('dbname', $options)) {
0144             // require_once 'Zend/Queue/Exception.php';
0145             throw new Zend_Queue_Exception("Configuration array must have a key for 'dbname' for the database to use");
0146         }
0147 
0148         $type = $options['type'];
0149         unset($options['type']);
0150 
0151         try {
0152             $db = Zend_Db::factory($type, $options);
0153         } catch (Zend_Db_Exception $e) {
0154             // require_once 'Zend/Queue/Exception.php';
0155             throw new Zend_Queue_Exception('Error connecting to database: ' . $e->getMessage(), $e->getCode(), $e);
0156         }
0157 
0158         return $db;
0159     }
0160 
0161     /********************************************************************
0162      * Queue management functions
0163      *********************************************************************/
0164 
0165     /**
0166      * Does a queue already exist?
0167      *
0168      * Throws an exception if the adapter cannot determine if a queue exists.
0169      * use isSupported('isExists') to determine if an adapter can test for
0170      * queue existance.
0171      *
0172      * @param  string $name
0173      * @return boolean
0174      * @throws Zend_Queue_Exception
0175      */
0176     public function isExists($name)
0177     {
0178         $id = 0;
0179 
0180         try {
0181             $id = $this->getQueueId($name);
0182         } catch (Zend_Queue_Exception $e) {
0183             return false;
0184         }
0185 
0186         return ($id > 0);
0187     }
0188 
0189     /**
0190      * Create a new queue
0191      *
0192      * Visibility timeout is how long a message is left in the queue "invisible"
0193      * to other readers.  If the message is acknowleged (deleted) before the
0194      * timeout, then the message is deleted.  However, if the timeout expires
0195      * then the message will be made available to other queue readers.
0196      *
0197      * @param  string  $name    queue name
0198      * @param  integer $timeout default visibility timeout
0199      * @return boolean
0200      * @throws Zend_Queue_Exception - database error
0201      */
0202     public function create($name, $timeout = null)
0203     {
0204         if ($this->isExists($name)) {
0205             return false;
0206         }
0207 
0208         $queue = $this->_queueTable->createRow();
0209         $queue->queue_name = $name;
0210         $queue->timeout    = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
0211 
0212         try {
0213             if ($queue->save()) {
0214                 return true;
0215             }
0216         } catch (Exception $e) {
0217             // require_once 'Zend/Queue/Exception.php';
0218             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
0219         }
0220 
0221         return false;
0222     }
0223 
0224     /**
0225      * Delete a queue and all of it's messages
0226      *
0227      * Returns false if the queue is not found, true if the queue exists
0228      *
0229      * @param  string  $name queue name
0230      * @return boolean
0231      * @throws Zend_Queue_Exception - database error
0232      */
0233     public function delete($name)
0234     {
0235         $id = $this->getQueueId($name); // get primary key
0236 
0237         // if the queue does not exist then it must already be deleted.
0238         $list = $this->_queueTable->find($id);
0239         if (count($list) === 0) {
0240             return false;
0241         }
0242         $queue = $list->current();
0243 
0244         if ($queue instanceof Zend_Db_Table_Row_Abstract) {
0245             try {
0246                 $queue->delete();
0247             } catch (Exception $e) {
0248                 // require_once 'Zend/Queue/Exception.php';
0249                 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
0250             }
0251         }
0252 
0253         if (array_key_exists($name, $this->_queues)) {
0254             unset($this->_queues[$name]);
0255         }
0256 
0257         return true;
0258     }
0259 
0260     /*
0261      * Get an array of all available queues
0262      *
0263      * Not all adapters support getQueues(), use isSupported('getQueues')
0264      * to determine if the adapter supports this feature.
0265      *
0266      * @return array
0267      * @throws Zend_Queue_Exception - database error
0268      */
0269     public function getQueues()
0270     {
0271         $query = $this->_queueTable->select();
0272         $query->from($this->_queueTable, array('queue_id', 'queue_name'));
0273 
0274         $this->_queues = array();
0275         foreach ($this->_queueTable->fetchAll($query) as $queue) {
0276             $this->_queues[$queue->queue_name] = (int)$queue->queue_id;
0277         }
0278 
0279         $list = array_keys($this->_queues);
0280 
0281         return $list;
0282     }
0283 
0284     /**
0285      * Return the approximate number of messages in the queue
0286      *
0287      * @param  Zend_Queue $queue
0288      * @return integer
0289      * @throws Zend_Queue_Exception
0290      */
0291     public function count(Zend_Queue $queue = null)
0292     {
0293         if ($queue === null) {
0294             $queue = $this->_queue;
0295         }
0296 
0297         $info  = $this->_messageTable->info();
0298         $db    = $this->_messageTable->getAdapter();
0299         $query = $db->select();
0300         $query->from($info['name'], array(new Zend_Db_Expr('COUNT(1)')))
0301               ->where('queue_id=?', $this->getQueueId($queue->getName()));
0302 
0303         // return count results
0304         return (int) $db->fetchOne($query);
0305     }
0306 
0307     /********************************************************************
0308     * Messsage management functions
0309      *********************************************************************/
0310 
0311     /**
0312      * Send a message to the queue
0313      *
0314      * @param  string     $message Message to send to the active queue
0315      * @param  Zend_Queue $queue
0316      * @return Zend_Queue_Message
0317      * @throws Zend_Queue_Exception - database error
0318      */
0319     public function send($message, Zend_Queue $queue = null)
0320     {
0321         if ($this->_messageRow === null) {
0322             $this->_messageRow = $this->_messageTable->createRow();
0323         }
0324 
0325         if ($queue === null) {
0326             $queue = $this->_queue;
0327         }
0328 
0329         if (is_scalar($message)) {
0330             $message = (string) $message;
0331         }
0332         if (is_string($message)) {
0333             $message = trim($message);
0334         }
0335 
0336         if (!$this->isExists($queue->getName())) {
0337             // require_once 'Zend/Queue/Exception.php';
0338             throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
0339         }
0340 
0341         $msg           = clone $this->_messageRow;
0342         $msg->queue_id = $this->getQueueId($queue->getName());
0343         $msg->created  = time();
0344         $msg->body     = $message;
0345         $msg->md5      = md5($message);
0346         // $msg->timeout = ??? @TODO
0347 
0348         try {
0349             $msg->save();
0350         } catch (Exception $e) {
0351             // require_once 'Zend/Queue/Exception.php';
0352             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
0353         }
0354 
0355         $options = array(
0356             'queue' => $queue,
0357             'data'  => $msg->toArray(),
0358         );
0359 
0360         $classname = $queue->getMessageClass();
0361         if (!class_exists($classname)) {
0362             // require_once 'Zend/Loader.php';
0363             Zend_Loader::loadClass($classname);
0364         }
0365         return new $classname($options);
0366     }
0367 
0368     /**
0369      * Get messages in the queue
0370      *
0371      * @param  integer    $maxMessages  Maximum number of messages to return
0372      * @param  integer    $timeout      Visibility timeout for these messages
0373      * @param  Zend_Queue $queue
0374      * @return Zend_Queue_Message_Iterator
0375      * @throws Zend_Queue_Exception - database error
0376      */
0377     public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
0378     {
0379         if ($maxMessages === null) {
0380             $maxMessages = 1;
0381         }
0382         if ($timeout === null) {
0383             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
0384         }
0385         if ($queue === null) {
0386             $queue = $this->_queue;
0387         }
0388 
0389         $msgs      = array();
0390         $info      = $this->_messageTable->info();
0391         $microtime = microtime(true); // cache microtime
0392         $db        = $this->_messageTable->getAdapter();
0393 
0394         // start transaction handling
0395         try {
0396             if ( $maxMessages > 0 ) { // ZF-7666 LIMIT 0 clause not included.
0397                 $db->beginTransaction();
0398 
0399                 $query = $db->select();
0400                 if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
0401                     // turn on forUpdate
0402                     $query->forUpdate();
0403                 }
0404                 $query->from($info['name'], array('*'))
0405                       ->where('queue_id=?', $this->getQueueId($queue->getName()))
0406                       ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
0407                       ->limit($maxMessages);
0408 
0409                 foreach ($db->fetchAll($query) as $data) {
0410                     // setup our changes to the message
0411                     $data['handle'] = md5(uniqid(rand(), true));
0412 
0413                     $update = array(
0414                         'handle'  => $data['handle'],
0415                         'timeout' => $microtime,
0416                     );
0417 
0418                     // update the database
0419                     $where   = array();
0420                     $where[] = $db->quoteInto('message_id=?', $data['message_id']);
0421                     $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
0422 
0423                     $count = $db->update($info['name'], $update, $where);
0424 
0425                     // we check count to make sure no other thread has gotten
0426                     // the rows after our select, but before our update.
0427                     if ($count > 0) {
0428                         $msgs[] = $data;
0429                     }
0430                 }
0431                 $db->commit();
0432             }
0433         } catch (Exception $e) {
0434             $db->rollBack();
0435 
0436             // require_once 'Zend/Queue/Exception.php';
0437             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
0438         }
0439 
0440         $options = array(
0441             'queue'        => $queue,
0442             'data'         => $msgs,
0443             'messageClass' => $queue->getMessageClass(),
0444         );
0445 
0446         $classname = $queue->getMessageSetClass();
0447         if (!class_exists($classname)) {
0448             // require_once 'Zend/Loader.php';
0449             Zend_Loader::loadClass($classname);
0450         }
0451         return new $classname($options);
0452     }
0453 
0454     /**
0455      * Delete a message from the queue
0456      *
0457      * Returns true if the message is deleted, false if the deletion is
0458      * unsuccessful.
0459      *
0460      * @param  Zend_Queue_Message $message
0461      * @return boolean
0462      * @throws Zend_Queue_Exception - database error
0463      */
0464     public function deleteMessage(Zend_Queue_Message $message)
0465     {
0466         $db    = $this->_messageTable->getAdapter();
0467         $where = $db->quoteInto('handle=?', $message->handle);
0468 
0469         if ($this->_messageTable->delete($where)) {
0470             return true;
0471         }
0472 
0473         return false;
0474     }
0475 
0476     /********************************************************************
0477      * Supporting functions
0478      *********************************************************************/
0479 
0480     /**
0481      * Return a list of queue capabilities functions
0482      *
0483      * $array['function name'] = true or false
0484      * true is supported, false is not supported.
0485      *
0486      * @param  string $name
0487      * @return array
0488      */
0489     public function getCapabilities()
0490     {
0491         return array(
0492             'create'        => true,
0493             'delete'        => true,
0494             'send'          => true,
0495             'receive'       => true,
0496             'deleteMessage' => true,
0497             'getQueues'     => true,
0498             'count'         => true,
0499             'isExists'      => true,
0500         );
0501     }
0502 
0503     /********************************************************************
0504      * Functions that are not part of the Zend_Queue_Adapter_Abstract
0505      *********************************************************************/
0506     /**
0507      * Get the queue ID
0508      *
0509      * Returns the queue's row identifier.
0510      *
0511      * @param  string       $name
0512      * @return integer|null
0513      * @throws Zend_Queue_Exception
0514      */
0515     protected function getQueueId($name)
0516     {
0517         if (array_key_exists($name, $this->_queues)) {
0518             return $this->_queues[$name];
0519         }
0520 
0521         $query = $this->_queueTable->select();
0522         $query->from($this->_queueTable, array('queue_id'))
0523               ->where('queue_name=?', $name);
0524 
0525         $queue = $this->_queueTable->fetchRow($query);
0526 
0527         if ($queue === null) {
0528             // require_once 'Zend/Queue/Exception.php';
0529             throw new Zend_Queue_Exception('Queue does not exist: ' . $name);
0530         }
0531 
0532         $this->_queues[$name] = (int)$queue->queue_id;
0533 
0534         return $this->_queues[$name];
0535     }
0536 }