File indexing completed on 2024-04-28 15:34:50
0001 /* -*- C++ -*- 0002 This file is part of ThreadWeaver. 0003 0004 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include "collection_p.h" 0010 #include "collection.h" 0011 #include "debuggingaids.h" 0012 #include "managedjobpointer.h" 0013 #include "queueapi.h" 0014 0015 namespace ThreadWeaver 0016 { 0017 namespace Private 0018 { 0019 Collection_Private::Collection_Private() 0020 : api(nullptr) 0021 , jobCounter(0) 0022 , selfIsExecuting(false) 0023 { 0024 } 0025 0026 Collection_Private::~Collection_Private() 0027 { 0028 } 0029 0030 void Collection_Private::finalCleanup(Collection *collection) 0031 { 0032 Q_ASSERT(!self.isNull()); 0033 Q_ASSERT(!mutex.tryLock()); 0034 freeQueuePolicyResources(self); 0035 if (collection->status() < Job::Status_Success) { 0036 collection->setStatus(Job::Status_Success); 0037 } 0038 api = nullptr; 0039 } 0040 0041 void Collection_Private::enqueueElements() 0042 { 0043 Q_ASSERT(!mutex.tryLock()); 0044 prepareToEnqueueElements(); 0045 jobCounter.fetchAndStoreOrdered(elements.count() + 1); // including self 0046 api->enqueue(elements); 0047 } 0048 0049 void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *) 0050 { 0051 QMutexLocker l(&mutex); 0052 Q_UNUSED(l); 0053 Q_UNUSED(job) // except in Q_ASSERT 0054 Q_ASSERT(!self.isNull()); 0055 if (jobsStarted.fetchAndAddOrdered(1) == 0) { 0056 // emit started() signal on beginning of first job execution 0057 selfExecuteWrapper.callBegin(); 0058 } 0059 } 0060 0061 namespace 0062 { 0063 struct MutexUnlocker { 0064 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0) 0065 QMutexLocker<QMutex> *locker; 0066 #else 0067 QMutexLocker *locker; 0068 #endif 0069 0070 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0) 0071 MutexUnlocker(QMutexLocker<QMutex> *l) 0072 #else 0073 MutexUnlocker(QMutexLocker *l) 0074 #endif 0075 : locker(l) 0076 { 0077 locker->unlock(); 0078 } 0079 ~MutexUnlocker() 0080 { 0081 locker->relock(); 0082 } 0083 MutexUnlocker(const MutexUnlocker &) = delete; 0084 MutexUnlocker &operator=(const MutexUnlocker &) = delete; 0085 }; 0086 } 0087 0088 void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread) 0089 { 0090 JobPointer saveYourSelf = self; 0091 Q_UNUSED(saveYourSelf); 0092 QMutexLocker l(&mutex); 0093 Q_UNUSED(l); 0094 Q_ASSERT(!self.isNull()); 0095 Q_UNUSED(job) // except in Q_ASSERT 0096 if (selfIsExecuting) { 0097 // the element that is finished is the collection itself 0098 // the collection is always executed first 0099 // queue the collection elements: 0100 enqueueElements(); 0101 selfIsExecuting = false; 0102 } 0103 const int started = jobsStarted.loadAcquire(); 0104 Q_ASSERT(started >= 0); 0105 Q_UNUSED(started); 0106 processCompletedElement(collection, job, thread); 0107 const int remainingJobs = jobCounter.fetchAndAddOrdered(-1) - 1; 0108 TWDEBUG(4, "Collection_Private::elementFinished: %i\n", remainingJobs); 0109 if (remainingJobs <= -1) { 0110 // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime 0111 } else { 0112 Q_ASSERT(remainingJobs >= 0); 0113 if (remainingJobs == 0) { 0114 // all elements can only be done if self has been executed: 0115 // there is a small chance that (this) has been dequeued in the 0116 // meantime, in this case, there is nothing left to clean up 0117 finalCleanup(collection); 0118 { 0119 MutexUnlocker u(&l); 0120 Q_UNUSED(u); 0121 selfExecuteWrapper.callEnd(); 0122 } 0123 self.clear(); 0124 } 0125 } 0126 } 0127 0128 void Collection_Private::prepareToEnqueueElements() 0129 { 0130 // empty in Collection 0131 } 0132 0133 void Collection_Private::processCompletedElement(Collection *, JobPointer, Thread *) 0134 { 0135 // empty in Collection 0136 } 0137 0138 void Collection_Private::stop_locked(Collection *collection) 0139 { 0140 Q_ASSERT(!mutex.tryLock()); 0141 if (api != nullptr) { 0142 TWDEBUG(4, "Collection::stop: dequeueing %p.\n", collection); 0143 if (!api->dequeue(ManagedJobPointer<Collection>(collection))) { 0144 dequeueElements(collection, false); 0145 } 0146 } 0147 } 0148 0149 void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked) 0150 { 0151 // dequeue everything: 0152 Q_ASSERT(!mutex.tryLock()); 0153 if (api == nullptr) { 0154 return; // not queued 0155 } 0156 0157 for (int index = 0; index < elements.size(); ++index) { 0158 bool result; 0159 if (queueApiIsLocked) { 0160 result = api->dequeue_p(elements.at(index)); 0161 } else { 0162 result = api->dequeue(elements.at(index)); 0163 } 0164 if (result) { 0165 jobCounter.fetchAndAddOrdered(-1); 0166 } 0167 TWDEBUG(3, 0168 "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n", 0169 (void *)elements.at(index).data(), 0170 result ? "found" : "not found", 0171 jobCounter.loadAcquire()); 0172 elementDequeued(elements.at(index)); 0173 } 0174 0175 if (jobCounter.loadAcquire() == 1) { 0176 finalCleanup(collection); 0177 } 0178 } 0179 0180 void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread) 0181 { 0182 job_ = job; 0183 thread_ = thread; 0184 } 0185 0186 void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread) 0187 { 0188 Q_ASSERT(job_ == job && thread_ == thread); 0189 Q_UNUSED(job); 0190 Q_UNUSED(thread); // except in assert 0191 } 0192 0193 void CollectionSelfExecuteWrapper::callBegin() 0194 { 0195 ExecuteWrapper::begin(job_, thread_); 0196 } 0197 0198 void CollectionSelfExecuteWrapper::callEnd() 0199 { 0200 ExecuteWrapper::end(job_, thread_); 0201 job_.clear(); 0202 } 0203 0204 } 0205 0206 }