File indexing completed on 2024-04-28 15:34:51

0001 /* -*- C++ -*-
0002     This file implements the Job class.
0003 
0004     SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 
0008     $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $
0009 */
0010 
0011 #include "job.h"
0012 #include "job_p.h"
0013 
0014 #include "debuggingaids.h"
0015 #include "thread.h"
0016 #include <QAtomicInt>
0017 #include <QAtomicPointer>
0018 #include <QList>
0019 #include <QMutex>
0020 
0021 #include "dependencypolicy.h"
0022 #include "exception.h"
0023 #include "executewrapper_p.h"
0024 #include "executor_p.h"
0025 #include "managedjobpointer.h"
0026 #include "queuepolicy.h"
0027 
0028 namespace ThreadWeaver
0029 {
0030 Job::Job()
0031     : d_(new Private::Job_Private())
0032 {
0033 #if !defined(NDEBUG)
0034     d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
0035 #endif
0036     d()->status.storeRelease(Status_New);
0037 }
0038 
0039 Job::Job(Private::Job_Private *d__)
0040     : d_(d__)
0041 {
0042 #if !defined(NDEBUG)
0043     d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
0044 #endif
0045     d()->status.storeRelease(Status_New);
0046 }
0047 
0048 Job::~Job()
0049 {
0050     for (int index = 0; index < d()->queuePolicies.size(); ++index) {
0051         d()->queuePolicies.at(index)->destructed(this);
0052     }
0053     delete d_;
0054 }
0055 
0056 void Job::execute(const JobPointer &self, Thread *th)
0057 {
0058     Executor *executor = d()->executor.loadAcquire();
0059     Q_ASSERT(executor); // may never be unset!
0060     Q_ASSERT(self);
0061     executor->begin(self, th);
0062     self->setStatus(Status_Running);
0063     try {
0064         executor->execute(self, th);
0065         if (self->status() == Status_Running) {
0066             self->setStatus(Status_Success);
0067         }
0068     } catch (JobAborted &) {
0069         self->setStatus(Status_Aborted);
0070     } catch (JobFailed &) {
0071         self->setStatus(Status_Failed);
0072     } catch (AbortThread &) {
0073         throw;
0074     } catch (...) {
0075         TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data());
0076         throw;
0077     }
0078     Q_ASSERT(self->status() > Status_Running);
0079     executor->end(self, th);
0080     executor->cleanup(self, th);
0081 }
0082 
0083 void Job::blockingExecute()
0084 {
0085     execute(ManagedJobPointer<Job>(this), nullptr);
0086 }
0087 
0088 Executor *Job::setExecutor(Executor *executor)
0089 {
0090     return d()->executor.fetchAndStoreOrdered(executor == nullptr ? &Private::defaultExecutor : executor);
0091 }
0092 
0093 Executor *Job::executor() const
0094 {
0095     return d()->executor.loadAcquire();
0096 }
0097 
0098 int Job::priority() const
0099 {
0100     return 0;
0101 }
0102 
0103 void Job::setStatus(JobInterface::Status status)
0104 {
0105     d()->status.storeRelease(status);
0106 }
0107 
0108 JobInterface::Status Job::status() const
0109 {
0110     // since status is set only through setStatus, this should be safe:
0111     return static_cast<Status>(d()->status.loadAcquire());
0112 }
0113 
0114 bool Job::success() const
0115 {
0116     return d()->status.loadAcquire() == Status_Success;
0117 }
0118 
0119 void Job::defaultBegin(const JobPointer &, Thread *)
0120 {
0121 }
0122 
0123 void Job::defaultEnd(const JobPointer &job, Thread *)
0124 {
0125     d()->freeQueuePolicyResources(job);
0126 }
0127 
0128 void Job::aboutToBeQueued(QueueAPI *api)
0129 {
0130     QMutexLocker l(mutex());
0131     Q_UNUSED(l);
0132     aboutToBeQueued_locked(api);
0133 }
0134 
0135 void Job::aboutToBeQueued_locked(QueueAPI *)
0136 {
0137 }
0138 
0139 void Job::aboutToBeDequeued(QueueAPI *api)
0140 {
0141     QMutexLocker l(mutex());
0142     Q_UNUSED(l);
0143     aboutToBeDequeued_locked(api);
0144 }
0145 
0146 void Job::aboutToBeDequeued_locked(QueueAPI *)
0147 {
0148 }
0149 
0150 void Job::assignQueuePolicy(QueuePolicy *policy)
0151 {
0152     Q_ASSERT(!mutex()->tryLock());
0153     if (!d()->queuePolicies.contains(policy)) {
0154         d()->queuePolicies.append(policy);
0155     }
0156 }
0157 
0158 void Job::removeQueuePolicy(QueuePolicy *policy)
0159 {
0160     Q_ASSERT(!mutex()->tryLock());
0161     int index = d()->queuePolicies.indexOf(policy);
0162     if (index != -1) {
0163         d()->queuePolicies.removeAt(index);
0164     }
0165 }
0166 
0167 QList<QueuePolicy *> Job::queuePolicies() const
0168 {
0169     Q_ASSERT(!mutex()->tryLock());
0170     return d()->queuePolicies;
0171 }
0172 
0173 Private::Job_Private *Job::d()
0174 {
0175     return d_;
0176 }
0177 
0178 const Private::Job_Private *Job::d() const
0179 {
0180     return d_;
0181 }
0182 
0183 bool Job::isFinished() const
0184 {
0185     const Status s = status();
0186     return s == Status_Success || s == Status_Failed || s == Status_Aborted;
0187 }
0188 
0189 QMutex *Job::mutex() const
0190 {
0191     return &(d()->mutex);
0192 }
0193 
0194 }
0195 
0196 #include "managedjobpointer.h"