File indexing completed on 2025-03-09 03:56:11

0001 /* ============================================================
0002  *
0003  * This file is a part of digiKam project
0004  * https://www.digikam.org
0005  *
0006  * Date        : 2011-12-28
0007  * Description : Low level threads management for batch processing on multi-core
0008  *
0009  * SPDX-FileCopyrightText: 2011-2024 by Gilles Caulier <caulier dot gilles at gmail dot com>
0010  * SPDX-FileCopyrightText:      2014 by Veaceslav Munteanu <veaceslav dot munteanu90 at gmail dot com>
0011  * SPDX-FileCopyrightText: 2011-2012 by Janardhan Reddy <annapareddyjanardhanreddy at gmail dot com>
0012  *
0013  * SPDX-License-Identifier: GPL-2.0-or-later
0014  *
0015  * ============================================================ */
0016 
0017 #include "actionthreadbase.h"
0018 
0019 // Qt includes
0020 
0021 #include <QMutexLocker>
0022 #include <QWaitCondition>
0023 #include <QMutex>
0024 #include <QThreadPool>
0025 
0026 // Local includes
0027 
0028 #include "digikam_debug.h"
0029 
0030 namespace Digikam
0031 {
0032 
0033 ActionJob::ActionJob(QObject* const parent)
0034     : QObject  (parent),
0035       QRunnable(),
0036       m_cancel (false)
0037 {
0038     setAutoDelete(false);
0039 }
0040 
0041 ActionJob::~ActionJob()
0042 {
0043     cancel();
0044 }
0045 
0046 void ActionJob::cancel()
0047 {
0048     m_cancel = true;
0049 }
0050 
0051 // -----------------------------------------------------------------
0052 
0053 class Q_DECL_HIDDEN ActionThreadBase::Private
0054 {
0055 public:
0056 
0057     explicit Private()
0058       : running(false),
0059         pool   (nullptr)
0060     {
0061     }
0062 
0063     volatile bool       running;
0064 
0065     QWaitCondition      condVarJobs;
0066     QMutex              mutex;
0067 
0068     ActionJobCollection todo;
0069     ActionJobCollection pending;
0070     ActionJobCollection processed;
0071 
0072     QThreadPool*        pool;
0073 };
0074 
0075 ActionThreadBase::ActionThreadBase(QObject* const parent)
0076     : QThread(parent),
0077       d      (new Private)
0078 {
0079     d->pool = new QThreadPool(this);
0080 
0081     setDefaultMaximumNumberOfThreads();
0082 }
0083 
0084 ActionThreadBase::~ActionThreadBase()
0085 {
0086     // Cancel the thread
0087 
0088     cancel(false);
0089 
0090     // Wait for the thread to finish
0091 
0092     wait();
0093 
0094     // Wait for the jobs to finish
0095 
0096     d->pool->waitForDone();
0097 
0098     // Cleanup all jobs from memory
0099 
0100     Q_FOREACH (ActionJob* const job, d->processed.keys())
0101     {
0102         delete job;
0103     }
0104 
0105     delete d;
0106 }
0107 
0108 void ActionThreadBase::setMaximumNumberOfThreads(int n)
0109 {
0110     d->pool->setMaxThreadCount(n);
0111 
0112     qCDebug(DIGIKAM_GENERAL_LOG) << "Using " << n << " CPU core to run threads";
0113 }
0114 
0115 int ActionThreadBase::maximumNumberOfThreads() const
0116 {
0117     return d->pool->maxThreadCount();
0118 }
0119 
0120 void ActionThreadBase::setDefaultMaximumNumberOfThreads()
0121 {
0122     setMaximumNumberOfThreads(QThread::idealThreadCount());
0123 }
0124 
0125 void ActionThreadBase::slotJobFinished()
0126 {
0127     ActionJob* const job = dynamic_cast<ActionJob*>(sender());
0128 
0129     if (!job)
0130     {
0131         return;
0132     }
0133 
0134     qCDebug(DIGIKAM_GENERAL_LOG) << "One job is done " << job
0135                                  << " time:" << job->m_timer.elapsed();
0136 
0137     QMutexLocker lock(&d->mutex);
0138 
0139     d->processed.insert(job, 0);
0140     d->pending.remove(job);
0141 
0142     if (isEmpty())
0143     {
0144         d->running = false;
0145     }
0146 
0147     d->condVarJobs.wakeAll();
0148 }
0149 
0150 void ActionThreadBase::cancel(bool isCancel)
0151 {
0152     if (isCancel)
0153     {
0154         qCDebug(DIGIKAM_GENERAL_LOG) << "Cancel Main Thread";
0155     }
0156     else
0157     {
0158         qCDebug(DIGIKAM_GENERAL_LOG) << "Finish Main Thread";
0159     }
0160 
0161     QMutexLocker lock(&d->mutex);
0162 
0163     Q_FOREACH (ActionJob* const job, d->todo.keys())
0164     {
0165         delete job;
0166     }
0167 
0168     Q_FOREACH (ActionJob* const job, d->pending.keys())
0169     {
0170         job->cancel();
0171         d->processed.insert(job, 0);
0172     }
0173 
0174     d->todo.clear();
0175     d->pending.clear();
0176     d->running = false;
0177 
0178     d->condVarJobs.wakeAll();
0179 }
0180 
0181 bool ActionThreadBase::isEmpty() const
0182 {
0183     return (d->pending.isEmpty());
0184 }
0185 
0186 int ActionThreadBase::pendingCount() const
0187 {
0188     return (d->pending.count());
0189 }
0190 
0191 void ActionThreadBase::appendJobs(const ActionJobCollection& jobs)
0192 {
0193     QMutexLocker lock(&d->mutex);
0194 
0195     for (ActionJobCollection::const_iterator it = jobs.begin() ; it != jobs.end() ; ++it)
0196     {
0197         d->todo.insert(it.key(), it.value());
0198     }
0199 
0200     d->condVarJobs.wakeAll();
0201 }
0202 
0203 void ActionThreadBase::run()
0204 {
0205     d->running = true;
0206 
0207     while (d->running)
0208     {
0209         QMutexLocker lock(&d->mutex);
0210 
0211         if (!d->todo.isEmpty())
0212         {
0213             qCDebug(DIGIKAM_GENERAL_LOG) << "Action Thread run" << d->todo.count() << "new jobs";
0214 
0215             for (ActionJobCollection::iterator it = d->todo.begin() ; it != d->todo.end() ; ++it)
0216             {
0217                 ActionJob* const job = it.key();
0218                 int priority         = it.value();
0219 
0220                 connect(job, SIGNAL(signalDone()),
0221                         this, SLOT(slotJobFinished()));
0222 
0223                 job->m_timer.start();
0224                 d->pool->start(job, priority);
0225                 d->pending.insert(job, priority);
0226             }
0227 
0228             d->todo.clear();
0229         }
0230         else
0231         {
0232             d->condVarJobs.wait(&d->mutex);
0233         }
0234     }
0235 }
0236 
0237 } // namespace Digikam
0238 
0239 #include "moc_actionthreadbase.cpp"