File indexing completed on 2025-01-19 05:21:24

0001 <?php
0002 /**
0003  * Zend Framework
0004  *
0005  * LICENSE
0006  *
0007  * This source file is subject to the new BSD license that is bundled
0008  * with this package in the file LICENSE.txt.
0009  * It is also available through the world-wide-web at this URL:
0010  * http://framework.zend.com/license/new-bsd
0011  * If you did not receive a copy of the license and are unable to
0012  * obtain it through the world-wide-web, please send an email
0013  * to license@zend.com so we can send you a copy immediately.
0014  *
0015  * @category   Zend
0016  * @package    Zend_Queue
0017  * @subpackage Stomp
0018  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0019  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0020  * @version    $Id$
0021  */
0022 
0023 /**
0024  * @see Zend_Queue_Stomp_Client_ConnectionInterface
0025  */
0026 // require_once 'Zend/Queue/Stomp/Client/ConnectionInterface.php';
0027 
0028 /**
0029  * The Stomp client interacts with a Stomp server.
0030  *
0031  * @category   Zend
0032  * @package    Zend_Queue
0033  * @subpackage Stomp
0034  * @copyright  Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
0035  * @license    http://framework.zend.com/license/new-bsd     New BSD License
0036  */
0037 class Zend_Queue_Stomp_Client_Connection
0038     implements Zend_Queue_Stomp_Client_ConnectionInterface
0039 {
0040     const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds
0041     const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds
0042 
0043     /**
0044      * Connection options
0045      * @var array
0046      */
0047     protected $_options;
0048 
0049     /**
0050      * tcp/udp socket
0051      *
0052      * @var resource
0053      */
0054     protected $_socket = false;
0055 
0056     /**
0057      * open() opens a socket to the Stomp server
0058      *
0059      * @param  array $options ('scheme', 'host', 'port')
0060      * @param  string $scheme
0061      * @param  string $host
0062      * @param  int $port
0063      * @param  array $options Accepts "timeout_sec" and "timeout_usec" keys
0064      * @return true;
0065      * @throws Zend_Queue_Exception
0066      */
0067     public function open($scheme, $host, $port, array $options = array())
0068     {
0069         $str = $scheme . '://' . $host;
0070         $this->_socket = fsockopen($str, $port, $errno, $errstr);
0071 
0072         if ($this->_socket === false) {
0073             // aparently there is some reason that fsockopen will return false
0074             // but it normally throws an error.
0075             // require_once 'Zend/Queue/Exception.php';
0076             throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )");
0077         }
0078 
0079         stream_set_blocking($this->_socket, 0); // non blocking
0080 
0081         if (!isset($options['timeout_sec'])) {
0082             $options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC;
0083         }
0084         if (! isset($options['timeout_usec'])) {
0085             $options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC;
0086         }
0087 
0088         $this->_options = $options;
0089 
0090         return true;
0091     }
0092 
0093     /**
0094      * Close the socket explicitly when destructed
0095      *
0096      * @return void
0097      */
0098     public function __destruct()
0099     {
0100     }
0101 
0102     /**
0103      * Close connection
0104      *
0105      * @param  boolean $destructor
0106      * @return void
0107      */
0108     public function close($destructor = false)
0109     {
0110         // Gracefully disconnect
0111         if (!$destructor) {
0112             $frame = $this->createFrame();
0113             $frame->setCommand('DISCONNECT');
0114             $this->write($frame);
0115         }
0116 
0117         // @todo: Should be fixed.
0118         // When the socket is "closed", it will trigger the below error when php exits
0119         // Fatal error: Exception thrown without a stack frame in Unknown on line 0
0120 
0121         // Danlo: I suspect this is because this has already been claimed by the interpeter
0122         // thus trying to shutdown this resources, which is already shutdown is a problem.
0123         if (is_resource($this->_socket)) {
0124             // fclose($this->_socket);
0125         }
0126 
0127         // $this->_socket = null;
0128     }
0129 
0130     /**
0131      * Check whether we are connected to the server
0132      *
0133      * @return true
0134      * @throws Zend_Queue_Exception
0135      */
0136     public function ping()
0137     {
0138         if (!is_resource($this->_socket)) {
0139             // require_once 'Zend/Queue/Exception.php';
0140             throw new Zend_Queue_Exception('Not connected to Stomp server');
0141         }
0142         return true;
0143     }
0144 
0145     /**
0146      * Write a frame to the stomp server
0147      *
0148      * example: $response = $client->write($frame)->read();
0149      *
0150      * @param Zend_Queue_Stom_FrameInterface $frame
0151      * @return $this
0152      */
0153     public function write(Zend_Queue_Stomp_FrameInterface $frame)
0154     {
0155         $this->ping();
0156         $output = $frame->toFrame();
0157 
0158         $bytes = fwrite($this->_socket, $output, strlen($output));
0159         if ($bytes === false || $bytes == 0) {
0160             // require_once 'Zend/Queue/Exception.php';
0161             throw new Zend_Queue_Exception('No bytes written');
0162         }
0163 
0164         return $this;
0165     }
0166 
0167     /**
0168      * Tests the socket to see if there is data for us
0169      *
0170      * @return boolean
0171      */
0172     public function canRead()
0173     {
0174         $read   = array($this->_socket);
0175         $write  = null;
0176         $except = null;
0177 
0178         return stream_select(
0179             $read,
0180             $write,
0181             $except,
0182             $this->_options['timeout_sec'],
0183             $this->_options['timeout_usec']
0184         ) == 1;
0185         // see http://us.php.net/manual/en/function.stream-select.php
0186     }
0187 
0188     /**
0189      * Reads in a frame from the socket or returns false.
0190      *
0191      * @return Zend_Queue_Stomp_FrameInterface|false
0192      * @throws Zend_Queue_Exception
0193      */
0194     public function read()
0195     {
0196         $this->ping();
0197 
0198         $response = '';
0199         $prev     = '';
0200 
0201         // while not end of file.
0202         while (!feof($this->_socket)) {
0203             // read in one character until "\0\n" is found
0204             $data = fread($this->_socket, 1);
0205 
0206             // check to make sure that the connection is not lost.
0207             if ($data === false) {
0208                 // require_once 'Zend/Queue/Exception.php';
0209                 throw new Zend_Queue_Exception('Connection lost');
0210             }
0211 
0212             // append last character read to $response
0213             $response .= $data;
0214 
0215             // is this \0 (prev) \n (data)? END_OF_FRAME
0216             if (ord($data) == 10 && ord($prev) == 0) {
0217                 break;
0218             }
0219             $prev = $data;
0220         }
0221 
0222         if ($response === '') {
0223             return false;
0224         }
0225 
0226         $frame = $this->createFrame();
0227         $frame->fromFrame($response);
0228         return $frame;
0229     }
0230 
0231     /**
0232      * Set the frameClass to be used
0233      *
0234      * This must be a Zend_Queue_Stomp_FrameInterface.
0235      *
0236      * @param  string $classname - class is an instance of Zend_Queue_Stomp_FrameInterface
0237      * @return $this;
0238      */
0239     public function setFrameClass($classname)
0240     {
0241         $this->_options['frameClass'] = $classname;
0242         return $this;
0243     }
0244 
0245     /**
0246      * Get the frameClass
0247      *
0248      * @return string
0249      */
0250     public function getFrameClass()
0251     {
0252         return isset($this->_options['frameClass'])
0253             ? $this->_options['frameClass']
0254             : 'Zend_Queue_Stomp_Frame';
0255     }
0256 
0257     /**
0258      * Create an empty frame
0259      *
0260      * @return Zend_Queue_Stomp_FrameInterface
0261      */
0262     public function createFrame()
0263     {
0264         $class = $this->getFrameClass();
0265 
0266         if (!class_exists($class)) {
0267             // require_once 'Zend/Loader.php';
0268             Zend_Loader::loadClass($class);
0269         }
0270 
0271         $frame = new $class();
0272 
0273         if (!$frame instanceof Zend_Queue_Stomp_FrameInterface) {
0274             // require_once 'Zend/Queue/Exception.php';
0275             throw new Zend_Queue_Exception('Invalid Frame class provided; must implement Zend_Queue_Stomp_FrameInterface');
0276         }
0277 
0278         return $frame;
0279     }
0280 }