File indexing completed on 2024-05-12 15:58:48

0001 /*
0002  *  SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com>
0003  *
0004  *  SPDX-License-Identifier: GPL-2.0-or-later
0005  */
0006 
0007 #include "kis_updater_context.h"
0008 
0009 #include <QThread>
0010 #include <QThreadPool>
0011 
0012 #include "kis_update_job_item.h"
0013 #include "kis_stroke_job.h"
0014 
0015 const int KisUpdaterContext::useIdealThreadCountTag = -1;
0016 
0017 KisUpdaterContext::KisUpdaterContext(qint32 threadCount, KisUpdateScheduler *parent)
0018     : m_scheduler(parent)
0019 {
0020     if(threadCount <= 0) {
0021         threadCount = QThread::idealThreadCount();
0022         threadCount = threadCount > 0 ? threadCount : 1;
0023     }
0024 
0025     setThreadsLimit(threadCount);
0026 }
0027 
0028 KisUpdaterContext::~KisUpdaterContext()
0029 {
0030     m_threadPool.waitForDone();
0031 
0032     if (m_testingMode) {
0033         clear();
0034     }
0035 
0036     qDeleteAll(m_jobs);
0037 }
0038 
0039 void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs,
0040                                         qint32 &numStrokeJobs)
0041 {
0042     numMergeJobs = 0;
0043     numStrokeJobs = 0;
0044 
0045     Q_FOREACH (const KisUpdateJobItem *item, m_jobs) {
0046         if(item->type() == KisUpdateJobItem::Type::MERGE ||
0047            item->type() == KisUpdateJobItem::Type::SPONTANEOUS) {
0048             numMergeJobs++;
0049         }
0050         else if(item->type() == KisUpdateJobItem::Type::STROKE) {
0051             numStrokeJobs++;
0052         }
0053     }
0054 }
0055 
0056 KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const
0057 {
0058     KisUpdaterContextSnapshotEx state = ContextEmpty;
0059 
0060     Q_FOREACH (const KisUpdateJobItem *item, m_jobs) {
0061         if (item->type() == KisUpdateJobItem::Type::MERGE ||
0062             item->type() == KisUpdateJobItem::Type::SPONTANEOUS) {
0063             state |= HasMergeJob;
0064         } else if(item->type() == KisUpdateJobItem::Type::STROKE) {
0065             switch (item->strokeJobSequentiality()) {
0066             case KisStrokeJobData::SEQUENTIAL:
0067                 state |= HasSequentialJob;
0068                 break;
0069             case KisStrokeJobData::CONCURRENT:
0070                 state |= HasConcurrentJob;
0071                 break;
0072             case KisStrokeJobData::BARRIER:
0073                 state |= HasBarrierJob;
0074                 break;
0075             case KisStrokeJobData::UNIQUELY_CONCURRENT:
0076                 state |= HasUniquelyConcurrentJob;
0077                 break;
0078             }
0079         }
0080     }
0081 
0082     return state;
0083 }
0084 
0085 int KisUpdaterContext::currentLevelOfDetail() const
0086 {
0087     return m_lodCounter.readLod();
0088 }
0089 
0090 bool KisUpdaterContext::hasSpareThread()
0091 {
0092     bool found = false;
0093 
0094     Q_FOREACH (const KisUpdateJobItem *item, m_jobs) {
0095         if(!item->isRunning()) {
0096             found = true;
0097             break;
0098         }
0099     }
0100     return found;
0101 }
0102 
0103 bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker)
0104 {
0105     int lod = this->currentLevelOfDetail();
0106     if (lod >= 0 && walker->levelOfDetail() != lod) return false;
0107 
0108     bool intersects = false;
0109 
0110     Q_FOREACH (const KisUpdateJobItem *item, m_jobs) {
0111         if(item->isRunning() && walkerIntersectsJob(walker, item)) {
0112             intersects = true;
0113             break;
0114         }
0115     }
0116 
0117     return !intersects;
0118 }
0119 
0120 void KisUpdaterContext::startThread(int index)
0121 {
0122     {
0123         QMutexLocker l(&m_runningThreadsMutex);
0124         m_numRunningThreads++;
0125     }
0126 
0127     m_threadPool.start(m_jobs[index]);
0128 }
0129 
0130 /**
0131  * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into
0132  * one atomic method like `bool push()`, because this implementation
0133  * of KisUpdaterContext will not work in case of multiple
0134  * producers. But currently we have only one producer (one thread
0135  * in a time), that is guaranteed by the lock()/unlock() pair in
0136  * KisAbstractUpdateQueue::processQueue.
0137  */
0138 void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
0139 {
0140     m_lodCounter.addLod(walker->levelOfDetail());
0141     qint32 jobIndex = findSpareThread();
0142     Q_ASSERT(jobIndex >= 0);
0143 
0144     const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker);
0145 
0146     // it might happen that we call this function from within
0147     // the thread itself, right when it finished its work
0148     if (shouldStartThread && !m_testingMode) {
0149         startThread(jobIndex);
0150     }
0151 }
0152 
0153 void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
0154 {
0155     m_lodCounter.addLod(strokeJob->levelOfDetail());
0156     qint32 jobIndex = findSpareThread();
0157     Q_ASSERT(jobIndex >= 0);
0158 
0159     const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob);
0160 
0161     // it might happen that we call this function from within
0162     // the thread itself, right when it finished its work
0163     if (shouldStartThread && !m_testingMode) {
0164         startThread(jobIndex);
0165     }
0166 }
0167 
0168 void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob)
0169 {
0170     m_lodCounter.addLod(spontaneousJob->levelOfDetail());
0171     qint32 jobIndex = findSpareThread();
0172     Q_ASSERT(jobIndex >= 0);
0173 
0174     const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob);
0175 
0176     // it might happen that we call this function from within
0177     // the thread itself, right when it finished its work
0178     if (shouldStartThread && !m_testingMode) {
0179         startThread(jobIndex);
0180     }
0181 }
0182 
0183 void KisUpdaterContext::waitForDone()
0184 {
0185     QMutexLocker l(&m_runningThreadsMutex);
0186 
0187     while(m_numRunningThreads > 0) {
0188         m_waitForDoneCondition.wait(l.mutex());
0189     }
0190 }
0191 
0192 bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker,
0193                                             const KisUpdateJobItem* job)
0194 {
0195     /**
0196      * TODO: theoretically, we should compare rects intersection
0197      * on a per-layer basis. The current solution is too rough and
0198      * basically makes updates single-threaded in some cases (e.g.
0199      * when a transform mask is present in the stack)
0200      */
0201     return walker->accessRect().intersects(job->accessRect());
0202 }
0203 
0204 qint32 KisUpdaterContext::findSpareThread()
0205 {
0206     for(qint32 i=0; i < m_jobs.size(); i++)
0207         if(!m_jobs[i]->isRunning())
0208             return i;
0209 
0210     return -1;
0211 }
0212 
0213 void KisUpdaterContext::lock()
0214 {
0215     m_lock.lock();
0216 }
0217 
0218 void KisUpdaterContext::unlock()
0219 {
0220     m_lock.unlock();
0221 }
0222 
0223 void KisUpdaterContext::setThreadsLimit(int value)
0224 {
0225     m_threadPool.setMaxThreadCount(value);
0226 
0227     for (int i = 0; i < m_jobs.size(); i++) {
0228         KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning());
0229         // don't delete the jobs until all of them are checked!
0230     }
0231 
0232     for (int i = 0; i < m_jobs.size(); i++) {
0233         delete m_jobs[i];
0234     }
0235 
0236     m_jobs.resize(value);
0237 
0238     for(qint32 i = 0; i < m_jobs.size(); i++) {
0239         m_jobs[i] = new KisUpdateJobItem(this);
0240     }
0241 }
0242 
0243 int KisUpdaterContext::threadsLimit() const
0244 {
0245     KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount());
0246     return m_jobs.size();
0247 }
0248 
0249 void KisUpdaterContext::continueUpdate(const QRect& rc)
0250 {
0251     if (m_scheduler) m_scheduler->continueUpdate(rc);
0252 }
0253 
0254 void KisUpdaterContext::doSomeUsefulWork()
0255 {
0256     if (m_scheduler) m_scheduler->doSomeUsefulWork();
0257 }
0258 
0259 void KisUpdaterContext::jobFinished()
0260 {
0261     m_lodCounter.removeLod();
0262     if (m_scheduler) m_scheduler->spareThreadAppeared();
0263 }
0264 
0265 void KisUpdaterContext::jobThreadExited()
0266 {
0267     QMutexLocker l(&m_runningThreadsMutex);
0268     m_numRunningThreads--;
0269     KIS_SAFE_ASSERT_RECOVER_NOOP(m_numRunningThreads >= 0);
0270 
0271     if (m_numRunningThreads <= 0) {
0272         m_waitForDoneCondition.wakeAll();
0273     }
0274 }
0275 
0276 void KisUpdaterContext::setTestingMode(bool value)
0277 {
0278     m_testingMode = value;
0279 }
0280 
0281 const QVector<KisUpdateJobItem*> KisUpdaterContext::getJobs()
0282 {
0283     return m_jobs;
0284 }
0285 
0286 void KisUpdaterContext::clear()
0287 {
0288     Q_FOREACH (KisUpdateJobItem *item, m_jobs) {
0289         item->testingSetDone();
0290     }
0291 
0292     m_lodCounter.testingClear();
0293 }
0294 
0295 
0296 KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount)
0297     : KisUpdaterContext(threadCount)
0298 {
0299     setTestingMode(true);
0300 }
0301