File indexing completed on 2024-05-19 04:26:39

0001 /*
0002  *  SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com>
0003  *
0004  *  SPDX-License-Identifier: GPL-2.0-or-later
0005  */
0006 
0007 #include "kis_updater_context.h"
0008 
0009 #include <QThread>
0010 #include <QThreadPool>
0011 
0012 #include "kis_update_job_item.h"
0013 #include "kis_stroke_job.h"
0014 
0015 const int KisUpdaterContext::useIdealThreadCountTag = -1;
0016 
0017 KisUpdaterContext::KisUpdaterContext(qint32 threadCount, KisUpdateScheduler *parent)
0018     : m_scheduler(parent)
0019 {
0020     if(threadCount <= 0) {
0021         threadCount = QThread::idealThreadCount();
0022         threadCount = threadCount > 0 ? threadCount : 1;
0023     }
0024 
0025     setThreadsLimit(threadCount);
0026 }
0027 
0028 KisUpdaterContext::~KisUpdaterContext()
0029 {
0030     m_threadPool.waitForDone();
0031 
0032     if (m_testingMode) {
0033         clear();
0034     }
0035 
0036     qDeleteAll(m_jobs);
0037 }
0038 
0039 void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs,
0040                                         qint32 &numStrokeJobs)
0041 {
0042     numMergeJobs = 0;
0043     numStrokeJobs = 0;
0044 
0045     /**
0046      * We cannot use Q_FOREACH here since the function may
0047      * be called concurrently without any locks, causing detaching
0048      * of the vector and causing a crash. Only read-only accesses
0049      * are allowed in such environment
0050      */
0051     for (const KisUpdateJobItem *item : std::as_const(m_jobs)) {
0052 
0053         if(item->type() == KisUpdateJobItem::Type::MERGE ||
0054            item->type() == KisUpdateJobItem::Type::SPONTANEOUS) {
0055             numMergeJobs++;
0056         }
0057         else if(item->type() == KisUpdateJobItem::Type::STROKE) {
0058             numStrokeJobs++;
0059         }
0060     }
0061 }
0062 
0063 KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const
0064 {
0065     KisUpdaterContextSnapshotEx state = ContextEmpty;
0066 
0067     /**
0068      * We cannot use Q_FOREACH here since the function may
0069      * be called concurrently without any locks, causing detaching
0070      * of the vector and causing a crash. Only read-only accesses
0071      * are allowed in such environment
0072      */
0073     for (const KisUpdateJobItem *item : std::as_const(m_jobs)) {
0074         if (item->type() == KisUpdateJobItem::Type::MERGE ||
0075             item->type() == KisUpdateJobItem::Type::SPONTANEOUS) {
0076             state |= HasMergeJob;
0077         } else if(item->type() == KisUpdateJobItem::Type::STROKE) {
0078             switch (item->strokeJobSequentiality()) {
0079             case KisStrokeJobData::SEQUENTIAL:
0080                 state |= HasSequentialJob;
0081                 break;
0082             case KisStrokeJobData::CONCURRENT:
0083                 state |= HasConcurrentJob;
0084                 break;
0085             case KisStrokeJobData::BARRIER:
0086                 state |= HasBarrierJob;
0087                 break;
0088             case KisStrokeJobData::UNIQUELY_CONCURRENT:
0089                 state |= HasUniquelyConcurrentJob;
0090                 break;
0091             }
0092         }
0093     }
0094 
0095     return state;
0096 }
0097 
0098 int KisUpdaterContext::currentLevelOfDetail() const
0099 {
0100     return m_lodCounter.readLod();
0101 }
0102 
0103 bool KisUpdaterContext::hasSpareThread()
0104 {
0105     bool found = false;
0106 
0107     /**
0108      * We cannot use Q_FOREACH here since the function may
0109      * be called concurrently without any locks, causing detaching
0110      * of the vector and causing a crash. Only read-only accesses
0111      * are allowed in such environment
0112      */
0113     for (const KisUpdateJobItem *item : std::as_const(m_jobs)) {
0114         if(!item->isRunning()) {
0115             found = true;
0116             break;
0117         }
0118     }
0119     return found;
0120 }
0121 
0122 bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker)
0123 {
0124     int lod = this->currentLevelOfDetail();
0125     if (lod >= 0 && walker->levelOfDetail() != lod) return false;
0126 
0127     bool intersects = false;
0128 
0129     /**
0130      * We cannot use Q_FOREACH here since the function may
0131      * be called concurrently without any locks, causing detaching
0132      * of the vector and causing a crash. Only read-only accesses
0133      * are allowed in such environment
0134      */
0135     for (const KisUpdateJobItem *item : std::as_const(m_jobs)) {
0136         if(item->isRunning() && walkerIntersectsJob(walker, item)) {
0137             intersects = true;
0138             break;
0139         }
0140     }
0141 
0142     return !intersects;
0143 }
0144 
0145 void KisUpdaterContext::startThread(int index)
0146 {
0147     {
0148         QMutexLocker l(&m_runningThreadsMutex);
0149         m_numRunningThreads++;
0150     }
0151 
0152     m_threadPool.start(m_jobs[index]);
0153 }
0154 
0155 /**
0156  * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into
0157  * one atomic method like `bool push()`, because this implementation
0158  * of KisUpdaterContext will not work in case of multiple
0159  * producers. But currently we have only one producer (one thread
0160  * in a time), that is guaranteed by the lock()/unlock() pair in
0161  * KisAbstractUpdateQueue::processQueue.
0162  */
0163 void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
0164 {
0165     m_lodCounter.addLod(walker->levelOfDetail());
0166     qint32 jobIndex = findSpareThread();
0167     Q_ASSERT(jobIndex >= 0);
0168 
0169     const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker);
0170 
0171     // it might happen that we call this function from within
0172     // the thread itself, right when it finished its work
0173     if (shouldStartThread && !m_testingMode) {
0174         startThread(jobIndex);
0175     }
0176 }
0177 
0178 void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
0179 {
0180     m_lodCounter.addLod(strokeJob->levelOfDetail());
0181     qint32 jobIndex = findSpareThread();
0182     Q_ASSERT(jobIndex >= 0);
0183 
0184     const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob);
0185 
0186     // it might happen that we call this function from within
0187     // the thread itself, right when it finished its work
0188     if (shouldStartThread && !m_testingMode) {
0189         startThread(jobIndex);
0190     }
0191 }
0192 
0193 void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob)
0194 {
0195     m_lodCounter.addLod(spontaneousJob->levelOfDetail());
0196     qint32 jobIndex = findSpareThread();
0197     Q_ASSERT(jobIndex >= 0);
0198 
0199     const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob);
0200 
0201     // it might happen that we call this function from within
0202     // the thread itself, right when it finished its work
0203     if (shouldStartThread && !m_testingMode) {
0204         startThread(jobIndex);
0205     }
0206 }
0207 
0208 void KisUpdaterContext::waitForDone()
0209 {
0210     QMutexLocker l(&m_runningThreadsMutex);
0211 
0212     while(m_numRunningThreads > 0) {
0213         m_waitForDoneCondition.wait(l.mutex());
0214     }
0215 }
0216 
0217 bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker,
0218                                             const KisUpdateJobItem* job)
0219 {
0220     /**
0221      * TODO: theoretically, we should compare rects intersection
0222      * on a per-layer basis. The current solution is too rough and
0223      * basically makes updates single-threaded in some cases (e.g.
0224      * when a transform mask is present in the stack)
0225      */
0226     return walker->accessRect().intersects(job->accessRect());
0227 }
0228 
0229 qint32 KisUpdaterContext::findSpareThread()
0230 {
0231     for(qint32 i=0; i < m_jobs.size(); i++)
0232         if(!m_jobs[i]->isRunning())
0233             return i;
0234 
0235     return -1;
0236 }
0237 
0238 void KisUpdaterContext::lock()
0239 {
0240     m_lock.lock();
0241 }
0242 
0243 void KisUpdaterContext::unlock()
0244 {
0245     m_lock.unlock();
0246 }
0247 
0248 void KisUpdaterContext::setThreadsLimit(int value)
0249 {
0250     m_threadPool.setMaxThreadCount(value);
0251 
0252     for (int i = 0; i < m_jobs.size(); i++) {
0253         KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning());
0254         // don't delete the jobs until all of them are checked!
0255     }
0256 
0257     for (int i = 0; i < m_jobs.size(); i++) {
0258         delete m_jobs[i];
0259     }
0260 
0261     m_jobs.resize(value);
0262 
0263     for(qint32 i = 0; i < m_jobs.size(); i++) {
0264         m_jobs[i] = new KisUpdateJobItem(this);
0265     }
0266 }
0267 
0268 int KisUpdaterContext::threadsLimit() const
0269 {
0270     KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount());
0271     return m_jobs.size();
0272 }
0273 
0274 void KisUpdaterContext::continueUpdate(const QRect& rc)
0275 {
0276     if (m_scheduler) m_scheduler->continueUpdate(rc);
0277 }
0278 
0279 void KisUpdaterContext::doSomeUsefulWork()
0280 {
0281     if (m_scheduler) m_scheduler->doSomeUsefulWork();
0282 }
0283 
0284 void KisUpdaterContext::jobFinished()
0285 {
0286     m_lodCounter.removeLod();
0287     if (m_scheduler) m_scheduler->spareThreadAppeared();
0288 }
0289 
0290 void KisUpdaterContext::jobThreadExited()
0291 {
0292     QMutexLocker l(&m_runningThreadsMutex);
0293     m_numRunningThreads--;
0294     KIS_SAFE_ASSERT_RECOVER_NOOP(m_numRunningThreads >= 0);
0295 
0296     if (m_numRunningThreads <= 0) {
0297         m_waitForDoneCondition.wakeAll();
0298     }
0299 }
0300 
0301 void KisUpdaterContext::setTestingMode(bool value)
0302 {
0303     m_testingMode = value;
0304 }
0305 
0306 const QVector<KisUpdateJobItem*> KisUpdaterContext::getJobs()
0307 {
0308     return m_jobs;
0309 }
0310 
0311 void KisUpdaterContext::clear()
0312 {
0313     Q_FOREACH (KisUpdateJobItem *item, m_jobs) {
0314         item->testingSetDone();
0315     }
0316 
0317     m_lodCounter.testingClear();
0318 }
0319 
0320 
0321 KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount)
0322     : KisUpdaterContext(threadCount)
0323 {
0324     setTestingMode(true);
0325 }
0326