File indexing completed on 2025-03-09 03:56:11
0001 /* ============================================================ 0002 * 0003 * This file is a part of digiKam project 0004 * https://www.digikam.org 0005 * 0006 * Date : 2011-12-28 0007 * Description : Low level threads management for batch processing on multi-core 0008 * 0009 * SPDX-FileCopyrightText: 2011-2024 by Gilles Caulier <caulier dot gilles at gmail dot com> 0010 * SPDX-FileCopyrightText: 2014 by Veaceslav Munteanu <veaceslav dot munteanu90 at gmail dot com> 0011 * SPDX-FileCopyrightText: 2011-2012 by Janardhan Reddy <annapareddyjanardhanreddy at gmail dot com> 0012 * 0013 * SPDX-License-Identifier: GPL-2.0-or-later 0014 * 0015 * ============================================================ */ 0016 0017 #include "actionthreadbase.h" 0018 0019 // Qt includes 0020 0021 #include <QMutexLocker> 0022 #include <QWaitCondition> 0023 #include <QMutex> 0024 #include <QThreadPool> 0025 0026 // Local includes 0027 0028 #include "digikam_debug.h" 0029 0030 namespace Digikam 0031 { 0032 0033 ActionJob::ActionJob(QObject* const parent) 0034 : QObject (parent), 0035 QRunnable(), 0036 m_cancel (false) 0037 { 0038 setAutoDelete(false); 0039 } 0040 0041 ActionJob::~ActionJob() 0042 { 0043 cancel(); 0044 } 0045 0046 void ActionJob::cancel() 0047 { 0048 m_cancel = true; 0049 } 0050 0051 // ----------------------------------------------------------------- 0052 0053 class Q_DECL_HIDDEN ActionThreadBase::Private 0054 { 0055 public: 0056 0057 explicit Private() 0058 : running(false), 0059 pool (nullptr) 0060 { 0061 } 0062 0063 volatile bool running; 0064 0065 QWaitCondition condVarJobs; 0066 QMutex mutex; 0067 0068 ActionJobCollection todo; 0069 ActionJobCollection pending; 0070 ActionJobCollection processed; 0071 0072 QThreadPool* pool; 0073 }; 0074 0075 ActionThreadBase::ActionThreadBase(QObject* const parent) 0076 : QThread(parent), 0077 d (new Private) 0078 { 0079 d->pool = new QThreadPool(this); 0080 0081 setDefaultMaximumNumberOfThreads(); 0082 } 0083 0084 ActionThreadBase::~ActionThreadBase() 0085 { 0086 // Cancel the thread 0087 0088 cancel(false); 0089 0090 // Wait for the thread to finish 0091 0092 wait(); 0093 0094 // Wait for the jobs to finish 0095 0096 d->pool->waitForDone(); 0097 0098 // Cleanup all jobs from memory 0099 0100 Q_FOREACH (ActionJob* const job, d->processed.keys()) 0101 { 0102 delete job; 0103 } 0104 0105 delete d; 0106 } 0107 0108 void ActionThreadBase::setMaximumNumberOfThreads(int n) 0109 { 0110 d->pool->setMaxThreadCount(n); 0111 0112 qCDebug(DIGIKAM_GENERAL_LOG) << "Using " << n << " CPU core to run threads"; 0113 } 0114 0115 int ActionThreadBase::maximumNumberOfThreads() const 0116 { 0117 return d->pool->maxThreadCount(); 0118 } 0119 0120 void ActionThreadBase::setDefaultMaximumNumberOfThreads() 0121 { 0122 setMaximumNumberOfThreads(QThread::idealThreadCount()); 0123 } 0124 0125 void ActionThreadBase::slotJobFinished() 0126 { 0127 ActionJob* const job = dynamic_cast<ActionJob*>(sender()); 0128 0129 if (!job) 0130 { 0131 return; 0132 } 0133 0134 qCDebug(DIGIKAM_GENERAL_LOG) << "One job is done " << job 0135 << " time:" << job->m_timer.elapsed(); 0136 0137 QMutexLocker lock(&d->mutex); 0138 0139 d->processed.insert(job, 0); 0140 d->pending.remove(job); 0141 0142 if (isEmpty()) 0143 { 0144 d->running = false; 0145 } 0146 0147 d->condVarJobs.wakeAll(); 0148 } 0149 0150 void ActionThreadBase::cancel(bool isCancel) 0151 { 0152 if (isCancel) 0153 { 0154 qCDebug(DIGIKAM_GENERAL_LOG) << "Cancel Main Thread"; 0155 } 0156 else 0157 { 0158 qCDebug(DIGIKAM_GENERAL_LOG) << "Finish Main Thread"; 0159 } 0160 0161 QMutexLocker lock(&d->mutex); 0162 0163 Q_FOREACH (ActionJob* const job, d->todo.keys()) 0164 { 0165 delete job; 0166 } 0167 0168 Q_FOREACH (ActionJob* const job, d->pending.keys()) 0169 { 0170 job->cancel(); 0171 d->processed.insert(job, 0); 0172 } 0173 0174 d->todo.clear(); 0175 d->pending.clear(); 0176 d->running = false; 0177 0178 d->condVarJobs.wakeAll(); 0179 } 0180 0181 bool ActionThreadBase::isEmpty() const 0182 { 0183 return (d->pending.isEmpty()); 0184 } 0185 0186 int ActionThreadBase::pendingCount() const 0187 { 0188 return (d->pending.count()); 0189 } 0190 0191 void ActionThreadBase::appendJobs(const ActionJobCollection& jobs) 0192 { 0193 QMutexLocker lock(&d->mutex); 0194 0195 for (ActionJobCollection::const_iterator it = jobs.begin() ; it != jobs.end() ; ++it) 0196 { 0197 d->todo.insert(it.key(), it.value()); 0198 } 0199 0200 d->condVarJobs.wakeAll(); 0201 } 0202 0203 void ActionThreadBase::run() 0204 { 0205 d->running = true; 0206 0207 while (d->running) 0208 { 0209 QMutexLocker lock(&d->mutex); 0210 0211 if (!d->todo.isEmpty()) 0212 { 0213 qCDebug(DIGIKAM_GENERAL_LOG) << "Action Thread run" << d->todo.count() << "new jobs"; 0214 0215 for (ActionJobCollection::iterator it = d->todo.begin() ; it != d->todo.end() ; ++it) 0216 { 0217 ActionJob* const job = it.key(); 0218 int priority = it.value(); 0219 0220 connect(job, SIGNAL(signalDone()), 0221 this, SLOT(slotJobFinished())); 0222 0223 job->m_timer.start(); 0224 d->pool->start(job, priority); 0225 d->pending.insert(job, priority); 0226 } 0227 0228 d->todo.clear(); 0229 } 0230 else 0231 { 0232 d->condVarJobs.wait(&d->mutex); 0233 } 0234 } 0235 } 0236 0237 } // namespace Digikam 0238 0239 #include "moc_actionthreadbase.cpp"