File indexing completed on 2024-06-23 05:14:17
0001 /* 0002 SPDX-FileCopyrightText: 2007 Klarälvdalens Datakonsult AB 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include <config-kleopatra.h> 0008 0009 #include "kdpipeiodevice.h" 0010 0011 #include "kleopatra_debug.h" 0012 #include <QDebug> 0013 #include <QMutex> 0014 #include <QPointer> 0015 #include <QThread> 0016 #include <QWaitCondition> 0017 0018 #include <algorithm> 0019 #include <cstring> 0020 #include <memory> 0021 0022 #ifdef Q_OS_WIN 0023 #ifndef NOMINMAX 0024 #define NOMINMAX 0025 #endif 0026 #include <io.h> 0027 #include <windows.h> 0028 #else 0029 #include <errno.h> 0030 #include <unistd.h> 0031 #endif 0032 0033 #ifndef KDAB_CHECK_THIS 0034 #define KDAB_CHECK_CTOR (void)1 0035 #define KDAB_CHECK_DTOR KDAB_CHECK_CTOR 0036 #define KDAB_CHECK_THIS KDAB_CHECK_CTOR 0037 #endif 0038 0039 #define LOCKED(d) const QMutexLocker locker(&d->mutex) 0040 #define synchronized(d) \ 0041 if (int i = 0) { \ 0042 } else \ 0043 for (const QMutexLocker locker(&d->mutex); !i; ++i) 0044 0045 const unsigned int BUFFER_SIZE = 4096; 0046 const bool ALLOW_QIODEVICE_BUFFERING = true; 0047 0048 namespace 0049 { 0050 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug; 0051 } 0052 0053 #define QDebug \ 0054 if (s_debugLevel == KDPipeIODevice::NoDebug) { \ 0055 } else \ 0056 qDebug 0057 0058 namespace 0059 { 0060 0061 class Reader : public QThread 0062 { 0063 Q_OBJECT 0064 public: 0065 Reader(int fd, Qt::HANDLE handle); 0066 ~Reader() override; 0067 0068 qint64 readData(char *data, qint64 maxSize); 0069 0070 unsigned int bytesInBuffer() const 0071 { 0072 return (wptr + sizeof buffer - rptr) % sizeof buffer; 0073 } 0074 0075 bool bufferFull() const 0076 { 0077 return bytesInBuffer() == sizeof buffer - 1; 0078 } 0079 0080 bool bufferEmpty() const 0081 { 0082 return bytesInBuffer() == 0; 0083 } 0084 0085 bool bufferContains(char ch) 0086 { 0087 const unsigned int bib = bytesInBuffer(); 0088 for (unsigned int i = rptr; i < rptr + bib; ++i) 0089 if (buffer[i % sizeof buffer] == ch) { 0090 return true; 0091 } 0092 return false; 0093 } 0094 0095 void notifyReadyRead(); 0096 0097 Q_SIGNALS: 0098 void readyRead(); 0099 0100 protected: 0101 void run() override; 0102 0103 private: 0104 int fd; 0105 [[maybe_unused]] Qt::HANDLE handle; 0106 0107 public: 0108 QMutex mutex; 0109 QWaitCondition waitForCancelCondition; 0110 QWaitCondition bufferNotFullCondition; 0111 QWaitCondition bufferNotEmptyCondition; 0112 QWaitCondition hasStarted; 0113 QWaitCondition readyReadSentCondition; 0114 QWaitCondition blockedConsumerIsDoneCondition; 0115 bool cancel; 0116 bool eof; 0117 bool error; 0118 bool eofShortCut; 0119 int errorCode; 0120 bool isReading; 0121 bool consumerBlocksOnUs; 0122 0123 private: 0124 unsigned int rptr, wptr; 0125 char buffer[BUFFER_SIZE + 1]; // need to keep one byte free to detect empty state 0126 }; 0127 0128 Reader::Reader(int fd_, Qt::HANDLE handle_) 0129 : QThread() 0130 , fd(fd_) 0131 , handle(handle_) 0132 , mutex() 0133 , bufferNotFullCondition() 0134 , bufferNotEmptyCondition() 0135 , hasStarted() 0136 , cancel(false) 0137 , eof(false) 0138 , error(false) 0139 , eofShortCut(false) 0140 , errorCode(0) 0141 , isReading(false) 0142 , consumerBlocksOnUs(false) 0143 , rptr(0) 0144 , wptr(0) 0145 { 0146 } 0147 0148 Reader::~Reader() 0149 { 0150 } 0151 0152 class Writer : public QThread 0153 { 0154 Q_OBJECT 0155 public: 0156 Writer(int fd, Qt::HANDLE handle); 0157 ~Writer() override; 0158 0159 qint64 writeData(const char *data, qint64 size); 0160 0161 unsigned int bytesInBuffer() const 0162 { 0163 return numBytesInBuffer; 0164 } 0165 0166 bool bufferFull() const 0167 { 0168 return numBytesInBuffer == sizeof buffer; 0169 } 0170 0171 bool bufferEmpty() const 0172 { 0173 return numBytesInBuffer == 0; 0174 } 0175 0176 Q_SIGNALS: 0177 void bytesWritten(qint64); 0178 0179 protected: 0180 void run() override; 0181 0182 private: 0183 int fd; 0184 [[maybe_unused]] Qt::HANDLE handle; 0185 0186 public: 0187 QMutex mutex; 0188 QWaitCondition bufferEmptyCondition; 0189 QWaitCondition bufferNotEmptyCondition; 0190 QWaitCondition hasStarted; 0191 bool cancel; 0192 bool error; 0193 int errorCode; 0194 0195 private: 0196 unsigned int numBytesInBuffer; 0197 char buffer[BUFFER_SIZE]; 0198 }; 0199 } 0200 0201 Writer::Writer(int fd_, Qt::HANDLE handle_) 0202 : QThread() 0203 , fd(fd_) 0204 , handle(handle_) 0205 , mutex() 0206 , bufferEmptyCondition() 0207 , bufferNotEmptyCondition() 0208 , hasStarted() 0209 , cancel(false) 0210 , error(false) 0211 , errorCode(0) 0212 , numBytesInBuffer(0) 0213 { 0214 } 0215 0216 Writer::~Writer() 0217 { 0218 } 0219 0220 class KDPipeIODevice::Private : public QObject 0221 { 0222 Q_OBJECT 0223 friend class ::KDPipeIODevice; 0224 KDPipeIODevice *const q; 0225 0226 public: 0227 explicit Private(KDPipeIODevice *qq); 0228 ~Private() override; 0229 0230 bool doOpen(int, Qt::HANDLE, OpenMode); 0231 bool startReaderThread(); 0232 bool startWriterThread(); 0233 void stopThreads(); 0234 0235 public Q_SLOTS: 0236 void emitReadyRead(); 0237 0238 private: 0239 int fd; 0240 Qt::HANDLE handle; 0241 Reader *reader; 0242 Writer *writer; 0243 bool triedToStartReader; 0244 bool triedToStartWriter; 0245 }; 0246 0247 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel() 0248 { 0249 return s_debugLevel; 0250 } 0251 0252 void KDPipeIODevice::setDebugLevel(KDPipeIODevice::DebugLevel level) 0253 { 0254 s_debugLevel = level; 0255 } 0256 0257 KDPipeIODevice::Private::Private(KDPipeIODevice *qq) 0258 : QObject(qq) 0259 , q(qq) 0260 , fd(-1) 0261 , handle(nullptr) 0262 , reader(nullptr) 0263 , writer(nullptr) 0264 , triedToStartReader(false) 0265 , triedToStartWriter(false) 0266 { 0267 } 0268 0269 KDPipeIODevice::Private::~Private() 0270 { 0271 QDebug("KDPipeIODevice::~Private(): Destroying %p", (void *)q); 0272 } 0273 0274 KDPipeIODevice::KDPipeIODevice(QObject *p) 0275 : QIODevice(p) 0276 , d(new Private(this)) 0277 { 0278 KDAB_CHECK_CTOR; 0279 } 0280 0281 KDPipeIODevice::KDPipeIODevice(int fd, OpenMode mode, QObject *p) 0282 : QIODevice(p) 0283 , d(new Private(this)) 0284 { 0285 KDAB_CHECK_CTOR; 0286 open(fd, mode); 0287 } 0288 0289 KDPipeIODevice::KDPipeIODevice(Qt::HANDLE handle, OpenMode mode, QObject *p) 0290 : QIODevice(p) 0291 , d(new Private(this)) 0292 { 0293 KDAB_CHECK_CTOR; 0294 open(handle, mode); 0295 } 0296 0297 KDPipeIODevice::~KDPipeIODevice() 0298 { 0299 KDAB_CHECK_DTOR; 0300 if (isOpen()) { 0301 close(); 0302 } 0303 delete d; 0304 d = nullptr; 0305 } 0306 0307 bool KDPipeIODevice::open(int fd, OpenMode mode) 0308 { 0309 KDAB_CHECK_THIS; 0310 #ifdef Q_OS_WIN 0311 return d->doOpen(fd, (HANDLE)_get_osfhandle(fd), mode); 0312 #else 0313 return d->doOpen(fd, nullptr, mode); 0314 #endif 0315 } 0316 0317 bool KDPipeIODevice::open(Qt::HANDLE h, OpenMode mode) 0318 { 0319 KDAB_CHECK_THIS; 0320 #ifdef Q_OS_WIN 0321 return d->doOpen(-1, h, mode); 0322 #else 0323 Q_UNUSED(h) 0324 Q_UNUSED(mode) 0325 Q_ASSERT(!"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows."); 0326 return false; 0327 #endif 0328 } 0329 0330 bool KDPipeIODevice::Private::startReaderThread() 0331 { 0332 if (triedToStartReader) { 0333 return true; 0334 } 0335 triedToStartReader = true; 0336 if (reader && !reader->isRunning() && !reader->isFinished()) { 0337 QDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)"); 0338 LOCKED(reader); 0339 QDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)"); 0340 reader->start(QThread::HighestPriority); 0341 QDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)"); 0342 const bool hasStarted = reader->hasStarted.wait(&reader->mutex, 1000); 0343 QDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)"); 0344 0345 return hasStarted; 0346 } 0347 return true; 0348 } 0349 0350 bool KDPipeIODevice::Private::startWriterThread() 0351 { 0352 if (triedToStartWriter) { 0353 return true; 0354 } 0355 triedToStartWriter = true; 0356 if (writer && !writer->isRunning() && !writer->isFinished()) { 0357 LOCKED(writer); 0358 0359 writer->start(QThread::HighestPriority); 0360 if (!writer->hasStarted.wait(&writer->mutex, 1000)) { 0361 return false; 0362 } 0363 } 0364 return true; 0365 } 0366 0367 void KDPipeIODevice::Private::emitReadyRead() 0368 { 0369 QPointer<Private> thisPointer(this); 0370 QDebug("KDPipeIODevice::Private::emitReadyRead %p", (void *)this); 0371 0372 Q_EMIT q->readyRead(); 0373 0374 if (!thisPointer) { 0375 return; 0376 } 0377 if (reader) { 0378 QDebug("KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (void *)this); 0379 synchronized(reader) 0380 { 0381 QDebug("KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (void *)this); 0382 reader->readyReadSentCondition.wakeAll(); 0383 QDebug("KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", 0384 (void *)this, 0385 reader->bufferEmpty(), 0386 reader->isReading); 0387 } 0388 } 0389 QDebug("KDPipeIODevice::Private::emitReadyRead %p leaving", (void *)this); 0390 } 0391 0392 bool KDPipeIODevice::Private::doOpen(int fd_, Qt::HANDLE handle_, OpenMode mode_) 0393 { 0394 if (q->isOpen()) { 0395 return false; 0396 } 0397 0398 #ifdef Q_OS_WIN 0399 if (!handle_) { 0400 return false; 0401 } 0402 #else 0403 if (fd_ < 0) { 0404 return false; 0405 } 0406 #endif 0407 0408 if (!(mode_ & ReadWrite)) { 0409 return false; // need to have at least read -or- write 0410 } 0411 0412 std::unique_ptr<Reader> reader_; 0413 std::unique_ptr<Writer> writer_; 0414 0415 if (mode_ & ReadOnly) { 0416 reader_ = std::make_unique<Reader>(fd_, handle_); 0417 QDebug("KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", (void *)this, (void *)reader_.get(), fd_); 0418 connect(reader_.get(), &Reader::readyRead, this, &Private::emitReadyRead, Qt::QueuedConnection); 0419 } 0420 if (mode_ & WriteOnly) { 0421 writer_ = std::make_unique<Writer>(fd_, handle_); 0422 QDebug("KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", (void *)this, (void *)writer_.get(), fd_); 0423 connect(writer_.get(), &Writer::bytesWritten, q, &QIODevice::bytesWritten, Qt::QueuedConnection); 0424 } 0425 0426 // commit to *this: 0427 fd = fd_; 0428 handle = handle_; 0429 reader = reader_.release(); 0430 writer = writer_.release(); 0431 0432 q->setOpenMode(mode_ | Unbuffered); 0433 return true; 0434 } 0435 0436 int KDPipeIODevice::descriptor() const 0437 { 0438 KDAB_CHECK_THIS; 0439 return d->fd; 0440 } 0441 0442 Qt::HANDLE KDPipeIODevice::handle() const 0443 { 0444 KDAB_CHECK_THIS; 0445 return d->handle; 0446 } 0447 0448 qint64 KDPipeIODevice::bytesAvailable() const 0449 { 0450 KDAB_CHECK_THIS; 0451 const qint64 base = QIODevice::bytesAvailable(); 0452 if (!d->triedToStartReader) { 0453 d->startReaderThread(); 0454 return base; 0455 } 0456 if (d->reader) { 0457 synchronized(d->reader) 0458 { 0459 const qint64 inBuffer = d->reader->bytesInBuffer(); 0460 return base + inBuffer; 0461 } 0462 } 0463 return base; 0464 } 0465 0466 qint64 KDPipeIODevice::bytesToWrite() const 0467 { 0468 KDAB_CHECK_THIS; 0469 d->startWriterThread(); 0470 const qint64 base = QIODevice::bytesToWrite(); 0471 if (d->writer) { 0472 synchronized(d->writer) return base + d->writer->bytesInBuffer(); 0473 } 0474 return base; 0475 } 0476 0477 bool KDPipeIODevice::canReadLine() const 0478 { 0479 KDAB_CHECK_THIS; 0480 d->startReaderThread(); 0481 if (QIODevice::canReadLine()) { 0482 return true; 0483 } 0484 if (d->reader) { 0485 synchronized(d->reader) return d->reader->bufferContains('\n'); 0486 } 0487 return true; 0488 } 0489 0490 bool KDPipeIODevice::isSequential() const 0491 { 0492 return true; 0493 } 0494 0495 bool KDPipeIODevice::atEnd() const 0496 { 0497 KDAB_CHECK_THIS; 0498 d->startReaderThread(); 0499 if (!QIODevice::atEnd()) { 0500 QDebug("%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", 0501 (void *)this, 0502 static_cast<long>(bytesAvailable())); 0503 return false; 0504 } 0505 if (!isOpen()) { 0506 return true; 0507 } 0508 if (d->reader->eofShortCut) { 0509 return true; 0510 } 0511 LOCKED(d->reader); 0512 const bool eof = (d->reader->error || d->reader->eof) && d->reader->bufferEmpty(); 0513 if (!eof) { 0514 if (!d->reader->error && !d->reader->eof) { 0515 QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", (void *)(this)); 0516 } 0517 if (!d->reader->bufferEmpty()) { 0518 QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", (void *)this); 0519 } 0520 } 0521 return eof; 0522 } 0523 0524 bool KDPipeIODevice::waitForBytesWritten(int msecs) 0525 { 0526 KDAB_CHECK_THIS; 0527 d->startWriterThread(); 0528 Writer *const w = d->writer; 0529 if (!w) { 0530 return true; 0531 } 0532 LOCKED(w); 0533 QDebug("KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", (void *)this, (void *)w); 0534 return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait(&w->mutex, msecs); 0535 } 0536 0537 bool KDPipeIODevice::waitForReadyRead(int msecs) 0538 { 0539 KDAB_CHECK_THIS; 0540 QDebug("KDPipeIODEvice::waitForReadyRead()(%p)", (void *)this); 0541 d->startReaderThread(); 0542 if (ALLOW_QIODEVICE_BUFFERING) { 0543 if (bytesAvailable() > 0) { 0544 return true; 0545 } 0546 } 0547 Reader *const r = d->reader; 0548 if (!r || r->eofShortCut) { 0549 return true; 0550 } 0551 LOCKED(r); 0552 if (r->bytesInBuffer() != 0 || r->eof || r->error) { 0553 return true; 0554 } 0555 Q_ASSERT(false); // ### wtf? 0556 return r->bufferNotEmptyCondition.wait(&r->mutex, msecs); 0557 } 0558 0559 template<typename T> 0560 class TemporaryValue 0561 { 0562 public: 0563 TemporaryValue(T &var_, const T &tv) 0564 : var(var_) 0565 , oldValue(var_) 0566 { 0567 var = tv; 0568 } 0569 ~TemporaryValue() 0570 { 0571 var = oldValue; 0572 } 0573 0574 private: 0575 T &var; 0576 const T oldValue; 0577 }; 0578 0579 bool KDPipeIODevice::readWouldBlock() const 0580 { 0581 d->startReaderThread(); 0582 LOCKED(d->reader); 0583 return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error; 0584 } 0585 0586 bool KDPipeIODevice::writeWouldBlock() const 0587 { 0588 d->startWriterThread(); 0589 LOCKED(d->writer); 0590 return !d->writer->bufferEmpty() && !d->writer->error; 0591 } 0592 0593 qint64 KDPipeIODevice::readData(char *data, qint64 maxSize) 0594 { 0595 KDAB_CHECK_THIS; 0596 QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld", (void *)this, data, maxSize); 0597 d->startReaderThread(); 0598 Reader *const r = d->reader; 0599 0600 Q_ASSERT(r); 0601 0602 // assert( r->isRunning() ); // wrong (might be eof, error) 0603 Q_ASSERT(data || maxSize == 0); 0604 Q_ASSERT(maxSize >= 0); 0605 0606 if (r->eofShortCut) { 0607 QDebug("%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", (void *)this); 0608 return 0; 0609 } 0610 0611 if (maxSize < 0) { 0612 maxSize = 0; 0613 } 0614 0615 if (ALLOW_QIODEVICE_BUFFERING) { 0616 if (bytesAvailable() > 0) { 0617 maxSize = std::min(maxSize, bytesAvailable()); // don't block 0618 } 0619 } 0620 QDebug("%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", (void *)this); 0621 LOCKED(r); 0622 QDebug("%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", (void *)this); 0623 0624 r->readyReadSentCondition.wakeAll(); 0625 if (/* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof) { // ### block on maxSize == 0? 0626 QDebug("%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", (void *)this); 0627 const TemporaryValue<bool> tmp(d->reader->consumerBlocksOnUs, true); 0628 r->bufferNotEmptyCondition.wait(&r->mutex); 0629 r->blockedConsumerIsDoneCondition.wakeAll(); 0630 QDebug("%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", (void *)this); 0631 } 0632 0633 if (r->bufferEmpty()) { 0634 QDebug("%p: KDPipeIODevice::readData: got empty buffer, signal eof", (void *)this); 0635 // woken with an empty buffer must mean either EOF or error: 0636 Q_ASSERT(r->eof || r->error); 0637 r->eofShortCut = true; 0638 return r->eof ? 0 : -1; 0639 } 0640 0641 QDebug("%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", (void *)this, maxSize); 0642 const qint64 bytesRead = r->readData(data, maxSize); 0643 QDebug("%p: KDPipeIODevice::readData: read %lld bytes", (void *)this, bytesRead); 0644 QDebug("%p (fd=%d): KDPipeIODevice::readData: %s", (void *)this, d->fd, data); 0645 0646 return bytesRead; 0647 } 0648 0649 qint64 Reader::readData(char *data, qint64 maxSize) 0650 { 0651 qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr; 0652 if (numRead > maxSize) { 0653 numRead = maxSize; 0654 } 0655 0656 QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", 0657 (void *)this, 0658 data, 0659 maxSize, 0660 rptr, 0661 wptr, 0662 bytesInBuffer(), 0663 numRead); 0664 0665 memcpy(data, buffer + rptr, numRead); 0666 0667 rptr = (rptr + numRead) % sizeof buffer; 0668 0669 if (!bufferFull()) { 0670 QDebug("%p: KDPipeIODevice::readData: signal bufferNotFullCondition", (void *)this); 0671 bufferNotFullCondition.wakeAll(); 0672 } 0673 0674 return numRead; 0675 } 0676 0677 qint64 KDPipeIODevice::writeData(const char *data, qint64 size) 0678 { 0679 KDAB_CHECK_THIS; 0680 d->startWriterThread(); 0681 Writer *const w = d->writer; 0682 0683 Q_ASSERT(w); 0684 Q_ASSERT(w->error || w->isRunning()); 0685 Q_ASSERT(data || size == 0); 0686 Q_ASSERT(size >= 0); 0687 0688 LOCKED(w); 0689 0690 while (!w->error && !w->bufferEmpty()) { 0691 QDebug("%p: KDPipeIODevice::writeData: wait for empty buffer", (void *)this); 0692 w->bufferEmptyCondition.wait(&w->mutex); 0693 QDebug("%p: KDPipeIODevice::writeData: empty buffer signaled", (void *)this); 0694 } 0695 if (w->error) { 0696 return -1; 0697 } 0698 0699 Q_ASSERT(w->bufferEmpty()); 0700 0701 return w->writeData(data, size); 0702 } 0703 0704 qint64 Writer::writeData(const char *data, qint64 size) 0705 { 0706 Q_ASSERT(bufferEmpty()); 0707 0708 if (size > static_cast<qint64>(sizeof buffer)) { 0709 size = sizeof buffer; 0710 } 0711 0712 memcpy(buffer, data, size); 0713 0714 numBytesInBuffer = size; 0715 0716 if (!bufferEmpty()) { 0717 bufferNotEmptyCondition.wakeAll(); 0718 } 0719 return size; 0720 } 0721 0722 void KDPipeIODevice::Private::stopThreads() 0723 { 0724 if (triedToStartWriter) { 0725 if (writer && q->bytesToWrite() > 0) { 0726 q->waitForBytesWritten(-1); 0727 } 0728 0729 Q_ASSERT(q->bytesToWrite() == 0); 0730 } 0731 if (Reader *&r = reader) { 0732 disconnect(r, &Reader::readyRead, this, &Private::emitReadyRead); 0733 synchronized(r) 0734 { 0735 // tell thread to cancel: 0736 r->cancel = true; 0737 // and wake it, so it can terminate: 0738 r->waitForCancelCondition.wakeAll(); 0739 r->bufferNotFullCondition.wakeAll(); 0740 r->readyReadSentCondition.wakeAll(); 0741 } 0742 } 0743 if (Writer *&w = writer) { 0744 synchronized(w) 0745 { 0746 // tell thread to cancel: 0747 w->cancel = true; 0748 // and wake it, so it can terminate: 0749 w->bufferNotEmptyCondition.wakeAll(); 0750 } 0751 } 0752 } 0753 0754 void KDPipeIODevice::close() 0755 { 0756 KDAB_CHECK_THIS; 0757 QDebug("KDPipeIODevice::close(%p)", (void *)this); 0758 if (!isOpen()) { 0759 return; 0760 } 0761 0762 // tell clients we're about to close: 0763 Q_EMIT aboutToClose(); 0764 d->stopThreads(); 0765 0766 #define waitAndDelete(t) \ 0767 if (t) { \ 0768 t->wait(); \ 0769 QThread *const t2 = t; \ 0770 t = nullptr; \ 0771 delete t2; \ 0772 } 0773 QDebug("KPipeIODevice::close(%p): wait and closing writer %p", (void *)this, (void *)d->writer); 0774 waitAndDelete(d->writer); 0775 QDebug("KPipeIODevice::close(%p): wait and closing reader %p", (void *)this, (void *)d->reader); 0776 if (d->reader) { 0777 LOCKED(d->reader); 0778 d->reader->readyReadSentCondition.wakeAll(); 0779 } 0780 waitAndDelete(d->reader); 0781 #undef waitAndDelete 0782 #ifdef Q_OS_WIN 0783 if (d->fd != -1) { 0784 _close(d->fd); 0785 } else { 0786 CloseHandle(d->handle); 0787 } 0788 #else 0789 ::close(d->fd); 0790 #endif 0791 0792 setOpenMode(NotOpen); 0793 d->fd = -1; 0794 d->handle = nullptr; 0795 } 0796 0797 void Reader::run() 0798 { 0799 LOCKED(this); 0800 0801 // too bad QThread doesn't have that itself; a signal isn't enough 0802 hasStarted.wakeAll(); 0803 0804 QDebug("%p: Reader::run: started", (void *)this); 0805 0806 while (true) { 0807 if (!cancel && (eof || error)) { 0808 // notify the client until the buffer is empty and then once 0809 // again so he receives eof/error. After that, wait for him 0810 // to cancel 0811 const bool wasEmpty = bufferEmpty(); 0812 QDebug("%p: Reader::run: received eof(%d) or error(%d), waking everyone", (void *)this, eof, error); 0813 notifyReadyRead(); 0814 if (!cancel && wasEmpty) { 0815 waitForCancelCondition.wait(&mutex); 0816 } 0817 } else if (!cancel && !bufferFull() && !bufferEmpty()) { 0818 QDebug("%p: Reader::run: buffer no longer empty, waking everyone", (void *)this); 0819 notifyReadyRead(); 0820 } 0821 0822 while (!cancel && !error && bufferFull()) { 0823 notifyReadyRead(); 0824 if (!cancel && bufferFull()) { 0825 QDebug("%p: Reader::run: buffer is full, going to sleep", (void *)this); 0826 bufferNotFullCondition.wait(&mutex); 0827 } 0828 } 0829 0830 if (cancel) { 0831 QDebug("%p: Reader::run: detected cancel", (void *)this); 0832 goto leave; 0833 } 0834 0835 if (!eof && !error) { 0836 if (rptr == wptr) { // optimize for larger chunks in case the buffer is empty 0837 rptr = wptr = 0; 0838 } 0839 0840 unsigned int numBytes = (rptr + sizeof buffer - wptr - 1) % sizeof buffer; 0841 if (numBytes > sizeof buffer - wptr) { 0842 numBytes = sizeof buffer - wptr; 0843 } 0844 0845 QDebug("%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", (void *)this, rptr, wptr, numBytes); 0846 0847 Q_ASSERT(numBytes > 0); 0848 0849 QDebug("%p: Reader::run: trying to read %d bytes from fd %d", (void *)this, numBytes, fd); 0850 #ifdef Q_OS_WIN 0851 isReading = true; 0852 mutex.unlock(); 0853 DWORD numRead; 0854 const bool ok = ReadFile(handle, buffer + wptr, numBytes, &numRead, nullptr); 0855 mutex.lock(); 0856 isReading = false; 0857 if (ok) { 0858 if (numRead == 0) { 0859 QDebug("%p: Reader::run: got eof (numRead==0)", (void *)this); 0860 eof = true; 0861 } 0862 } else { // !ok 0863 errorCode = static_cast<int>(GetLastError()); 0864 if (errorCode == ERROR_BROKEN_PIPE) { 0865 Q_ASSERT(numRead == 0); 0866 QDebug("%p: Reader::run: got eof (broken pipe)", (void *)this); 0867 eof = true; 0868 } else { 0869 Q_ASSERT(numRead == 0); 0870 QDebug("%p: Reader::run: got error: %s (%d)", (void *)this, strerror(errorCode), errorCode); 0871 error = true; 0872 } 0873 } 0874 #else 0875 qint64 numRead; 0876 mutex.unlock(); 0877 do { 0878 numRead = ::read(fd, buffer + wptr, numBytes); 0879 } while (numRead == -1 && errno == EINTR); 0880 mutex.lock(); 0881 0882 if (numRead < 0) { 0883 errorCode = errno; 0884 error = true; 0885 QDebug("%p: Reader::run: got error: %d", (void *)this, errorCode); 0886 } else if (numRead == 0) { 0887 QDebug("%p: Reader::run: eof detected", (void *)this); 0888 eof = true; 0889 } 0890 #endif 0891 QDebug("%p (fd=%d): Reader::run: read %ld bytes", (void *)this, fd, static_cast<long>(numRead)); 0892 QDebug("%p (fd=%d): Reader::run: %s", (void *)this, fd, buffer); 0893 0894 if (numRead > 0) { 0895 QDebug("%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr); 0896 wptr = (wptr + numRead) % sizeof buffer; 0897 QDebug("%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr); 0898 } 0899 } 0900 } 0901 leave: 0902 QDebug("%p: Reader::run: terminated", (void *)this); 0903 } 0904 0905 void Reader::notifyReadyRead() 0906 { 0907 QDebug("notifyReadyRead: %d bytes available", bytesInBuffer()); 0908 Q_ASSERT(!cancel); 0909 0910 if (consumerBlocksOnUs) { 0911 bufferNotEmptyCondition.wakeAll(); 0912 blockedConsumerIsDoneCondition.wait(&mutex); 0913 return; 0914 } 0915 QDebug("notifyReadyRead: Q_EMIT signal"); 0916 Q_EMIT readyRead(); 0917 readyReadSentCondition.wait(&mutex); 0918 QDebug("notifyReadyRead: returning from waiting, leave"); 0919 } 0920 0921 void Writer::run() 0922 { 0923 LOCKED(this); 0924 0925 // too bad QThread doesn't have that itself; a signal isn't enough 0926 hasStarted.wakeAll(); 0927 0928 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: started"; 0929 0930 while (true) { 0931 while (!cancel && bufferEmpty()) { 0932 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners"; 0933 bufferEmptyCondition.wakeAll(); 0934 Q_EMIT bytesWritten(0); 0935 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, going to sleep"; 0936 bufferNotEmptyCondition.wait(&mutex); 0937 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: woke up"; 0938 } 0939 0940 if (cancel) { 0941 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: detected cancel"; 0942 goto leave; 0943 } 0944 0945 Q_ASSERT(numBytesInBuffer > 0); 0946 0947 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes"; 0948 qint64 totalWritten = 0; 0949 do { 0950 mutex.unlock(); 0951 #ifdef Q_OS_WIN 0952 DWORD numWritten; 0953 QDebug("%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer); 0954 QDebug("%p (fd=%d): Writer::run: Going into WriteFile", (void *)this, fd); 0955 if (!WriteFile(handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, nullptr)) { 0956 mutex.lock(); 0957 errorCode = static_cast<int>(GetLastError()); 0958 QDebug("%p: Writer::run: got error code: %d", (void *)this, errorCode); 0959 error = true; 0960 goto leave; 0961 } 0962 #else 0963 qint64 numWritten; 0964 do { 0965 numWritten = ::write(fd, buffer + totalWritten, numBytesInBuffer - totalWritten); 0966 } while (numWritten == -1 && errno == EINTR); 0967 0968 if (numWritten < 0) { 0969 mutex.lock(); 0970 errorCode = errno; 0971 QDebug("%p: Writer::run: got error code: %s (%d)", (void *)this, strerror(errorCode), errorCode); 0972 error = true; 0973 goto leave; 0974 } 0975 #endif 0976 QDebug("%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer); 0977 totalWritten += numWritten; 0978 mutex.lock(); 0979 } while (totalWritten < numBytesInBuffer); 0980 0981 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: wrote " << totalWritten << "bytes"; 0982 numBytesInBuffer = 0; 0983 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners"; 0984 bufferEmptyCondition.wakeAll(); 0985 Q_EMIT bytesWritten(totalWritten); 0986 } 0987 leave: 0988 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: terminating"; 0989 numBytesInBuffer = 0; 0990 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners"; 0991 bufferEmptyCondition.wakeAll(); 0992 Q_EMIT bytesWritten(0); 0993 } 0994 0995 // static 0996 std::pair<KDPipeIODevice *, KDPipeIODevice *> KDPipeIODevice::makePairOfConnectedPipes() 0997 { 0998 KDPipeIODevice *read = nullptr; 0999 KDPipeIODevice *write = nullptr; 1000 #ifdef Q_OS_WIN 1001 HANDLE rh; 1002 HANDLE wh; 1003 SECURITY_ATTRIBUTES sa; 1004 memset(&sa, 0, sizeof(sa)); 1005 sa.nLength = sizeof(sa); 1006 sa.bInheritHandle = TRUE; 1007 if (CreatePipe(&rh, &wh, &sa, BUFFER_SIZE)) { 1008 read = new KDPipeIODevice; 1009 read->open(rh, ReadOnly); 1010 write = new KDPipeIODevice; 1011 write->open(wh, WriteOnly); 1012 } 1013 #else 1014 int fds[2]; 1015 if (pipe(fds) == 0) { 1016 read = new KDPipeIODevice; 1017 read->open(fds[0], ReadOnly); 1018 write = new KDPipeIODevice; 1019 write->open(fds[1], WriteOnly); 1020 } 1021 #endif 1022 return std::make_pair(read, write); 1023 } 1024 1025 #ifdef KDAB_DEFINE_CHECKS 1026 KDAB_DEFINE_CHECKS(KDPipeIODevice) 1027 { 1028 if (!isOpen()) { 1029 Q_ASSERT(openMode() == NotOpen); 1030 Q_ASSERT(!d->reader); 1031 Q_ASSERT(!d->writer); 1032 #ifdef Q_OS_WIN 1033 Q_ASSERT(!d->handle); 1034 #else 1035 Q_ASSERT(d->fd < 0); 1036 #endif 1037 } else { 1038 Q_ASSERT(openMode() != NotOpen); 1039 Q_ASSERT(openMode() & ReadWrite); 1040 if (openMode() & ReadOnly) { 1041 Q_ASSERT(d->reader); 1042 synchronized(d->reader) Q_ASSERT(d->reader->eof || d->reader->error || d->reader->isRunning()); 1043 } 1044 if (openMode() & WriteOnly) { 1045 Q_ASSERT(d->writer); 1046 synchronized(d->writer) Q_ASSERT(d->writer->error || d->writer->isRunning()); 1047 } 1048 #ifdef Q_OS_WIN 1049 Q_ASSERT(d->handle); 1050 #else 1051 Q_ASSERT(d->fd >= 0); 1052 #endif 1053 } 1054 } 1055 #endif // KDAB_DEFINE_CHECKS 1056 1057 #include "kdpipeiodevice.moc" 1058 1059 #include "moc_kdpipeiodevice.cpp"