File indexing completed on 2024-06-23 05:14:17

0001 /*
0002   SPDX-FileCopyrightText: 2007 Klarälvdalens Datakonsult AB
0003 
0004   SPDX-License-Identifier: LGPL-2.0-or-later
0005  */
0006 
0007 #include <config-kleopatra.h>
0008 
0009 #include "kdpipeiodevice.h"
0010 
0011 #include "kleopatra_debug.h"
0012 #include <QDebug>
0013 #include <QMutex>
0014 #include <QPointer>
0015 #include <QThread>
0016 #include <QWaitCondition>
0017 
0018 #include <algorithm>
0019 #include <cstring>
0020 #include <memory>
0021 
0022 #ifdef Q_OS_WIN
0023 #ifndef NOMINMAX
0024 #define NOMINMAX
0025 #endif
0026 #include <io.h>
0027 #include <windows.h>
0028 #else
0029 #include <errno.h>
0030 #include <unistd.h>
0031 #endif
0032 
0033 #ifndef KDAB_CHECK_THIS
0034 #define KDAB_CHECK_CTOR (void)1
0035 #define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
0036 #define KDAB_CHECK_THIS KDAB_CHECK_CTOR
0037 #endif
0038 
0039 #define LOCKED(d) const QMutexLocker locker(&d->mutex)
0040 #define synchronized(d)                                                                                                                                        \
0041     if (int i = 0) {                                                                                                                                           \
0042     } else                                                                                                                                                     \
0043         for (const QMutexLocker locker(&d->mutex); !i; ++i)
0044 
0045 const unsigned int BUFFER_SIZE = 4096;
0046 const bool ALLOW_QIODEVICE_BUFFERING = true;
0047 
0048 namespace
0049 {
0050 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
0051 }
0052 
0053 #define QDebug                                                                                                                                                 \
0054     if (s_debugLevel == KDPipeIODevice::NoDebug) {                                                                                                             \
0055     } else                                                                                                                                                     \
0056         qDebug
0057 
0058 namespace
0059 {
0060 
0061 class Reader : public QThread
0062 {
0063     Q_OBJECT
0064 public:
0065     Reader(int fd, Qt::HANDLE handle);
0066     ~Reader() override;
0067 
0068     qint64 readData(char *data, qint64 maxSize);
0069 
0070     unsigned int bytesInBuffer() const
0071     {
0072         return (wptr + sizeof buffer - rptr) % sizeof buffer;
0073     }
0074 
0075     bool bufferFull() const
0076     {
0077         return bytesInBuffer() == sizeof buffer - 1;
0078     }
0079 
0080     bool bufferEmpty() const
0081     {
0082         return bytesInBuffer() == 0;
0083     }
0084 
0085     bool bufferContains(char ch)
0086     {
0087         const unsigned int bib = bytesInBuffer();
0088         for (unsigned int i = rptr; i < rptr + bib; ++i)
0089             if (buffer[i % sizeof buffer] == ch) {
0090                 return true;
0091             }
0092         return false;
0093     }
0094 
0095     void notifyReadyRead();
0096 
0097 Q_SIGNALS:
0098     void readyRead();
0099 
0100 protected:
0101     void run() override;
0102 
0103 private:
0104     int fd;
0105     [[maybe_unused]] Qt::HANDLE handle;
0106 
0107 public:
0108     QMutex mutex;
0109     QWaitCondition waitForCancelCondition;
0110     QWaitCondition bufferNotFullCondition;
0111     QWaitCondition bufferNotEmptyCondition;
0112     QWaitCondition hasStarted;
0113     QWaitCondition readyReadSentCondition;
0114     QWaitCondition blockedConsumerIsDoneCondition;
0115     bool cancel;
0116     bool eof;
0117     bool error;
0118     bool eofShortCut;
0119     int errorCode;
0120     bool isReading;
0121     bool consumerBlocksOnUs;
0122 
0123 private:
0124     unsigned int rptr, wptr;
0125     char buffer[BUFFER_SIZE + 1]; // need to keep one byte free to detect empty state
0126 };
0127 
0128 Reader::Reader(int fd_, Qt::HANDLE handle_)
0129     : QThread()
0130     , fd(fd_)
0131     , handle(handle_)
0132     , mutex()
0133     , bufferNotFullCondition()
0134     , bufferNotEmptyCondition()
0135     , hasStarted()
0136     , cancel(false)
0137     , eof(false)
0138     , error(false)
0139     , eofShortCut(false)
0140     , errorCode(0)
0141     , isReading(false)
0142     , consumerBlocksOnUs(false)
0143     , rptr(0)
0144     , wptr(0)
0145 {
0146 }
0147 
0148 Reader::~Reader()
0149 {
0150 }
0151 
0152 class Writer : public QThread
0153 {
0154     Q_OBJECT
0155 public:
0156     Writer(int fd, Qt::HANDLE handle);
0157     ~Writer() override;
0158 
0159     qint64 writeData(const char *data, qint64 size);
0160 
0161     unsigned int bytesInBuffer() const
0162     {
0163         return numBytesInBuffer;
0164     }
0165 
0166     bool bufferFull() const
0167     {
0168         return numBytesInBuffer == sizeof buffer;
0169     }
0170 
0171     bool bufferEmpty() const
0172     {
0173         return numBytesInBuffer == 0;
0174     }
0175 
0176 Q_SIGNALS:
0177     void bytesWritten(qint64);
0178 
0179 protected:
0180     void run() override;
0181 
0182 private:
0183     int fd;
0184     [[maybe_unused]] Qt::HANDLE handle;
0185 
0186 public:
0187     QMutex mutex;
0188     QWaitCondition bufferEmptyCondition;
0189     QWaitCondition bufferNotEmptyCondition;
0190     QWaitCondition hasStarted;
0191     bool cancel;
0192     bool error;
0193     int errorCode;
0194 
0195 private:
0196     unsigned int numBytesInBuffer;
0197     char buffer[BUFFER_SIZE];
0198 };
0199 }
0200 
0201 Writer::Writer(int fd_, Qt::HANDLE handle_)
0202     : QThread()
0203     , fd(fd_)
0204     , handle(handle_)
0205     , mutex()
0206     , bufferEmptyCondition()
0207     , bufferNotEmptyCondition()
0208     , hasStarted()
0209     , cancel(false)
0210     , error(false)
0211     , errorCode(0)
0212     , numBytesInBuffer(0)
0213 {
0214 }
0215 
0216 Writer::~Writer()
0217 {
0218 }
0219 
0220 class KDPipeIODevice::Private : public QObject
0221 {
0222     Q_OBJECT
0223     friend class ::KDPipeIODevice;
0224     KDPipeIODevice *const q;
0225 
0226 public:
0227     explicit Private(KDPipeIODevice *qq);
0228     ~Private() override;
0229 
0230     bool doOpen(int, Qt::HANDLE, OpenMode);
0231     bool startReaderThread();
0232     bool startWriterThread();
0233     void stopThreads();
0234 
0235 public Q_SLOTS:
0236     void emitReadyRead();
0237 
0238 private:
0239     int fd;
0240     Qt::HANDLE handle;
0241     Reader *reader;
0242     Writer *writer;
0243     bool triedToStartReader;
0244     bool triedToStartWriter;
0245 };
0246 
0247 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
0248 {
0249     return s_debugLevel;
0250 }
0251 
0252 void KDPipeIODevice::setDebugLevel(KDPipeIODevice::DebugLevel level)
0253 {
0254     s_debugLevel = level;
0255 }
0256 
0257 KDPipeIODevice::Private::Private(KDPipeIODevice *qq)
0258     : QObject(qq)
0259     , q(qq)
0260     , fd(-1)
0261     , handle(nullptr)
0262     , reader(nullptr)
0263     , writer(nullptr)
0264     , triedToStartReader(false)
0265     , triedToStartWriter(false)
0266 {
0267 }
0268 
0269 KDPipeIODevice::Private::~Private()
0270 {
0271     QDebug("KDPipeIODevice::~Private(): Destroying %p", (void *)q);
0272 }
0273 
0274 KDPipeIODevice::KDPipeIODevice(QObject *p)
0275     : QIODevice(p)
0276     , d(new Private(this))
0277 {
0278     KDAB_CHECK_CTOR;
0279 }
0280 
0281 KDPipeIODevice::KDPipeIODevice(int fd, OpenMode mode, QObject *p)
0282     : QIODevice(p)
0283     , d(new Private(this))
0284 {
0285     KDAB_CHECK_CTOR;
0286     open(fd, mode);
0287 }
0288 
0289 KDPipeIODevice::KDPipeIODevice(Qt::HANDLE handle, OpenMode mode, QObject *p)
0290     : QIODevice(p)
0291     , d(new Private(this))
0292 {
0293     KDAB_CHECK_CTOR;
0294     open(handle, mode);
0295 }
0296 
0297 KDPipeIODevice::~KDPipeIODevice()
0298 {
0299     KDAB_CHECK_DTOR;
0300     if (isOpen()) {
0301         close();
0302     }
0303     delete d;
0304     d = nullptr;
0305 }
0306 
0307 bool KDPipeIODevice::open(int fd, OpenMode mode)
0308 {
0309     KDAB_CHECK_THIS;
0310 #ifdef Q_OS_WIN
0311     return d->doOpen(fd, (HANDLE)_get_osfhandle(fd), mode);
0312 #else
0313     return d->doOpen(fd, nullptr, mode);
0314 #endif
0315 }
0316 
0317 bool KDPipeIODevice::open(Qt::HANDLE h, OpenMode mode)
0318 {
0319     KDAB_CHECK_THIS;
0320 #ifdef Q_OS_WIN
0321     return d->doOpen(-1, h, mode);
0322 #else
0323     Q_UNUSED(h)
0324     Q_UNUSED(mode)
0325     Q_ASSERT(!"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows.");
0326     return false;
0327 #endif
0328 }
0329 
0330 bool KDPipeIODevice::Private::startReaderThread()
0331 {
0332     if (triedToStartReader) {
0333         return true;
0334     }
0335     triedToStartReader = true;
0336     if (reader && !reader->isRunning() && !reader->isFinished()) {
0337         QDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)");
0338         LOCKED(reader);
0339         QDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)");
0340         reader->start(QThread::HighestPriority);
0341         QDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)");
0342         const bool hasStarted = reader->hasStarted.wait(&reader->mutex, 1000);
0343         QDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)");
0344 
0345         return hasStarted;
0346     }
0347     return true;
0348 }
0349 
0350 bool KDPipeIODevice::Private::startWriterThread()
0351 {
0352     if (triedToStartWriter) {
0353         return true;
0354     }
0355     triedToStartWriter = true;
0356     if (writer && !writer->isRunning() && !writer->isFinished()) {
0357         LOCKED(writer);
0358 
0359         writer->start(QThread::HighestPriority);
0360         if (!writer->hasStarted.wait(&writer->mutex, 1000)) {
0361             return false;
0362         }
0363     }
0364     return true;
0365 }
0366 
0367 void KDPipeIODevice::Private::emitReadyRead()
0368 {
0369     QPointer<Private> thisPointer(this);
0370     QDebug("KDPipeIODevice::Private::emitReadyRead %p", (void *)this);
0371 
0372     Q_EMIT q->readyRead();
0373 
0374     if (!thisPointer) {
0375         return;
0376     }
0377     if (reader) {
0378         QDebug("KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (void *)this);
0379         synchronized(reader)
0380         {
0381             QDebug("KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (void *)this);
0382             reader->readyReadSentCondition.wakeAll();
0383             QDebug("KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d",
0384                    (void *)this,
0385                    reader->bufferEmpty(),
0386                    reader->isReading);
0387         }
0388     }
0389     QDebug("KDPipeIODevice::Private::emitReadyRead %p leaving", (void *)this);
0390 }
0391 
0392 bool KDPipeIODevice::Private::doOpen(int fd_, Qt::HANDLE handle_, OpenMode mode_)
0393 {
0394     if (q->isOpen()) {
0395         return false;
0396     }
0397 
0398 #ifdef Q_OS_WIN
0399     if (!handle_) {
0400         return false;
0401     }
0402 #else
0403     if (fd_ < 0) {
0404         return false;
0405     }
0406 #endif
0407 
0408     if (!(mode_ & ReadWrite)) {
0409         return false; // need to have at least read -or- write
0410     }
0411 
0412     std::unique_ptr<Reader> reader_;
0413     std::unique_ptr<Writer> writer_;
0414 
0415     if (mode_ & ReadOnly) {
0416         reader_ = std::make_unique<Reader>(fd_, handle_);
0417         QDebug("KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", (void *)this, (void *)reader_.get(), fd_);
0418         connect(reader_.get(), &Reader::readyRead, this, &Private::emitReadyRead, Qt::QueuedConnection);
0419     }
0420     if (mode_ & WriteOnly) {
0421         writer_ = std::make_unique<Writer>(fd_, handle_);
0422         QDebug("KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", (void *)this, (void *)writer_.get(), fd_);
0423         connect(writer_.get(), &Writer::bytesWritten, q, &QIODevice::bytesWritten, Qt::QueuedConnection);
0424     }
0425 
0426     // commit to *this:
0427     fd = fd_;
0428     handle = handle_;
0429     reader = reader_.release();
0430     writer = writer_.release();
0431 
0432     q->setOpenMode(mode_ | Unbuffered);
0433     return true;
0434 }
0435 
0436 int KDPipeIODevice::descriptor() const
0437 {
0438     KDAB_CHECK_THIS;
0439     return d->fd;
0440 }
0441 
0442 Qt::HANDLE KDPipeIODevice::handle() const
0443 {
0444     KDAB_CHECK_THIS;
0445     return d->handle;
0446 }
0447 
0448 qint64 KDPipeIODevice::bytesAvailable() const
0449 {
0450     KDAB_CHECK_THIS;
0451     const qint64 base = QIODevice::bytesAvailable();
0452     if (!d->triedToStartReader) {
0453         d->startReaderThread();
0454         return base;
0455     }
0456     if (d->reader) {
0457         synchronized(d->reader)
0458         {
0459             const qint64 inBuffer = d->reader->bytesInBuffer();
0460             return base + inBuffer;
0461         }
0462     }
0463     return base;
0464 }
0465 
0466 qint64 KDPipeIODevice::bytesToWrite() const
0467 {
0468     KDAB_CHECK_THIS;
0469     d->startWriterThread();
0470     const qint64 base = QIODevice::bytesToWrite();
0471     if (d->writer) {
0472         synchronized(d->writer) return base + d->writer->bytesInBuffer();
0473     }
0474     return base;
0475 }
0476 
0477 bool KDPipeIODevice::canReadLine() const
0478 {
0479     KDAB_CHECK_THIS;
0480     d->startReaderThread();
0481     if (QIODevice::canReadLine()) {
0482         return true;
0483     }
0484     if (d->reader) {
0485         synchronized(d->reader) return d->reader->bufferContains('\n');
0486     }
0487     return true;
0488 }
0489 
0490 bool KDPipeIODevice::isSequential() const
0491 {
0492     return true;
0493 }
0494 
0495 bool KDPipeIODevice::atEnd() const
0496 {
0497     KDAB_CHECK_THIS;
0498     d->startReaderThread();
0499     if (!QIODevice::atEnd()) {
0500         QDebug("%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)",
0501                (void *)this,
0502                static_cast<long>(bytesAvailable()));
0503         return false;
0504     }
0505     if (!isOpen()) {
0506         return true;
0507     }
0508     if (d->reader->eofShortCut) {
0509         return true;
0510     }
0511     LOCKED(d->reader);
0512     const bool eof = (d->reader->error || d->reader->eof) && d->reader->bufferEmpty();
0513     if (!eof) {
0514         if (!d->reader->error && !d->reader->eof) {
0515             QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", (void *)(this));
0516         }
0517         if (!d->reader->bufferEmpty()) {
0518             QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", (void *)this);
0519         }
0520     }
0521     return eof;
0522 }
0523 
0524 bool KDPipeIODevice::waitForBytesWritten(int msecs)
0525 {
0526     KDAB_CHECK_THIS;
0527     d->startWriterThread();
0528     Writer *const w = d->writer;
0529     if (!w) {
0530         return true;
0531     }
0532     LOCKED(w);
0533     QDebug("KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", (void *)this, (void *)w);
0534     return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait(&w->mutex, msecs);
0535 }
0536 
0537 bool KDPipeIODevice::waitForReadyRead(int msecs)
0538 {
0539     KDAB_CHECK_THIS;
0540     QDebug("KDPipeIODEvice::waitForReadyRead()(%p)", (void *)this);
0541     d->startReaderThread();
0542     if (ALLOW_QIODEVICE_BUFFERING) {
0543         if (bytesAvailable() > 0) {
0544             return true;
0545         }
0546     }
0547     Reader *const r = d->reader;
0548     if (!r || r->eofShortCut) {
0549         return true;
0550     }
0551     LOCKED(r);
0552     if (r->bytesInBuffer() != 0 || r->eof || r->error) {
0553         return true;
0554     }
0555     Q_ASSERT(false); // ### wtf?
0556     return r->bufferNotEmptyCondition.wait(&r->mutex, msecs);
0557 }
0558 
0559 template<typename T>
0560 class TemporaryValue
0561 {
0562 public:
0563     TemporaryValue(T &var_, const T &tv)
0564         : var(var_)
0565         , oldValue(var_)
0566     {
0567         var = tv;
0568     }
0569     ~TemporaryValue()
0570     {
0571         var = oldValue;
0572     }
0573 
0574 private:
0575     T &var;
0576     const T oldValue;
0577 };
0578 
0579 bool KDPipeIODevice::readWouldBlock() const
0580 {
0581     d->startReaderThread();
0582     LOCKED(d->reader);
0583     return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
0584 }
0585 
0586 bool KDPipeIODevice::writeWouldBlock() const
0587 {
0588     d->startWriterThread();
0589     LOCKED(d->writer);
0590     return !d->writer->bufferEmpty() && !d->writer->error;
0591 }
0592 
0593 qint64 KDPipeIODevice::readData(char *data, qint64 maxSize)
0594 {
0595     KDAB_CHECK_THIS;
0596     QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld", (void *)this, data, maxSize);
0597     d->startReaderThread();
0598     Reader *const r = d->reader;
0599 
0600     Q_ASSERT(r);
0601 
0602     // assert( r->isRunning() ); // wrong (might be eof, error)
0603     Q_ASSERT(data || maxSize == 0);
0604     Q_ASSERT(maxSize >= 0);
0605 
0606     if (r->eofShortCut) {
0607         QDebug("%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", (void *)this);
0608         return 0;
0609     }
0610 
0611     if (maxSize < 0) {
0612         maxSize = 0;
0613     }
0614 
0615     if (ALLOW_QIODEVICE_BUFFERING) {
0616         if (bytesAvailable() > 0) {
0617             maxSize = std::min(maxSize, bytesAvailable()); // don't block
0618         }
0619     }
0620     QDebug("%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", (void *)this);
0621     LOCKED(r);
0622     QDebug("%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", (void *)this);
0623 
0624     r->readyReadSentCondition.wakeAll();
0625     if (/* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof) { // ### block on maxSize == 0?
0626         QDebug("%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", (void *)this);
0627         const TemporaryValue<bool> tmp(d->reader->consumerBlocksOnUs, true);
0628         r->bufferNotEmptyCondition.wait(&r->mutex);
0629         r->blockedConsumerIsDoneCondition.wakeAll();
0630         QDebug("%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", (void *)this);
0631     }
0632 
0633     if (r->bufferEmpty()) {
0634         QDebug("%p: KDPipeIODevice::readData: got empty buffer, signal eof", (void *)this);
0635         // woken with an empty buffer must mean either EOF or error:
0636         Q_ASSERT(r->eof || r->error);
0637         r->eofShortCut = true;
0638         return r->eof ? 0 : -1;
0639     }
0640 
0641     QDebug("%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", (void *)this, maxSize);
0642     const qint64 bytesRead = r->readData(data, maxSize);
0643     QDebug("%p: KDPipeIODevice::readData: read %lld bytes", (void *)this, bytesRead);
0644     QDebug("%p (fd=%d): KDPipeIODevice::readData: %s", (void *)this, d->fd, data);
0645 
0646     return bytesRead;
0647 }
0648 
0649 qint64 Reader::readData(char *data, qint64 maxSize)
0650 {
0651     qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr;
0652     if (numRead > maxSize) {
0653         numRead = maxSize;
0654     }
0655 
0656     QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
0657            (void *)this,
0658            data,
0659            maxSize,
0660            rptr,
0661            wptr,
0662            bytesInBuffer(),
0663            numRead);
0664 
0665     memcpy(data, buffer + rptr, numRead);
0666 
0667     rptr = (rptr + numRead) % sizeof buffer;
0668 
0669     if (!bufferFull()) {
0670         QDebug("%p: KDPipeIODevice::readData: signal bufferNotFullCondition", (void *)this);
0671         bufferNotFullCondition.wakeAll();
0672     }
0673 
0674     return numRead;
0675 }
0676 
0677 qint64 KDPipeIODevice::writeData(const char *data, qint64 size)
0678 {
0679     KDAB_CHECK_THIS;
0680     d->startWriterThread();
0681     Writer *const w = d->writer;
0682 
0683     Q_ASSERT(w);
0684     Q_ASSERT(w->error || w->isRunning());
0685     Q_ASSERT(data || size == 0);
0686     Q_ASSERT(size >= 0);
0687 
0688     LOCKED(w);
0689 
0690     while (!w->error && !w->bufferEmpty()) {
0691         QDebug("%p: KDPipeIODevice::writeData: wait for empty buffer", (void *)this);
0692         w->bufferEmptyCondition.wait(&w->mutex);
0693         QDebug("%p: KDPipeIODevice::writeData: empty buffer signaled", (void *)this);
0694     }
0695     if (w->error) {
0696         return -1;
0697     }
0698 
0699     Q_ASSERT(w->bufferEmpty());
0700 
0701     return w->writeData(data, size);
0702 }
0703 
0704 qint64 Writer::writeData(const char *data, qint64 size)
0705 {
0706     Q_ASSERT(bufferEmpty());
0707 
0708     if (size > static_cast<qint64>(sizeof buffer)) {
0709         size = sizeof buffer;
0710     }
0711 
0712     memcpy(buffer, data, size);
0713 
0714     numBytesInBuffer = size;
0715 
0716     if (!bufferEmpty()) {
0717         bufferNotEmptyCondition.wakeAll();
0718     }
0719     return size;
0720 }
0721 
0722 void KDPipeIODevice::Private::stopThreads()
0723 {
0724     if (triedToStartWriter) {
0725         if (writer && q->bytesToWrite() > 0) {
0726             q->waitForBytesWritten(-1);
0727         }
0728 
0729         Q_ASSERT(q->bytesToWrite() == 0);
0730     }
0731     if (Reader *&r = reader) {
0732         disconnect(r, &Reader::readyRead, this, &Private::emitReadyRead);
0733         synchronized(r)
0734         {
0735             // tell thread to cancel:
0736             r->cancel = true;
0737             // and wake it, so it can terminate:
0738             r->waitForCancelCondition.wakeAll();
0739             r->bufferNotFullCondition.wakeAll();
0740             r->readyReadSentCondition.wakeAll();
0741         }
0742     }
0743     if (Writer *&w = writer) {
0744         synchronized(w)
0745         {
0746             // tell thread to cancel:
0747             w->cancel = true;
0748             // and wake it, so it can terminate:
0749             w->bufferNotEmptyCondition.wakeAll();
0750         }
0751     }
0752 }
0753 
0754 void KDPipeIODevice::close()
0755 {
0756     KDAB_CHECK_THIS;
0757     QDebug("KDPipeIODevice::close(%p)", (void *)this);
0758     if (!isOpen()) {
0759         return;
0760     }
0761 
0762     // tell clients we're about to close:
0763     Q_EMIT aboutToClose();
0764     d->stopThreads();
0765 
0766 #define waitAndDelete(t)                                                                                                                                       \
0767     if (t) {                                                                                                                                                   \
0768         t->wait();                                                                                                                                             \
0769         QThread *const t2 = t;                                                                                                                                 \
0770         t = nullptr;                                                                                                                                           \
0771         delete t2;                                                                                                                                             \
0772     }
0773     QDebug("KPipeIODevice::close(%p): wait and closing writer %p", (void *)this, (void *)d->writer);
0774     waitAndDelete(d->writer);
0775     QDebug("KPipeIODevice::close(%p): wait and closing reader %p", (void *)this, (void *)d->reader);
0776     if (d->reader) {
0777         LOCKED(d->reader);
0778         d->reader->readyReadSentCondition.wakeAll();
0779     }
0780     waitAndDelete(d->reader);
0781 #undef waitAndDelete
0782 #ifdef Q_OS_WIN
0783     if (d->fd != -1) {
0784         _close(d->fd);
0785     } else {
0786         CloseHandle(d->handle);
0787     }
0788 #else
0789     ::close(d->fd);
0790 #endif
0791 
0792     setOpenMode(NotOpen);
0793     d->fd = -1;
0794     d->handle = nullptr;
0795 }
0796 
0797 void Reader::run()
0798 {
0799     LOCKED(this);
0800 
0801     // too bad QThread doesn't have that itself; a signal isn't enough
0802     hasStarted.wakeAll();
0803 
0804     QDebug("%p: Reader::run: started", (void *)this);
0805 
0806     while (true) {
0807         if (!cancel && (eof || error)) {
0808             // notify the client until the buffer is empty and then once
0809             // again so he receives eof/error. After that, wait for him
0810             // to cancel
0811             const bool wasEmpty = bufferEmpty();
0812             QDebug("%p: Reader::run: received eof(%d) or error(%d), waking everyone", (void *)this, eof, error);
0813             notifyReadyRead();
0814             if (!cancel && wasEmpty) {
0815                 waitForCancelCondition.wait(&mutex);
0816             }
0817         } else if (!cancel && !bufferFull() && !bufferEmpty()) {
0818             QDebug("%p: Reader::run: buffer no longer empty, waking everyone", (void *)this);
0819             notifyReadyRead();
0820         }
0821 
0822         while (!cancel && !error && bufferFull()) {
0823             notifyReadyRead();
0824             if (!cancel && bufferFull()) {
0825                 QDebug("%p: Reader::run: buffer is full, going to sleep", (void *)this);
0826                 bufferNotFullCondition.wait(&mutex);
0827             }
0828         }
0829 
0830         if (cancel) {
0831             QDebug("%p: Reader::run: detected cancel", (void *)this);
0832             goto leave;
0833         }
0834 
0835         if (!eof && !error) {
0836             if (rptr == wptr) { // optimize for larger chunks in case the buffer is empty
0837                 rptr = wptr = 0;
0838             }
0839 
0840             unsigned int numBytes = (rptr + sizeof buffer - wptr - 1) % sizeof buffer;
0841             if (numBytes > sizeof buffer - wptr) {
0842                 numBytes = sizeof buffer - wptr;
0843             }
0844 
0845             QDebug("%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", (void *)this, rptr, wptr, numBytes);
0846 
0847             Q_ASSERT(numBytes > 0);
0848 
0849             QDebug("%p: Reader::run: trying to read %d bytes from fd %d", (void *)this, numBytes, fd);
0850 #ifdef Q_OS_WIN
0851             isReading = true;
0852             mutex.unlock();
0853             DWORD numRead;
0854             const bool ok = ReadFile(handle, buffer + wptr, numBytes, &numRead, nullptr);
0855             mutex.lock();
0856             isReading = false;
0857             if (ok) {
0858                 if (numRead == 0) {
0859                     QDebug("%p: Reader::run: got eof (numRead==0)", (void *)this);
0860                     eof = true;
0861                 }
0862             } else { // !ok
0863                 errorCode = static_cast<int>(GetLastError());
0864                 if (errorCode == ERROR_BROKEN_PIPE) {
0865                     Q_ASSERT(numRead == 0);
0866                     QDebug("%p: Reader::run: got eof (broken pipe)", (void *)this);
0867                     eof = true;
0868                 } else {
0869                     Q_ASSERT(numRead == 0);
0870                     QDebug("%p: Reader::run: got error: %s (%d)", (void *)this, strerror(errorCode), errorCode);
0871                     error = true;
0872                 }
0873             }
0874 #else
0875             qint64 numRead;
0876             mutex.unlock();
0877             do {
0878                 numRead = ::read(fd, buffer + wptr, numBytes);
0879             } while (numRead == -1 && errno == EINTR);
0880             mutex.lock();
0881 
0882             if (numRead < 0) {
0883                 errorCode = errno;
0884                 error = true;
0885                 QDebug("%p: Reader::run: got error: %d", (void *)this, errorCode);
0886             } else if (numRead == 0) {
0887                 QDebug("%p: Reader::run: eof detected", (void *)this);
0888                 eof = true;
0889             }
0890 #endif
0891             QDebug("%p (fd=%d): Reader::run: read %ld bytes", (void *)this, fd, static_cast<long>(numRead));
0892             QDebug("%p (fd=%d): Reader::run: %s", (void *)this, fd, buffer);
0893 
0894             if (numRead > 0) {
0895                 QDebug("%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
0896                 wptr = (wptr + numRead) % sizeof buffer;
0897                 QDebug("%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
0898             }
0899         }
0900     }
0901 leave:
0902     QDebug("%p: Reader::run: terminated", (void *)this);
0903 }
0904 
0905 void Reader::notifyReadyRead()
0906 {
0907     QDebug("notifyReadyRead: %d bytes available", bytesInBuffer());
0908     Q_ASSERT(!cancel);
0909 
0910     if (consumerBlocksOnUs) {
0911         bufferNotEmptyCondition.wakeAll();
0912         blockedConsumerIsDoneCondition.wait(&mutex);
0913         return;
0914     }
0915     QDebug("notifyReadyRead: Q_EMIT signal");
0916     Q_EMIT readyRead();
0917     readyReadSentCondition.wait(&mutex);
0918     QDebug("notifyReadyRead: returning from waiting, leave");
0919 }
0920 
0921 void Writer::run()
0922 {
0923     LOCKED(this);
0924 
0925     // too bad QThread doesn't have that itself; a signal isn't enough
0926     hasStarted.wakeAll();
0927 
0928     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: started";
0929 
0930     while (true) {
0931         while (!cancel && bufferEmpty()) {
0932             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
0933             bufferEmptyCondition.wakeAll();
0934             Q_EMIT bytesWritten(0);
0935             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, going to sleep";
0936             bufferNotEmptyCondition.wait(&mutex);
0937             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: woke up";
0938         }
0939 
0940         if (cancel) {
0941             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: detected cancel";
0942             goto leave;
0943         }
0944 
0945         Q_ASSERT(numBytesInBuffer > 0);
0946 
0947         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes";
0948         qint64 totalWritten = 0;
0949         do {
0950             mutex.unlock();
0951 #ifdef Q_OS_WIN
0952             DWORD numWritten;
0953             QDebug("%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer);
0954             QDebug("%p (fd=%d): Writer::run: Going into WriteFile", (void *)this, fd);
0955             if (!WriteFile(handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, nullptr)) {
0956                 mutex.lock();
0957                 errorCode = static_cast<int>(GetLastError());
0958                 QDebug("%p: Writer::run: got error code: %d", (void *)this, errorCode);
0959                 error = true;
0960                 goto leave;
0961             }
0962 #else
0963             qint64 numWritten;
0964             do {
0965                 numWritten = ::write(fd, buffer + totalWritten, numBytesInBuffer - totalWritten);
0966             } while (numWritten == -1 && errno == EINTR);
0967 
0968             if (numWritten < 0) {
0969                 mutex.lock();
0970                 errorCode = errno;
0971                 QDebug("%p: Writer::run: got error code: %s (%d)", (void *)this, strerror(errorCode), errorCode);
0972                 error = true;
0973                 goto leave;
0974             }
0975 #endif
0976             QDebug("%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer);
0977             totalWritten += numWritten;
0978             mutex.lock();
0979         } while (totalWritten < numBytesInBuffer);
0980 
0981         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: wrote " << totalWritten << "bytes";
0982         numBytesInBuffer = 0;
0983         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
0984         bufferEmptyCondition.wakeAll();
0985         Q_EMIT bytesWritten(totalWritten);
0986     }
0987 leave:
0988     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: terminating";
0989     numBytesInBuffer = 0;
0990     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
0991     bufferEmptyCondition.wakeAll();
0992     Q_EMIT bytesWritten(0);
0993 }
0994 
0995 // static
0996 std::pair<KDPipeIODevice *, KDPipeIODevice *> KDPipeIODevice::makePairOfConnectedPipes()
0997 {
0998     KDPipeIODevice *read = nullptr;
0999     KDPipeIODevice *write = nullptr;
1000 #ifdef Q_OS_WIN
1001     HANDLE rh;
1002     HANDLE wh;
1003     SECURITY_ATTRIBUTES sa;
1004     memset(&sa, 0, sizeof(sa));
1005     sa.nLength = sizeof(sa);
1006     sa.bInheritHandle = TRUE;
1007     if (CreatePipe(&rh, &wh, &sa, BUFFER_SIZE)) {
1008         read = new KDPipeIODevice;
1009         read->open(rh, ReadOnly);
1010         write = new KDPipeIODevice;
1011         write->open(wh, WriteOnly);
1012     }
1013 #else
1014     int fds[2];
1015     if (pipe(fds) == 0) {
1016         read = new KDPipeIODevice;
1017         read->open(fds[0], ReadOnly);
1018         write = new KDPipeIODevice;
1019         write->open(fds[1], WriteOnly);
1020     }
1021 #endif
1022     return std::make_pair(read, write);
1023 }
1024 
1025 #ifdef KDAB_DEFINE_CHECKS
1026 KDAB_DEFINE_CHECKS(KDPipeIODevice)
1027 {
1028     if (!isOpen()) {
1029         Q_ASSERT(openMode() == NotOpen);
1030         Q_ASSERT(!d->reader);
1031         Q_ASSERT(!d->writer);
1032 #ifdef Q_OS_WIN
1033         Q_ASSERT(!d->handle);
1034 #else
1035         Q_ASSERT(d->fd < 0);
1036 #endif
1037     } else {
1038         Q_ASSERT(openMode() != NotOpen);
1039         Q_ASSERT(openMode() & ReadWrite);
1040         if (openMode() & ReadOnly) {
1041             Q_ASSERT(d->reader);
1042             synchronized(d->reader) Q_ASSERT(d->reader->eof || d->reader->error || d->reader->isRunning());
1043         }
1044         if (openMode() & WriteOnly) {
1045             Q_ASSERT(d->writer);
1046             synchronized(d->writer) Q_ASSERT(d->writer->error || d->writer->isRunning());
1047         }
1048 #ifdef Q_OS_WIN
1049         Q_ASSERT(d->handle);
1050 #else
1051         Q_ASSERT(d->fd >= 0);
1052 #endif
1053     }
1054 }
1055 #endif // KDAB_DEFINE_CHECKS
1056 
1057 #include "kdpipeiodevice.moc"
1058 
1059 #include "moc_kdpipeiodevice.cpp"