File indexing completed on 2024-04-28 15:34:50

0001 /* -*- C++ -*-
0002     This file is part of ThreadWeaver.
0003 
0004     SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #include "collection_p.h"
0010 #include "collection.h"
0011 #include "debuggingaids.h"
0012 #include "managedjobpointer.h"
0013 #include "queueapi.h"
0014 
0015 namespace ThreadWeaver
0016 {
0017 namespace Private
0018 {
0019 Collection_Private::Collection_Private()
0020     : api(nullptr)
0021     , jobCounter(0)
0022     , selfIsExecuting(false)
0023 {
0024 }
0025 
0026 Collection_Private::~Collection_Private()
0027 {
0028 }
0029 
0030 void Collection_Private::finalCleanup(Collection *collection)
0031 {
0032     Q_ASSERT(!self.isNull());
0033     Q_ASSERT(!mutex.tryLock());
0034     freeQueuePolicyResources(self);
0035     if (collection->status() < Job::Status_Success) {
0036         collection->setStatus(Job::Status_Success);
0037     }
0038     api = nullptr;
0039 }
0040 
0041 void Collection_Private::enqueueElements()
0042 {
0043     Q_ASSERT(!mutex.tryLock());
0044     prepareToEnqueueElements();
0045     jobCounter.fetchAndStoreOrdered(elements.count() + 1); // including self
0046     api->enqueue(elements);
0047 }
0048 
0049 void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *)
0050 {
0051     QMutexLocker l(&mutex);
0052     Q_UNUSED(l);
0053     Q_UNUSED(job) // except in Q_ASSERT
0054     Q_ASSERT(!self.isNull());
0055     if (jobsStarted.fetchAndAddOrdered(1) == 0) {
0056         // emit started() signal on beginning of first job execution
0057         selfExecuteWrapper.callBegin();
0058     }
0059 }
0060 
0061 namespace
0062 {
0063 struct MutexUnlocker {
0064 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0)
0065     QMutexLocker<QMutex> *locker;
0066 #else
0067     QMutexLocker *locker;
0068 #endif
0069 
0070 #if QT_VERSION >= QT_VERSION_CHECK(6, 0, 0)
0071     MutexUnlocker(QMutexLocker<QMutex> *l)
0072 #else
0073     MutexUnlocker(QMutexLocker *l)
0074 #endif
0075         : locker(l)
0076     {
0077         locker->unlock();
0078     }
0079     ~MutexUnlocker()
0080     {
0081         locker->relock();
0082     }
0083     MutexUnlocker(const MutexUnlocker &) = delete;
0084     MutexUnlocker &operator=(const MutexUnlocker &) = delete;
0085 };
0086 }
0087 
0088 void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread)
0089 {
0090     JobPointer saveYourSelf = self;
0091     Q_UNUSED(saveYourSelf);
0092     QMutexLocker l(&mutex);
0093     Q_UNUSED(l);
0094     Q_ASSERT(!self.isNull());
0095     Q_UNUSED(job) // except in Q_ASSERT
0096     if (selfIsExecuting) {
0097         // the element that is finished is the collection itself
0098         // the collection is always executed first
0099         // queue the collection elements:
0100         enqueueElements();
0101         selfIsExecuting = false;
0102     }
0103     const int started = jobsStarted.loadAcquire();
0104     Q_ASSERT(started >= 0);
0105     Q_UNUSED(started);
0106     processCompletedElement(collection, job, thread);
0107     const int remainingJobs = jobCounter.fetchAndAddOrdered(-1) - 1;
0108     TWDEBUG(4, "Collection_Private::elementFinished: %i\n", remainingJobs);
0109     if (remainingJobs <= -1) {
0110         // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime
0111     } else {
0112         Q_ASSERT(remainingJobs >= 0);
0113         if (remainingJobs == 0) {
0114             // all elements can only be done if self has been executed:
0115             // there is a small chance that (this) has been dequeued in the
0116             // meantime, in this case, there is nothing left to clean up
0117             finalCleanup(collection);
0118             {
0119                 MutexUnlocker u(&l);
0120                 Q_UNUSED(u);
0121                 selfExecuteWrapper.callEnd();
0122             }
0123             self.clear();
0124         }
0125     }
0126 }
0127 
0128 void Collection_Private::prepareToEnqueueElements()
0129 {
0130     // empty in Collection
0131 }
0132 
0133 void Collection_Private::processCompletedElement(Collection *, JobPointer, Thread *)
0134 {
0135     // empty in Collection
0136 }
0137 
0138 void Collection_Private::stop_locked(Collection *collection)
0139 {
0140     Q_ASSERT(!mutex.tryLock());
0141     if (api != nullptr) {
0142         TWDEBUG(4, "Collection::stop: dequeueing %p.\n", collection);
0143         if (!api->dequeue(ManagedJobPointer<Collection>(collection))) {
0144             dequeueElements(collection, false);
0145         }
0146     }
0147 }
0148 
0149 void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked)
0150 {
0151     // dequeue everything:
0152     Q_ASSERT(!mutex.tryLock());
0153     if (api == nullptr) {
0154         return; // not queued
0155     }
0156 
0157     for (int index = 0; index < elements.size(); ++index) {
0158         bool result;
0159         if (queueApiIsLocked) {
0160             result = api->dequeue_p(elements.at(index));
0161         } else {
0162             result = api->dequeue(elements.at(index));
0163         }
0164         if (result) {
0165             jobCounter.fetchAndAddOrdered(-1);
0166         }
0167         TWDEBUG(3,
0168                 "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n",
0169                 (void *)elements.at(index).data(),
0170                 result ? "found" : "not found",
0171                 jobCounter.loadAcquire());
0172         elementDequeued(elements.at(index));
0173     }
0174 
0175     if (jobCounter.loadAcquire() == 1) {
0176         finalCleanup(collection);
0177     }
0178 }
0179 
0180 void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread)
0181 {
0182     job_ = job;
0183     thread_ = thread;
0184 }
0185 
0186 void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread)
0187 {
0188     Q_ASSERT(job_ == job && thread_ == thread);
0189     Q_UNUSED(job);
0190     Q_UNUSED(thread); // except in assert
0191 }
0192 
0193 void CollectionSelfExecuteWrapper::callBegin()
0194 {
0195     ExecuteWrapper::begin(job_, thread_);
0196 }
0197 
0198 void CollectionSelfExecuteWrapper::callEnd()
0199 {
0200     ExecuteWrapper::end(job_, thread_);
0201     job_.clear();
0202 }
0203 
0204 }
0205 
0206 }