File indexing completed on 2024-05-12 15:58:47
0001 /* 0002 * SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 0007 #include "kis_update_scheduler.h" 0008 0009 #include "klocalizedstring.h" 0010 #include "kis_image_config.h" 0011 #include "kis_merge_walker.h" 0012 #include "kis_full_refresh_walker.h" 0013 0014 #include "kis_updater_context.h" 0015 #include "kis_simple_update_queue.h" 0016 #include "kis_strokes_queue.h" 0017 0018 #include "kis_queues_progress_updater.h" 0019 #include "KisImageConfigNotifier.h" 0020 0021 #include <QReadWriteLock> 0022 #include "kis_lazy_wait_condition.h" 0023 #include <mutex> 0024 0025 //#define DEBUG_BALANCING 0026 0027 #ifdef DEBUG_BALANCING 0028 #define DEBUG_BALANCING_METRICS(decidedFirst, excl) \ 0029 dbgKrita << "Balance decision:" << decidedFirst \ 0030 << "(" << excl << ")" \ 0031 << "updates:" << m_d->updatesQueue.sizeMetric() \ 0032 << "strokes:" << m_d->strokesQueue.sizeMetric() 0033 #else 0034 #define DEBUG_BALANCING_METRICS(decidedFirst, excl) 0035 #endif 0036 0037 0038 struct Q_DECL_HIDDEN KisUpdateScheduler::Private { 0039 Private(KisUpdateScheduler *_q, KisProjectionUpdateListener *p) 0040 : q(_q) 0041 , updaterContext(KisImageConfig(true).maxNumberOfThreads(), q) 0042 , projectionUpdateListener(p) 0043 {} 0044 0045 KisUpdateScheduler *q; 0046 0047 KisSimpleUpdateQueue updatesQueue; 0048 KisStrokesQueue strokesQueue; 0049 KisUpdaterContext updaterContext; 0050 bool processingBlocked = false; 0051 qreal defaultBalancingRatio = 1.0; // desired strokes-queue-size / updates-queue-size 0052 KisProjectionUpdateListener *projectionUpdateListener; 0053 KisQueuesProgressUpdater *progressUpdater = 0; 0054 0055 QAtomicInt updatesLockCounter; 0056 QReadWriteLock updatesStartLock; 0057 KisLazyWaitCondition updatesFinishedCondition; 0058 0059 qreal balancingRatio() const { 0060 const qreal strokeRatioOverride = strokesQueue.balancingRatioOverride(); 0061 return strokeRatioOverride > 0 ? strokeRatioOverride : defaultBalancingRatio; 0062 } 0063 }; 0064 0065 KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent) 0066 : QObject(parent), 0067 m_d(new Private(this, projectionUpdateListener)) 0068 { 0069 updateSettings(); 0070 connectSignals(); 0071 } 0072 0073 KisUpdateScheduler::KisUpdateScheduler() 0074 : m_d(new Private(this, 0)) 0075 { 0076 } 0077 0078 KisUpdateScheduler::~KisUpdateScheduler() 0079 { 0080 delete m_d->progressUpdater; 0081 delete m_d; 0082 } 0083 0084 void KisUpdateScheduler::setThreadsLimit(int value) 0085 { 0086 KIS_SAFE_ASSERT_RECOVER_RETURN(!m_d->processingBlocked); 0087 0088 /** 0089 * Thread limit can be changed without the full-featured barrier 0090 * lock, we can avoid waiting for all the jobs to complete. We 0091 * should just ensure there is no more jobs in the updater context. 0092 */ 0093 immediateLockForReadOnly(); 0094 m_d->updaterContext.lock(); 0095 m_d->updaterContext.setThreadsLimit(value); 0096 m_d->updaterContext.unlock(); 0097 unlock(false); 0098 } 0099 0100 int KisUpdateScheduler::threadsLimit() const 0101 { 0102 std::lock_guard<KisUpdaterContext> l(m_d->updaterContext); 0103 return m_d->updaterContext.threadsLimit(); 0104 } 0105 0106 void KisUpdateScheduler::connectSignals() 0107 { 0108 connect(KisImageConfigNotifier::instance(), SIGNAL(configChanged()), 0109 SLOT(updateSettings())); 0110 } 0111 0112 void KisUpdateScheduler::setProgressProxy(KoProgressProxy *progressProxy) 0113 { 0114 delete m_d->progressUpdater; 0115 m_d->progressUpdater = progressProxy ? 0116 new KisQueuesProgressUpdater(progressProxy, this) : 0; 0117 } 0118 0119 void KisUpdateScheduler::progressUpdate() 0120 { 0121 if (!m_d->progressUpdater) return; 0122 0123 if(!m_d->strokesQueue.hasOpenedStrokes()) { 0124 QString jobName = m_d->strokesQueue.currentStrokeName().toString(); 0125 if(jobName.isEmpty()) { 0126 jobName = i18n("Updating..."); 0127 } 0128 0129 int sizeMetric = m_d->strokesQueue.sizeMetric(); 0130 if (!sizeMetric) { 0131 sizeMetric = m_d->updatesQueue.sizeMetric(); 0132 } 0133 0134 m_d->progressUpdater->updateProgress(sizeMetric, jobName); 0135 } 0136 else { 0137 m_d->progressUpdater->hide(); 0138 } 0139 } 0140 0141 void KisUpdateScheduler::updateProjection(KisNodeSP node, const QVector<QRect> &rects, const QRect &cropRect) 0142 { 0143 m_d->updatesQueue.addUpdateJob(node, rects, cropRect, currentLevelOfDetail()); 0144 processQueues(); 0145 } 0146 0147 void KisUpdateScheduler::updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect) 0148 { 0149 m_d->updatesQueue.addUpdateJob(node, rc, cropRect, currentLevelOfDetail()); 0150 processQueues(); 0151 } 0152 0153 void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QVector<QRect>& rects, const QRect &cropRect) 0154 { 0155 m_d->updatesQueue.addUpdateNoFilthyJob(node, rects, cropRect, currentLevelOfDetail()); 0156 processQueues(); 0157 } 0158 0159 void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect) 0160 { 0161 m_d->updatesQueue.addUpdateNoFilthyJob(node, rc, cropRect, currentLevelOfDetail()); 0162 processQueues(); 0163 } 0164 0165 void KisUpdateScheduler::fullRefreshAsync(KisNodeSP root, const QVector<QRect>& rects, const QRect &cropRect) 0166 { 0167 m_d->updatesQueue.addFullRefreshJob(root, rects, cropRect, currentLevelOfDetail()); 0168 processQueues(); 0169 } 0170 0171 void KisUpdateScheduler::fullRefreshAsyncNoFilthy(KisNodeSP root, const QVector<QRect> &rects, const QRect &cropRect) 0172 { 0173 m_d->updatesQueue.addFullRefreshNoFilthyJob(root, rects, cropRect, currentLevelOfDetail()); 0174 processQueues(); 0175 } 0176 0177 void KisUpdateScheduler::fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect) 0178 { 0179 KisBaseRectsWalkerSP walker = new KisFullRefreshWalker(cropRect); 0180 walker->collectRects(root, rc); 0181 0182 bool needLock = true; 0183 0184 if(m_d->processingBlocked) { 0185 warnImage << "WARNING: Calling synchronous fullRefresh under a scheduler lock held"; 0186 warnImage << "We will not assert for now, but please port caller's to strokes"; 0187 warnImage << "to avoid this warning"; 0188 needLock = false; 0189 } 0190 0191 if(needLock) immediateLockForReadOnly(); 0192 m_d->updaterContext.lock(); 0193 0194 Q_ASSERT(m_d->updaterContext.isJobAllowed(walker)); 0195 m_d->updaterContext.addMergeJob(walker); 0196 m_d->updaterContext.unlock(); 0197 0198 m_d->updaterContext.waitForDone(); 0199 0200 if(needLock) unlock(true); 0201 } 0202 0203 void KisUpdateScheduler::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) 0204 { 0205 m_d->updatesQueue.addSpontaneousJob(spontaneousJob); 0206 processQueues(); 0207 } 0208 0209 bool KisUpdateScheduler::hasUpdatesRunning() const 0210 { 0211 return !m_d->updatesQueue.isEmpty(); 0212 } 0213 0214 KisStrokeId KisUpdateScheduler::startStroke(KisStrokeStrategy *strokeStrategy) 0215 { 0216 KisStrokeId id = m_d->strokesQueue.startStroke(strokeStrategy); 0217 processQueues(); 0218 return id; 0219 } 0220 0221 void KisUpdateScheduler::addJob(KisStrokeId id, KisStrokeJobData *data) 0222 { 0223 m_d->strokesQueue.addJob(id, data); 0224 processQueues(); 0225 } 0226 0227 void KisUpdateScheduler::endStroke(KisStrokeId id) 0228 { 0229 m_d->strokesQueue.endStroke(id); 0230 processQueues(); 0231 } 0232 0233 bool KisUpdateScheduler::cancelStroke(KisStrokeId id) 0234 { 0235 bool result = m_d->strokesQueue.cancelStroke(id); 0236 processQueues(); 0237 return result; 0238 } 0239 0240 bool KisUpdateScheduler::tryCancelCurrentStrokeAsync() 0241 { 0242 return m_d->strokesQueue.tryCancelCurrentStrokeAsync(); 0243 } 0244 0245 UndoResult KisUpdateScheduler::tryUndoLastStrokeAsync() 0246 { 0247 return m_d->strokesQueue.tryUndoLastStrokeAsync(); 0248 } 0249 0250 bool KisUpdateScheduler::wrapAroundModeSupported() const 0251 { 0252 return m_d->strokesQueue.wrapAroundModeSupported(); 0253 } 0254 0255 void KisUpdateScheduler::setLodPreferences(const KisLodPreferences &value) 0256 { 0257 m_d->strokesQueue.setLodPreferences(value); 0258 0259 /** 0260 * The queue might have started an internal stroke for 0261 * cache synchronization. Process the queues to execute 0262 * it if needed. 0263 */ 0264 processQueues(); 0265 } 0266 0267 KisLodPreferences KisUpdateScheduler::lodPreferences() const 0268 { 0269 return m_d->strokesQueue.lodPreferences(); 0270 } 0271 0272 void KisUpdateScheduler::explicitRegenerateLevelOfDetail() 0273 { 0274 m_d->strokesQueue.explicitRegenerateLevelOfDetail(); 0275 0276 // \see a comment in setDesiredLevelOfDetail() 0277 processQueues(); 0278 } 0279 0280 int KisUpdateScheduler::currentLevelOfDetail() const 0281 { 0282 int levelOfDetail = m_d->updaterContext.currentLevelOfDetail(); 0283 0284 if (levelOfDetail < 0) { 0285 // it is safe, because is called iff updaterContext has no running jobs at all 0286 levelOfDetail = m_d->updatesQueue.overrideLevelOfDetail(); 0287 } 0288 0289 if (levelOfDetail < 0) { 0290 levelOfDetail = 0; 0291 } 0292 0293 return levelOfDetail; 0294 } 0295 0296 void KisUpdateScheduler::setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory) 0297 { 0298 m_d->strokesQueue.setLod0ToNStrokeStrategyFactory(factory); 0299 } 0300 0301 void KisUpdateScheduler::setSuspendResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyPairFactory &factory) 0302 { 0303 m_d->strokesQueue.setSuspendResumeUpdatesStrokeStrategyFactory(factory); 0304 } 0305 0306 void KisUpdateScheduler::setPurgeRedoStateCallback(const std::function<void ()> &callback) 0307 { 0308 m_d->strokesQueue.setPurgeRedoStateCallback(callback); 0309 } 0310 0311 KisPostExecutionUndoAdapter *KisUpdateScheduler::lodNPostExecutionUndoAdapter() const 0312 { 0313 return m_d->strokesQueue.lodNPostExecutionUndoAdapter(); 0314 } 0315 0316 void KisUpdateScheduler::updateSettings() 0317 { 0318 m_d->updatesQueue.updateSettings(); 0319 KisImageConfig config(true); 0320 m_d->defaultBalancingRatio = config.schedulerBalancingRatio(); 0321 setThreadsLimit(config.maxNumberOfThreads()); 0322 } 0323 0324 void KisUpdateScheduler::immediateLockForReadOnly() 0325 { 0326 m_d->processingBlocked = true; 0327 m_d->updaterContext.waitForDone(); 0328 } 0329 0330 void KisUpdateScheduler::unlock(bool resetLodLevels) 0331 { 0332 if (resetLodLevels) { 0333 /** 0334 * Legacy strokes may have changed the image while we didn't 0335 * control it. Notify the queue to take it into account. 0336 */ 0337 m_d->strokesQueue.notifyUFOChangedImage(); 0338 } 0339 0340 m_d->processingBlocked = false; 0341 processQueues(); 0342 } 0343 0344 bool KisUpdateScheduler::isIdle() 0345 { 0346 bool result = false; 0347 0348 if (tryBarrierLock()) { 0349 result = true; 0350 unlock(false); 0351 } 0352 0353 return result; 0354 } 0355 0356 void KisUpdateScheduler::waitForDone() 0357 { 0358 do { 0359 processQueues(); 0360 m_d->updaterContext.waitForDone(); 0361 } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); 0362 } 0363 0364 bool KisUpdateScheduler::tryBarrierLock() 0365 { 0366 if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { 0367 return false; 0368 } 0369 0370 m_d->processingBlocked = true; 0371 m_d->updaterContext.waitForDone(); 0372 if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { 0373 m_d->processingBlocked = false; 0374 processQueues(); 0375 return false; 0376 } 0377 0378 return true; 0379 } 0380 0381 void KisUpdateScheduler::barrierLock() 0382 { 0383 do { 0384 m_d->processingBlocked = false; 0385 processQueues(); 0386 m_d->processingBlocked = true; 0387 m_d->updaterContext.waitForDone(); 0388 } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); 0389 } 0390 0391 void KisUpdateScheduler::processQueues() 0392 { 0393 wakeUpWaitingThreads(); 0394 0395 if(m_d->processingBlocked) return; 0396 0397 if(m_d->strokesQueue.needsExclusiveAccess()) { 0398 DEBUG_BALANCING_METRICS("STROKES", "X"); 0399 m_d->strokesQueue.processQueue(m_d->updaterContext, 0400 !m_d->updatesQueue.isEmpty()); 0401 0402 if(!m_d->strokesQueue.needsExclusiveAccess()) { 0403 tryProcessUpdatesQueue(); 0404 } 0405 } 0406 else if(m_d->balancingRatio() * m_d->strokesQueue.sizeMetric() > m_d->updatesQueue.sizeMetric()) { 0407 DEBUG_BALANCING_METRICS("STROKES", "N"); 0408 m_d->strokesQueue.processQueue(m_d->updaterContext, 0409 !m_d->updatesQueue.isEmpty()); 0410 tryProcessUpdatesQueue(); 0411 } 0412 else { 0413 DEBUG_BALANCING_METRICS("UPDATES", "N"); 0414 tryProcessUpdatesQueue(); 0415 m_d->strokesQueue.processQueue(m_d->updaterContext, 0416 !m_d->updatesQueue.isEmpty()); 0417 0418 } 0419 0420 progressUpdate(); 0421 } 0422 0423 void KisUpdateScheduler::blockUpdates() 0424 { 0425 m_d->updatesFinishedCondition.initWaiting(); 0426 0427 m_d->updatesLockCounter.ref(); 0428 while(haveUpdatesRunning()) { 0429 m_d->updatesFinishedCondition.wait(); 0430 } 0431 0432 m_d->updatesFinishedCondition.endWaiting(); 0433 } 0434 0435 void KisUpdateScheduler::unblockUpdates() 0436 { 0437 m_d->updatesLockCounter.deref(); 0438 processQueues(); 0439 } 0440 0441 void KisUpdateScheduler::wakeUpWaitingThreads() 0442 { 0443 if(m_d->updatesLockCounter && !haveUpdatesRunning()) { 0444 m_d->updatesFinishedCondition.wakeAll(); 0445 } 0446 } 0447 0448 void KisUpdateScheduler::tryProcessUpdatesQueue() 0449 { 0450 QReadLocker locker(&m_d->updatesStartLock); 0451 if(m_d->updatesLockCounter) return; 0452 0453 m_d->updatesQueue.processQueue(m_d->updaterContext); 0454 } 0455 0456 bool KisUpdateScheduler::haveUpdatesRunning() 0457 { 0458 QWriteLocker locker(&m_d->updatesStartLock); 0459 0460 qint32 numMergeJobs, numStrokeJobs; 0461 m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs); 0462 0463 return numMergeJobs; 0464 } 0465 0466 void KisUpdateScheduler::continueUpdate(const QRect &rect) 0467 { 0468 Q_ASSERT(m_d->projectionUpdateListener); 0469 m_d->projectionUpdateListener->notifyProjectionUpdated(rect); 0470 } 0471 0472 void KisUpdateScheduler::doSomeUsefulWork() 0473 { 0474 m_d->updatesQueue.optimize(); 0475 } 0476 0477 void KisUpdateScheduler::spareThreadAppeared() 0478 { 0479 processQueues(); 0480 } 0481 0482 KisTestableUpdateScheduler::KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, 0483 qint32 threadCount) 0484 { 0485 updateSettings(); 0486 m_d->projectionUpdateListener = projectionUpdateListener; 0487 0488 setThreadsLimit(threadCount); 0489 m_d->updaterContext.setTestingMode(true); 0490 0491 connectSignals(); 0492 } 0493 0494 KisUpdaterContext *KisTestableUpdateScheduler::updaterContext() 0495 { 0496 return &m_d->updaterContext; 0497 }