File indexing completed on 2023-09-24 04:04:37
0001 /* -*- C++ -*- 0002 * Copyright (C) 2003-2005 Thiago Macieira <thiago@kde.org> 0003 * 0004 * 0005 * Permission is hereby granted, free of charge, to any person obtaining 0006 * a copy of this software and associated documentation files (the 0007 * "Software"), to deal in the Software without restriction, including 0008 * without limitation the rights to use, copy, modify, merge, publish, 0009 * distribute, sublicense, and/or sell copies of the Software, and to 0010 * permit persons to whom the Software is furnished to do so, subject to 0011 * the following conditions: 0012 * 0013 * The above copyright notice and this permission notice shall be included 0014 * in all copies or substantial portions of the Software. 0015 * 0016 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 0017 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 0018 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 0019 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 0020 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 0021 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 0022 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 0023 */ 0024 0025 #include "k3bufferedsocket.h" 0026 0027 #include <config-network.h> 0028 0029 #include <QMutex> 0030 #include <QTimer> 0031 0032 #include "k3socketdevice.h" 0033 #include "k3socketaddress.h" 0034 #include "k3socketbuffer_p.h" 0035 0036 using namespace KNetwork; 0037 using namespace KNetwork::Internal; 0038 0039 class KNetwork::KBufferedSocketPrivate 0040 { 0041 public: 0042 mutable KSocketBuffer *input, *output; 0043 0044 KBufferedSocketPrivate() 0045 { 0046 input = nullptr; 0047 output = nullptr; 0048 } 0049 }; 0050 0051 KBufferedSocket::KBufferedSocket(const QString &host, const QString &service, 0052 QObject *parent) 0053 : KStreamSocket(host, service, parent), 0054 d(new KBufferedSocketPrivate) 0055 { 0056 setInputBuffering(true); 0057 setOutputBuffering(true); 0058 } 0059 0060 KBufferedSocket::~KBufferedSocket() 0061 { 0062 closeNow(); 0063 delete d->input; 0064 delete d->output; 0065 delete d; 0066 } 0067 0068 void KBufferedSocket::setSocketDevice(KSocketDevice *device) 0069 { 0070 KStreamSocket::setSocketDevice(device); 0071 device->setBlocking(false); 0072 KActiveSocketBase::open(openMode() & ~Unbuffered); 0073 } 0074 0075 bool KBufferedSocket::setSocketOptions(int opts) 0076 { 0077 if (opts == Blocking) { 0078 return false; 0079 } 0080 0081 opts &= ~Blocking; 0082 return KStreamSocket::setSocketOptions(opts); 0083 } 0084 0085 void KBufferedSocket::close() 0086 { 0087 if (!d->output || d->output->isEmpty()) { 0088 closeNow(); 0089 } else { 0090 setState(Closing); 0091 QSocketNotifier *n = socketDevice()->readNotifier(); 0092 if (n) { 0093 n->setEnabled(false); 0094 } 0095 emit stateChanged(Closing); 0096 } 0097 } 0098 0099 qint64 KBufferedSocket::bytesAvailable() const 0100 { 0101 if (!d->input) { 0102 return KStreamSocket::bytesAvailable(); 0103 } 0104 0105 return d->input->length(); 0106 } 0107 0108 qint64 KBufferedSocket::waitForMore(int msecs, bool *timeout) 0109 { 0110 qint64 retval = KStreamSocket::waitForMore(msecs, timeout); 0111 if (d->input) { 0112 resetError(); 0113 slotReadActivity(); 0114 return bytesAvailable(); 0115 } 0116 return retval; 0117 } 0118 0119 qint64 KBufferedSocket::readData(char *data, qint64 maxlen, KSocketAddress *from) 0120 { 0121 if (from) { 0122 *from = peerAddress(); 0123 } 0124 if (d->input) { 0125 if (d->input->isEmpty()) { 0126 return 0; 0127 } 0128 resetError(); 0129 return d->input->consumeBuffer(data, maxlen); 0130 } 0131 return KStreamSocket::readData(data, maxlen, nullptr); 0132 } 0133 0134 qint64 KBufferedSocket::peekData(char *data, qint64 maxlen, KSocketAddress *from) 0135 { 0136 if (from) { 0137 *from = peerAddress(); 0138 } 0139 if (d->input) { 0140 if (d->input->isEmpty()) { 0141 return 0; 0142 } 0143 resetError(); 0144 return d->input->consumeBuffer(data, maxlen, false); 0145 } 0146 return KStreamSocket::peekData(data, maxlen, nullptr); 0147 } 0148 0149 qint64 KBufferedSocket::writeData(const char *data, qint64 len, 0150 const KSocketAddress *) 0151 { 0152 // ignore the third parameter 0153 if (state() != Connected) { 0154 // cannot write now! 0155 setError(NotConnected); 0156 return -1; 0157 } 0158 0159 if (d->output) { 0160 if (d->output->isFull()) { 0161 setError(WouldBlock); 0162 emit gotError(WouldBlock); 0163 return -1; 0164 } 0165 resetError(); 0166 0167 // enable notifier to send data 0168 QSocketNotifier *n = socketDevice()->writeNotifier(); 0169 if (n) { 0170 n->setEnabled(true); 0171 } 0172 0173 return d->output->feedBuffer(data, len); 0174 } 0175 0176 return KStreamSocket::writeData(data, len, nullptr); 0177 } 0178 0179 void KBufferedSocket::enableRead(bool enable) 0180 { 0181 KStreamSocket::enableRead(enable); 0182 if (!enable && d->input) { 0183 // reenable it 0184 QSocketNotifier *n = socketDevice()->readNotifier(); 0185 if (n) { 0186 n->setEnabled(true); 0187 } 0188 } 0189 0190 if (enable && state() != Connected && d->input && !d->input->isEmpty()) 0191 // this means the buffer is still dirty 0192 // allow the signal to be emitted 0193 { 0194 QTimer::singleShot(0, this, SLOT(slotReadActivity())); 0195 } 0196 } 0197 0198 void KBufferedSocket::enableWrite(bool enable) 0199 { 0200 KStreamSocket::enableWrite(enable); 0201 if (!enable && d->output && !d->output->isEmpty()) { 0202 // reenable it 0203 QSocketNotifier *n = socketDevice()->writeNotifier(); 0204 if (n) { 0205 n->setEnabled(true); 0206 } 0207 } 0208 } 0209 0210 void KBufferedSocket::stateChanging(SocketState newState) 0211 { 0212 if (newState == Connecting || newState == Connected) { 0213 // we're going to connect 0214 // make sure the buffers are clean 0215 if (d->input) { 0216 d->input->clear(); 0217 } 0218 if (d->output) { 0219 d->output->clear(); 0220 } 0221 0222 // also, turn on notifiers 0223 enableRead(emitsReadyRead()); 0224 enableWrite(emitsReadyWrite()); 0225 } 0226 KStreamSocket::stateChanging(newState); 0227 } 0228 0229 void KBufferedSocket::setInputBuffering(bool enable) 0230 { 0231 QMutexLocker locker(mutex()); 0232 if (!enable) { 0233 delete d->input; 0234 d->input = nullptr; 0235 } else if (d->input == nullptr) { 0236 d->input = new KSocketBuffer; 0237 } 0238 } 0239 0240 void KBufferedSocket::setOutputBuffering(bool enable) 0241 { 0242 QMutexLocker locker(mutex()); 0243 if (!enable) { 0244 delete d->output; 0245 d->output = nullptr; 0246 } else if (d->output == nullptr) { 0247 d->output = new KSocketBuffer; 0248 } 0249 } 0250 0251 qint64 KBufferedSocket::bytesToWrite() const 0252 { 0253 if (!d->output) { 0254 return 0; 0255 } 0256 0257 return d->output->length(); 0258 } 0259 0260 void KBufferedSocket::closeNow() 0261 { 0262 KStreamSocket::close(); 0263 if (d->output) { 0264 d->output->clear(); 0265 } 0266 } 0267 0268 bool KBufferedSocket::canReadLine() const 0269 { 0270 if (!d->input) { 0271 return false; 0272 } 0273 0274 return d->input->canReadLine(); 0275 } 0276 0277 qint64 KBufferedSocket::readLineData(char *data, qint64 maxSize) 0278 { 0279 return d->input->readLine(data, maxSize); 0280 } 0281 0282 void KBufferedSocket::waitForConnect() 0283 { 0284 if (state() != Connecting) { 0285 return; // nothing to be waited on 0286 } 0287 0288 KStreamSocket::setSocketOptions(socketOptions() | Blocking); 0289 connectionEvent(); 0290 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking); 0291 } 0292 0293 void KBufferedSocket::slotReadActivity() 0294 { 0295 if (d->input && state() == Connected) { 0296 mutex()->lock(); 0297 qint64 len = d->input->receiveFrom(socketDevice()); 0298 0299 if (len == -1) { 0300 if (socketDevice()->error() != WouldBlock) { 0301 // nope, another error! 0302 copyError(); 0303 mutex()->unlock(); 0304 emit gotError(error()); 0305 closeNow(); // emits closed 0306 return; 0307 } 0308 } else if (len == 0) { 0309 // remotely closed 0310 setError(RemotelyDisconnected); 0311 mutex()->unlock(); 0312 emit gotError(error()); 0313 closeNow(); // emits closed 0314 return; 0315 } 0316 0317 // no error 0318 mutex()->unlock(); 0319 } 0320 0321 if (state() == Connected) { 0322 KStreamSocket::slotReadActivity(); // this emits readyRead 0323 } else if (emitsReadyRead()) { // state() != Connected 0324 if (d->input && !d->input->isEmpty()) { 0325 // buffer isn't empty 0326 // keep emitting signals till it is 0327 QTimer::singleShot(0, this, SLOT(slotReadActivity())); 0328 emit readyRead(); 0329 } 0330 } 0331 } 0332 0333 void KBufferedSocket::slotWriteActivity() 0334 { 0335 if (d->output && !d->output->isEmpty() && 0336 (state() == Connected || state() == Closing)) { 0337 mutex()->lock(); 0338 qint64 len = d->output->sendTo(socketDevice()); 0339 0340 if (len == -1) { 0341 if (socketDevice()->error() != WouldBlock) { 0342 // nope, another error! 0343 copyError(); 0344 mutex()->unlock(); 0345 emit gotError(error()); 0346 closeNow(); 0347 return; 0348 } 0349 } else if (len == 0) { 0350 // remotely closed 0351 setError(RemotelyDisconnected); 0352 mutex()->unlock(); 0353 emit gotError(error()); 0354 closeNow(); 0355 return; 0356 } 0357 0358 if (d->output->isEmpty()) 0359 // deactivate the notifier until we have something to send 0360 // writeNotifier can't return NULL here 0361 { 0362 socketDevice()->writeNotifier()->setEnabled(false); 0363 } 0364 0365 mutex()->unlock(); 0366 emit bytesWritten(len); 0367 } 0368 0369 if (state() != Closing) { 0370 KStreamSocket::slotWriteActivity(); 0371 } else if (d->output && d->output->isEmpty() && state() == Closing) { 0372 KStreamSocket::close(); // finished sending data 0373 } 0374 } 0375 0376 #include "moc_k3bufferedsocket.cpp"