File indexing completed on 2024-04-21 04:01:27
0001 /* -*- C++ -*- 0002 This file implements the Job class. 0003 0004 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 0008 $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $ 0009 */ 0010 0011 #include "job.h" 0012 #include "job_p.h" 0013 0014 #include "debuggingaids.h" 0015 #include "thread.h" 0016 #include <QAtomicInt> 0017 #include <QAtomicPointer> 0018 #include <QList> 0019 #include <QMutex> 0020 0021 #include "dependencypolicy.h" 0022 #include "exception.h" 0023 #include "executewrapper_p.h" 0024 #include "executor_p.h" 0025 #include "managedjobpointer.h" 0026 #include "queuepolicy.h" 0027 0028 namespace ThreadWeaver 0029 { 0030 Job::Job() 0031 : d_(new Private::Job_Private()) 0032 { 0033 #if !defined(NDEBUG) 0034 d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper))); 0035 #endif 0036 d()->status.storeRelease(Status_New); 0037 } 0038 0039 Job::Job(Private::Job_Private *d__) 0040 : d_(d__) 0041 { 0042 #if !defined(NDEBUG) 0043 d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper))); 0044 #endif 0045 d()->status.storeRelease(Status_New); 0046 } 0047 0048 Job::~Job() 0049 { 0050 for (int index = 0; index < d()->queuePolicies.size(); ++index) { 0051 d()->queuePolicies.at(index)->destructed(this); 0052 } 0053 delete d_; 0054 } 0055 0056 void Job::execute(const JobPointer &self, Thread *th) 0057 { 0058 Executor *executor = d()->executor.loadAcquire(); 0059 Q_ASSERT(executor); // may never be unset! 0060 Q_ASSERT(self); 0061 executor->begin(self, th); 0062 self->setStatus(Status_Running); 0063 try { 0064 executor->execute(self, th); 0065 if (self->status() == Status_Running) { 0066 self->setStatus(Status_Success); 0067 } 0068 } catch (JobAborted &) { 0069 self->setStatus(Status_Aborted); 0070 } catch (JobFailed &) { 0071 self->setStatus(Status_Failed); 0072 } catch (AbortThread &) { 0073 throw; 0074 } catch (...) { 0075 TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data()); 0076 throw; 0077 } 0078 Q_ASSERT(self->status() > Status_Running); 0079 executor->end(self, th); 0080 } 0081 0082 void Job::blockingExecute() 0083 { 0084 execute(ManagedJobPointer<Job>(this), nullptr); 0085 } 0086 0087 Executor *Job::setExecutor(Executor *executor) 0088 { 0089 return d()->executor.fetchAndStoreOrdered(executor == nullptr ? &Private::defaultExecutor : executor); 0090 } 0091 0092 Executor *Job::executor() const 0093 { 0094 return d()->executor.loadAcquire(); 0095 } 0096 0097 int Job::priority() const 0098 { 0099 return 0; 0100 } 0101 0102 void Job::setStatus(JobInterface::Status status) 0103 { 0104 d()->status.storeRelease(status); 0105 } 0106 0107 JobInterface::Status Job::status() const 0108 { 0109 // since status is set only through setStatus, this should be safe: 0110 return static_cast<Status>(d()->status.loadAcquire()); 0111 } 0112 0113 bool Job::success() const 0114 { 0115 return d()->status.loadAcquire() == Status_Success; 0116 } 0117 0118 void Job::requestAbort() 0119 { 0120 d()->shouldAbort = true; 0121 } 0122 0123 void Job::defaultBegin(const JobPointer &, Thread *) 0124 { 0125 } 0126 0127 void Job::defaultEnd(const JobPointer &job, Thread *) 0128 { 0129 d()->handleFinish(job); 0130 d()->freeQueuePolicyResources(job); 0131 } 0132 0133 void Job::aboutToBeQueued(QueueAPI *api) 0134 { 0135 QMutexLocker l(mutex()); 0136 Q_UNUSED(l); 0137 aboutToBeQueued_locked(api); 0138 } 0139 0140 void Job::aboutToBeQueued_locked(QueueAPI *) 0141 { 0142 } 0143 0144 void Job::aboutToBeDequeued(QueueAPI *api) 0145 { 0146 QMutexLocker l(mutex()); 0147 Q_UNUSED(l); 0148 aboutToBeDequeued_locked(api); 0149 } 0150 0151 void Job::aboutToBeDequeued_locked(QueueAPI *) 0152 { 0153 } 0154 0155 void Job::assignQueuePolicy(QueuePolicy *policy) 0156 { 0157 Q_ASSERT(!mutex()->tryLock()); 0158 if (!d()->queuePolicies.contains(policy)) { 0159 d()->queuePolicies.append(policy); 0160 } 0161 } 0162 0163 void Job::removeQueuePolicy(QueuePolicy *policy) 0164 { 0165 Q_ASSERT(!mutex()->tryLock()); 0166 int index = d()->queuePolicies.indexOf(policy); 0167 if (index != -1) { 0168 d()->queuePolicies.removeAt(index); 0169 } 0170 } 0171 0172 QList<QueuePolicy *> Job::queuePolicies() const 0173 { 0174 Q_ASSERT(!mutex()->tryLock()); 0175 return d()->queuePolicies; 0176 } 0177 0178 Private::Job_Private *Job::d() 0179 { 0180 return d_; 0181 } 0182 0183 const Private::Job_Private *Job::d() const 0184 { 0185 return d_; 0186 } 0187 0188 bool Job::isFinished() const 0189 { 0190 const Status s = status(); 0191 return s == Status_Success || s == Status_Failed || s == Status_Aborted; 0192 } 0193 0194 QMutex *Job::mutex() const 0195 { 0196 return &(d()->mutex); 0197 } 0198 0199 bool Job::shouldAbort() const 0200 { 0201 return d()->shouldAbort; 0202 } 0203 0204 void Job::onFinish(const std::function<void(const JobInterface &job)> &lambda) 0205 { 0206 QMutexLocker l(mutex()); 0207 d()->finishHandlers << lambda; 0208 } 0209 0210 } 0211 0212 #include "managedjobpointer.h"