File indexing completed on 2024-04-21 04:01:27

0001 /* -*- C++ -*-
0002     This file implements the Job class.
0003 
0004     SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 
0008     $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $
0009 */
0010 
0011 #include "job.h"
0012 #include "job_p.h"
0013 
0014 #include "debuggingaids.h"
0015 #include "thread.h"
0016 #include <QAtomicInt>
0017 #include <QAtomicPointer>
0018 #include <QList>
0019 #include <QMutex>
0020 
0021 #include "dependencypolicy.h"
0022 #include "exception.h"
0023 #include "executewrapper_p.h"
0024 #include "executor_p.h"
0025 #include "managedjobpointer.h"
0026 #include "queuepolicy.h"
0027 
0028 namespace ThreadWeaver
0029 {
0030 Job::Job()
0031     : d_(new Private::Job_Private())
0032 {
0033 #if !defined(NDEBUG)
0034     d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
0035 #endif
0036     d()->status.storeRelease(Status_New);
0037 }
0038 
0039 Job::Job(Private::Job_Private *d__)
0040     : d_(d__)
0041 {
0042 #if !defined(NDEBUG)
0043     d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
0044 #endif
0045     d()->status.storeRelease(Status_New);
0046 }
0047 
0048 Job::~Job()
0049 {
0050     for (int index = 0; index < d()->queuePolicies.size(); ++index) {
0051         d()->queuePolicies.at(index)->destructed(this);
0052     }
0053     delete d_;
0054 }
0055 
0056 void Job::execute(const JobPointer &self, Thread *th)
0057 {
0058     Executor *executor = d()->executor.loadAcquire();
0059     Q_ASSERT(executor); // may never be unset!
0060     Q_ASSERT(self);
0061     executor->begin(self, th);
0062     self->setStatus(Status_Running);
0063     try {
0064         executor->execute(self, th);
0065         if (self->status() == Status_Running) {
0066             self->setStatus(Status_Success);
0067         }
0068     } catch (JobAborted &) {
0069         self->setStatus(Status_Aborted);
0070     } catch (JobFailed &) {
0071         self->setStatus(Status_Failed);
0072     } catch (AbortThread &) {
0073         throw;
0074     } catch (...) {
0075         TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data());
0076         throw;
0077     }
0078     Q_ASSERT(self->status() > Status_Running);
0079     executor->end(self, th);
0080 }
0081 
0082 void Job::blockingExecute()
0083 {
0084     execute(ManagedJobPointer<Job>(this), nullptr);
0085 }
0086 
0087 Executor *Job::setExecutor(Executor *executor)
0088 {
0089     return d()->executor.fetchAndStoreOrdered(executor == nullptr ? &Private::defaultExecutor : executor);
0090 }
0091 
0092 Executor *Job::executor() const
0093 {
0094     return d()->executor.loadAcquire();
0095 }
0096 
0097 int Job::priority() const
0098 {
0099     return 0;
0100 }
0101 
0102 void Job::setStatus(JobInterface::Status status)
0103 {
0104     d()->status.storeRelease(status);
0105 }
0106 
0107 JobInterface::Status Job::status() const
0108 {
0109     // since status is set only through setStatus, this should be safe:
0110     return static_cast<Status>(d()->status.loadAcquire());
0111 }
0112 
0113 bool Job::success() const
0114 {
0115     return d()->status.loadAcquire() == Status_Success;
0116 }
0117 
0118 void Job::requestAbort()
0119 {
0120     d()->shouldAbort = true;
0121 }
0122 
0123 void Job::defaultBegin(const JobPointer &, Thread *)
0124 {
0125 }
0126 
0127 void Job::defaultEnd(const JobPointer &job, Thread *)
0128 {
0129     d()->handleFinish(job);
0130     d()->freeQueuePolicyResources(job);
0131 }
0132 
0133 void Job::aboutToBeQueued(QueueAPI *api)
0134 {
0135     QMutexLocker l(mutex());
0136     Q_UNUSED(l);
0137     aboutToBeQueued_locked(api);
0138 }
0139 
0140 void Job::aboutToBeQueued_locked(QueueAPI *)
0141 {
0142 }
0143 
0144 void Job::aboutToBeDequeued(QueueAPI *api)
0145 {
0146     QMutexLocker l(mutex());
0147     Q_UNUSED(l);
0148     aboutToBeDequeued_locked(api);
0149 }
0150 
0151 void Job::aboutToBeDequeued_locked(QueueAPI *)
0152 {
0153 }
0154 
0155 void Job::assignQueuePolicy(QueuePolicy *policy)
0156 {
0157     Q_ASSERT(!mutex()->tryLock());
0158     if (!d()->queuePolicies.contains(policy)) {
0159         d()->queuePolicies.append(policy);
0160     }
0161 }
0162 
0163 void Job::removeQueuePolicy(QueuePolicy *policy)
0164 {
0165     Q_ASSERT(!mutex()->tryLock());
0166     int index = d()->queuePolicies.indexOf(policy);
0167     if (index != -1) {
0168         d()->queuePolicies.removeAt(index);
0169     }
0170 }
0171 
0172 QList<QueuePolicy *> Job::queuePolicies() const
0173 {
0174     Q_ASSERT(!mutex()->tryLock());
0175     return d()->queuePolicies;
0176 }
0177 
0178 Private::Job_Private *Job::d()
0179 {
0180     return d_;
0181 }
0182 
0183 const Private::Job_Private *Job::d() const
0184 {
0185     return d_;
0186 }
0187 
0188 bool Job::isFinished() const
0189 {
0190     const Status s = status();
0191     return s == Status_Success || s == Status_Failed || s == Status_Aborted;
0192 }
0193 
0194 QMutex *Job::mutex() const
0195 {
0196     return &(d()->mutex);
0197 }
0198 
0199 bool Job::shouldAbort() const
0200 {
0201     return d()->shouldAbort;
0202 }
0203 
0204 void Job::onFinish(const std::function<void(const JobInterface &job)> &lambda)
0205 {
0206     QMutexLocker l(mutex());
0207     d()->finishHandlers << lambda;
0208 }
0209 
0210 }
0211 
0212 #include "managedjobpointer.h"