File indexing completed on 2024-04-28 04:01:24

0001 /* -*- C++ -*-
0002     This file implements the WeaverImpl class.
0003 
0004     SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 
0008     $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
0009 */
0010 
0011 #include "weaver.h"
0012 
0013 #include <QCoreApplication>
0014 #include <QDebug>
0015 #include <QMutex>
0016 #include <QDeadlineTimer>
0017 #include "debuggingaids.h"
0018 #include "destructedstate.h"
0019 #include "exception.h"
0020 #include "inconstructionstate.h"
0021 #include "job.h"
0022 #include "managedjobpointer.h"
0023 #include "queuepolicy.h"
0024 #include "shuttingdownstate.h"
0025 #include "state.h"
0026 #include "suspendedstate.h"
0027 #include "suspendingstate.h"
0028 #include "thread.h"
0029 #include "threadweaver.h"
0030 #include "weaver_p.h"
0031 #include "workinghardstate.h"
0032 
0033 using namespace ThreadWeaver;
0034 
0035 /** @brief Constructs a Weaver object. */
0036 Weaver::Weaver(QObject *parent)
0037     : QueueAPI(new Private::Weaver_Private(), parent)
0038 {
0039     qRegisterMetaType<ThreadWeaver::JobPointer>("ThreadWeaver::JobPointer");
0040     QMutexLocker l(d()->mutex);
0041     Q_UNUSED(l);
0042     // initialize state objects:
0043     d()->states[InConstruction] = QSharedPointer<State>(new InConstructionState(this));
0044     setState_p(InConstruction);
0045     d()->states[WorkingHard] = QSharedPointer<State>(new WorkingHardState(this));
0046     d()->states[Suspending] = QSharedPointer<State>(new SuspendingState(this));
0047     d()->states[Suspended] = QSharedPointer<State>(new SuspendedState(this));
0048     d()->states[ShuttingDown] = QSharedPointer<State>(new ShuttingDownState(this));
0049     d()->states[Destructed] = QSharedPointer<State>(new DestructedState(this));
0050 
0051     setState_p(WorkingHard);
0052 }
0053 
0054 /** @brief Destructs a Weaver object. */
0055 Weaver::~Weaver()
0056 {
0057     Q_ASSERT_X(state()->stateId() == Destructed, Q_FUNC_INFO, "shutDown() method was not called before Weaver destructor!");
0058 }
0059 
0060 /** @brief Enter Destructed state.
0061  *
0062  * Once this method returns, it is save to delete this object.
0063  */
0064 void Weaver::shutDown()
0065 {
0066     state()->shutDown();
0067 }
0068 
0069 void Weaver::shutDown_p()
0070 {
0071     // the constructor may only be called from the thread that owns this
0072     // object (everything else would be what we professionals call "insane")
0073 
0074     REQUIRE(QThread::currentThread() == thread());
0075     TWDEBUG(3, "WeaverImpl::shutDown: destroying inventory.\n");
0076     d()->semaphore.acquire(d()->createdThreads.loadAcquire());
0077     finish();
0078     suspend();
0079     setState(ShuttingDown);
0080     reschedule();
0081     d()->jobFinished.wakeAll();
0082 
0083     // problem: Some threads might not be asleep yet, just finding
0084     // out if a job is available. Those threads will suspend
0085     // waiting for their next job (a rare case, but not impossible).
0086     // Therefore, if we encounter a thread that has not exited, we
0087     // have to wake it again (which we do in the following for
0088     // loop).
0089 
0090     for (;;) {
0091         Thread *th = nullptr;
0092         {
0093             QMutexLocker l(d()->mutex);
0094             Q_UNUSED(l);
0095             if (d()->inventory.isEmpty()) {
0096                 break;
0097             }
0098             th = d()->inventory.takeFirst();
0099         }
0100         if (!th->isFinished()) {
0101             for (;;) {
0102                 Q_ASSERT(state()->stateId() == ShuttingDown);
0103                 reschedule();
0104                 if (th->wait(100)) {
0105                     break;
0106                 }
0107                 TWDEBUG(1,
0108                         "WeaverImpl::shutDown: thread %i did not exit as expected, "
0109                         "retrying.\n",
0110                         th->id());
0111             }
0112         }
0113         Q_EMIT(threadExited(th));
0114         delete th;
0115     }
0116     Q_ASSERT(d()->inventory.isEmpty());
0117     TWDEBUG(3, "WeaverImpl::shutDown: done\n");
0118     setState(Destructed); // Destructed ignores all calls into the queue API
0119 }
0120 
0121 /** @brief Set the Weaver state.
0122  * @see StateId
0123  * @see WeaverImplState
0124  * @see State
0125  */
0126 void Weaver::setState(StateId id)
0127 {
0128     QMutexLocker l(d()->mutex);
0129     Q_UNUSED(l);
0130     setState_p(id);
0131 }
0132 
0133 void Weaver::setState_p(StateId id)
0134 {
0135     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0136     State *newState = d()->states[id].data();
0137     State *previous = d()->state.fetchAndStoreOrdered(newState);
0138     if (previous == nullptr || previous->stateId() != id) {
0139         newState->activated();
0140         TWDEBUG(2, "WeaverImpl::setState: state changed to \"%s\".\n", newState->stateName().toLatin1().constData());
0141         if (id == Suspended) {
0142             Q_EMIT(suspended());
0143         }
0144         Q_EMIT(stateChanged(newState));
0145     }
0146 }
0147 
0148 const State *Weaver::state() const
0149 {
0150     return d()->state.loadAcquire();
0151 }
0152 
0153 State *Weaver::state()
0154 {
0155     return d()->state.loadAcquire();
0156 }
0157 
0158 void Weaver::setMaximumNumberOfThreads(int cap)
0159 {
0160     Q_ASSERT_X(cap >= 0, "Weaver Impl", "Thread inventory size has to be larger than or equal to zero.");
0161     QMutexLocker l(d()->mutex);
0162     Q_UNUSED(l);
0163     state()->setMaximumNumberOfThreads(cap);
0164     reschedule();
0165 }
0166 
0167 void Weaver::setMaximumNumberOfThreads_p(int cap)
0168 {
0169     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0170     const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0);
0171     d()->inventoryMax = cap;
0172     if (createInitialThread) {
0173         adjustInventory(1);
0174     }
0175 }
0176 
0177 int Weaver::maximumNumberOfThreads() const
0178 {
0179     QMutexLocker l(d()->mutex);
0180     Q_UNUSED(l);
0181     return state()->maximumNumberOfThreads();
0182 }
0183 
0184 int Weaver::maximumNumberOfThreads_p() const
0185 {
0186     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0187     return d()->inventoryMax;
0188 }
0189 
0190 int Weaver::currentNumberOfThreads() const
0191 {
0192     QMutexLocker l(d()->mutex);
0193     Q_UNUSED(l);
0194     return state()->currentNumberOfThreads();
0195 }
0196 
0197 int Weaver::currentNumberOfThreads_p() const
0198 {
0199     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0200     return d()->inventory.count();
0201 }
0202 
0203 void Weaver::enqueue(const QList<JobPointer> &jobs)
0204 {
0205     QMutexLocker l(d()->mutex);
0206     Q_UNUSED(l);
0207     state()->enqueue(jobs);
0208 }
0209 
0210 void Weaver::enqueue_p(const QList<JobPointer> &jobs)
0211 {
0212     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0213     if (jobs.isEmpty()) {
0214         return;
0215     }
0216     for (const JobPointer &job : jobs) {
0217         if (job) {
0218             Q_ASSERT(job->status() == Job::Status_New);
0219             adjustInventory(jobs.size());
0220             TWDEBUG(3, "WeaverImpl::enqueue: queueing job %p.\n", (void *)job.data());
0221             job->aboutToBeQueued(this);
0222             // find position for insertion:
0223             int i = d()->assignments.size();
0224             if (i > 0) {
0225                 while (i > 0 && d()->assignments.at(i - 1)->priority() < job->priority()) {
0226                     --i;
0227                 }
0228                 d()->assignments.insert(i, job);
0229             } else {
0230                 d()->assignments.append(job);
0231             }
0232             job->setStatus(Job::Status_Queued);
0233             reschedule();
0234         }
0235     }
0236 }
0237 
0238 bool Weaver::dequeue(const JobPointer &job)
0239 {
0240     QMutexLocker l(d()->mutex);
0241     Q_UNUSED(l);
0242     return state()->dequeue(job);
0243 }
0244 
0245 bool Weaver::dequeue_p(JobPointer job)
0246 {
0247     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0248     int position = d()->assignments.indexOf(job);
0249     if (position != -1) {
0250         job->aboutToBeDequeued(this);
0251         int newPosition = d()->assignments.indexOf(job);
0252         JobPointer job = d()->assignments.takeAt(newPosition);
0253         job->setStatus(Job::Status_New);
0254         Q_ASSERT(!d()->assignments.contains(job));
0255         TWDEBUG(3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", (void *)job.data(), queueLength_p());
0256         // from the queues point of view, a job is just as finished if it gets dequeued:
0257         d()->jobFinished.wakeAll();
0258         Q_ASSERT(!d()->assignments.contains(job));
0259         return true;
0260     } else {
0261         TWDEBUG(3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void *)job.data());
0262         return false;
0263     }
0264 }
0265 
0266 void Weaver::dequeue()
0267 {
0268     QMutexLocker l(d()->mutex);
0269     Q_UNUSED(l);
0270     state()->dequeue();
0271 }
0272 
0273 void Weaver::dequeue_p()
0274 {
0275     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0276     TWDEBUG(3, "WeaverImpl::dequeue: dequeueing all jobs.\n");
0277     for (int index = 0; index < d()->assignments.size(); ++index) {
0278         d()->assignments.at(index)->aboutToBeDequeued(this);
0279     }
0280     d()->assignments.clear();
0281     ENSURE(d()->assignments.isEmpty());
0282 }
0283 
0284 void Weaver::finish()
0285 {
0286     QMutexLocker l(d()->mutex);
0287     Q_UNUSED(l);
0288     state()->finish();
0289 }
0290 
0291 void Weaver::finish_p()
0292 {
0293     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0294 #ifdef QT_NO_DEBUG
0295     const int MaxWaitMilliSeconds = 50;
0296 #else
0297     const int MaxWaitMilliSeconds = 500;
0298 #endif
0299     while (!isIdle_p()) {
0300         Q_ASSERT_X(state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(state()->stateName()));
0301         TWDEBUG(2, "WeaverImpl::finish: not done, waiting.\n");
0302         if (d()->jobFinished.wait(d()->mutex, QDeadlineTimer(MaxWaitMilliSeconds)) == false) {
0303             TWDEBUG(2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", queueLength_p());
0304             reschedule();
0305         }
0306     }
0307     TWDEBUG(2, "WeaverImpl::finish: done.\n\n\n");
0308 }
0309 
0310 void Weaver::suspend()
0311 {
0312     // FIXME?
0313     // QMutexLocker l(m_mutex); Q_UNUSED(l);
0314     state()->suspend();
0315 }
0316 
0317 void Weaver::suspend_p()
0318 {
0319     // FIXME ?
0320 }
0321 
0322 void Weaver::resume()
0323 {
0324     // FIXME?
0325     // QMutexLocker l(m_mutex); Q_UNUSED(l);
0326     state()->resume();
0327 }
0328 
0329 void Weaver::resume_p()
0330 {
0331     // FIXME ?
0332 }
0333 
0334 bool Weaver::isEmpty() const
0335 {
0336     QMutexLocker l(d()->mutex);
0337     Q_UNUSED(l);
0338     return state()->isEmpty();
0339 }
0340 
0341 bool Weaver::isEmpty_p() const
0342 {
0343     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0344     return d()->assignments.isEmpty();
0345 }
0346 
0347 bool Weaver::isIdle() const
0348 {
0349     QMutexLocker l(d()->mutex);
0350     Q_UNUSED(l);
0351     return state()->isIdle();
0352 }
0353 
0354 bool Weaver::isIdle_p() const
0355 {
0356     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0357     return isEmpty_p() && d()->active == 0;
0358 }
0359 
0360 int Weaver::queueLength() const
0361 {
0362     QMutexLocker l(d()->mutex);
0363     Q_UNUSED(l);
0364     return state()->queueLength();
0365 }
0366 
0367 int Weaver::queueLength_p() const
0368 {
0369     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0370     return d()->assignments.count();
0371 }
0372 
0373 void Weaver::requestAbort()
0374 {
0375     QMutexLocker l(d()->mutex);
0376     Q_UNUSED(l);
0377     return state()->requestAbort();
0378 }
0379 
0380 void Weaver::reschedule()
0381 {
0382     d()->jobAvailable.wakeAll();
0383 }
0384 
0385 void Weaver::requestAbort_p()
0386 {
0387     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0388     for (int i = 0; i < d()->inventory.size(); ++i) {
0389         d()->inventory[i]->requestAbort();
0390     }
0391 }
0392 
0393 /** @brief Adjust the inventory size.
0394  *
0395  * Requires that the mutex is being held when called.
0396  *
0397  * This method creates threads on demand. Threads in the inventory
0398  * are not created upon construction of the WeaverImpl object, but
0399  * when jobs are queued. This avoids costly delays on the application
0400  * startup time. Threads are created when the inventory size is under
0401  * inventoryMin and new jobs are queued.
0402  */
0403 void Weaver::adjustInventory(int numberOfNewJobs)
0404 {
0405     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0406     // number of threads that can be created:
0407     const int reserve = d()->inventoryMax - d()->inventory.count();
0408 
0409     if (reserve > 0) {
0410         for (int i = 0; i < qMin(reserve, numberOfNewJobs); ++i) {
0411             Thread *th = createThread();
0412             th->moveToThread(th); // be sane from the start
0413             d()->inventory.append(th);
0414             th->start();
0415             d()->createdThreads.ref();
0416             TWDEBUG(2,
0417                     "WeaverImpl::adjustInventory: thread created, "
0418                     "%i threads in inventory.\n",
0419                     currentNumberOfThreads_p());
0420         }
0421     }
0422 }
0423 
0424 Private::Weaver_Private *Weaver::d()
0425 {
0426     return reinterpret_cast<Private::Weaver_Private *>(QueueSignals::d());
0427 }
0428 
0429 const Private::Weaver_Private *Weaver::d() const
0430 {
0431     return reinterpret_cast<const Private::Weaver_Private *>(QueueSignals::d());
0432 }
0433 
0434 /** @brief Factory method to create the threads.
0435  *
0436  * Overload in adapted Weaver implementations.
0437  */
0438 Thread *Weaver::createThread()
0439 {
0440     return new Thread(this);
0441 }
0442 
0443 /** @brief Increment the count of active threads. */
0444 void Weaver::incActiveThreadCount()
0445 {
0446     adjustActiveThreadCount(1);
0447 }
0448 
0449 /** brief Decrement the count of active threads. */
0450 void Weaver::decActiveThreadCount()
0451 {
0452     adjustActiveThreadCount(-1);
0453     // the done job could have freed another set of jobs, and we do not know how
0454     // many - therefore we need to wake all threads:
0455     d()->jobFinished.wakeAll();
0456 }
0457 
0458 /** @brief Adjust active thread count.
0459  *
0460  * This is a helper function for incActiveThreadCount and decActiveThreadCount.
0461  */
0462 void Weaver::adjustActiveThreadCount(int diff)
0463 {
0464     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0465     d()->active += diff;
0466     TWDEBUG(4,
0467             "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
0468             " in queue).\n",
0469             d()->active,
0470             queueLength_p());
0471 
0472     if (d()->assignments.isEmpty() && d()->active == 0) {
0473         P_ASSERT(diff < 0); // cannot reach zero otherwise
0474         Q_EMIT(finished());
0475     }
0476 }
0477 
0478 /** @brief Returns the number of active threads.
0479  *
0480  * Threads are active if they process a job. Requires that the mutex is being held when called.
0481  */
0482 int Weaver::activeThreadCount()
0483 {
0484     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0485     return d()->active;
0486 }
0487 
0488 /** @brief Called from a new thread when entering the run method. */
0489 void Weaver::threadEnteredRun(Thread *thread)
0490 {
0491     d()->semaphore.release(1);
0492     Q_EMIT threadStarted(thread);
0493 }
0494 
0495 /** @brief Take the first available job out of the queue and return it.
0496  *
0497  * The job will be removed from the queue (therefore, take). Only jobs that have no unresolved dependencies
0498  * are considered available. If only jobs that depend on other unfinished jobs are in the queue, this method
0499  * blocks on m_jobAvailable.
0500  *
0501  * This method will enter suspended state if the active thread count is now zero and
0502  * suspendIfAllThreadsInactive is true.
0503  * If justReturning is true, do not assign a new job, just process the completed previous one.
0504  */
0505 JobPointer Weaver::takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfInactive, bool justReturning)
0506 {
0507     QMutexLocker l(d()->mutex);
0508     Q_UNUSED(l);
0509     Q_ASSERT(threadWasBusy == false || (threadWasBusy == true && d()->active > 0));
0510     TWDEBUG(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n", th->id(), qPrintable(state()->stateName()));
0511     TWDEBUG(5,
0512             "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n",
0513             activeThreadCount(),
0514             threadWasBusy ? "yes" : "no",
0515             suspendIfInactive ? "yes" : "no",
0516             !justReturning ? "yes" : "no");
0517     d()->deleteExpiredThreads();
0518     adjustInventory(1);
0519 
0520     if (threadWasBusy) {
0521         // cleanup and send events:
0522         decActiveThreadCount();
0523     }
0524     Q_ASSERT(d()->active >= 0);
0525 
0526     if (suspendIfInactive && d()->active == 0 && state()->stateId() == Suspending) {
0527         setState_p(Suspended);
0528         return JobPointer();
0529     }
0530 
0531     if (state()->stateId() != WorkingHard || justReturning) {
0532         return JobPointer();
0533     }
0534 
0535     if (state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) {
0536         const int count = d()->inventory.removeAll(th);
0537         Q_ASSERT(count == 1);
0538         d()->expiredThreads.append(th);
0539         throw AbortThread(QStringLiteral("Inventory size exceeded"));
0540     }
0541 
0542     JobPointer next;
0543     for (int index = 0; index < d()->assignments.size(); ++index) {
0544         const JobPointer &candidate = d()->assignments.at(index);
0545         if (d()->canBeExecuted(candidate)) {
0546             next = candidate;
0547             d()->assignments.removeAt(index);
0548             break;
0549         }
0550     }
0551     if (next) {
0552         incActiveThreadCount();
0553         TWDEBUG(3,
0554                 "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n",
0555                 next.data(),
0556                 th->id(),
0557                 qPrintable(state()->stateName()));
0558         return next;
0559     }
0560 
0561     blockThreadUntilJobsAreBeingAssigned_locked(th);
0562     return JobPointer();
0563 }
0564 
0565 /** @brief Assign a job to the calling thread.
0566  *
0567  * This is supposed to be called from the Thread objects in the inventory. Do not call this method from
0568  * your code.
0569  * Returns 0 if the weaver is shutting down, telling the calling thread to finish and exit. If no jobs are
0570  * available and shut down is not in progress, the calling thread is suspended until either condition is met.
0571  * @param wasBusy True if the thread is returning from processing a job
0572  */
0573 JobPointer Weaver::applyForWork(Thread *th, bool wasBusy)
0574 {
0575     return state()->applyForWork(th, wasBusy);
0576 }
0577 
0578 /** @brief Wait for a job to become available. */
0579 void Weaver::waitForAvailableJob(Thread *th)
0580 {
0581     state()->waitForAvailableJob(th);
0582 }
0583 
0584 /** @brief Blocks the calling thread until jobs can be assigned. */
0585 void Weaver::blockThreadUntilJobsAreBeingAssigned(Thread *th)
0586 {
0587     QMutexLocker l(d()->mutex);
0588     Q_UNUSED(l);
0589     blockThreadUntilJobsAreBeingAssigned_locked(th);
0590 }
0591 
0592 /** @brief Blocks the calling thread until jobs can be assigned.
0593  *
0594  * The mutex must be held when calling this method.
0595  */
0596 void Weaver::blockThreadUntilJobsAreBeingAssigned_locked(Thread *th)
0597 {
0598     Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
0599     TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n", th->id(), qPrintable(state()->stateName()));
0600     Q_EMIT threadSuspended(th);
0601     d()->jobAvailable.wait(d()->mutex);
0602     TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed  (%s state).\n", th->id(), qPrintable(state()->stateName()));
0603 }
0604 
0605 #include "moc_weaver.cpp"