File indexing completed on 2024-12-29 05:27:29
0001 <?php 0002 /** 0003 * LICENSE 0004 * 0005 * This source file is subject to the new BSD license that is bundled 0006 * with this package in the file LICENSE.txt. 0007 * It is also available through the world-wide-web at this URL: 0008 * http://framework.zend.com/license/new-bsd 0009 * If you did not receive a copy of the license and are unable to 0010 * obtain it through the world-wide-web, please send an email 0011 * to license@zend.com so we can send you a copy immediately. 0012 * 0013 * @category Zend 0014 * @package Zend_Cloud 0015 * @subpackage QueueService 0016 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0017 * @license http://framework.zend.com/license/new-bsd New BSD License 0018 */ 0019 0020 // require_once 'Zend/Service/Amazon/Sqs.php'; 0021 // require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php'; 0022 // require_once 'Zend/Cloud/QueueService/Exception.php'; 0023 // require_once 'Zend/Cloud/QueueService/Message.php'; 0024 0025 /** 0026 * SQS adapter for simple queue service. 0027 * 0028 * @category Zend 0029 * @package Zend_Cloud 0030 * @subpackage QueueService 0031 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0032 * @license http://framework.zend.com/license/new-bsd New BSD License 0033 */ 0034 class Zend_Cloud_QueueService_Adapter_Sqs 0035 extends Zend_Cloud_QueueService_Adapter_AbstractAdapter 0036 { 0037 /* 0038 * Options array keys for the SQS adapter. 0039 */ 0040 const AWS_ACCESS_KEY = 'aws_accesskey'; 0041 const AWS_SECRET_KEY = 'aws_secretkey'; 0042 0043 /** 0044 * Defaults 0045 */ 0046 const CREATE_TIMEOUT = 30; 0047 0048 /** 0049 * SQS service instance. 0050 * @var Zend_Service_Amazon_Sqs 0051 */ 0052 protected $_sqs; 0053 0054 /** 0055 * Constructor 0056 * 0057 * @param array|Zend_Config $options 0058 * @return void 0059 */ 0060 public function __construct($options = array()) 0061 { 0062 if ($options instanceof Zend_Config) { 0063 $options = $options->toArray(); 0064 } 0065 0066 if (!is_array($options)) { 0067 throw new Zend_Cloud_QueueService_Exception('Invalid options provided'); 0068 } 0069 0070 if (isset($options[self::MESSAGE_CLASS])) { 0071 $this->setMessageClass($options[self::MESSAGE_CLASS]); 0072 } 0073 0074 if (isset($options[self::MESSAGESET_CLASS])) { 0075 $this->setMessageSetClass($options[self::MESSAGESET_CLASS]); 0076 } 0077 0078 try { 0079 $this->_sqs = new Zend_Service_Amazon_Sqs( 0080 $options[self::AWS_ACCESS_KEY], $options[self::AWS_SECRET_KEY] 0081 ); 0082 } catch(Zend_Service_Amazon_Exception $e) { 0083 throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e); 0084 } 0085 0086 if(isset($options[self::HTTP_ADAPTER])) { 0087 $this->_sqs->getHttpClient()->setAdapter($options[self::HTTP_ADAPTER]); 0088 } 0089 } 0090 0091 /** 0092 * Create a queue. Returns the ID of the created queue (typically the URL). 0093 * It may take some time to create the queue. Check your vendor's 0094 * documentation for details. 0095 * 0096 * @param string $name 0097 * @param array $options 0098 * @return string Queue ID (typically URL) 0099 */ 0100 public function createQueue($name, $options = null) 0101 { 0102 try { 0103 return $this->_sqs->create($name, $options[self::CREATE_TIMEOUT]); 0104 } catch(Zend_Service_Amazon_Exception $e) { 0105 throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e); 0106 } 0107 } 0108 0109 /** 0110 * Delete a queue. All messages in the queue will also be deleted. 0111 * 0112 * @param string $queueId 0113 * @param array $options 0114 * @return boolean true if successful, false otherwise 0115 */ 0116 public function deleteQueue($queueId, $options = null) 0117 { 0118 try { 0119 return $this->_sqs->delete($queueId); 0120 } catch(Zend_Service_Amazon_Exception $e) { 0121 throw new Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e); 0122 } 0123 } 0124 0125 /** 0126 * List all queues. 0127 * 0128 * @param array $options 0129 * @return array Queue IDs 0130 */ 0131 public function listQueues($options = null) 0132 { 0133 try { 0134 return $this->_sqs->getQueues(); 0135 } catch(Zend_Service_Amazon_Exception $e) { 0136 throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e); 0137 } 0138 } 0139 0140 /** 0141 * Get a key/value array of metadata for the given queue. 0142 * 0143 * @param string $queueId 0144 * @param array $options 0145 * @return array 0146 */ 0147 public function fetchQueueMetadata($queueId, $options = null) 0148 { 0149 try { 0150 // TODO: ZF-9050 Fix the SQS client library in trunk to return all attribute values 0151 $attributes = $this->_sqs->getAttribute($queueId, 'All'); 0152 if(is_array($attributes)) { 0153 return $attributes; 0154 } else { 0155 return array('All' => $this->_sqs->getAttribute($queueId, 'All')); 0156 } 0157 } catch(Zend_Service_Amazon_Exception $e) { 0158 throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e); 0159 } 0160 } 0161 0162 /** 0163 * Store a key/value array of metadata for the specified queue. 0164 * WARNING: This operation overwrites any metadata that is located at 0165 * $destinationPath. Some adapters may not support this method. 0166 * 0167 * @param array $metadata 0168 * @param string $queueId 0169 * @param array $options 0170 * @return void 0171 */ 0172 public function storeQueueMetadata($queueId, $metadata, $options = null) 0173 { 0174 // TODO Add support for SetQueueAttributes to client library 0175 // require_once 'Zend/Cloud/OperationNotAvailableException.php'; 0176 throw new Zend_Cloud_OperationNotAvailableException('Amazon SQS doesn\'t currently support storing metadata'); 0177 } 0178 0179 /** 0180 * Send a message to the specified queue. 0181 * 0182 * @param string $message 0183 * @param string $queueId 0184 * @param array $options 0185 * @return string Message ID 0186 */ 0187 public function sendMessage($queueId, $message, $options = null) 0188 { 0189 try { 0190 return $this->_sqs->send($queueId, $message); 0191 } catch(Zend_Service_Amazon_Exception $e) { 0192 throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e); 0193 } 0194 } 0195 0196 /** 0197 * Recieve at most $max messages from the specified queue and return the 0198 * message IDs for messages recieved. 0199 * 0200 * @param string $queueId 0201 * @param int $max 0202 * @param array $options 0203 * @return array 0204 */ 0205 public function receiveMessages($queueId, $max = 1, $options = null) 0206 { 0207 try { 0208 return $this->_makeMessages($this->_sqs->receive($queueId, $max, $options[self::VISIBILITY_TIMEOUT])); 0209 } catch(Zend_Service_Amazon_Exception $e) { 0210 throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e); 0211 } 0212 } 0213 0214 /** 0215 * Create Zend_Cloud_QueueService_Message array for 0216 * Sqs messages. 0217 * 0218 * @param array $messages 0219 * @return Zend_Cloud_QueueService_Message[] 0220 */ 0221 protected function _makeMessages($messages) 0222 { 0223 $messageClass = $this->getMessageClass(); 0224 $setClass = $this->getMessageSetClass(); 0225 $result = array(); 0226 foreach($messages as $message) { 0227 $result[] = new $messageClass($message['body'], $message); 0228 } 0229 return new $setClass($result); 0230 } 0231 0232 /** 0233 * Delete the specified message from the specified queue. 0234 * 0235 * @param string $queueId 0236 * @param Zend_Cloud_QueueService_Message $message 0237 * @param array $options 0238 * @return void 0239 */ 0240 public function deleteMessage($queueId, $message, $options = null) 0241 { 0242 try { 0243 if($message instanceof Zend_Cloud_QueueService_Message) { 0244 $message = $message->getMessage(); 0245 } 0246 $messageId = $message['handle']; 0247 return $this->_sqs->deleteMessage($queueId, $messageId); 0248 } catch(Zend_Service_Amazon_Exception $e) { 0249 throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e); 0250 } 0251 } 0252 0253 /** 0254 * Peek at the messages from the specified queue without removing them. 0255 * 0256 * @param string $queueId 0257 * @param int $num How many messages 0258 * @param array $options 0259 * @return Zend_Cloud_QueueService_Message[] 0260 */ 0261 public function peekMessages($queueId, $num = 1, $options = null) 0262 { 0263 try { 0264 return $this->_makeMessages($this->_sqs->receive($queueId, $num, 0)); 0265 } catch(Zend_Service_Amazon_Exception $e) { 0266 throw new Zend_Cloud_QueueService_Exception('Error on peeking messages: '.$e->getMessage(), $e->getCode(), $e); 0267 } 0268 } 0269 0270 /** 0271 * Get SQS implementation 0272 * @return Zend_Service_Amazon_Sqs 0273 */ 0274 public function getClient() 0275 { 0276 return $this->_sqs; 0277 } 0278 }