File indexing completed on 2024-12-29 05:27:29

0001 <?php
0002 /**
0003  * LICENSE
0004  *
0005  * This source file is subject to the new BSD license that is bundled
0006  * with this package in the file LICENSE.txt.
0007  * It is also available through the world-wide-web at this URL:
0008  * http://framework.zend.com/license/new-bsd
0009  * If you did not receive a copy of the license and are unable to
0010  * obtain it through the world-wide-web, please send an email
0011  * to license@zend.com so we can send you a copy immediately.
0012  *
0013  * @category   Zend
0014  * @package    Zend_Cloud
0015  * @subpackage QueueService
0016  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0017  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0018  */
0019 
0020 // require_once 'Zend/Service/Amazon/Sqs.php';
0021 // require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php';
0022 // require_once 'Zend/Cloud/QueueService/Exception.php';
0023 // require_once 'Zend/Cloud/QueueService/Message.php';
0024 
0025 /**
0026  * SQS adapter for simple queue service.
0027  *
0028  * @category   Zend
0029  * @package    Zend_Cloud
0030  * @subpackage QueueService
0031  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0032  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0033  */
0034 class Zend_Cloud_QueueService_Adapter_Sqs
0035     extends Zend_Cloud_QueueService_Adapter_AbstractAdapter
0036 {
0037     /*
0038      * Options array keys for the SQS adapter.
0039      */
0040     const AWS_ACCESS_KEY = 'aws_accesskey';
0041     const AWS_SECRET_KEY = 'aws_secretkey';
0042 
0043     /**
0044      * Defaults
0045      */
0046     const CREATE_TIMEOUT = 30;
0047 
0048     /**
0049      * SQS service instance.
0050      * @var Zend_Service_Amazon_Sqs
0051      */
0052     protected $_sqs;
0053 
0054     /**
0055      * Constructor
0056      *
0057      * @param  array|Zend_Config $options
0058      * @return void
0059      */
0060     public function __construct($options = array())
0061     {
0062         if ($options instanceof Zend_Config) {
0063             $options = $options->toArray();
0064         }
0065 
0066         if (!is_array($options)) {
0067             throw new Zend_Cloud_QueueService_Exception('Invalid options provided');
0068         }
0069 
0070         if (isset($options[self::MESSAGE_CLASS])) {
0071             $this->setMessageClass($options[self::MESSAGE_CLASS]);
0072         }
0073 
0074         if (isset($options[self::MESSAGESET_CLASS])) {
0075             $this->setMessageSetClass($options[self::MESSAGESET_CLASS]);
0076         }
0077 
0078         try {
0079             $this->_sqs = new Zend_Service_Amazon_Sqs(
0080                 $options[self::AWS_ACCESS_KEY], $options[self::AWS_SECRET_KEY]
0081             );
0082         } catch(Zend_Service_Amazon_Exception $e) {
0083             throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e);
0084         }
0085 
0086         if(isset($options[self::HTTP_ADAPTER])) {
0087             $this->_sqs->getHttpClient()->setAdapter($options[self::HTTP_ADAPTER]);
0088         }
0089     }
0090 
0091      /**
0092      * Create a queue. Returns the ID of the created queue (typically the URL).
0093      * It may take some time to create the queue. Check your vendor's
0094      * documentation for details.
0095      *
0096      * @param  string $name
0097      * @param  array  $options
0098      * @return string Queue ID (typically URL)
0099      */
0100     public function createQueue($name, $options = null)
0101     {
0102         try {
0103             return $this->_sqs->create($name, $options[self::CREATE_TIMEOUT]);
0104         } catch(Zend_Service_Amazon_Exception $e) {
0105             throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e);
0106         }
0107     }
0108 
0109     /**
0110      * Delete a queue. All messages in the queue will also be deleted.
0111      *
0112      * @param  string $queueId
0113      * @param  array  $options
0114      * @return boolean true if successful, false otherwise
0115      */
0116     public function deleteQueue($queueId, $options = null)
0117 {
0118         try {
0119             return $this->_sqs->delete($queueId);
0120         } catch(Zend_Service_Amazon_Exception $e) {
0121             throw new Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e);
0122         }
0123     }
0124 
0125     /**
0126      * List all queues.
0127      *
0128      * @param  array $options
0129      * @return array Queue IDs
0130      */
0131     public function listQueues($options = null)
0132     {
0133         try {
0134             return $this->_sqs->getQueues();
0135         } catch(Zend_Service_Amazon_Exception $e) {
0136             throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e);
0137         }
0138     }
0139 
0140     /**
0141      * Get a key/value array of metadata for the given queue.
0142      *
0143      * @param  string $queueId
0144      * @param  array  $options
0145      * @return array
0146      */
0147     public function fetchQueueMetadata($queueId, $options = null)
0148     {
0149         try {
0150             // TODO: ZF-9050 Fix the SQS client library in trunk to return all attribute values
0151             $attributes = $this->_sqs->getAttribute($queueId, 'All');
0152             if(is_array($attributes)) {
0153                 return $attributes;
0154             } else {
0155                 return array('All' => $this->_sqs->getAttribute($queueId, 'All'));
0156             }
0157         } catch(Zend_Service_Amazon_Exception $e) {
0158             throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e);
0159         }
0160     }
0161 
0162     /**
0163      * Store a key/value array of metadata for the specified queue.
0164      * WARNING: This operation overwrites any metadata that is located at
0165      * $destinationPath. Some adapters may not support this method.
0166      *
0167      * @param  array  $metadata
0168      * @param  string $queueId
0169      * @param  array  $options
0170      * @return void
0171      */
0172     public function storeQueueMetadata($queueId, $metadata, $options = null)
0173     {
0174         // TODO Add support for SetQueueAttributes to client library
0175         // require_once 'Zend/Cloud/OperationNotAvailableException.php';
0176         throw new Zend_Cloud_OperationNotAvailableException('Amazon SQS doesn\'t currently support storing metadata');
0177     }
0178 
0179     /**
0180      * Send a message to the specified queue.
0181      *
0182      * @param  string $message
0183      * @param  string $queueId
0184      * @param  array  $options
0185      * @return string Message ID
0186      */
0187     public function sendMessage($queueId, $message, $options = null)
0188     {
0189         try {
0190             return $this->_sqs->send($queueId, $message);
0191         } catch(Zend_Service_Amazon_Exception $e) {
0192             throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e);
0193         }
0194     }
0195 
0196     /**
0197      * Recieve at most $max messages from the specified queue and return the
0198      * message IDs for messages recieved.
0199      *
0200      * @param  string $queueId
0201      * @param  int    $max
0202      * @param  array  $options
0203      * @return array
0204      */
0205     public function receiveMessages($queueId, $max = 1, $options = null)
0206     {
0207         try {
0208             return $this->_makeMessages($this->_sqs->receive($queueId, $max, $options[self::VISIBILITY_TIMEOUT]));
0209         } catch(Zend_Service_Amazon_Exception $e) {
0210             throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e);
0211         }
0212     }
0213 
0214     /**
0215      * Create Zend_Cloud_QueueService_Message array for
0216      * Sqs messages.
0217      *
0218      * @param array $messages
0219      * @return Zend_Cloud_QueueService_Message[]
0220      */
0221     protected function _makeMessages($messages)
0222     {
0223         $messageClass = $this->getMessageClass();
0224         $setClass     = $this->getMessageSetClass();
0225         $result = array();
0226         foreach($messages as $message) {
0227             $result[] = new $messageClass($message['body'], $message);
0228         }
0229         return new $setClass($result);
0230     }
0231 
0232     /**
0233      * Delete the specified message from the specified queue.
0234      *
0235      * @param  string $queueId
0236      * @param  Zend_Cloud_QueueService_Message $message
0237      * @param  array  $options
0238      * @return void
0239      */
0240     public function deleteMessage($queueId, $message, $options = null)
0241     {
0242         try {
0243             if($message instanceof Zend_Cloud_QueueService_Message) {
0244                 $message = $message->getMessage();
0245             }
0246             $messageId = $message['handle'];
0247             return $this->_sqs->deleteMessage($queueId, $messageId);
0248         } catch(Zend_Service_Amazon_Exception $e) {
0249             throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e);
0250         }
0251     }
0252 
0253     /**
0254      * Peek at the messages from the specified queue without removing them.
0255      *
0256      * @param  string $queueId
0257      * @param  int $num How many messages
0258      * @param  array  $options
0259      * @return Zend_Cloud_QueueService_Message[]
0260      */
0261     public function peekMessages($queueId, $num = 1, $options = null)
0262     {
0263         try {
0264             return $this->_makeMessages($this->_sqs->receive($queueId, $num, 0));
0265         } catch(Zend_Service_Amazon_Exception $e) {
0266             throw new Zend_Cloud_QueueService_Exception('Error on peeking messages: '.$e->getMessage(), $e->getCode(), $e);
0267         }
0268     }
0269 
0270     /**
0271      * Get SQS implementation
0272      * @return Zend_Service_Amazon_Sqs
0273      */
0274     public function getClient()
0275     {
0276         return $this->_sqs;
0277     }
0278 }