File indexing completed on 2024-12-29 05:27:29
0001 <?php 0002 /** 0003 * LICENSE 0004 * 0005 * This source file is subject to the new BSD license that is bundled 0006 * with this package in the file LICENSE.txt. 0007 * It is also available through the world-wide-web at this URL: 0008 * http://framework.zend.com/license/new-bsd 0009 * If you did not receive a copy of the license and are unable to 0010 * obtain it through the world-wide-web, please send an email 0011 * to license@zend.com so we can send you a copy immediately. 0012 * 0013 * @category Zend 0014 * @package Zend_Cloud 0015 * @subpackage QueueService 0016 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0017 * @license http://framework.zend.com/license/new-bsd New BSD License 0018 */ 0019 0020 // require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php'; 0021 // require_once 'Zend/Cloud/QueueService/Exception.php'; 0022 // require_once 'Zend/Queue.php'; 0023 0024 /** 0025 * WindowsAzure adapter for simple queue service. 0026 * 0027 * @category Zend 0028 * @package Zend_Cloud 0029 * @subpackage QueueService 0030 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0031 * @license http://framework.zend.com/license/new-bsd New BSD License 0032 */ 0033 class Zend_Cloud_QueueService_Adapter_ZendQueue 0034 extends Zend_Cloud_QueueService_Adapter_AbstractAdapter 0035 { 0036 /** 0037 * Options array keys for the Zend_Queue adapter. 0038 */ 0039 const ADAPTER = 'adapter'; 0040 0041 /** 0042 * Storage client 0043 * 0044 * @var Zend_Queue 0045 */ 0046 protected $_queue = null; 0047 0048 /** 0049 * @var array All queues 0050 */ 0051 protected $_queues = array(); 0052 0053 /** 0054 * Constructor 0055 * 0056 * @param array|Zend_Config $options 0057 * @return void 0058 */ 0059 public function __construct ($options = array()) 0060 { 0061 if ($options instanceof Zend_Config) { 0062 $options = $options->toArray(); 0063 } 0064 0065 if (!is_array($options)) { 0066 throw new Zend_Cloud_QueueService_Exception('Invalid options provided'); 0067 } 0068 0069 if (isset($options[self::MESSAGE_CLASS])) { 0070 $this->setMessageClass($options[self::MESSAGE_CLASS]); 0071 } 0072 0073 if (isset($options[self::MESSAGESET_CLASS])) { 0074 $this->setMessageSetClass($options[self::MESSAGESET_CLASS]); 0075 } 0076 0077 // Build Zend_Service_WindowsAzure_Storage_Blob instance 0078 if (!isset($options[self::ADAPTER])) { 0079 throw new Zend_Cloud_QueueService_Exception('No Zend_Queue adapter provided'); 0080 } else { 0081 $adapter = $options[self::ADAPTER]; 0082 unset($options[self::ADAPTER]); 0083 } 0084 try { 0085 $this->_queue = new Zend_Queue($adapter, $options); 0086 } catch (Zend_Queue_Exception $e) { 0087 throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e); 0088 } 0089 } 0090 0091 /** 0092 * Create a queue. Returns the ID of the created queue (typically the URL). 0093 * It may take some time to create the queue. Check your vendor's 0094 * documentation for details. 0095 * 0096 * @param string $name 0097 * @param array $options 0098 * @return string Queue ID (typically URL) 0099 */ 0100 public function createQueue($name, $options = null) 0101 { 0102 try { 0103 $this->_queues[$name] = $this->_queue->createQueue($name, isset($options[Zend_Queue::TIMEOUT])?$options[Zend_Queue::TIMEOUT]:null); 0104 return $name; 0105 } catch (Zend_Queue_Exception $e) { 0106 throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e); 0107 } 0108 } 0109 0110 /** 0111 * Delete a queue. All messages in the queue will also be deleted. 0112 * 0113 * @param string $queueId 0114 * @param array $options 0115 * @return boolean true if successful, false otherwise 0116 */ 0117 public function deleteQueue($queueId, $options = null) 0118 { 0119 if (!isset($this->_queues[$queueId])) { 0120 return false; 0121 } 0122 try { 0123 if ($this->_queues[$queueId]->deleteQueue()) { 0124 unset($this->_queues[$queueId]); 0125 return true; 0126 } 0127 } catch (Zend_Queue_Exception $e) { 0128 throw new Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e); 0129 } 0130 } 0131 0132 /** 0133 * List all queues. 0134 * 0135 * @param array $options 0136 * @return array Queue IDs 0137 */ 0138 public function listQueues($options = null) 0139 { 0140 try { 0141 return $this->_queue->getQueues(); 0142 } catch (Zend_Queue_Exception $e) { 0143 throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e); 0144 } 0145 } 0146 0147 /** 0148 * Get a key/value array of metadata for the given queue. 0149 * 0150 * @param string $queueId 0151 * @param array $options 0152 * @return array 0153 */ 0154 public function fetchQueueMetadata($queueId, $options = null) 0155 { 0156 if (!isset($this->_queues[$queueId])) { 0157 return false; 0158 } 0159 try { 0160 return $this->_queues[$queueId]->getOptions(); 0161 } catch (Zend_Queue_Exception $e) { 0162 throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e); 0163 } 0164 } 0165 0166 /** 0167 * Store a key/value array of metadata for the specified queue. 0168 * WARNING: This operation overwrites any metadata that is located at 0169 * $destinationPath. Some adapters may not support this method. 0170 * 0171 * @param string $queueId 0172 * @param array $metadata 0173 * @param array $options 0174 * @return void 0175 */ 0176 public function storeQueueMetadata($queueId, $metadata, $options = null) 0177 { 0178 if (!isset($this->_queues[$queueId])) { 0179 throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId"); 0180 } 0181 try { 0182 return $this->_queues[$queueId]->setOptions($metadata); 0183 } catch (Zend_Queue_Exception $e) { 0184 throw new Zend_Cloud_QueueService_Exception('Error on setting queue metadata: '.$e->getMessage(), $e->getCode(), $e); 0185 } 0186 } 0187 0188 /** 0189 * Send a message to the specified queue. 0190 * 0191 * @param string $queueId 0192 * @param string $message 0193 * @param array $options 0194 * @return string Message ID 0195 */ 0196 public function sendMessage($queueId, $message, $options = null) 0197 { 0198 if (!isset($this->_queues[$queueId])) { 0199 throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId"); 0200 } 0201 try { 0202 return $this->_queues[$queueId]->send($message); 0203 } catch (Zend_Queue_Exception $e) { 0204 throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e); 0205 } 0206 } 0207 0208 /** 0209 * Recieve at most $max messages from the specified queue and return the 0210 * message IDs for messages recieved. 0211 * 0212 * @param string $queueId 0213 * @param int $max 0214 * @param array $options 0215 * @return array 0216 */ 0217 public function receiveMessages($queueId, $max = 1, $options = null) 0218 { 0219 if (!isset($this->_queues[$queueId])) { 0220 throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId"); 0221 } 0222 try { 0223 $res = $this->_queues[$queueId]->receive($max, isset($options[Zend_Queue::TIMEOUT])?$options[Zend_Queue::TIMEOUT]:null); 0224 if ($res instanceof Iterator) { 0225 return $this->_makeMessages($res); 0226 } else { 0227 return $this->_makeMessages(array($res)); 0228 } 0229 } catch (Zend_Queue_Exception $e) { 0230 throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e); 0231 } 0232 } 0233 0234 /** 0235 * Create Zend_Cloud_QueueService_Message array for 0236 * Azure messages. 0237 * 0238 * @param array $messages 0239 * @return Zend_Cloud_QueueService_Message[] 0240 */ 0241 protected function _makeMessages($messages) 0242 { 0243 $messageClass = $this->getMessageClass(); 0244 $setClass = $this->getMessageSetClass(); 0245 $result = array(); 0246 foreach ($messages as $message) { 0247 $result[] = new $messageClass($message->body, $message); 0248 } 0249 return new $setClass($result); 0250 } 0251 0252 /** 0253 * Delete the specified message from the specified queue. 0254 * 0255 * @param string $queueId 0256 * @param Zend_Cloud_QueueService_Message $message Message ID or message 0257 * @param array $options 0258 * @return void 0259 */ 0260 public function deleteMessage($queueId, $message, $options = null) 0261 { 0262 if (!isset($this->_queues[$queueId])) { 0263 throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId"); 0264 } 0265 try { 0266 if ($message instanceof Zend_Cloud_QueueService_Message) { 0267 $message = $message->getMessage(); 0268 } 0269 if (!($message instanceof Zend_Queue_Message)) { 0270 throw new Zend_Cloud_QueueService_Exception('Cannot delete the message: Zend_Queue_Message object required'); 0271 } 0272 0273 return $this->_queues[$queueId]->deleteMessage($message); 0274 } catch (Zend_Queue_Exception $e) { 0275 throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e); 0276 } 0277 } 0278 0279 /** 0280 * Peek at the messages from the specified queue without removing them. 0281 * 0282 * @param string $queueId 0283 * @param int $num How many messages 0284 * @param array $options 0285 * @return Zend_Cloud_QueueService_Message[] 0286 */ 0287 public function peekMessages($queueId, $num = 1, $options = null) 0288 { 0289 // require_once 'Zend/Cloud/OperationNotAvailableException.php'; 0290 throw new Zend_Cloud_OperationNotAvailableException('ZendQueue doesn\'t currently support message peeking'); 0291 } 0292 0293 /** 0294 * Get Azure implementation 0295 * @return Zend_Queue 0296 */ 0297 public function getClient() 0298 { 0299 return $this->_queue; 0300 } 0301 }