File indexing completed on 2025-01-19 05:21:24
0001 <?php 0002 /** 0003 * Zend Framework 0004 * 0005 * LICENSE 0006 * 0007 * This source file is subject to the new BSD license that is bundled 0008 * with this package in the file LICENSE.txt. 0009 * It is also available through the world-wide-web at this URL: 0010 * http://framework.zend.com/license/new-bsd 0011 * If you did not receive a copy of the license and are unable to 0012 * obtain it through the world-wide-web, please send an email 0013 * to license@zend.com so we can send you a copy immediately. 0014 * 0015 * @category Zend 0016 * @package Zend_Queue 0017 * @subpackage Adapter 0018 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0019 * @license http://framework.zend.com/license/new-bsd New BSD License 0020 * @version $Id$ 0021 */ 0022 0023 /** 0024 * @see Zend_Queue_Adapter_AdapterAbstract 0025 */ 0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php'; 0027 0028 /** 0029 * @see Zend_Db_Select 0030 */ 0031 // require_once 'Zend/Db/Select.php'; 0032 0033 /** 0034 * @see Zend_Db 0035 */ 0036 // require_once 'Zend/Db.php'; 0037 0038 /** 0039 * @see Zend_Queue_Adapter_Db_Queue 0040 */ 0041 // require_once 'Zend/Queue/Adapter/Db/Queue.php'; 0042 0043 /** 0044 * @see Zend_Queue_Adapter_Db_Message 0045 */ 0046 // require_once 'Zend/Queue/Adapter/Db/Message.php'; 0047 0048 /** 0049 * Class for using connecting to a Zend_Db-based queuing system 0050 * 0051 * @category Zend 0052 * @package Zend_Queue 0053 * @subpackage Adapter 0054 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0055 * @license http://framework.zend.com/license/new-bsd New BSD License 0056 */ 0057 class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract 0058 { 0059 /** 0060 * @var Zend_Queue_Adapter_Db_Queue 0061 */ 0062 protected $_queueTable = null; 0063 0064 /** 0065 * @var Zend_Queue_Adapter_Db_Message 0066 */ 0067 protected $_messageTable = null; 0068 0069 /** 0070 * @var Zend_Db_Table_Row_Abstract 0071 */ 0072 protected $_messageRow = null; 0073 0074 /** 0075 * Constructor 0076 * 0077 * @param array|Zend_Config $options 0078 * @param Zend_Queue|null $queue 0079 * @return void 0080 */ 0081 public function __construct($options, Zend_Queue $queue = null) 0082 { 0083 parent::__construct($options, $queue); 0084 0085 if (!isset($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) { 0086 // turn off auto update by default 0087 $this->_options['options'][Zend_Db_Select::FOR_UPDATE] = false; 0088 } 0089 0090 if (!is_bool($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) { 0091 // require_once 'Zend/Queue/Exception.php'; 0092 throw new Zend_Queue_Exception('Options array item: Zend_Db_Select::FOR_UPDATE must be boolean'); 0093 } 0094 0095 if (isset($this->_options['dbAdapter']) 0096 && $this->_options['dbAdapter'] instanceof Zend_Db_Adapter_Abstract) { 0097 $db = $this->_options['dbAdapter']; 0098 } else { 0099 $db = $this->_initDbAdapter(); 0100 } 0101 0102 $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array( 0103 'db' => $db, 0104 )); 0105 0106 $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array( 0107 'db' => $db, 0108 )); 0109 0110 } 0111 0112 /** 0113 * Initialize Db adapter using 'driverOptions' section of the _options array 0114 * 0115 * Throws an exception if the adapter cannot connect to DB. 0116 * 0117 * @return Zend_Db_Adapter_Abstract 0118 * @throws Zend_Queue_Exception 0119 */ 0120 protected function _initDbAdapter() 0121 { 0122 $options = &$this->_options['driverOptions']; 0123 if (!array_key_exists('type', $options)) { 0124 // require_once 'Zend/Queue/Exception.php'; 0125 throw new Zend_Queue_Exception("Configuration array must have a key for 'type' for the database type to use"); 0126 } 0127 0128 if (!array_key_exists('host', $options)) { 0129 // require_once 'Zend/Queue/Exception.php'; 0130 throw new Zend_Queue_Exception("Configuration array must have a key for 'host' for the host to use"); 0131 } 0132 0133 if (!array_key_exists('username', $options)) { 0134 // require_once 'Zend/Queue/Exception.php'; 0135 throw new Zend_Queue_Exception("Configuration array must have a key for 'username' for the username to use"); 0136 } 0137 0138 if (!array_key_exists('password', $options)) { 0139 // require_once 'Zend/Queue/Exception.php'; 0140 throw new Zend_Queue_Exception("Configuration array must have a key for 'password' for the password to use"); 0141 } 0142 0143 if (!array_key_exists('dbname', $options)) { 0144 // require_once 'Zend/Queue/Exception.php'; 0145 throw new Zend_Queue_Exception("Configuration array must have a key for 'dbname' for the database to use"); 0146 } 0147 0148 $type = $options['type']; 0149 unset($options['type']); 0150 0151 try { 0152 $db = Zend_Db::factory($type, $options); 0153 } catch (Zend_Db_Exception $e) { 0154 // require_once 'Zend/Queue/Exception.php'; 0155 throw new Zend_Queue_Exception('Error connecting to database: ' . $e->getMessage(), $e->getCode(), $e); 0156 } 0157 0158 return $db; 0159 } 0160 0161 /******************************************************************** 0162 * Queue management functions 0163 *********************************************************************/ 0164 0165 /** 0166 * Does a queue already exist? 0167 * 0168 * Throws an exception if the adapter cannot determine if a queue exists. 0169 * use isSupported('isExists') to determine if an adapter can test for 0170 * queue existance. 0171 * 0172 * @param string $name 0173 * @return boolean 0174 * @throws Zend_Queue_Exception 0175 */ 0176 public function isExists($name) 0177 { 0178 $id = 0; 0179 0180 try { 0181 $id = $this->getQueueId($name); 0182 } catch (Zend_Queue_Exception $e) { 0183 return false; 0184 } 0185 0186 return ($id > 0); 0187 } 0188 0189 /** 0190 * Create a new queue 0191 * 0192 * Visibility timeout is how long a message is left in the queue "invisible" 0193 * to other readers. If the message is acknowleged (deleted) before the 0194 * timeout, then the message is deleted. However, if the timeout expires 0195 * then the message will be made available to other queue readers. 0196 * 0197 * @param string $name queue name 0198 * @param integer $timeout default visibility timeout 0199 * @return boolean 0200 * @throws Zend_Queue_Exception - database error 0201 */ 0202 public function create($name, $timeout = null) 0203 { 0204 if ($this->isExists($name)) { 0205 return false; 0206 } 0207 0208 $queue = $this->_queueTable->createRow(); 0209 $queue->queue_name = $name; 0210 $queue->timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout; 0211 0212 try { 0213 if ($queue->save()) { 0214 return true; 0215 } 0216 } catch (Exception $e) { 0217 // require_once 'Zend/Queue/Exception.php'; 0218 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e); 0219 } 0220 0221 return false; 0222 } 0223 0224 /** 0225 * Delete a queue and all of it's messages 0226 * 0227 * Returns false if the queue is not found, true if the queue exists 0228 * 0229 * @param string $name queue name 0230 * @return boolean 0231 * @throws Zend_Queue_Exception - database error 0232 */ 0233 public function delete($name) 0234 { 0235 $id = $this->getQueueId($name); // get primary key 0236 0237 // if the queue does not exist then it must already be deleted. 0238 $list = $this->_queueTable->find($id); 0239 if (count($list) === 0) { 0240 return false; 0241 } 0242 $queue = $list->current(); 0243 0244 if ($queue instanceof Zend_Db_Table_Row_Abstract) { 0245 try { 0246 $queue->delete(); 0247 } catch (Exception $e) { 0248 // require_once 'Zend/Queue/Exception.php'; 0249 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e); 0250 } 0251 } 0252 0253 if (array_key_exists($name, $this->_queues)) { 0254 unset($this->_queues[$name]); 0255 } 0256 0257 return true; 0258 } 0259 0260 /* 0261 * Get an array of all available queues 0262 * 0263 * Not all adapters support getQueues(), use isSupported('getQueues') 0264 * to determine if the adapter supports this feature. 0265 * 0266 * @return array 0267 * @throws Zend_Queue_Exception - database error 0268 */ 0269 public function getQueues() 0270 { 0271 $query = $this->_queueTable->select(); 0272 $query->from($this->_queueTable, array('queue_id', 'queue_name')); 0273 0274 $this->_queues = array(); 0275 foreach ($this->_queueTable->fetchAll($query) as $queue) { 0276 $this->_queues[$queue->queue_name] = (int)$queue->queue_id; 0277 } 0278 0279 $list = array_keys($this->_queues); 0280 0281 return $list; 0282 } 0283 0284 /** 0285 * Return the approximate number of messages in the queue 0286 * 0287 * @param Zend_Queue $queue 0288 * @return integer 0289 * @throws Zend_Queue_Exception 0290 */ 0291 public function count(Zend_Queue $queue = null) 0292 { 0293 if ($queue === null) { 0294 $queue = $this->_queue; 0295 } 0296 0297 $info = $this->_messageTable->info(); 0298 $db = $this->_messageTable->getAdapter(); 0299 $query = $db->select(); 0300 $query->from($info['name'], array(new Zend_Db_Expr('COUNT(1)'))) 0301 ->where('queue_id=?', $this->getQueueId($queue->getName())); 0302 0303 // return count results 0304 return (int) $db->fetchOne($query); 0305 } 0306 0307 /******************************************************************** 0308 * Messsage management functions 0309 *********************************************************************/ 0310 0311 /** 0312 * Send a message to the queue 0313 * 0314 * @param string $message Message to send to the active queue 0315 * @param Zend_Queue $queue 0316 * @return Zend_Queue_Message 0317 * @throws Zend_Queue_Exception - database error 0318 */ 0319 public function send($message, Zend_Queue $queue = null) 0320 { 0321 if ($this->_messageRow === null) { 0322 $this->_messageRow = $this->_messageTable->createRow(); 0323 } 0324 0325 if ($queue === null) { 0326 $queue = $this->_queue; 0327 } 0328 0329 if (is_scalar($message)) { 0330 $message = (string) $message; 0331 } 0332 if (is_string($message)) { 0333 $message = trim($message); 0334 } 0335 0336 if (!$this->isExists($queue->getName())) { 0337 // require_once 'Zend/Queue/Exception.php'; 0338 throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName()); 0339 } 0340 0341 $msg = clone $this->_messageRow; 0342 $msg->queue_id = $this->getQueueId($queue->getName()); 0343 $msg->created = time(); 0344 $msg->body = $message; 0345 $msg->md5 = md5($message); 0346 // $msg->timeout = ??? @TODO 0347 0348 try { 0349 $msg->save(); 0350 } catch (Exception $e) { 0351 // require_once 'Zend/Queue/Exception.php'; 0352 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e); 0353 } 0354 0355 $options = array( 0356 'queue' => $queue, 0357 'data' => $msg->toArray(), 0358 ); 0359 0360 $classname = $queue->getMessageClass(); 0361 if (!class_exists($classname)) { 0362 // require_once 'Zend/Loader.php'; 0363 Zend_Loader::loadClass($classname); 0364 } 0365 return new $classname($options); 0366 } 0367 0368 /** 0369 * Get messages in the queue 0370 * 0371 * @param integer $maxMessages Maximum number of messages to return 0372 * @param integer $timeout Visibility timeout for these messages 0373 * @param Zend_Queue $queue 0374 * @return Zend_Queue_Message_Iterator 0375 * @throws Zend_Queue_Exception - database error 0376 */ 0377 public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null) 0378 { 0379 if ($maxMessages === null) { 0380 $maxMessages = 1; 0381 } 0382 if ($timeout === null) { 0383 $timeout = self::RECEIVE_TIMEOUT_DEFAULT; 0384 } 0385 if ($queue === null) { 0386 $queue = $this->_queue; 0387 } 0388 0389 $msgs = array(); 0390 $info = $this->_messageTable->info(); 0391 $microtime = microtime(true); // cache microtime 0392 $db = $this->_messageTable->getAdapter(); 0393 0394 // start transaction handling 0395 try { 0396 if ( $maxMessages > 0 ) { // ZF-7666 LIMIT 0 clause not included. 0397 $db->beginTransaction(); 0398 0399 $query = $db->select(); 0400 if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) { 0401 // turn on forUpdate 0402 $query->forUpdate(); 0403 } 0404 $query->from($info['name'], array('*')) 0405 ->where('queue_id=?', $this->getQueueId($queue->getName())) 0406 ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime) 0407 ->limit($maxMessages); 0408 0409 foreach ($db->fetchAll($query) as $data) { 0410 // setup our changes to the message 0411 $data['handle'] = md5(uniqid(rand(), true)); 0412 0413 $update = array( 0414 'handle' => $data['handle'], 0415 'timeout' => $microtime, 0416 ); 0417 0418 // update the database 0419 $where = array(); 0420 $where[] = $db->quoteInto('message_id=?', $data['message_id']); 0421 $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime; 0422 0423 $count = $db->update($info['name'], $update, $where); 0424 0425 // we check count to make sure no other thread has gotten 0426 // the rows after our select, but before our update. 0427 if ($count > 0) { 0428 $msgs[] = $data; 0429 } 0430 } 0431 $db->commit(); 0432 } 0433 } catch (Exception $e) { 0434 $db->rollBack(); 0435 0436 // require_once 'Zend/Queue/Exception.php'; 0437 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e); 0438 } 0439 0440 $options = array( 0441 'queue' => $queue, 0442 'data' => $msgs, 0443 'messageClass' => $queue->getMessageClass(), 0444 ); 0445 0446 $classname = $queue->getMessageSetClass(); 0447 if (!class_exists($classname)) { 0448 // require_once 'Zend/Loader.php'; 0449 Zend_Loader::loadClass($classname); 0450 } 0451 return new $classname($options); 0452 } 0453 0454 /** 0455 * Delete a message from the queue 0456 * 0457 * Returns true if the message is deleted, false if the deletion is 0458 * unsuccessful. 0459 * 0460 * @param Zend_Queue_Message $message 0461 * @return boolean 0462 * @throws Zend_Queue_Exception - database error 0463 */ 0464 public function deleteMessage(Zend_Queue_Message $message) 0465 { 0466 $db = $this->_messageTable->getAdapter(); 0467 $where = $db->quoteInto('handle=?', $message->handle); 0468 0469 if ($this->_messageTable->delete($where)) { 0470 return true; 0471 } 0472 0473 return false; 0474 } 0475 0476 /******************************************************************** 0477 * Supporting functions 0478 *********************************************************************/ 0479 0480 /** 0481 * Return a list of queue capabilities functions 0482 * 0483 * $array['function name'] = true or false 0484 * true is supported, false is not supported. 0485 * 0486 * @param string $name 0487 * @return array 0488 */ 0489 public function getCapabilities() 0490 { 0491 return array( 0492 'create' => true, 0493 'delete' => true, 0494 'send' => true, 0495 'receive' => true, 0496 'deleteMessage' => true, 0497 'getQueues' => true, 0498 'count' => true, 0499 'isExists' => true, 0500 ); 0501 } 0502 0503 /******************************************************************** 0504 * Functions that are not part of the Zend_Queue_Adapter_Abstract 0505 *********************************************************************/ 0506 /** 0507 * Get the queue ID 0508 * 0509 * Returns the queue's row identifier. 0510 * 0511 * @param string $name 0512 * @return integer|null 0513 * @throws Zend_Queue_Exception 0514 */ 0515 protected function getQueueId($name) 0516 { 0517 if (array_key_exists($name, $this->_queues)) { 0518 return $this->_queues[$name]; 0519 } 0520 0521 $query = $this->_queueTable->select(); 0522 $query->from($this->_queueTable, array('queue_id')) 0523 ->where('queue_name=?', $name); 0524 0525 $queue = $this->_queueTable->fetchRow($query); 0526 0527 if ($queue === null) { 0528 // require_once 'Zend/Queue/Exception.php'; 0529 throw new Zend_Queue_Exception('Queue does not exist: ' . $name); 0530 } 0531 0532 $this->_queues[$name] = (int)$queue->queue_id; 0533 0534 return $this->_queues[$name]; 0535 } 0536 }