File indexing completed on 2025-01-19 05:21:24
0001 <?php 0002 /** 0003 * Zend Framework 0004 * 0005 * LICENSE 0006 * 0007 * This source file is subject to the new BSD license that is bundled 0008 * with this package in the file LICENSE.txt. 0009 * It is also available through the world-wide-web at this URL: 0010 * http://framework.zend.com/license/new-bsd 0011 * If you did not receive a copy of the license and are unable to 0012 * obtain it through the world-wide-web, please send an email 0013 * to license@zend.com so we can send you a copy immediately. 0014 * 0015 * @category Zend 0016 * @package Zend_Queue 0017 * @subpackage Stomp 0018 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0019 * @license http://framework.zend.com/license/new-bsd New BSD License 0020 * @version $Id$ 0021 */ 0022 0023 /** 0024 * @see Zend_Queue_Stomp_Client_ConnectionInterface 0025 */ 0026 // require_once 'Zend/Queue/Stomp/Client/ConnectionInterface.php'; 0027 0028 /** 0029 * The Stomp client interacts with a Stomp server. 0030 * 0031 * @category Zend 0032 * @package Zend_Queue 0033 * @subpackage Stomp 0034 * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com) 0035 * @license http://framework.zend.com/license/new-bsd New BSD License 0036 */ 0037 class Zend_Queue_Stomp_Client_Connection 0038 implements Zend_Queue_Stomp_Client_ConnectionInterface 0039 { 0040 const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds 0041 const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds 0042 0043 /** 0044 * Connection options 0045 * @var array 0046 */ 0047 protected $_options; 0048 0049 /** 0050 * tcp/udp socket 0051 * 0052 * @var resource 0053 */ 0054 protected $_socket = false; 0055 0056 /** 0057 * open() opens a socket to the Stomp server 0058 * 0059 * @param array $options ('scheme', 'host', 'port') 0060 * @param string $scheme 0061 * @param string $host 0062 * @param int $port 0063 * @param array $options Accepts "timeout_sec" and "timeout_usec" keys 0064 * @return true; 0065 * @throws Zend_Queue_Exception 0066 */ 0067 public function open($scheme, $host, $port, array $options = array()) 0068 { 0069 $str = $scheme . '://' . $host; 0070 $this->_socket = fsockopen($str, $port, $errno, $errstr); 0071 0072 if ($this->_socket === false) { 0073 // aparently there is some reason that fsockopen will return false 0074 // but it normally throws an error. 0075 // require_once 'Zend/Queue/Exception.php'; 0076 throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )"); 0077 } 0078 0079 stream_set_blocking($this->_socket, 0); // non blocking 0080 0081 if (!isset($options['timeout_sec'])) { 0082 $options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC; 0083 } 0084 if (! isset($options['timeout_usec'])) { 0085 $options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC; 0086 } 0087 0088 $this->_options = $options; 0089 0090 return true; 0091 } 0092 0093 /** 0094 * Close the socket explicitly when destructed 0095 * 0096 * @return void 0097 */ 0098 public function __destruct() 0099 { 0100 } 0101 0102 /** 0103 * Close connection 0104 * 0105 * @param boolean $destructor 0106 * @return void 0107 */ 0108 public function close($destructor = false) 0109 { 0110 // Gracefully disconnect 0111 if (!$destructor) { 0112 $frame = $this->createFrame(); 0113 $frame->setCommand('DISCONNECT'); 0114 $this->write($frame); 0115 } 0116 0117 // @todo: Should be fixed. 0118 // When the socket is "closed", it will trigger the below error when php exits 0119 // Fatal error: Exception thrown without a stack frame in Unknown on line 0 0120 0121 // Danlo: I suspect this is because this has already been claimed by the interpeter 0122 // thus trying to shutdown this resources, which is already shutdown is a problem. 0123 if (is_resource($this->_socket)) { 0124 // fclose($this->_socket); 0125 } 0126 0127 // $this->_socket = null; 0128 } 0129 0130 /** 0131 * Check whether we are connected to the server 0132 * 0133 * @return true 0134 * @throws Zend_Queue_Exception 0135 */ 0136 public function ping() 0137 { 0138 if (!is_resource($this->_socket)) { 0139 // require_once 'Zend/Queue/Exception.php'; 0140 throw new Zend_Queue_Exception('Not connected to Stomp server'); 0141 } 0142 return true; 0143 } 0144 0145 /** 0146 * Write a frame to the stomp server 0147 * 0148 * example: $response = $client->write($frame)->read(); 0149 * 0150 * @param Zend_Queue_Stom_FrameInterface $frame 0151 * @return $this 0152 */ 0153 public function write(Zend_Queue_Stomp_FrameInterface $frame) 0154 { 0155 $this->ping(); 0156 $output = $frame->toFrame(); 0157 0158 $bytes = fwrite($this->_socket, $output, strlen($output)); 0159 if ($bytes === false || $bytes == 0) { 0160 // require_once 'Zend/Queue/Exception.php'; 0161 throw new Zend_Queue_Exception('No bytes written'); 0162 } 0163 0164 return $this; 0165 } 0166 0167 /** 0168 * Tests the socket to see if there is data for us 0169 * 0170 * @return boolean 0171 */ 0172 public function canRead() 0173 { 0174 $read = array($this->_socket); 0175 $write = null; 0176 $except = null; 0177 0178 return stream_select( 0179 $read, 0180 $write, 0181 $except, 0182 $this->_options['timeout_sec'], 0183 $this->_options['timeout_usec'] 0184 ) == 1; 0185 // see http://us.php.net/manual/en/function.stream-select.php 0186 } 0187 0188 /** 0189 * Reads in a frame from the socket or returns false. 0190 * 0191 * @return Zend_Queue_Stomp_FrameInterface|false 0192 * @throws Zend_Queue_Exception 0193 */ 0194 public function read() 0195 { 0196 $this->ping(); 0197 0198 $response = ''; 0199 $prev = ''; 0200 0201 // while not end of file. 0202 while (!feof($this->_socket)) { 0203 // read in one character until "\0\n" is found 0204 $data = fread($this->_socket, 1); 0205 0206 // check to make sure that the connection is not lost. 0207 if ($data === false) { 0208 // require_once 'Zend/Queue/Exception.php'; 0209 throw new Zend_Queue_Exception('Connection lost'); 0210 } 0211 0212 // append last character read to $response 0213 $response .= $data; 0214 0215 // is this \0 (prev) \n (data)? END_OF_FRAME 0216 if (ord($data) == 10 && ord($prev) == 0) { 0217 break; 0218 } 0219 $prev = $data; 0220 } 0221 0222 if ($response === '') { 0223 return false; 0224 } 0225 0226 $frame = $this->createFrame(); 0227 $frame->fromFrame($response); 0228 return $frame; 0229 } 0230 0231 /** 0232 * Set the frameClass to be used 0233 * 0234 * This must be a Zend_Queue_Stomp_FrameInterface. 0235 * 0236 * @param string $classname - class is an instance of Zend_Queue_Stomp_FrameInterface 0237 * @return $this; 0238 */ 0239 public function setFrameClass($classname) 0240 { 0241 $this->_options['frameClass'] = $classname; 0242 return $this; 0243 } 0244 0245 /** 0246 * Get the frameClass 0247 * 0248 * @return string 0249 */ 0250 public function getFrameClass() 0251 { 0252 return isset($this->_options['frameClass']) 0253 ? $this->_options['frameClass'] 0254 : 'Zend_Queue_Stomp_Frame'; 0255 } 0256 0257 /** 0258 * Create an empty frame 0259 * 0260 * @return Zend_Queue_Stomp_FrameInterface 0261 */ 0262 public function createFrame() 0263 { 0264 $class = $this->getFrameClass(); 0265 0266 if (!class_exists($class)) { 0267 // require_once 'Zend/Loader.php'; 0268 Zend_Loader::loadClass($class); 0269 } 0270 0271 $frame = new $class(); 0272 0273 if (!$frame instanceof Zend_Queue_Stomp_FrameInterface) { 0274 // require_once 'Zend/Queue/Exception.php'; 0275 throw new Zend_Queue_Exception('Invalid Frame class provided; must implement Zend_Queue_Stomp_FrameInterface'); 0276 } 0277 0278 return $frame; 0279 } 0280 }