File indexing completed on 2024-05-19 04:26:39
0001 /* 0002 * SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 0007 #include "kis_updater_context.h" 0008 0009 #include <QThread> 0010 #include <QThreadPool> 0011 0012 #include "kis_update_job_item.h" 0013 #include "kis_stroke_job.h" 0014 0015 const int KisUpdaterContext::useIdealThreadCountTag = -1; 0016 0017 KisUpdaterContext::KisUpdaterContext(qint32 threadCount, KisUpdateScheduler *parent) 0018 : m_scheduler(parent) 0019 { 0020 if(threadCount <= 0) { 0021 threadCount = QThread::idealThreadCount(); 0022 threadCount = threadCount > 0 ? threadCount : 1; 0023 } 0024 0025 setThreadsLimit(threadCount); 0026 } 0027 0028 KisUpdaterContext::~KisUpdaterContext() 0029 { 0030 m_threadPool.waitForDone(); 0031 0032 if (m_testingMode) { 0033 clear(); 0034 } 0035 0036 qDeleteAll(m_jobs); 0037 } 0038 0039 void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, 0040 qint32 &numStrokeJobs) 0041 { 0042 numMergeJobs = 0; 0043 numStrokeJobs = 0; 0044 0045 /** 0046 * We cannot use Q_FOREACH here since the function may 0047 * be called concurrently without any locks, causing detaching 0048 * of the vector and causing a crash. Only read-only accesses 0049 * are allowed in such environment 0050 */ 0051 for (const KisUpdateJobItem *item : std::as_const(m_jobs)) { 0052 0053 if(item->type() == KisUpdateJobItem::Type::MERGE || 0054 item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { 0055 numMergeJobs++; 0056 } 0057 else if(item->type() == KisUpdateJobItem::Type::STROKE) { 0058 numStrokeJobs++; 0059 } 0060 } 0061 } 0062 0063 KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const 0064 { 0065 KisUpdaterContextSnapshotEx state = ContextEmpty; 0066 0067 /** 0068 * We cannot use Q_FOREACH here since the function may 0069 * be called concurrently without any locks, causing detaching 0070 * of the vector and causing a crash. Only read-only accesses 0071 * are allowed in such environment 0072 */ 0073 for (const KisUpdateJobItem *item : std::as_const(m_jobs)) { 0074 if (item->type() == KisUpdateJobItem::Type::MERGE || 0075 item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { 0076 state |= HasMergeJob; 0077 } else if(item->type() == KisUpdateJobItem::Type::STROKE) { 0078 switch (item->strokeJobSequentiality()) { 0079 case KisStrokeJobData::SEQUENTIAL: 0080 state |= HasSequentialJob; 0081 break; 0082 case KisStrokeJobData::CONCURRENT: 0083 state |= HasConcurrentJob; 0084 break; 0085 case KisStrokeJobData::BARRIER: 0086 state |= HasBarrierJob; 0087 break; 0088 case KisStrokeJobData::UNIQUELY_CONCURRENT: 0089 state |= HasUniquelyConcurrentJob; 0090 break; 0091 } 0092 } 0093 } 0094 0095 return state; 0096 } 0097 0098 int KisUpdaterContext::currentLevelOfDetail() const 0099 { 0100 return m_lodCounter.readLod(); 0101 } 0102 0103 bool KisUpdaterContext::hasSpareThread() 0104 { 0105 bool found = false; 0106 0107 /** 0108 * We cannot use Q_FOREACH here since the function may 0109 * be called concurrently without any locks, causing detaching 0110 * of the vector and causing a crash. Only read-only accesses 0111 * are allowed in such environment 0112 */ 0113 for (const KisUpdateJobItem *item : std::as_const(m_jobs)) { 0114 if(!item->isRunning()) { 0115 found = true; 0116 break; 0117 } 0118 } 0119 return found; 0120 } 0121 0122 bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) 0123 { 0124 int lod = this->currentLevelOfDetail(); 0125 if (lod >= 0 && walker->levelOfDetail() != lod) return false; 0126 0127 bool intersects = false; 0128 0129 /** 0130 * We cannot use Q_FOREACH here since the function may 0131 * be called concurrently without any locks, causing detaching 0132 * of the vector and causing a crash. Only read-only accesses 0133 * are allowed in such environment 0134 */ 0135 for (const KisUpdateJobItem *item : std::as_const(m_jobs)) { 0136 if(item->isRunning() && walkerIntersectsJob(walker, item)) { 0137 intersects = true; 0138 break; 0139 } 0140 } 0141 0142 return !intersects; 0143 } 0144 0145 void KisUpdaterContext::startThread(int index) 0146 { 0147 { 0148 QMutexLocker l(&m_runningThreadsMutex); 0149 m_numRunningThreads++; 0150 } 0151 0152 m_threadPool.start(m_jobs[index]); 0153 } 0154 0155 /** 0156 * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into 0157 * one atomic method like `bool push()`, because this implementation 0158 * of KisUpdaterContext will not work in case of multiple 0159 * producers. But currently we have only one producer (one thread 0160 * in a time), that is guaranteed by the lock()/unlock() pair in 0161 * KisAbstractUpdateQueue::processQueue. 0162 */ 0163 void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) 0164 { 0165 m_lodCounter.addLod(walker->levelOfDetail()); 0166 qint32 jobIndex = findSpareThread(); 0167 Q_ASSERT(jobIndex >= 0); 0168 0169 const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); 0170 0171 // it might happen that we call this function from within 0172 // the thread itself, right when it finished its work 0173 if (shouldStartThread && !m_testingMode) { 0174 startThread(jobIndex); 0175 } 0176 } 0177 0178 void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) 0179 { 0180 m_lodCounter.addLod(strokeJob->levelOfDetail()); 0181 qint32 jobIndex = findSpareThread(); 0182 Q_ASSERT(jobIndex >= 0); 0183 0184 const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); 0185 0186 // it might happen that we call this function from within 0187 // the thread itself, right when it finished its work 0188 if (shouldStartThread && !m_testingMode) { 0189 startThread(jobIndex); 0190 } 0191 } 0192 0193 void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) 0194 { 0195 m_lodCounter.addLod(spontaneousJob->levelOfDetail()); 0196 qint32 jobIndex = findSpareThread(); 0197 Q_ASSERT(jobIndex >= 0); 0198 0199 const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); 0200 0201 // it might happen that we call this function from within 0202 // the thread itself, right when it finished its work 0203 if (shouldStartThread && !m_testingMode) { 0204 startThread(jobIndex); 0205 } 0206 } 0207 0208 void KisUpdaterContext::waitForDone() 0209 { 0210 QMutexLocker l(&m_runningThreadsMutex); 0211 0212 while(m_numRunningThreads > 0) { 0213 m_waitForDoneCondition.wait(l.mutex()); 0214 } 0215 } 0216 0217 bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, 0218 const KisUpdateJobItem* job) 0219 { 0220 /** 0221 * TODO: theoretically, we should compare rects intersection 0222 * on a per-layer basis. The current solution is too rough and 0223 * basically makes updates single-threaded in some cases (e.g. 0224 * when a transform mask is present in the stack) 0225 */ 0226 return walker->accessRect().intersects(job->accessRect()); 0227 } 0228 0229 qint32 KisUpdaterContext::findSpareThread() 0230 { 0231 for(qint32 i=0; i < m_jobs.size(); i++) 0232 if(!m_jobs[i]->isRunning()) 0233 return i; 0234 0235 return -1; 0236 } 0237 0238 void KisUpdaterContext::lock() 0239 { 0240 m_lock.lock(); 0241 } 0242 0243 void KisUpdaterContext::unlock() 0244 { 0245 m_lock.unlock(); 0246 } 0247 0248 void KisUpdaterContext::setThreadsLimit(int value) 0249 { 0250 m_threadPool.setMaxThreadCount(value); 0251 0252 for (int i = 0; i < m_jobs.size(); i++) { 0253 KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning()); 0254 // don't delete the jobs until all of them are checked! 0255 } 0256 0257 for (int i = 0; i < m_jobs.size(); i++) { 0258 delete m_jobs[i]; 0259 } 0260 0261 m_jobs.resize(value); 0262 0263 for(qint32 i = 0; i < m_jobs.size(); i++) { 0264 m_jobs[i] = new KisUpdateJobItem(this); 0265 } 0266 } 0267 0268 int KisUpdaterContext::threadsLimit() const 0269 { 0270 KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount()); 0271 return m_jobs.size(); 0272 } 0273 0274 void KisUpdaterContext::continueUpdate(const QRect& rc) 0275 { 0276 if (m_scheduler) m_scheduler->continueUpdate(rc); 0277 } 0278 0279 void KisUpdaterContext::doSomeUsefulWork() 0280 { 0281 if (m_scheduler) m_scheduler->doSomeUsefulWork(); 0282 } 0283 0284 void KisUpdaterContext::jobFinished() 0285 { 0286 m_lodCounter.removeLod(); 0287 if (m_scheduler) m_scheduler->spareThreadAppeared(); 0288 } 0289 0290 void KisUpdaterContext::jobThreadExited() 0291 { 0292 QMutexLocker l(&m_runningThreadsMutex); 0293 m_numRunningThreads--; 0294 KIS_SAFE_ASSERT_RECOVER_NOOP(m_numRunningThreads >= 0); 0295 0296 if (m_numRunningThreads <= 0) { 0297 m_waitForDoneCondition.wakeAll(); 0298 } 0299 } 0300 0301 void KisUpdaterContext::setTestingMode(bool value) 0302 { 0303 m_testingMode = value; 0304 } 0305 0306 const QVector<KisUpdateJobItem*> KisUpdaterContext::getJobs() 0307 { 0308 return m_jobs; 0309 } 0310 0311 void KisUpdaterContext::clear() 0312 { 0313 Q_FOREACH (KisUpdateJobItem *item, m_jobs) { 0314 item->testingSetDone(); 0315 } 0316 0317 m_lodCounter.testingClear(); 0318 } 0319 0320 0321 KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) 0322 : KisUpdaterContext(threadCount) 0323 { 0324 setTestingMode(true); 0325 } 0326