File indexing completed on 2024-05-12 07:57:47
0001 /* -*- C++ -*- 0002 This file is part of ThreadWeaver. 0003 0004 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include "collection_p.h" 0010 #include "collection.h" 0011 #include "debuggingaids.h" 0012 #include "managedjobpointer.h" 0013 #include "queueapi.h" 0014 0015 namespace ThreadWeaver 0016 { 0017 namespace Private 0018 { 0019 Collection_Private::Collection_Private() 0020 : api(nullptr) 0021 , jobCounter(0) 0022 , selfIsExecuting(false) 0023 { 0024 } 0025 0026 Collection_Private::~Collection_Private() 0027 { 0028 } 0029 0030 void Collection_Private::finalCleanup(Collection *collection) 0031 { 0032 Q_ASSERT(!self.isNull()); 0033 Q_ASSERT(!mutex.tryLock()); 0034 if (collection->shouldAbort()) { 0035 collection->setStatus(Job::Status_Aborted); 0036 } else if (collection->status() < Job::Status_Success) { 0037 collection->setStatus(Job::Status_Success); 0038 } else { 0039 // At this point we either should have been running 0040 // in which case above would mark Success 0041 // or otherwise we already should be in Failed or Aborted state 0042 Q_ASSERT(collection->status() == Job::Status_Failed || collection->status() == Job::Status_Aborted); 0043 } 0044 freeQueuePolicyResources(self); 0045 api = nullptr; 0046 } 0047 0048 void Collection_Private::enqueueElements() 0049 { 0050 Q_ASSERT(!mutex.tryLock()); 0051 prepareToEnqueueElements(); 0052 jobCounter.fetchAndStoreOrdered(elements.count() + 1); // including self 0053 api->enqueue(elements); 0054 } 0055 0056 void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *) 0057 { 0058 QMutexLocker l(&mutex); 0059 Q_UNUSED(l); 0060 Q_UNUSED(job) // except in Q_ASSERT 0061 Q_ASSERT(!self.isNull()); 0062 if (jobsStarted.fetchAndAddOrdered(1) == 0) { 0063 // emit started() signal on beginning of first job execution 0064 selfExecuteWrapper.callBegin(); 0065 } 0066 } 0067 0068 namespace 0069 { 0070 struct MutexUnlocker { 0071 QMutexLocker<QMutex> *locker; 0072 0073 MutexUnlocker(QMutexLocker<QMutex> *l) 0074 : locker(l) 0075 { 0076 locker->unlock(); 0077 } 0078 ~MutexUnlocker() 0079 { 0080 locker->relock(); 0081 } 0082 MutexUnlocker(const MutexUnlocker &) = delete; 0083 MutexUnlocker &operator=(const MutexUnlocker &) = delete; 0084 }; 0085 } 0086 0087 void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread) 0088 { 0089 JobPointer saveYourSelf = self; 0090 Q_UNUSED(saveYourSelf); 0091 QMutexLocker l(&mutex); 0092 Q_UNUSED(l); 0093 Q_ASSERT(!self.isNull()); 0094 Q_UNUSED(job) // except in Q_ASSERT 0095 if (selfIsExecuting) { 0096 // the element that is finished is the collection itself 0097 // the collection is always executed first 0098 // No need to queue elements if we were aborted 0099 if (!collection->shouldAbort()) { 0100 // queue the collection elements: 0101 enqueueElements(); 0102 } 0103 selfIsExecuting = false; 0104 } 0105 const int started = jobsStarted.loadAcquire(); 0106 Q_ASSERT(started >= 0); 0107 Q_UNUSED(started); 0108 processCompletedElement(collection, job, thread); 0109 const int remainingJobs = jobCounter.fetchAndAddOrdered(-1) - 1; 0110 TWDEBUG(4, "Collection_Private::elementFinished: %i\n", remainingJobs); 0111 if (remainingJobs <= -1) { 0112 // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime 0113 } else { 0114 Q_ASSERT(remainingJobs >= 0); 0115 if (remainingJobs == 0) { 0116 // all elements can only be done if self has been executed: 0117 // there is a small chance that (this) has been dequeued in the 0118 // meantime, in this case, there is nothing left to clean up 0119 finalCleanup(collection); 0120 { 0121 MutexUnlocker u(&l); 0122 Q_UNUSED(u); 0123 selfExecuteWrapper.callEnd(); 0124 } 0125 self.clear(); 0126 } 0127 } 0128 } 0129 0130 void Collection_Private::prepareToEnqueueElements() 0131 { 0132 // empty in Collection 0133 } 0134 0135 JobInterface::Status Collection_Private::updateStatus(Collection *collection, JobPointer job) 0136 { 0137 // Keep our collection status in running until all jobs have finished 0138 // but make failures sticky so on first failed job we keep it counting as failure 0139 auto newStatus = Job::Status_Running; 0140 const auto jobStatus = job->status(); 0141 if (jobStatus != JobInterface::Status_Success) { 0142 newStatus = jobStatus; 0143 } 0144 collection->setStatus(newStatus); 0145 return newStatus; 0146 } 0147 0148 void Collection_Private::processCompletedElement(Collection *collection, JobPointer job, Thread *) 0149 { 0150 updateStatus(collection, job); 0151 } 0152 0153 void Collection_Private::stop(Collection *collection) 0154 { 0155 QMutexLocker l(&mutex); 0156 if (api != nullptr) { 0157 // We can't dequeue ourselves while locked because we will 0158 // get deadlock when our own aboutToBeDequeued will be invoked 0159 // which will try to acquire this same lock 0160 // and we need our own `api` because `finalCleanup` can be invoked in between 0161 auto currentApi = api; 0162 l.unlock(); 0163 TWDEBUG(4, "Collection::stop: dequeueing %p.\n", collection); 0164 if (!currentApi->dequeue(ManagedJobPointer<Collection>(collection))) { 0165 l.relock(); 0166 dequeueElements(collection, false); 0167 } 0168 } 0169 } 0170 0171 void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked) 0172 { 0173 // dequeue everything: 0174 Q_ASSERT(!mutex.tryLock()); 0175 if (api == nullptr) { 0176 return; // not queued 0177 } 0178 0179 for (const auto &job : elements) { 0180 bool result; 0181 if (queueApiIsLocked) { 0182 result = api->dequeue_p(job); 0183 } else { 0184 result = api->dequeue(job); 0185 } 0186 if (result) { 0187 jobCounter.fetchAndAddOrdered(-1); 0188 } 0189 TWDEBUG(3, 0190 "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n", 0191 (void *)job.data(), 0192 result ? "found" : "not found", 0193 jobCounter.loadAcquire()); 0194 elementDequeued(job); 0195 } 0196 0197 if (jobCounter.loadAcquire() == 1) { 0198 finalCleanup(collection); 0199 } 0200 } 0201 0202 void Collection_Private::requestAbort(Collection *collection) 0203 { 0204 stop(collection); 0205 QMutexLocker l(&mutex); 0206 for (auto job = elements.begin(); job != elements.end(); ++job) { 0207 if ((*job)->status() <= JobInterface::Status_Running) { 0208 (*job)->requestAbort(); 0209 } 0210 } 0211 } 0212 0213 void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread) 0214 { 0215 job_ = job; 0216 thread_ = thread; 0217 } 0218 0219 void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread) 0220 { 0221 Q_ASSERT(job_ == job && thread_ == thread); 0222 Q_UNUSED(job); 0223 Q_UNUSED(thread); // except in assert 0224 } 0225 0226 void CollectionSelfExecuteWrapper::callBegin() 0227 { 0228 ExecuteWrapper::begin(job_, thread_); 0229 } 0230 0231 void CollectionSelfExecuteWrapper::callEnd() 0232 { 0233 ExecuteWrapper::end(job_, thread_); 0234 job_.clear(); 0235 } 0236 0237 } 0238 0239 }