File indexing completed on 2025-01-19 05:21:24
0001 <?php 0002 /** 0003 * Zend Framework 0004 * 0005 * LICENSE 0006 * 0007 * This source file is subject to the new BSD license that is bundled 0008 * with this package in the file LICENSE.txt. 0009 * It is also available through the world-wide-web at this URL: 0010 * http://framework.zend.com/license/new-bsd 0011 * If you did not receive a copy of the license and are unable to 0012 * obtain it through the world-wide-web, please send an email 0013 * to license@zend.com so we can send you a copy immediately. 0014 * 0015 * @category Zend 0016 * @package Zend_Queue 0017 * @subpackage Adapter 0018 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0019 * @license http://framework.zend.com/license/new-bsd New BSD License 0020 * @version $Id$ 0021 */ 0022 0023 /** 0024 * @see Zend_Queue_Adapter_AdapterAbstract 0025 */ 0026 // require_once 'Zend/Queue/Adapter/AdapterAbstract.php'; 0027 0028 /** 0029 * @see Zend_Queue_Adapter_Stomp_Client 0030 */ 0031 // require_once 'Zend/Queue/Stomp/Client.php'; 0032 0033 /** 0034 * @see Zend_Queue_Adapter_Stomp_Frame 0035 */ 0036 // require_once 'Zend/Queue/Stomp/Frame.php'; 0037 0038 /** 0039 * Class for using Stomp to talk to an Stomp compliant server 0040 * 0041 * @category Zend 0042 * @package Zend_Queue 0043 * @subpackage Adapter 0044 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0045 * @license http://framework.zend.com/license/new-bsd New BSD License 0046 */ 0047 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract 0048 { 0049 const DEFAULT_SCHEME = 'tcp'; 0050 const DEFAULT_HOST = '127.0.0.1'; 0051 const DEFAULT_PORT = 61613; 0052 0053 /** 0054 * @var Zend_Queue_Adapter_Stomp_client 0055 */ 0056 private $_client = null; 0057 0058 /** 0059 * @var array 0060 */ 0061 private $_subscribed = array(); 0062 0063 /** 0064 * Constructor 0065 * 0066 * @param array|Zend_Config $config An array having configuration data 0067 * @param Zend_Queue The Zend_Queue object that created this class 0068 * @return void 0069 */ 0070 public function __construct($options, Zend_Queue $queue = null) 0071 { 0072 parent::__construct($options); 0073 0074 $options = &$this->_options['driverOptions']; 0075 if (!array_key_exists('scheme', $options)) { 0076 $options['scheme'] = self::DEFAULT_SCHEME; 0077 } 0078 if (!array_key_exists('host', $options)) { 0079 $options['host'] = self::DEFAULT_HOST; 0080 } 0081 if (!array_key_exists('port', $options)) { 0082 $options['port'] = self::DEFAULT_PORT; 0083 } 0084 0085 if (array_key_exists('stompClient', $options)) { 0086 $this->_client = $options['stompClient']; 0087 } else { 0088 $this->_client = new Zend_Queue_Stomp_Client($options['scheme'], $options['host'], $options['port']); 0089 } 0090 0091 $connect = $this->_client->createFrame(); 0092 0093 // Username and password are optional on some messaging servers 0094 // such as Apache's ActiveMQ 0095 $connect->setCommand('CONNECT'); 0096 if (isset($options['username'])) { 0097 $connect->setHeader('login', $options['username']); 0098 $connect->setHeader('passcode', $options['password']); 0099 } 0100 0101 $response = $this->_client->send($connect)->receive(); 0102 0103 if ((false !== $response) 0104 && ($response->getCommand() != 'CONNECTED') 0105 ) { 0106 // require_once 'Zend/Queue/Exception.php'; 0107 throw new Zend_Queue_Exception("Unable to authenticate to '".$options['scheme'].'://'.$options['host'].':'.$options['port']."'"); 0108 } 0109 } 0110 0111 /** 0112 * Close the socket explicitly when destructed 0113 * 0114 * @return void 0115 */ 0116 public function __destruct() 0117 { 0118 // Gracefully disconnect 0119 $frame = $this->_client->createFrame(); 0120 $frame->setCommand('DISCONNECT'); 0121 $this->_client->send($frame); 0122 unset($this->_client); 0123 } 0124 0125 /** 0126 * Create a new queue 0127 * 0128 * @param string $name queue name 0129 * @param integer $timeout default visibility timeout 0130 * @return void 0131 * @throws Zend_Queue_Exception 0132 */ 0133 public function create($name, $timeout=null) 0134 { 0135 // require_once 'Zend/Queue/Exception.php'; 0136 throw new Zend_Queue_Exception('create() is not supported in ' . get_class($this)); 0137 } 0138 0139 /** 0140 * Delete a queue and all of its messages 0141 * 0142 * @param string $name queue name 0143 * @return void 0144 * @throws Zend_Queue_Exception 0145 */ 0146 public function delete($name) 0147 { 0148 // require_once 'Zend/Queue/Exception.php'; 0149 throw new Zend_Queue_Exception('delete() is not supported in ' . get_class($this)); 0150 } 0151 0152 /** 0153 * Delete a message from the queue 0154 * 0155 * Returns true if the message is deleted, false if the deletion is 0156 * unsuccessful. 0157 * 0158 * @param Zend_Queue_Message $message 0159 * @return boolean 0160 */ 0161 public function deleteMessage(Zend_Queue_Message $message) 0162 { 0163 $frame = $this->_client->createFrame(); 0164 $frame->setCommand('ACK'); 0165 $frame->setHeader('message-id', $message->handle); 0166 0167 $this->_client->send($frame); 0168 0169 return true; 0170 } 0171 0172 /** 0173 * Get an array of all available queues 0174 * 0175 * @return void 0176 * @throws Zend_Queue_Exception 0177 */ 0178 public function getQueues() 0179 { 0180 // require_once 'Zend/Queue/Exception.php'; 0181 throw new Zend_Queue_Exception('getQueues() is not supported in this adapter'); 0182 } 0183 0184 /** 0185 * Checks if the client is subscribed to the queue 0186 * 0187 * @param Zend_Queue $queue 0188 * @return boolean 0189 */ 0190 protected function _isSubscribed(Zend_Queue $queue) 0191 { 0192 return isset($this->_subscribed[$queue->getName()]); 0193 } 0194 0195 /** 0196 * Subscribes the client to the queue. 0197 * 0198 * @param Zend_Queue $queue 0199 * @return void 0200 */ 0201 protected function _subscribe(Zend_Queue $queue) 0202 { 0203 $frame = $this->_client->createFrame(); 0204 $frame->setCommand('SUBSCRIBE'); 0205 $frame->setHeader('destination', $queue->getName()); 0206 $frame->setHeader('ack', 'client'); 0207 $this->_client->send($frame); 0208 $this->_subscribed[$queue->getName()] = true; 0209 } 0210 0211 /** 0212 * Return the first element in the queue 0213 * 0214 * @param integer $maxMessages 0215 * @param integer $timeout 0216 * @param Zend_Queue $queue 0217 * @return Zend_Queue_Message_Iterator 0218 */ 0219 public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null) 0220 { 0221 if ($maxMessages === null) { 0222 $maxMessages = 1; 0223 } 0224 if ($timeout === null) { 0225 $timeout = self::RECEIVE_TIMEOUT_DEFAULT; 0226 } 0227 if ($queue === null) { 0228 $queue = $this->_queue; 0229 } 0230 0231 // read 0232 $data = array(); 0233 0234 // signal that we are reading 0235 if (!$this->_isSubscribed($queue)){ 0236 $this->_subscribe($queue); 0237 } 0238 0239 if ($maxMessages > 0) { 0240 if ($this->_client->canRead()) { 0241 for ($i = 0; $i < $maxMessages; $i++) { 0242 $response = $this->_client->receive(); 0243 0244 switch ($response->getCommand()) { 0245 case 'MESSAGE': 0246 $datum = array( 0247 'message_id' => $response->getHeader('message-id'), 0248 'handle' => $response->getHeader('message-id'), 0249 'body' => $response->getBody(), 0250 'md5' => md5($response->getBody()) 0251 ); 0252 $data[] = $datum; 0253 break; 0254 default: 0255 $block = print_r($response, true); 0256 // require_once 'Zend/Queue/Exception.php'; 0257 throw new Zend_Queue_Exception('Invalid response received: ' . $block); 0258 } 0259 } 0260 } 0261 } 0262 0263 $options = array( 0264 'queue' => $queue, 0265 'data' => $data, 0266 'messageClass' => $queue->getMessageClass() 0267 ); 0268 0269 $classname = $queue->getMessageSetClass(); 0270 0271 if (!class_exists($classname)) { 0272 // require_once 'Zend/Loader.php'; 0273 Zend_Loader::loadClass($classname); 0274 } 0275 return new $classname($options); 0276 } 0277 0278 /** 0279 * Push an element onto the end of the queue 0280 * 0281 * @param string $message message to send to the queue 0282 * @param Zend_Queue $queue 0283 * @return Zend_Queue_Message 0284 */ 0285 public function send($message, Zend_Queue $queue=null) 0286 { 0287 if ($queue === null) { 0288 $queue = $this->_queue; 0289 } 0290 0291 $frame = $this->_client->createFrame(); 0292 $frame->setCommand('SEND'); 0293 $frame->setHeader('destination', $queue->getName()); 0294 $frame->setHeader('content-length', strlen($message)); 0295 $frame->setBody((string) $message); 0296 $this->_client->send($frame); 0297 0298 $data = array( 0299 'message_id' => null, 0300 'body' => $message, 0301 'md5' => md5($message), 0302 'handle' => null 0303 ); 0304 0305 $options = array( 0306 'queue' => $queue, 0307 'data' => $data 0308 ); 0309 0310 $classname = $queue->getMessageClass(); 0311 if (!class_exists($classname)) { 0312 // require_once 'Zend/Loader.php'; 0313 Zend_Loader::loadClass($classname); 0314 } 0315 return new $classname($options); 0316 } 0317 0318 /** 0319 * Returns the length of the queue 0320 * 0321 * @param Zend_Queue $queue 0322 * @return integer 0323 * @throws Zend_Queue_Exception (not supported) 0324 */ 0325 public function count(Zend_Queue $queue=null) 0326 { 0327 // require_once 'Zend/Queue/Exception.php'; 0328 throw new Zend_Queue_Exception('count() is not supported in this adapter'); 0329 } 0330 0331 /** 0332 * Does a queue already exist? 0333 * 0334 * @param string $name 0335 * @return boolean 0336 * @throws Zend_Queue_Exception (not supported) 0337 */ 0338 public function isExists($name) 0339 { 0340 // require_once 'Zend/Queue/Exception.php'; 0341 throw new Zend_Queue_Exception('isExists() is not supported in this adapter'); 0342 } 0343 0344 /** 0345 * Return a list of queue capabilities functions 0346 * 0347 * $array['function name'] = true or false 0348 * true is supported, false is not supported. 0349 * 0350 * @param string $name 0351 * @return array 0352 */ 0353 public function getCapabilities() 0354 { 0355 return array( 0356 'create' => false, 0357 'delete' => false, 0358 'send' => true, 0359 'receive' => true, 0360 'deleteMessage' => true, 0361 'getQueues' => false, 0362 'count' => false, 0363 'isExists' => false, 0364 ); 0365 } 0366 }