File indexing completed on 2024-05-26 06:03:20

0001 <?php
0002 /**
0003  * Zend Framework
0004  *
0005  * LICENSE
0006  *
0007  * This source file is subject to the new BSD license that is bundled
0008  * with this package in the file LICENSE.txt.
0009  * It is also available through the world-wide-web at this URL:
0010  * http://framework.zend.com/license/new-bsd
0011  * If you did not receive a copy of the license and are unable to
0012  * obtain it through the world-wide-web, please send an email
0013  * to license@zend.com so we can send you a copy immediately.
0014  *
0015  * @category   Zend
0016  * @package    Zend_Queue
0017  * @subpackage Adapter
0018  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0019  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0020  * @version    $Id$
0021  */
0022 
0023 /**
0024  * @see Zend_Queue_Adapter_AdapterAbstract
0025  */
0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
0027 
0028 /**
0029  * Class for using a standard PHP array as a queue
0030  *
0031  * @category   Zend
0032  * @package    Zend_Queue
0033  * @subpackage Adapter
0034  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0035  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0036  */
0037 class Zend_Queue_Adapter_Array extends Zend_Queue_Adapter_AdapterAbstract
0038 {
0039     /**
0040      * @var array
0041      */
0042     protected $_data = array();
0043 
0044     /**
0045      * Constructor
0046      *
0047      * @param  array|Zend_Config $options
0048      * @param  Zend_Queue|null $queue
0049      * @return void
0050      */
0051     public function __construct($options, Zend_Queue $queue = null)
0052     {
0053         parent::__construct($options, $queue);
0054     }
0055 
0056     /********************************************************************
0057     * Queue management functions
0058      *********************************************************************/
0059 
0060     /**
0061      * Does a queue already exist?
0062      *
0063      * Throws an exception if the adapter cannot determine if a queue exists.
0064      * use isSupported('isExists') to determine if an adapter can test for
0065      * queue existance.
0066      *
0067      * @param  string $name
0068      * @return boolean
0069      */
0070     public function isExists($name)
0071     {
0072         return array_key_exists($name, $this->_data);
0073     }
0074 
0075     /**
0076      * Create a new queue
0077      *
0078      * Visibility timeout is how long a message is left in the queue "invisible"
0079      * to other readers.  If the message is acknowleged (deleted) before the
0080      * timeout, then the message is deleted.  However, if the timeout expires
0081      * then the message will be made available to other queue readers.
0082      *
0083      * @param  string  $name    queue name
0084      * @param  integer $timeout default visibility timeout
0085      * @return boolean
0086      */
0087     public function create($name, $timeout=null)
0088     {
0089         if ($this->isExists($name)) {
0090             return false;
0091         }
0092         if ($timeout === null) {
0093             $timeout = self::CREATE_TIMEOUT_DEFAULT;
0094         }
0095         $this->_data[$name] = array();
0096 
0097         return true;
0098     }
0099 
0100     /**
0101      * Delete a queue and all of it's messages
0102      *
0103      * Returns false if the queue is not found, true if the queue exists
0104      *
0105      * @param  string  $name queue name
0106      * @return boolean
0107      */
0108     public function delete($name)
0109     {
0110         $found = isset($this->_data[$name]);
0111 
0112         if ($found) {
0113             unset($this->_data[$name]);
0114         }
0115 
0116         return $found;
0117     }
0118 
0119     /**
0120      * Get an array of all available queues
0121      *
0122      * Not all adapters support getQueues(), use isSupported('getQueues')
0123      * to determine if the adapter supports this feature.
0124      *
0125      * @return array
0126      */
0127     public function getQueues()
0128     {
0129         return array_keys($this->_data);
0130     }
0131 
0132     /**
0133      * Return the approximate number of messages in the queue
0134      *
0135      * @param  Zend_Queue $queue
0136      * @return integer
0137      * @throws Zend_Queue_Exception
0138      */
0139     public function count(Zend_Queue $queue=null)
0140     {
0141         if ($queue === null) {
0142             $queue = $this->_queue;
0143         }
0144 
0145         if (!isset($this->_data[$queue->getName()])) {
0146             /**
0147              * @see Zend_Queue_Exception
0148              */
0149             // require_once 'Zend/Queue/Exception.php';
0150             throw new Zend_Queue_Exception('Queue does not exist');
0151         }
0152 
0153         return count($this->_data[$queue->getName()]);
0154     }
0155 
0156     /********************************************************************
0157     * Messsage management functions
0158      *********************************************************************/
0159 
0160     /**
0161      * Send a message to the queue
0162      *
0163      * @param  string     $message Message to send to the active queue
0164      * @param  Zend_Queue $queue
0165      * @return Zend_Queue_Message
0166      * @throws Zend_Queue_Exception
0167      */
0168     public function send($message, Zend_Queue $queue=null)
0169     {
0170         if ($queue === null) {
0171             $queue = $this->_queue;
0172         }
0173 
0174         if (!$this->isExists($queue->getName())) {
0175             // require_once 'Zend/Queue/Exception.php';
0176             throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
0177         }
0178 
0179         // create the message
0180         $data = array(
0181             'message_id' => md5(uniqid(rand(), true)),
0182             'body'       => $message,
0183             'md5'        => md5($message),
0184             'handle'     => null,
0185             'created'    => time(),
0186             'queue_name' => $queue->getName(),
0187         );
0188 
0189         // add $data to the "queue"
0190         $this->_data[$queue->getName()][] = $data;
0191 
0192         $options = array(
0193             'queue' => $queue,
0194             'data'  => $data,
0195         );
0196 
0197         $classname = $queue->getMessageClass();
0198         if (!class_exists($classname)) {
0199             // require_once 'Zend/Loader.php';
0200             Zend_Loader::loadClass($classname);
0201         }
0202         return new $classname($options);
0203     }
0204 
0205     /**
0206      * Get messages in the queue
0207      *
0208      * @param  integer    $maxMessages  Maximum number of messages to return
0209      * @param  integer    $timeout      Visibility timeout for these messages
0210      * @param  Zend_Queue $queue
0211      * @return Zend_Queue_Message_Iterator
0212      */
0213     public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
0214     {
0215         if ($maxMessages === null) {
0216             $maxMessages = 1;
0217         }
0218         if ($timeout === null) {
0219             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
0220         }
0221         if ($queue === null) {
0222             $queue = $this->_queue;
0223         }
0224 
0225         $data       = array();
0226         if ($maxMessages > 0) {
0227             $start_time = microtime(true);
0228 
0229             $count = 0;
0230             $temp = &$this->_data[$queue->getName()];
0231             foreach ($temp as $key=>&$msg) {
0232                 // count check has to be first, as someone can ask for 0 messages
0233                 // ZF-7650
0234                 if ($count >= $maxMessages) {
0235                     break;
0236                 }
0237 
0238                 if (($msg['handle'] === null)
0239                     || ($msg['timeout'] + $timeout < $start_time)
0240                 ) {
0241                     $msg['handle']  = md5(uniqid(rand(), true));
0242                     $msg['timeout'] = microtime(true);
0243                     $data[] = $msg;
0244                     $count++;
0245                 }
0246 
0247             }
0248         }
0249 
0250         $options = array(
0251             'queue'        => $queue,
0252             'data'         => $data,
0253             'messageClass' => $queue->getMessageClass(),
0254         );
0255 
0256         $classname = $queue->getMessageSetClass();
0257         if (!class_exists($classname)) {
0258             // require_once 'Zend/Loader.php';
0259             Zend_Loader::loadClass($classname);
0260         }
0261         return new $classname($options);
0262     }
0263 
0264     /**
0265      * Delete a message from the queue
0266      *
0267      * Returns true if the message is deleted, false if the deletion is
0268      * unsuccessful.
0269      *
0270      * @param  Zend_Queue_Message $message
0271      * @return boolean
0272      * @throws Zend_Queue_Exception
0273      */
0274     public function deleteMessage(Zend_Queue_Message $message)
0275     {
0276         // load the queue
0277         $queue = &$this->_data[$message->queue_name];
0278 
0279         foreach ($queue as $key => &$msg) {
0280             if ($msg['handle'] == $message->handle) {
0281                 unset($queue[$key]);
0282                 return true;
0283             }
0284         }
0285 
0286         return false;
0287     }
0288 
0289     /********************************************************************
0290      * Supporting functions
0291      *********************************************************************/
0292 
0293     /**
0294      * Return a list of queue capabilities functions
0295      *
0296      * $array['function name'] = true or false
0297      * true is supported, false is not supported.
0298      *
0299      * @param  string $name
0300      * @return array
0301      */
0302     public function getCapabilities()
0303     {
0304         return array(
0305             'create'        => true,
0306             'delete'        => true,
0307             'send'          => true,
0308             'receive'       => true,
0309             'deleteMessage' => true,
0310             'getQueues'     => true,
0311             'count'         => true,
0312             'isExists'      => true,
0313         );
0314     }
0315 
0316     /********************************************************************
0317     * Functions that are not part of the Zend_Queue_Adapter_Abstract
0318      *********************************************************************/
0319 
0320     /**
0321      * serialize
0322      */
0323     public function __sleep()
0324     {
0325         return array('_data');
0326     }
0327 
0328     /*
0329      * These functions are debug helpers.
0330      */
0331 
0332     /**
0333      * returns underlying _data array
0334      * $queue->getAdapter()->getData();
0335      *
0336      * @return $this;
0337      */
0338     public function getData()
0339     {
0340         return $this->_data;
0341     }
0342 
0343     /**
0344      * sets the underlying _data array
0345      * $queue->getAdapter()->setData($data);
0346      *
0347      * @param array $data
0348      * @return $this;
0349      */
0350     public function setData($data)
0351     {
0352         $this->_data = $data;
0353         return $this;
0354     }
0355 }