File indexing completed on 2024-04-28 04:01:22

0001 /* -*- C++ -*-
0002     This file is part of ThreadWeaver.
0003 
0004     SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #include "collection_p.h"
0010 #include "collection.h"
0011 #include "debuggingaids.h"
0012 #include "managedjobpointer.h"
0013 #include "queueapi.h"
0014 
0015 namespace ThreadWeaver
0016 {
0017 namespace Private
0018 {
0019 Collection_Private::Collection_Private()
0020     : api(nullptr)
0021     , jobCounter(0)
0022     , selfIsExecuting(false)
0023 {
0024 }
0025 
0026 Collection_Private::~Collection_Private()
0027 {
0028 }
0029 
0030 void Collection_Private::finalCleanup(Collection *collection)
0031 {
0032     Q_ASSERT(!self.isNull());
0033     Q_ASSERT(!mutex.tryLock());
0034     if (collection->shouldAbort()) {
0035         collection->setStatus(Job::Status_Aborted);
0036     } else if (collection->status() < Job::Status_Success) {
0037         collection->setStatus(Job::Status_Success);
0038     } else {
0039         // At this point we either should have been running
0040         // in which case above would mark Success
0041         // or otherwise we already should be in Failed or Aborted state
0042         Q_ASSERT(collection->status() == Job::Status_Failed || collection->status() == Job::Status_Aborted);
0043     }
0044     freeQueuePolicyResources(self);
0045     api = nullptr;
0046 }
0047 
0048 void Collection_Private::enqueueElements()
0049 {
0050     Q_ASSERT(!mutex.tryLock());
0051     prepareToEnqueueElements();
0052     jobCounter.fetchAndStoreOrdered(elements.count() + 1); // including self
0053     api->enqueue(elements);
0054 }
0055 
0056 void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *)
0057 {
0058     QMutexLocker l(&mutex);
0059     Q_UNUSED(l);
0060     Q_UNUSED(job) // except in Q_ASSERT
0061     Q_ASSERT(!self.isNull());
0062     if (jobsStarted.fetchAndAddOrdered(1) == 0) {
0063         // emit started() signal on beginning of first job execution
0064         selfExecuteWrapper.callBegin();
0065     }
0066 }
0067 
0068 namespace
0069 {
0070 struct MutexUnlocker {
0071     QMutexLocker<QMutex> *locker;
0072 
0073     MutexUnlocker(QMutexLocker<QMutex> *l)
0074         : locker(l)
0075     {
0076         locker->unlock();
0077     }
0078     ~MutexUnlocker()
0079     {
0080         locker->relock();
0081     }
0082     MutexUnlocker(const MutexUnlocker &) = delete;
0083     MutexUnlocker &operator=(const MutexUnlocker &) = delete;
0084 };
0085 }
0086 
0087 void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread)
0088 {
0089     JobPointer saveYourSelf = self;
0090     Q_UNUSED(saveYourSelf);
0091     QMutexLocker l(&mutex);
0092     Q_UNUSED(l);
0093     Q_ASSERT(!self.isNull());
0094     Q_UNUSED(job) // except in Q_ASSERT
0095     if (selfIsExecuting) {
0096         // the element that is finished is the collection itself
0097         // the collection is always executed first
0098         // No need to queue elements if we were aborted
0099         if (!collection->shouldAbort()) {
0100             // queue the collection elements:
0101             enqueueElements();
0102         }
0103         selfIsExecuting = false;
0104     }
0105     const int started = jobsStarted.loadAcquire();
0106     Q_ASSERT(started >= 0);
0107     Q_UNUSED(started);
0108     processCompletedElement(collection, job, thread);
0109     const int remainingJobs = jobCounter.fetchAndAddOrdered(-1) - 1;
0110     TWDEBUG(4, "Collection_Private::elementFinished: %i\n", remainingJobs);
0111     if (remainingJobs <= -1) {
0112         // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime
0113     } else {
0114         Q_ASSERT(remainingJobs >= 0);
0115         if (remainingJobs == 0) {
0116             // all elements can only be done if self has been executed:
0117             // there is a small chance that (this) has been dequeued in the
0118             // meantime, in this case, there is nothing left to clean up
0119             finalCleanup(collection);
0120             {
0121                 MutexUnlocker u(&l);
0122                 Q_UNUSED(u);
0123                 selfExecuteWrapper.callEnd();
0124             }
0125             self.clear();
0126         }
0127     }
0128 }
0129 
0130 void Collection_Private::prepareToEnqueueElements()
0131 {
0132     // empty in Collection
0133 }
0134 
0135 JobInterface::Status Collection_Private::updateStatus(Collection *collection, JobPointer job)
0136 {
0137     // Keep our collection status in running until all jobs have finished
0138     // but make failures sticky so on first failed job we keep it counting as failure
0139     auto newStatus = Job::Status_Running;
0140     const auto jobStatus = job->status();
0141     if (jobStatus != JobInterface::Status_Success) {
0142         newStatus = jobStatus;
0143     }
0144     collection->setStatus(newStatus);
0145     return newStatus;
0146 }
0147 
0148 void Collection_Private::processCompletedElement(Collection *collection, JobPointer job, Thread *)
0149 {
0150     updateStatus(collection, job);
0151 }
0152 
0153 void Collection_Private::stop(Collection *collection)
0154 {
0155     QMutexLocker l(&mutex);
0156     if (api != nullptr) {
0157         // We can't dequeue ourselves while locked because we will
0158         // get deadlock when our own aboutToBeDequeued will be invoked
0159         // which will try to acquire this same lock
0160         // and we need our own `api` because `finalCleanup` can be invoked in between
0161         auto currentApi = api;
0162         l.unlock();
0163         TWDEBUG(4, "Collection::stop: dequeueing %p.\n", collection);
0164         if (!currentApi->dequeue(ManagedJobPointer<Collection>(collection))) {
0165             l.relock();
0166             dequeueElements(collection, false);
0167         }
0168     }
0169 }
0170 
0171 void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked)
0172 {
0173     // dequeue everything:
0174     Q_ASSERT(!mutex.tryLock());
0175     if (api == nullptr) {
0176         return; // not queued
0177     }
0178 
0179     for (const auto &job : elements) {
0180         bool result;
0181         if (queueApiIsLocked) {
0182             result = api->dequeue_p(job);
0183         } else {
0184             result = api->dequeue(job);
0185         }
0186         if (result) {
0187             jobCounter.fetchAndAddOrdered(-1);
0188         }
0189         TWDEBUG(3,
0190                 "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n",
0191                 (void *)job.data(),
0192                 result ? "found" : "not found",
0193                 jobCounter.loadAcquire());
0194         elementDequeued(job);
0195     }
0196 
0197     if (jobCounter.loadAcquire() == 1) {
0198         finalCleanup(collection);
0199     }
0200 }
0201 
0202 void Collection_Private::requestAbort(Collection *collection)
0203 {
0204     stop(collection);
0205     QMutexLocker l(&mutex);
0206     for (auto job = elements.begin(); job != elements.end(); ++job) {
0207         if ((*job)->status() <= JobInterface::Status_Running) {
0208             (*job)->requestAbort();
0209         }
0210     }
0211 }
0212 
0213 void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread)
0214 {
0215     job_ = job;
0216     thread_ = thread;
0217 }
0218 
0219 void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread)
0220 {
0221     Q_ASSERT(job_ == job && thread_ == thread);
0222     Q_UNUSED(job);
0223     Q_UNUSED(thread); // except in assert
0224 }
0225 
0226 void CollectionSelfExecuteWrapper::callBegin()
0227 {
0228     ExecuteWrapper::begin(job_, thread_);
0229 }
0230 
0231 void CollectionSelfExecuteWrapper::callEnd()
0232 {
0233     ExecuteWrapper::end(job_, thread_);
0234     job_.clear();
0235 }
0236 
0237 }
0238 
0239 }