File indexing completed on 2024-05-12 15:58:48
0001 /* 0002 * SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 0007 #include "kis_updater_context.h" 0008 0009 #include <QThread> 0010 #include <QThreadPool> 0011 0012 #include "kis_update_job_item.h" 0013 #include "kis_stroke_job.h" 0014 0015 const int KisUpdaterContext::useIdealThreadCountTag = -1; 0016 0017 KisUpdaterContext::KisUpdaterContext(qint32 threadCount, KisUpdateScheduler *parent) 0018 : m_scheduler(parent) 0019 { 0020 if(threadCount <= 0) { 0021 threadCount = QThread::idealThreadCount(); 0022 threadCount = threadCount > 0 ? threadCount : 1; 0023 } 0024 0025 setThreadsLimit(threadCount); 0026 } 0027 0028 KisUpdaterContext::~KisUpdaterContext() 0029 { 0030 m_threadPool.waitForDone(); 0031 0032 if (m_testingMode) { 0033 clear(); 0034 } 0035 0036 qDeleteAll(m_jobs); 0037 } 0038 0039 void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, 0040 qint32 &numStrokeJobs) 0041 { 0042 numMergeJobs = 0; 0043 numStrokeJobs = 0; 0044 0045 Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { 0046 if(item->type() == KisUpdateJobItem::Type::MERGE || 0047 item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { 0048 numMergeJobs++; 0049 } 0050 else if(item->type() == KisUpdateJobItem::Type::STROKE) { 0051 numStrokeJobs++; 0052 } 0053 } 0054 } 0055 0056 KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const 0057 { 0058 KisUpdaterContextSnapshotEx state = ContextEmpty; 0059 0060 Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { 0061 if (item->type() == KisUpdateJobItem::Type::MERGE || 0062 item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { 0063 state |= HasMergeJob; 0064 } else if(item->type() == KisUpdateJobItem::Type::STROKE) { 0065 switch (item->strokeJobSequentiality()) { 0066 case KisStrokeJobData::SEQUENTIAL: 0067 state |= HasSequentialJob; 0068 break; 0069 case KisStrokeJobData::CONCURRENT: 0070 state |= HasConcurrentJob; 0071 break; 0072 case KisStrokeJobData::BARRIER: 0073 state |= HasBarrierJob; 0074 break; 0075 case KisStrokeJobData::UNIQUELY_CONCURRENT: 0076 state |= HasUniquelyConcurrentJob; 0077 break; 0078 } 0079 } 0080 } 0081 0082 return state; 0083 } 0084 0085 int KisUpdaterContext::currentLevelOfDetail() const 0086 { 0087 return m_lodCounter.readLod(); 0088 } 0089 0090 bool KisUpdaterContext::hasSpareThread() 0091 { 0092 bool found = false; 0093 0094 Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { 0095 if(!item->isRunning()) { 0096 found = true; 0097 break; 0098 } 0099 } 0100 return found; 0101 } 0102 0103 bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) 0104 { 0105 int lod = this->currentLevelOfDetail(); 0106 if (lod >= 0 && walker->levelOfDetail() != lod) return false; 0107 0108 bool intersects = false; 0109 0110 Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { 0111 if(item->isRunning() && walkerIntersectsJob(walker, item)) { 0112 intersects = true; 0113 break; 0114 } 0115 } 0116 0117 return !intersects; 0118 } 0119 0120 void KisUpdaterContext::startThread(int index) 0121 { 0122 { 0123 QMutexLocker l(&m_runningThreadsMutex); 0124 m_numRunningThreads++; 0125 } 0126 0127 m_threadPool.start(m_jobs[index]); 0128 } 0129 0130 /** 0131 * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into 0132 * one atomic method like `bool push()`, because this implementation 0133 * of KisUpdaterContext will not work in case of multiple 0134 * producers. But currently we have only one producer (one thread 0135 * in a time), that is guaranteed by the lock()/unlock() pair in 0136 * KisAbstractUpdateQueue::processQueue. 0137 */ 0138 void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) 0139 { 0140 m_lodCounter.addLod(walker->levelOfDetail()); 0141 qint32 jobIndex = findSpareThread(); 0142 Q_ASSERT(jobIndex >= 0); 0143 0144 const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); 0145 0146 // it might happen that we call this function from within 0147 // the thread itself, right when it finished its work 0148 if (shouldStartThread && !m_testingMode) { 0149 startThread(jobIndex); 0150 } 0151 } 0152 0153 void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) 0154 { 0155 m_lodCounter.addLod(strokeJob->levelOfDetail()); 0156 qint32 jobIndex = findSpareThread(); 0157 Q_ASSERT(jobIndex >= 0); 0158 0159 const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); 0160 0161 // it might happen that we call this function from within 0162 // the thread itself, right when it finished its work 0163 if (shouldStartThread && !m_testingMode) { 0164 startThread(jobIndex); 0165 } 0166 } 0167 0168 void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) 0169 { 0170 m_lodCounter.addLod(spontaneousJob->levelOfDetail()); 0171 qint32 jobIndex = findSpareThread(); 0172 Q_ASSERT(jobIndex >= 0); 0173 0174 const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); 0175 0176 // it might happen that we call this function from within 0177 // the thread itself, right when it finished its work 0178 if (shouldStartThread && !m_testingMode) { 0179 startThread(jobIndex); 0180 } 0181 } 0182 0183 void KisUpdaterContext::waitForDone() 0184 { 0185 QMutexLocker l(&m_runningThreadsMutex); 0186 0187 while(m_numRunningThreads > 0) { 0188 m_waitForDoneCondition.wait(l.mutex()); 0189 } 0190 } 0191 0192 bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, 0193 const KisUpdateJobItem* job) 0194 { 0195 /** 0196 * TODO: theoretically, we should compare rects intersection 0197 * on a per-layer basis. The current solution is too rough and 0198 * basically makes updates single-threaded in some cases (e.g. 0199 * when a transform mask is present in the stack) 0200 */ 0201 return walker->accessRect().intersects(job->accessRect()); 0202 } 0203 0204 qint32 KisUpdaterContext::findSpareThread() 0205 { 0206 for(qint32 i=0; i < m_jobs.size(); i++) 0207 if(!m_jobs[i]->isRunning()) 0208 return i; 0209 0210 return -1; 0211 } 0212 0213 void KisUpdaterContext::lock() 0214 { 0215 m_lock.lock(); 0216 } 0217 0218 void KisUpdaterContext::unlock() 0219 { 0220 m_lock.unlock(); 0221 } 0222 0223 void KisUpdaterContext::setThreadsLimit(int value) 0224 { 0225 m_threadPool.setMaxThreadCount(value); 0226 0227 for (int i = 0; i < m_jobs.size(); i++) { 0228 KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning()); 0229 // don't delete the jobs until all of them are checked! 0230 } 0231 0232 for (int i = 0; i < m_jobs.size(); i++) { 0233 delete m_jobs[i]; 0234 } 0235 0236 m_jobs.resize(value); 0237 0238 for(qint32 i = 0; i < m_jobs.size(); i++) { 0239 m_jobs[i] = new KisUpdateJobItem(this); 0240 } 0241 } 0242 0243 int KisUpdaterContext::threadsLimit() const 0244 { 0245 KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount()); 0246 return m_jobs.size(); 0247 } 0248 0249 void KisUpdaterContext::continueUpdate(const QRect& rc) 0250 { 0251 if (m_scheduler) m_scheduler->continueUpdate(rc); 0252 } 0253 0254 void KisUpdaterContext::doSomeUsefulWork() 0255 { 0256 if (m_scheduler) m_scheduler->doSomeUsefulWork(); 0257 } 0258 0259 void KisUpdaterContext::jobFinished() 0260 { 0261 m_lodCounter.removeLod(); 0262 if (m_scheduler) m_scheduler->spareThreadAppeared(); 0263 } 0264 0265 void KisUpdaterContext::jobThreadExited() 0266 { 0267 QMutexLocker l(&m_runningThreadsMutex); 0268 m_numRunningThreads--; 0269 KIS_SAFE_ASSERT_RECOVER_NOOP(m_numRunningThreads >= 0); 0270 0271 if (m_numRunningThreads <= 0) { 0272 m_waitForDoneCondition.wakeAll(); 0273 } 0274 } 0275 0276 void KisUpdaterContext::setTestingMode(bool value) 0277 { 0278 m_testingMode = value; 0279 } 0280 0281 const QVector<KisUpdateJobItem*> KisUpdaterContext::getJobs() 0282 { 0283 return m_jobs; 0284 } 0285 0286 void KisUpdaterContext::clear() 0287 { 0288 Q_FOREACH (KisUpdateJobItem *item, m_jobs) { 0289 item->testingSetDone(); 0290 } 0291 0292 m_lodCounter.testingClear(); 0293 } 0294 0295 0296 KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) 0297 : KisUpdaterContext(threadCount) 0298 { 0299 setTestingMode(true); 0300 } 0301