File indexing completed on 2025-10-26 03:48:26
0001 /* -*- C++ -*- 0002 This file implements the WeaverImpl class. 0003 0004 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 0008 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $ 0009 */ 0010 0011 #include "weaver.h" 0012 0013 #include <QCoreApplication> 0014 #include <QDebug> 0015 #include <QMutex> 0016 #include <QDeadlineTimer> 0017 #include "debuggingaids.h" 0018 #include "destructedstate.h" 0019 #include "exception.h" 0020 #include "inconstructionstate.h" 0021 #include "job.h" 0022 #include "managedjobpointer.h" 0023 #include "queuepolicy.h" 0024 #include "shuttingdownstate.h" 0025 #include "state.h" 0026 #include "suspendedstate.h" 0027 #include "suspendingstate.h" 0028 #include "thread.h" 0029 #include "threadweaver.h" 0030 #include "weaver_p.h" 0031 #include "workinghardstate.h" 0032 0033 using namespace ThreadWeaver; 0034 0035 /** @brief Constructs a Weaver object. */ 0036 Weaver::Weaver(QObject *parent) 0037 : QueueAPI(new Private::Weaver_Private(), parent) 0038 { 0039 qRegisterMetaType<ThreadWeaver::JobPointer>("ThreadWeaver::JobPointer"); 0040 QMutexLocker l(d()->mutex); 0041 Q_UNUSED(l); 0042 // initialize state objects: 0043 d()->states[InConstruction] = QSharedPointer<State>(new InConstructionState(this)); 0044 setState_p(InConstruction); 0045 d()->states[WorkingHard] = QSharedPointer<State>(new WorkingHardState(this)); 0046 d()->states[Suspending] = QSharedPointer<State>(new SuspendingState(this)); 0047 d()->states[Suspended] = QSharedPointer<State>(new SuspendedState(this)); 0048 d()->states[ShuttingDown] = QSharedPointer<State>(new ShuttingDownState(this)); 0049 d()->states[Destructed] = QSharedPointer<State>(new DestructedState(this)); 0050 0051 setState_p(WorkingHard); 0052 } 0053 0054 /** @brief Destructs a Weaver object. */ 0055 Weaver::~Weaver() 0056 { 0057 Q_ASSERT_X(state()->stateId() == Destructed, Q_FUNC_INFO, "shutDown() method was not called before Weaver destructor!"); 0058 } 0059 0060 /** @brief Enter Destructed state. 0061 * 0062 * Once this method returns, it is save to delete this object. 0063 */ 0064 void Weaver::shutDown() 0065 { 0066 state()->shutDown(); 0067 } 0068 0069 void Weaver::shutDown_p() 0070 { 0071 // the constructor may only be called from the thread that owns this 0072 // object (everything else would be what we professionals call "insane") 0073 0074 REQUIRE(QThread::currentThread() == thread()); 0075 TWDEBUG(3, "WeaverImpl::shutDown: destroying inventory.\n"); 0076 d()->semaphore.acquire(d()->createdThreads.loadAcquire()); 0077 finish(); 0078 suspend(); 0079 setState(ShuttingDown); 0080 reschedule(); 0081 d()->jobFinished.wakeAll(); 0082 0083 // problem: Some threads might not be asleep yet, just finding 0084 // out if a job is available. Those threads will suspend 0085 // waiting for their next job (a rare case, but not impossible). 0086 // Therefore, if we encounter a thread that has not exited, we 0087 // have to wake it again (which we do in the following for 0088 // loop). 0089 0090 for (;;) { 0091 Thread *th = nullptr; 0092 { 0093 QMutexLocker l(d()->mutex); 0094 Q_UNUSED(l); 0095 if (d()->inventory.isEmpty()) { 0096 break; 0097 } 0098 th = d()->inventory.takeFirst(); 0099 } 0100 if (!th->isFinished()) { 0101 for (;;) { 0102 Q_ASSERT(state()->stateId() == ShuttingDown); 0103 reschedule(); 0104 if (th->wait(100)) { 0105 break; 0106 } 0107 TWDEBUG(1, 0108 "WeaverImpl::shutDown: thread %i did not exit as expected, " 0109 "retrying.\n", 0110 th->id()); 0111 } 0112 } 0113 Q_EMIT(threadExited(th)); 0114 delete th; 0115 } 0116 Q_ASSERT(d()->inventory.isEmpty()); 0117 TWDEBUG(3, "WeaverImpl::shutDown: done\n"); 0118 setState(Destructed); // Destructed ignores all calls into the queue API 0119 } 0120 0121 /** @brief Set the Weaver state. 0122 * @see StateId 0123 * @see WeaverImplState 0124 * @see State 0125 */ 0126 void Weaver::setState(StateId id) 0127 { 0128 QMutexLocker l(d()->mutex); 0129 Q_UNUSED(l); 0130 setState_p(id); 0131 } 0132 0133 void Weaver::setState_p(StateId id) 0134 { 0135 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0136 State *newState = d()->states[id].data(); 0137 State *previous = d()->state.fetchAndStoreOrdered(newState); 0138 if (previous == nullptr || previous->stateId() != id) { 0139 newState->activated(); 0140 TWDEBUG(2, "WeaverImpl::setState: state changed to \"%s\".\n", newState->stateName().toLatin1().constData()); 0141 if (id == Suspended) { 0142 Q_EMIT(suspended()); 0143 } 0144 Q_EMIT(stateChanged(newState)); 0145 } 0146 } 0147 0148 const State *Weaver::state() const 0149 { 0150 return d()->state.loadAcquire(); 0151 } 0152 0153 State *Weaver::state() 0154 { 0155 return d()->state.loadAcquire(); 0156 } 0157 0158 void Weaver::setMaximumNumberOfThreads(int cap) 0159 { 0160 Q_ASSERT_X(cap >= 0, "Weaver Impl", "Thread inventory size has to be larger than or equal to zero."); 0161 QMutexLocker l(d()->mutex); 0162 Q_UNUSED(l); 0163 state()->setMaximumNumberOfThreads(cap); 0164 reschedule(); 0165 } 0166 0167 void Weaver::setMaximumNumberOfThreads_p(int cap) 0168 { 0169 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0170 const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0); 0171 d()->inventoryMax = cap; 0172 if (createInitialThread) { 0173 adjustInventory(1); 0174 } 0175 } 0176 0177 int Weaver::maximumNumberOfThreads() const 0178 { 0179 QMutexLocker l(d()->mutex); 0180 Q_UNUSED(l); 0181 return state()->maximumNumberOfThreads(); 0182 } 0183 0184 int Weaver::maximumNumberOfThreads_p() const 0185 { 0186 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0187 return d()->inventoryMax; 0188 } 0189 0190 int Weaver::currentNumberOfThreads() const 0191 { 0192 QMutexLocker l(d()->mutex); 0193 Q_UNUSED(l); 0194 return state()->currentNumberOfThreads(); 0195 } 0196 0197 int Weaver::currentNumberOfThreads_p() const 0198 { 0199 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0200 return d()->inventory.count(); 0201 } 0202 0203 void Weaver::enqueue(const QList<JobPointer> &jobs) 0204 { 0205 QMutexLocker l(d()->mutex); 0206 Q_UNUSED(l); 0207 state()->enqueue(jobs); 0208 } 0209 0210 void Weaver::enqueue_p(const QList<JobPointer> &jobs) 0211 { 0212 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0213 if (jobs.isEmpty()) { 0214 return; 0215 } 0216 for (const JobPointer &job : jobs) { 0217 if (job) { 0218 Q_ASSERT(job->status() == Job::Status_New); 0219 adjustInventory(jobs.size()); 0220 TWDEBUG(3, "WeaverImpl::enqueue: queueing job %p.\n", (void *)job.data()); 0221 job->aboutToBeQueued(this); 0222 // find position for insertion: 0223 int i = d()->assignments.size(); 0224 if (i > 0) { 0225 while (i > 0 && d()->assignments.at(i - 1)->priority() < job->priority()) { 0226 --i; 0227 } 0228 d()->assignments.insert(i, job); 0229 } else { 0230 d()->assignments.append(job); 0231 } 0232 job->setStatus(Job::Status_Queued); 0233 reschedule(); 0234 } 0235 } 0236 } 0237 0238 bool Weaver::dequeue(const JobPointer &job) 0239 { 0240 QMutexLocker l(d()->mutex); 0241 Q_UNUSED(l); 0242 return state()->dequeue(job); 0243 } 0244 0245 bool Weaver::dequeue_p(JobPointer job) 0246 { 0247 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0248 int position = d()->assignments.indexOf(job); 0249 if (position != -1) { 0250 job->aboutToBeDequeued(this); 0251 int newPosition = d()->assignments.indexOf(job); 0252 JobPointer job = d()->assignments.takeAt(newPosition); 0253 job->setStatus(Job::Status_New); 0254 Q_ASSERT(!d()->assignments.contains(job)); 0255 TWDEBUG(3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", (void *)job.data(), queueLength_p()); 0256 // from the queues point of view, a job is just as finished if it gets dequeued: 0257 d()->jobFinished.wakeAll(); 0258 Q_ASSERT(!d()->assignments.contains(job)); 0259 return true; 0260 } else { 0261 TWDEBUG(3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void *)job.data()); 0262 return false; 0263 } 0264 } 0265 0266 void Weaver::dequeue() 0267 { 0268 QMutexLocker l(d()->mutex); 0269 Q_UNUSED(l); 0270 state()->dequeue(); 0271 } 0272 0273 void Weaver::dequeue_p() 0274 { 0275 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0276 TWDEBUG(3, "WeaverImpl::dequeue: dequeueing all jobs.\n"); 0277 for (int index = 0; index < d()->assignments.size(); ++index) { 0278 d()->assignments.at(index)->aboutToBeDequeued(this); 0279 } 0280 d()->assignments.clear(); 0281 ENSURE(d()->assignments.isEmpty()); 0282 } 0283 0284 void Weaver::finish() 0285 { 0286 QMutexLocker l(d()->mutex); 0287 Q_UNUSED(l); 0288 state()->finish(); 0289 } 0290 0291 void Weaver::finish_p() 0292 { 0293 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0294 #ifdef QT_NO_DEBUG 0295 const int MaxWaitMilliSeconds = 50; 0296 #else 0297 const int MaxWaitMilliSeconds = 500; 0298 #endif 0299 while (!isIdle_p()) { 0300 Q_ASSERT_X(state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(state()->stateName())); 0301 TWDEBUG(2, "WeaverImpl::finish: not done, waiting.\n"); 0302 if (d()->jobFinished.wait(d()->mutex, QDeadlineTimer(MaxWaitMilliSeconds)) == false) { 0303 TWDEBUG(2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", queueLength_p()); 0304 reschedule(); 0305 } 0306 } 0307 TWDEBUG(2, "WeaverImpl::finish: done.\n\n\n"); 0308 } 0309 0310 void Weaver::suspend() 0311 { 0312 // FIXME? 0313 // QMutexLocker l(m_mutex); Q_UNUSED(l); 0314 state()->suspend(); 0315 } 0316 0317 void Weaver::suspend_p() 0318 { 0319 // FIXME ? 0320 } 0321 0322 void Weaver::resume() 0323 { 0324 // FIXME? 0325 // QMutexLocker l(m_mutex); Q_UNUSED(l); 0326 state()->resume(); 0327 } 0328 0329 void Weaver::resume_p() 0330 { 0331 // FIXME ? 0332 } 0333 0334 bool Weaver::isEmpty() const 0335 { 0336 QMutexLocker l(d()->mutex); 0337 Q_UNUSED(l); 0338 return state()->isEmpty(); 0339 } 0340 0341 bool Weaver::isEmpty_p() const 0342 { 0343 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0344 return d()->assignments.isEmpty(); 0345 } 0346 0347 bool Weaver::isIdle() const 0348 { 0349 QMutexLocker l(d()->mutex); 0350 Q_UNUSED(l); 0351 return state()->isIdle(); 0352 } 0353 0354 bool Weaver::isIdle_p() const 0355 { 0356 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0357 return isEmpty_p() && d()->active == 0; 0358 } 0359 0360 int Weaver::queueLength() const 0361 { 0362 QMutexLocker l(d()->mutex); 0363 Q_UNUSED(l); 0364 return state()->queueLength(); 0365 } 0366 0367 int Weaver::queueLength_p() const 0368 { 0369 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0370 return d()->assignments.count(); 0371 } 0372 0373 void Weaver::requestAbort() 0374 { 0375 QMutexLocker l(d()->mutex); 0376 Q_UNUSED(l); 0377 return state()->requestAbort(); 0378 } 0379 0380 void Weaver::reschedule() 0381 { 0382 d()->jobAvailable.wakeAll(); 0383 } 0384 0385 void Weaver::requestAbort_p() 0386 { 0387 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0388 for (int i = 0; i < d()->inventory.size(); ++i) { 0389 d()->inventory[i]->requestAbort(); 0390 } 0391 } 0392 0393 /** @brief Adjust the inventory size. 0394 * 0395 * Requires that the mutex is being held when called. 0396 * 0397 * This method creates threads on demand. Threads in the inventory 0398 * are not created upon construction of the WeaverImpl object, but 0399 * when jobs are queued. This avoids costly delays on the application 0400 * startup time. Threads are created when the inventory size is under 0401 * inventoryMin and new jobs are queued. 0402 */ 0403 void Weaver::adjustInventory(int numberOfNewJobs) 0404 { 0405 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0406 // number of threads that can be created: 0407 const int reserve = d()->inventoryMax - d()->inventory.count(); 0408 0409 if (reserve > 0) { 0410 for (int i = 0; i < qMin(reserve, numberOfNewJobs); ++i) { 0411 Thread *th = createThread(); 0412 th->moveToThread(th); // be sane from the start 0413 d()->inventory.append(th); 0414 th->start(); 0415 d()->createdThreads.ref(); 0416 TWDEBUG(2, 0417 "WeaverImpl::adjustInventory: thread created, " 0418 "%i threads in inventory.\n", 0419 currentNumberOfThreads_p()); 0420 } 0421 } 0422 } 0423 0424 Private::Weaver_Private *Weaver::d() 0425 { 0426 return reinterpret_cast<Private::Weaver_Private *>(QueueSignals::d()); 0427 } 0428 0429 const Private::Weaver_Private *Weaver::d() const 0430 { 0431 return reinterpret_cast<const Private::Weaver_Private *>(QueueSignals::d()); 0432 } 0433 0434 /** @brief Factory method to create the threads. 0435 * 0436 * Overload in adapted Weaver implementations. 0437 */ 0438 Thread *Weaver::createThread() 0439 { 0440 return new Thread(this); 0441 } 0442 0443 /** @brief Increment the count of active threads. */ 0444 void Weaver::incActiveThreadCount() 0445 { 0446 adjustActiveThreadCount(1); 0447 } 0448 0449 /** brief Decrement the count of active threads. */ 0450 void Weaver::decActiveThreadCount() 0451 { 0452 adjustActiveThreadCount(-1); 0453 // the done job could have freed another set of jobs, and we do not know how 0454 // many - therefore we need to wake all threads: 0455 d()->jobFinished.wakeAll(); 0456 } 0457 0458 /** @brief Adjust active thread count. 0459 * 0460 * This is a helper function for incActiveThreadCount and decActiveThreadCount. 0461 */ 0462 void Weaver::adjustActiveThreadCount(int diff) 0463 { 0464 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0465 d()->active += diff; 0466 TWDEBUG(4, 0467 "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs" 0468 " in queue).\n", 0469 d()->active, 0470 queueLength_p()); 0471 0472 if (d()->assignments.isEmpty() && d()->active == 0) { 0473 P_ASSERT(diff < 0); // cannot reach zero otherwise 0474 Q_EMIT(finished()); 0475 } 0476 } 0477 0478 /** @brief Returns the number of active threads. 0479 * 0480 * Threads are active if they process a job. Requires that the mutex is being held when called. 0481 */ 0482 int Weaver::activeThreadCount() 0483 { 0484 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0485 return d()->active; 0486 } 0487 0488 /** @brief Called from a new thread when entering the run method. */ 0489 void Weaver::threadEnteredRun(Thread *thread) 0490 { 0491 d()->semaphore.release(1); 0492 Q_EMIT threadStarted(thread); 0493 } 0494 0495 /** @brief Take the first available job out of the queue and return it. 0496 * 0497 * The job will be removed from the queue (therefore, take). Only jobs that have no unresolved dependencies 0498 * are considered available. If only jobs that depend on other unfinished jobs are in the queue, this method 0499 * blocks on m_jobAvailable. 0500 * 0501 * This method will enter suspended state if the active thread count is now zero and 0502 * suspendIfAllThreadsInactive is true. 0503 * If justReturning is true, do not assign a new job, just process the completed previous one. 0504 */ 0505 JobPointer Weaver::takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfInactive, bool justReturning) 0506 { 0507 QMutexLocker l(d()->mutex); 0508 Q_UNUSED(l); 0509 Q_ASSERT(threadWasBusy == false || (threadWasBusy == true && d()->active > 0)); 0510 TWDEBUG(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n", th->id(), qPrintable(state()->stateName())); 0511 TWDEBUG(5, 0512 "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n", 0513 activeThreadCount(), 0514 threadWasBusy ? "yes" : "no", 0515 suspendIfInactive ? "yes" : "no", 0516 !justReturning ? "yes" : "no"); 0517 d()->deleteExpiredThreads(); 0518 adjustInventory(1); 0519 0520 if (threadWasBusy) { 0521 // cleanup and send events: 0522 decActiveThreadCount(); 0523 } 0524 Q_ASSERT(d()->active >= 0); 0525 0526 if (suspendIfInactive && d()->active == 0 && state()->stateId() == Suspending) { 0527 setState_p(Suspended); 0528 return JobPointer(); 0529 } 0530 0531 if (state()->stateId() != WorkingHard || justReturning) { 0532 return JobPointer(); 0533 } 0534 0535 if (state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) { 0536 const int count = d()->inventory.removeAll(th); 0537 Q_ASSERT(count == 1); 0538 d()->expiredThreads.append(th); 0539 throw AbortThread(QStringLiteral("Inventory size exceeded")); 0540 } 0541 0542 JobPointer next; 0543 for (int index = 0; index < d()->assignments.size(); ++index) { 0544 const JobPointer &candidate = d()->assignments.at(index); 0545 if (d()->canBeExecuted(candidate)) { 0546 next = candidate; 0547 d()->assignments.removeAt(index); 0548 break; 0549 } 0550 } 0551 if (next) { 0552 incActiveThreadCount(); 0553 TWDEBUG(3, 0554 "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n", 0555 next.data(), 0556 th->id(), 0557 qPrintable(state()->stateName())); 0558 return next; 0559 } 0560 0561 blockThreadUntilJobsAreBeingAssigned_locked(th); 0562 return JobPointer(); 0563 } 0564 0565 /** @brief Assign a job to the calling thread. 0566 * 0567 * This is supposed to be called from the Thread objects in the inventory. Do not call this method from 0568 * your code. 0569 * Returns 0 if the weaver is shutting down, telling the calling thread to finish and exit. If no jobs are 0570 * available and shut down is not in progress, the calling thread is suspended until either condition is met. 0571 * @param wasBusy True if the thread is returning from processing a job 0572 */ 0573 JobPointer Weaver::applyForWork(Thread *th, bool wasBusy) 0574 { 0575 return state()->applyForWork(th, wasBusy); 0576 } 0577 0578 /** @brief Wait for a job to become available. */ 0579 void Weaver::waitForAvailableJob(Thread *th) 0580 { 0581 state()->waitForAvailableJob(th); 0582 } 0583 0584 /** @brief Blocks the calling thread until jobs can be assigned. */ 0585 void Weaver::blockThreadUntilJobsAreBeingAssigned(Thread *th) 0586 { 0587 QMutexLocker l(d()->mutex); 0588 Q_UNUSED(l); 0589 blockThreadUntilJobsAreBeingAssigned_locked(th); 0590 } 0591 0592 /** @brief Blocks the calling thread until jobs can be assigned. 0593 * 0594 * The mutex must be held when calling this method. 0595 */ 0596 void Weaver::blockThreadUntilJobsAreBeingAssigned_locked(Thread *th) 0597 { 0598 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called 0599 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n", th->id(), qPrintable(state()->stateName())); 0600 Q_EMIT threadSuspended(th); 0601 d()->jobAvailable.wait(d()->mutex); 0602 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed (%s state).\n", th->id(), qPrintable(state()->stateName())); 0603 } 0604 0605 #include "moc_weaver.cpp"