File indexing completed on 2024-04-14 14:19:48

0001 /*  -*- C++ -*-
0002  *  Copyright (C) 2003-2005 Thiago Macieira <thiago@kde.org>
0003  *
0004  *
0005  *  Permission is hereby granted, free of charge, to any person obtaining
0006  *  a copy of this software and associated documentation files (the
0007  *  "Software"), to deal in the Software without restriction, including
0008  *  without limitation the rights to use, copy, modify, merge, publish,
0009  *  distribute, sublicense, and/or sell copies of the Software, and to
0010  *  permit persons to whom the Software is furnished to do so, subject to
0011  *  the following conditions:
0012  *
0013  *  The above copyright notice and this permission notice shall be included
0014  *  in all copies or substantial portions of the Software.
0015  *
0016  *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0017  *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0018  *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0019  *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
0020  *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
0021  *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
0022  *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
0023  */
0024 
0025 #include "k3bufferedsocket.h"
0026 
0027 #include <config-network.h>
0028 
0029 #include <QMutex>
0030 #include <QTimer>
0031 
0032 #include "k3socketdevice.h"
0033 #include "k3socketaddress.h"
0034 #include "k3socketbuffer_p.h"
0035 
0036 using namespace KNetwork;
0037 using namespace KNetwork::Internal;
0038 
0039 class KNetwork::KBufferedSocketPrivate
0040 {
0041 public:
0042     mutable KSocketBuffer *input, *output;
0043 
0044     KBufferedSocketPrivate()
0045     {
0046         input = nullptr;
0047         output = nullptr;
0048     }
0049 };
0050 
0051 KBufferedSocket::KBufferedSocket(const QString &host, const QString &service,
0052                                  QObject *parent)
0053     : KStreamSocket(host, service, parent),
0054       d(new KBufferedSocketPrivate)
0055 {
0056     setInputBuffering(true);
0057     setOutputBuffering(true);
0058 }
0059 
0060 KBufferedSocket::~KBufferedSocket()
0061 {
0062     closeNow();
0063     delete d->input;
0064     delete d->output;
0065     delete d;
0066 }
0067 
0068 void KBufferedSocket::setSocketDevice(KSocketDevice *device)
0069 {
0070     KStreamSocket::setSocketDevice(device);
0071     device->setBlocking(false);
0072     KActiveSocketBase::open(openMode() & ~Unbuffered);
0073 }
0074 
0075 bool KBufferedSocket::setSocketOptions(int opts)
0076 {
0077     if (opts == Blocking) {
0078         return false;
0079     }
0080 
0081     opts &= ~Blocking;
0082     return KStreamSocket::setSocketOptions(opts);
0083 }
0084 
0085 void KBufferedSocket::close()
0086 {
0087     if (!d->output || d->output->isEmpty()) {
0088         closeNow();
0089     } else {
0090         setState(Closing);
0091         QSocketNotifier *n = socketDevice()->readNotifier();
0092         if (n) {
0093             n->setEnabled(false);
0094         }
0095         emit stateChanged(Closing);
0096     }
0097 }
0098 
0099 qint64 KBufferedSocket::bytesAvailable() const
0100 {
0101     if (!d->input) {
0102         return KStreamSocket::bytesAvailable();
0103     }
0104 
0105     return d->input->length();
0106 }
0107 
0108 qint64 KBufferedSocket::waitForMore(int msecs, bool *timeout)
0109 {
0110     qint64 retval = KStreamSocket::waitForMore(msecs, timeout);
0111     if (d->input) {
0112         resetError();
0113         slotReadActivity();
0114         return bytesAvailable();
0115     }
0116     return retval;
0117 }
0118 
0119 qint64 KBufferedSocket::readData(char *data, qint64 maxlen, KSocketAddress *from)
0120 {
0121     if (from) {
0122         *from = peerAddress();
0123     }
0124     if (d->input) {
0125         if (d->input->isEmpty()) {
0126             return 0;
0127         }
0128         resetError();
0129         return d->input->consumeBuffer(data, maxlen);
0130     }
0131     return KStreamSocket::readData(data, maxlen, nullptr);
0132 }
0133 
0134 qint64 KBufferedSocket::peekData(char *data, qint64 maxlen, KSocketAddress *from)
0135 {
0136     if (from) {
0137         *from = peerAddress();
0138     }
0139     if (d->input) {
0140         if (d->input->isEmpty()) {
0141             return 0;
0142         }
0143         resetError();
0144         return d->input->consumeBuffer(data, maxlen, false);
0145     }
0146     return KStreamSocket::peekData(data, maxlen, nullptr);
0147 }
0148 
0149 qint64 KBufferedSocket::writeData(const char *data, qint64 len,
0150                                   const KSocketAddress *)
0151 {
0152     // ignore the third parameter
0153     if (state() != Connected) {
0154         // cannot write now!
0155         setError(NotConnected);
0156         return -1;
0157     }
0158 
0159     if (d->output) {
0160         if (d->output->isFull()) {
0161             setError(WouldBlock);
0162             emit gotError(WouldBlock);
0163             return -1;
0164         }
0165         resetError();
0166 
0167         // enable notifier to send data
0168         QSocketNotifier *n = socketDevice()->writeNotifier();
0169         if (n) {
0170             n->setEnabled(true);
0171         }
0172 
0173         return d->output->feedBuffer(data, len);
0174     }
0175 
0176     return KStreamSocket::writeData(data, len, nullptr);
0177 }
0178 
0179 void KBufferedSocket::enableRead(bool enable)
0180 {
0181     KStreamSocket::enableRead(enable);
0182     if (!enable && d->input) {
0183         // reenable it
0184         QSocketNotifier *n = socketDevice()->readNotifier();
0185         if (n) {
0186             n->setEnabled(true);
0187         }
0188     }
0189 
0190     if (enable && state() != Connected && d->input && !d->input->isEmpty())
0191         // this means the buffer is still dirty
0192         // allow the signal to be emitted
0193     {
0194         QTimer::singleShot(0, this, SLOT(slotReadActivity()));
0195     }
0196 }
0197 
0198 void KBufferedSocket::enableWrite(bool enable)
0199 {
0200     KStreamSocket::enableWrite(enable);
0201     if (!enable && d->output && !d->output->isEmpty()) {
0202         // reenable it
0203         QSocketNotifier *n = socketDevice()->writeNotifier();
0204         if (n) {
0205             n->setEnabled(true);
0206         }
0207     }
0208 }
0209 
0210 void KBufferedSocket::stateChanging(SocketState newState)
0211 {
0212     if (newState == Connecting || newState == Connected) {
0213         // we're going to connect
0214         // make sure the buffers are clean
0215         if (d->input) {
0216             d->input->clear();
0217         }
0218         if (d->output) {
0219             d->output->clear();
0220         }
0221 
0222         // also, turn on notifiers
0223         enableRead(emitsReadyRead());
0224         enableWrite(emitsReadyWrite());
0225     }
0226     KStreamSocket::stateChanging(newState);
0227 }
0228 
0229 void KBufferedSocket::setInputBuffering(bool enable)
0230 {
0231     QMutexLocker locker(mutex());
0232     if (!enable) {
0233         delete d->input;
0234         d->input = nullptr;
0235     } else if (d->input == nullptr) {
0236         d->input = new KSocketBuffer;
0237     }
0238 }
0239 
0240 void KBufferedSocket::setOutputBuffering(bool enable)
0241 {
0242     QMutexLocker locker(mutex());
0243     if (!enable) {
0244         delete d->output;
0245         d->output = nullptr;
0246     } else if (d->output == nullptr) {
0247         d->output = new KSocketBuffer;
0248     }
0249 }
0250 
0251 qint64 KBufferedSocket::bytesToWrite() const
0252 {
0253     if (!d->output) {
0254         return 0;
0255     }
0256 
0257     return d->output->length();
0258 }
0259 
0260 void KBufferedSocket::closeNow()
0261 {
0262     KStreamSocket::close();
0263     if (d->output) {
0264         d->output->clear();
0265     }
0266 }
0267 
0268 bool KBufferedSocket::canReadLine() const
0269 {
0270     if (!d->input) {
0271         return false;
0272     }
0273 
0274     return d->input->canReadLine();
0275 }
0276 
0277 qint64 KBufferedSocket::readLineData(char *data, qint64 maxSize)
0278 {
0279     return d->input->readLine(data, maxSize);
0280 }
0281 
0282 void KBufferedSocket::waitForConnect()
0283 {
0284     if (state() != Connecting) {
0285         return;    // nothing to be waited on
0286     }
0287 
0288     KStreamSocket::setSocketOptions(socketOptions() | Blocking);
0289     connectionEvent();
0290     KStreamSocket::setSocketOptions(socketOptions() & ~Blocking);
0291 }
0292 
0293 void KBufferedSocket::slotReadActivity()
0294 {
0295     if (d->input && state() == Connected) {
0296         mutex()->lock();
0297         qint64 len = d->input->receiveFrom(socketDevice());
0298 
0299         if (len == -1) {
0300             if (socketDevice()->error() != WouldBlock) {
0301                 // nope, another error!
0302                 copyError();
0303                 mutex()->unlock();
0304                 emit gotError(error());
0305                 closeNow();   // emits closed
0306                 return;
0307             }
0308         } else if (len == 0) {
0309             // remotely closed
0310             setError(RemotelyDisconnected);
0311             mutex()->unlock();
0312             emit gotError(error());
0313             closeNow();       // emits closed
0314             return;
0315         }
0316 
0317         // no error
0318         mutex()->unlock();
0319     }
0320 
0321     if (state() == Connected) {
0322         KStreamSocket::slotReadActivity();    // this emits readyRead
0323     } else if (emitsReadyRead()) { // state() != Connected
0324         if (d->input && !d->input->isEmpty()) {
0325             // buffer isn't empty
0326             // keep emitting signals till it is
0327             QTimer::singleShot(0, this, SLOT(slotReadActivity()));
0328             emit readyRead();
0329         }
0330     }
0331 }
0332 
0333 void KBufferedSocket::slotWriteActivity()
0334 {
0335     if (d->output && !d->output->isEmpty() &&
0336             (state() == Connected || state() == Closing)) {
0337         mutex()->lock();
0338         qint64 len = d->output->sendTo(socketDevice());
0339 
0340         if (len == -1) {
0341             if (socketDevice()->error() != WouldBlock) {
0342                 // nope, another error!
0343                 copyError();
0344                 mutex()->unlock();
0345                 emit gotError(error());
0346                 closeNow();
0347                 return;
0348             }
0349         } else if (len == 0) {
0350             // remotely closed
0351             setError(RemotelyDisconnected);
0352             mutex()->unlock();
0353             emit gotError(error());
0354             closeNow();
0355             return;
0356         }
0357 
0358         if (d->output->isEmpty())
0359             // deactivate the notifier until we have something to send
0360             // writeNotifier can't return NULL here
0361         {
0362             socketDevice()->writeNotifier()->setEnabled(false);
0363         }
0364 
0365         mutex()->unlock();
0366         emit bytesWritten(len);
0367     }
0368 
0369     if (state() != Closing) {
0370         KStreamSocket::slotWriteActivity();
0371     } else if (d->output && d->output->isEmpty() && state() == Closing) {
0372         KStreamSocket::close();   // finished sending data
0373     }
0374 }
0375 
0376 #include "moc_k3bufferedsocket.cpp"