File indexing completed on 2025-01-19 05:21:24

0001 <?php
0002 /**
0003  * Zend Framework
0004  *
0005  * LICENSE
0006  *
0007  * This source file is subject to the new BSD license that is bundled
0008  * with this package in the file LICENSE.txt.
0009  * It is also available through the world-wide-web at this URL:
0010  * http://framework.zend.com/license/new-bsd
0011  * If you did not receive a copy of the license and are unable to
0012  * obtain it through the world-wide-web, please send an email
0013  * to license@zend.com so we can send you a copy immediately.
0014  *
0015  * @category   Zend
0016  * @package    Zend_Queue
0017  * @subpackage Adapter
0018  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0019  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0020  * @version    $Id$
0021  */
0022 
0023 /**
0024  * @see Zend_Queue_Adapter_AdapterAbstract
0025  */
0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
0027 
0028 /**
0029  * @see Zend_Queue_Adapter_Stomp_Client
0030  */
0031 // require_once 'Zend/Queue/Stomp/Client.php';
0032 
0033 /**
0034  * @see Zend_Queue_Adapter_Stomp_Frame
0035  */
0036 // require_once 'Zend/Queue/Stomp/Frame.php';
0037 
0038 /**
0039  * Class for using Stomp to talk to an Stomp compliant server
0040  *
0041  * @category   Zend
0042  * @package    Zend_Queue
0043  * @subpackage Adapter
0044  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0045  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0046  */
0047 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
0048 {
0049     const DEFAULT_SCHEME = 'tcp';
0050     const DEFAULT_HOST   = '127.0.0.1';
0051     const DEFAULT_PORT   = 61613;
0052 
0053     /**
0054      * @var Zend_Queue_Adapter_Stomp_client
0055      */
0056     private $_client = null;
0057 
0058     /**
0059      * @var array
0060      */
0061     private $_subscribed = array();
0062 
0063     /**
0064      * Constructor
0065      *
0066      * @param  array|Zend_Config $config An array having configuration data
0067      * @param  Zend_Queue The Zend_Queue object that created this class
0068      * @return void
0069      */
0070     public function __construct($options, Zend_Queue $queue = null)
0071     {
0072         parent::__construct($options);
0073 
0074         $options = &$this->_options['driverOptions'];
0075         if (!array_key_exists('scheme', $options)) {
0076             $options['scheme'] = self::DEFAULT_SCHEME;
0077         }
0078         if (!array_key_exists('host', $options)) {
0079             $options['host'] = self::DEFAULT_HOST;
0080         }
0081         if (!array_key_exists('port', $options)) {
0082             $options['port'] = self::DEFAULT_PORT;
0083         }
0084 
0085         if (array_key_exists('stompClient', $options)) {
0086             $this->_client = $options['stompClient'];
0087         } else {
0088             $this->_client = new Zend_Queue_Stomp_Client($options['scheme'], $options['host'], $options['port']);
0089         }
0090 
0091         $connect = $this->_client->createFrame();
0092 
0093         // Username and password are optional on some messaging servers
0094         // such as Apache's ActiveMQ
0095         $connect->setCommand('CONNECT');
0096         if (isset($options['username'])) {
0097             $connect->setHeader('login', $options['username']);
0098             $connect->setHeader('passcode', $options['password']);
0099         }
0100 
0101         $response = $this->_client->send($connect)->receive();
0102 
0103         if ((false !== $response)
0104             && ($response->getCommand() != 'CONNECTED')
0105         ) {
0106             // require_once 'Zend/Queue/Exception.php';
0107             throw new Zend_Queue_Exception("Unable to authenticate to '".$options['scheme'].'://'.$options['host'].':'.$options['port']."'");
0108         }
0109     }
0110 
0111     /**
0112      * Close the socket explicitly when destructed
0113      *
0114      * @return void
0115      */
0116     public function __destruct()
0117     {
0118         // Gracefully disconnect
0119         $frame = $this->_client->createFrame();
0120         $frame->setCommand('DISCONNECT');
0121         $this->_client->send($frame);
0122         unset($this->_client);
0123     }
0124 
0125     /**
0126      * Create a new queue
0127      *
0128      * @param  string  $name    queue name
0129      * @param  integer $timeout default visibility timeout
0130      * @return void
0131      * @throws Zend_Queue_Exception
0132      */
0133     public function create($name, $timeout=null)
0134     {
0135         // require_once 'Zend/Queue/Exception.php';
0136         throw new Zend_Queue_Exception('create() is not supported in ' . get_class($this));
0137     }
0138 
0139     /**
0140      * Delete a queue and all of its messages
0141      *
0142      * @param  string $name queue name
0143      * @return void
0144      * @throws Zend_Queue_Exception
0145      */
0146     public function delete($name)
0147     {
0148         // require_once 'Zend/Queue/Exception.php';
0149         throw new Zend_Queue_Exception('delete() is not supported in ' . get_class($this));
0150     }
0151 
0152     /**
0153      * Delete a message from the queue
0154      *
0155      * Returns true if the message is deleted, false if the deletion is
0156      * unsuccessful.
0157      *
0158      * @param  Zend_Queue_Message $message
0159      * @return boolean
0160      */
0161     public function deleteMessage(Zend_Queue_Message $message)
0162     {
0163         $frame = $this->_client->createFrame();
0164         $frame->setCommand('ACK');
0165         $frame->setHeader('message-id', $message->handle);
0166 
0167         $this->_client->send($frame);
0168 
0169         return true;
0170     }
0171 
0172     /**
0173      * Get an array of all available queues
0174      *
0175      * @return void
0176      * @throws Zend_Queue_Exception
0177      */
0178     public function getQueues()
0179     {
0180         // require_once 'Zend/Queue/Exception.php';
0181         throw new Zend_Queue_Exception('getQueues() is not supported in this adapter');
0182     }
0183 
0184     /**
0185      * Checks if the client is subscribed to the queue
0186      *
0187      * @param  Zend_Queue $queue
0188      * @return boolean
0189      */
0190     protected function _isSubscribed(Zend_Queue $queue)
0191     {
0192         return isset($this->_subscribed[$queue->getName()]);
0193     }
0194 
0195     /**
0196       * Subscribes the client to the queue.
0197       *
0198       * @param  Zend_Queue $queue
0199       * @return void
0200       */
0201     protected function _subscribe(Zend_Queue $queue)
0202     {
0203         $frame = $this->_client->createFrame();
0204         $frame->setCommand('SUBSCRIBE');
0205         $frame->setHeader('destination', $queue->getName());
0206         $frame->setHeader('ack', 'client');
0207         $this->_client->send($frame);
0208         $this->_subscribed[$queue->getName()] = true;
0209     }
0210 
0211     /**
0212      * Return the first element in the queue
0213      *
0214      * @param  integer    $maxMessages
0215      * @param  integer    $timeout
0216      * @param  Zend_Queue $queue
0217      * @return Zend_Queue_Message_Iterator
0218      */
0219     public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
0220     {
0221         if ($maxMessages === null) {
0222             $maxMessages = 1;
0223         }
0224         if ($timeout === null) {
0225             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
0226         }
0227         if ($queue === null) {
0228             $queue = $this->_queue;
0229         }
0230 
0231         // read
0232         $data = array();
0233 
0234         // signal that we are reading
0235         if (!$this->_isSubscribed($queue)){
0236             $this->_subscribe($queue);
0237         }
0238 
0239         if ($maxMessages > 0) {
0240             if ($this->_client->canRead()) {
0241                 for ($i = 0; $i < $maxMessages; $i++) {
0242                     $response = $this->_client->receive();
0243 
0244                     switch ($response->getCommand()) {
0245                         case 'MESSAGE':
0246                             $datum = array(
0247                                 'message_id' => $response->getHeader('message-id'),
0248                                 'handle'     => $response->getHeader('message-id'),
0249                                 'body'       => $response->getBody(),
0250                                 'md5'        => md5($response->getBody())
0251                             );
0252                             $data[] = $datum;
0253                             break;
0254                         default:
0255                             $block = print_r($response, true);
0256                             // require_once 'Zend/Queue/Exception.php';
0257                             throw new Zend_Queue_Exception('Invalid response received: ' . $block);
0258                     }
0259                 }
0260             }
0261         }
0262 
0263         $options = array(
0264             'queue'        => $queue,
0265             'data'         => $data,
0266             'messageClass' => $queue->getMessageClass()
0267         );
0268 
0269         $classname = $queue->getMessageSetClass();
0270 
0271         if (!class_exists($classname)) {
0272             // require_once 'Zend/Loader.php';
0273             Zend_Loader::loadClass($classname);
0274         }
0275         return new $classname($options);
0276     }
0277 
0278     /**
0279      * Push an element onto the end of the queue
0280      *
0281      * @param  string     $message message to send to the queue
0282      * @param  Zend_Queue $queue
0283      * @return Zend_Queue_Message
0284      */
0285     public function send($message, Zend_Queue $queue=null)
0286     {
0287         if ($queue === null) {
0288             $queue = $this->_queue;
0289         }
0290 
0291         $frame = $this->_client->createFrame();
0292         $frame->setCommand('SEND');
0293         $frame->setHeader('destination', $queue->getName());
0294         $frame->setHeader('content-length', strlen($message));
0295         $frame->setBody((string) $message);
0296         $this->_client->send($frame);
0297 
0298         $data = array(
0299             'message_id' => null,
0300             'body'       => $message,
0301             'md5'        => md5($message),
0302             'handle'     => null
0303         );
0304 
0305         $options = array(
0306             'queue' => $queue,
0307             'data'  => $data
0308         );
0309 
0310         $classname = $queue->getMessageClass();
0311         if (!class_exists($classname)) {
0312             // require_once 'Zend/Loader.php';
0313             Zend_Loader::loadClass($classname);
0314         }
0315         return new $classname($options);
0316     }
0317 
0318     /**
0319      * Returns the length of the queue
0320      *
0321      * @param  Zend_Queue $queue
0322      * @return integer
0323      * @throws Zend_Queue_Exception (not supported)
0324      */
0325     public function count(Zend_Queue $queue=null)
0326     {
0327         // require_once 'Zend/Queue/Exception.php';
0328         throw new Zend_Queue_Exception('count() is not supported in this adapter');
0329     }
0330 
0331     /**
0332      * Does a queue already exist?
0333      *
0334      * @param  string $name
0335      * @return boolean
0336      * @throws Zend_Queue_Exception (not supported)
0337      */
0338     public function isExists($name)
0339     {
0340         // require_once 'Zend/Queue/Exception.php';
0341         throw new Zend_Queue_Exception('isExists() is not supported in this adapter');
0342     }
0343 
0344     /**
0345      * Return a list of queue capabilities functions
0346      *
0347      * $array['function name'] = true or false
0348      * true is supported, false is not supported.
0349      *
0350      * @param  string $name
0351      * @return array
0352      */
0353     public function getCapabilities()
0354     {
0355         return array(
0356             'create'        => false,
0357             'delete'        => false,
0358             'send'          => true,
0359             'receive'       => true,
0360             'deleteMessage' => true,
0361             'getQueues'     => false,
0362             'count'         => false,
0363             'isExists'      => false,
0364         );
0365     }
0366 }