File indexing completed on 2025-01-19 05:21:24

0001 <?php
0002 /**
0003  * Zend Framework
0004  *
0005  * LICENSE
0006  *
0007  * This source file is subject to the new BSD license that is bundled
0008  * with this package in the file LICENSE.txt.
0009  * It is also available through the world-wide-web at this URL:
0010  * http://framework.zend.com/license/new-bsd
0011  * If you did not receive a copy of the license and are unable to
0012  * obtain it through the world-wide-web, please send an email
0013  * to license@zend.com so we can send you a copy immediately.
0014  *
0015  * @category   Zend
0016  * @package    Zend_Queue
0017  * @subpackage Adapter
0018  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0019  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0020  * @version    $Id$
0021  */
0022 
0023 /**
0024  * @see Zend_Queue_Adapter_AdapterAbstract
0025  */
0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
0027 
0028 /**
0029  * Class for using connecting to a Zend_Cache-based queuing system
0030  *
0031  * @category   Zend
0032  * @package    Zend_Queue
0033  * @subpackage Adapter
0034  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0035  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0036  */
0037 class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
0038 {
0039     const DEFAULT_HOST = '127.0.0.1';
0040     const DEFAULT_PORT = 22201;
0041     const EOL          = "\r\n";
0042 
0043     /**
0044      * @var Memcache
0045      */
0046     protected $_cache = null;
0047 
0048     /**
0049      * @var string
0050      */
0051     protected $_host = null;
0052 
0053     /**
0054      * @var integer
0055      */
0056     protected $_port = null;
0057 
0058     /**
0059      * @var resource
0060      */
0061     protected $_socket = null;
0062 
0063     /********************************************************************
0064     * Constructor / Destructor
0065      *********************************************************************/
0066 
0067     /**
0068      * Constructor
0069      *
0070      * @param  array|Zend_Config $options
0071      * @param  null|Zend_Queue $queue
0072      * @return void
0073      */
0074     public function __construct($options, Zend_Queue $queue = null)
0075     {
0076         if (!extension_loaded('memcache')) {
0077             // require_once 'Zend/Queue/Exception.php';
0078             throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
0079         }
0080 
0081         parent::__construct($options, $queue);
0082 
0083         $options = &$this->_options['driverOptions'];
0084 
0085         if (!array_key_exists('host', $options)) {
0086             $options['host'] = self::DEFAULT_HOST;
0087         }
0088         if (!array_key_exists('port', $options)) {
0089             $options['port'] = self::DEFAULT_PORT;
0090         }
0091 
0092         $this->_cache = new Memcache();
0093 
0094         $result = $this->_cache->connect($options['host'], $options['port']);
0095 
0096         if ($result === false) {
0097             // require_once 'Zend/Queue/Exception.php';
0098             throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
0099         }
0100 
0101         $this->_host = $options['host'];
0102         $this->_port = (int)$options['port'];
0103     }
0104 
0105     /**
0106      * Destructor
0107      *
0108      * @return void
0109      */
0110     public function __destruct()
0111     {
0112         if ($this->_cache instanceof Memcache) {
0113             $this->_cache->close();
0114         }
0115         if (is_resource($this->_socket)) {
0116             $cmd = 'quit' . self::EOL;
0117             fwrite($this->_socket, $cmd);
0118             fclose($this->_socket);
0119         }
0120     }
0121 
0122     /********************************************************************
0123      * Queue management functions
0124      *********************************************************************/
0125 
0126     /**
0127      * Does a queue already exist?
0128      *
0129      * Throws an exception if the adapter cannot determine if a queue exists.
0130      * use isSupported('isExists') to determine if an adapter can test for
0131      * queue existance.
0132      *
0133      * @param  string $name
0134      * @return boolean
0135      * @throws Zend_Queue_Exception
0136      */
0137     public function isExists($name)
0138     {
0139         if (empty($this->_queues)) {
0140             $this->getQueues();
0141         }
0142 
0143         return in_array($name, $this->_queues);
0144     }
0145 
0146     /**
0147      * Create a new queue
0148      *
0149      * Visibility timeout is how long a message is left in the queue "invisible"
0150      * to other readers.  If the message is acknowleged (deleted) before the
0151      * timeout, then the message is deleted.  However, if the timeout expires
0152      * then the message will be made available to other queue readers.
0153      *
0154      * @param  string  $name    queue name
0155      * @param  integer $timeout default visibility timeout
0156      * @return boolean
0157      * @throws Zend_Queue_Exception
0158      */
0159     public function create($name, $timeout=null)
0160     {
0161         if ($this->isExists($name)) {
0162             return false;
0163         }
0164         if ($timeout === null) {
0165             $timeout = self::CREATE_TIMEOUT_DEFAULT;
0166         }
0167 
0168         // MemcacheQ does not have a method to "create" a queue
0169         // queues are created upon sending a packet.
0170         // We cannot use the send() and receive() functions because those
0171         // depend on the current name.
0172         $result = $this->_cache->set($name, 'creating queue', 0, 15);
0173         $result = $this->_cache->get($name);
0174 
0175         $this->_queues[] = $name;
0176 
0177         return true;
0178     }
0179 
0180     /**
0181      * Delete a queue and all of it's messages
0182      *
0183      * Returns false if the queue is not found, true if the queue exists
0184      *
0185      * @param  string  $name queue name
0186      * @return boolean
0187      * @throws Zend_Queue_Exception
0188      */
0189     public function delete($name)
0190     {
0191         $response = $this->_sendCommand('delete ' . $name, array('DELETED', 'NOT_FOUND'), true);
0192 
0193         if (in_array('DELETED', $response)) {
0194             $key = array_search($name, $this->_queues);
0195 
0196             if ($key !== false) {
0197                 unset($this->_queues[$key]);
0198             }
0199             return true;
0200         }
0201 
0202         return false;
0203     }
0204 
0205     /**
0206      * Get an array of all available queues
0207      *
0208      * Not all adapters support getQueues(), use isSupported('getQueues')
0209      * to determine if the adapter supports this feature.
0210      *
0211      * @return array
0212      * @throws Zend_Queue_Exception
0213      */
0214     public function getQueues()
0215     {
0216         $this->_queues = array();
0217 
0218         $response = $this->_sendCommand('stats queue', array('END'));
0219 
0220         foreach ($response as $i => $line) {
0221             $this->_queues[] = str_replace('STAT ', '', $line);
0222         }
0223 
0224         return $this->_queues;
0225     }
0226 
0227     /**
0228      * Return the approximate number of messages in the queue
0229      *
0230      * @param  Zend_Queue $queue
0231      * @return integer
0232      * @throws Zend_Queue_Exception (not supported)
0233      */
0234     public function count(Zend_Queue $queue=null)
0235     {
0236         // require_once 'Zend/Queue/Exception.php';
0237         throw new Zend_Queue_Exception('count() is not supported in this adapter');
0238     }
0239 
0240     /********************************************************************
0241      * Messsage management functions
0242      *********************************************************************/
0243 
0244     /**
0245      * Send a message to the queue
0246      *
0247      * @param  string     $message Message to send to the active queue
0248      * @param  Zend_Queue $queue
0249      * @return Zend_Queue_Message
0250      * @throws Zend_Queue_Exception
0251      */
0252     public function send($message, Zend_Queue $queue=null)
0253     {
0254         if ($queue === null) {
0255             $queue = $this->_queue;
0256         }
0257 
0258         if (!$this->isExists($queue->getName())) {
0259             // require_once 'Zend/Queue/Exception.php';
0260             throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
0261         }
0262 
0263         $message = (string) $message;
0264         $data    = array(
0265             'message_id' => md5(uniqid(rand(), true)),
0266             'handle'     => null,
0267             'body'       => $message,
0268             'md5'        => md5($message),
0269         );
0270 
0271         $result = $this->_cache->set($queue->getName(), $message, 0, 0);
0272         if ($result === false) {
0273             // require_once 'Zend/Queue/Exception.php';
0274             throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
0275         }
0276 
0277         $options = array(
0278             'queue' => $queue,
0279             'data'  => $data,
0280         );
0281 
0282         $classname = $queue->getMessageClass();
0283         if (!class_exists($classname)) {
0284             // require_once 'Zend/Loader.php';
0285             Zend_Loader::loadClass($classname);
0286         }
0287         return new $classname($options);
0288     }
0289 
0290     /**
0291      * Get messages in the queue
0292      *
0293      * @param  integer    $maxMessages  Maximum number of messages to return
0294      * @param  integer    $timeout      Visibility timeout for these messages
0295      * @param  Zend_Queue $queue
0296      * @return Zend_Queue_Message_Iterator
0297      * @throws Zend_Queue_Exception
0298      */
0299     public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
0300     {
0301         if ($maxMessages === null) {
0302             $maxMessages = 1;
0303         }
0304 
0305         if ($timeout === null) {
0306             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
0307         }
0308         if ($queue === null) {
0309             $queue = $this->_queue;
0310         }
0311 
0312         $msgs = array();
0313         if ($maxMessages > 0 ) {
0314             for ($i = 0; $i < $maxMessages; $i++) {
0315                 $data = array(
0316                     'handle' => md5(uniqid(rand(), true)),
0317                     'body'   => $this->_cache->get($queue->getName()),
0318                 );
0319 
0320                 $msgs[] = $data;
0321             }
0322         }
0323 
0324         $options = array(
0325             'queue'        => $queue,
0326             'data'         => $msgs,
0327             'messageClass' => $queue->getMessageClass(),
0328         );
0329 
0330         $classname = $queue->getMessageSetClass();
0331         if (!class_exists($classname)) {
0332             // require_once 'Zend/Loader.php';
0333             Zend_Loader::loadClass($classname);
0334         }
0335         return new $classname($options);
0336     }
0337 
0338     /**
0339      * Delete a message from the queue
0340      *
0341      * Returns true if the message is deleted, false if the deletion is
0342      * unsuccessful.
0343      *
0344      * @param  Zend_Queue_Message $message
0345      * @return boolean
0346      * @throws Zend_Queue_Exception (unsupported)
0347      */
0348     public function deleteMessage(Zend_Queue_Message $message)
0349     {
0350         // require_once 'Zend/Queue/Exception.php';
0351         throw new Zend_Queue_Exception('deleteMessage() is not supported in  ' . get_class($this));
0352     }
0353 
0354     /********************************************************************
0355      * Supporting functions
0356      *********************************************************************/
0357 
0358     /**
0359      * Return a list of queue capabilities functions
0360      *
0361      * $array['function name'] = true or false
0362      * true is supported, false is not supported.
0363      *
0364      * @param  string $name
0365      * @return array
0366      */
0367     public function getCapabilities()
0368     {
0369         return array(
0370             'create'        => true,
0371             'delete'        => true,
0372             'send'          => true,
0373             'receive'       => true,
0374             'deleteMessage' => false,
0375             'getQueues'     => true,
0376             'count'         => false,
0377             'isExists'      => true,
0378         );
0379     }
0380 
0381     /********************************************************************
0382      * Functions that are not part of the Zend_Queue_Adapter_Abstract
0383      *********************************************************************/
0384 
0385     /**
0386      * sends a command to MemcacheQ
0387      *
0388      * The memcache functions by php cannot handle all types of requests
0389      * supported by MemcacheQ
0390      * Non-standard requests are handled by this function.
0391      *
0392      * @param  string  $command - command to send to memcacheQ
0393      * @param  array   $terminator - strings to indicate end of memcacheQ response
0394      * @param  boolean $include_term - include terminator in response
0395      * @return array
0396      * @throws Zend_Queue_Exception if connection cannot be opened
0397      */
0398     protected function _sendCommand($command, array $terminator, $include_term=false)
0399     {
0400         if (!is_resource($this->_socket)) {
0401             $this->_socket = fsockopen($this->_host, $this->_port, $errno, $errstr, 10);
0402         }
0403         if ($this->_socket === false) {
0404             // require_once 'Zend/Queue/Exception.php';
0405             throw new Zend_Queue_Exception("Could not open a connection to $this->_host:$this->_port errno=$errno : $errstr");
0406         }
0407 
0408         $response = array();
0409 
0410         $cmd = $command . self::EOL;
0411         fwrite($this->_socket, $cmd);
0412 
0413         $continue_reading = true;
0414         while (!feof($this->_socket) && $continue_reading) {
0415             $resp = trim(fgets($this->_socket, 1024));
0416             if (in_array($resp, $terminator)) {
0417                 if ($include_term) {
0418                     $response[] = $resp;
0419                 }
0420                 $continue_reading = false;
0421             } else {
0422                 $response[] = $resp;
0423             }
0424         }
0425 
0426         return $response;
0427     }
0428 }