File indexing completed on 2024-05-12 15:58:47

0001 /*
0002  *  SPDX-FileCopyrightText: 2010 Dmitry Kazakov <dimula73@gmail.com>
0003  *
0004  *  SPDX-License-Identifier: GPL-2.0-or-later
0005  */
0006 
0007 #include "kis_update_scheduler.h"
0008 
0009 #include "klocalizedstring.h"
0010 #include "kis_image_config.h"
0011 #include "kis_merge_walker.h"
0012 #include "kis_full_refresh_walker.h"
0013 
0014 #include "kis_updater_context.h"
0015 #include "kis_simple_update_queue.h"
0016 #include "kis_strokes_queue.h"
0017 
0018 #include "kis_queues_progress_updater.h"
0019 #include "KisImageConfigNotifier.h"
0020 
0021 #include <QReadWriteLock>
0022 #include "kis_lazy_wait_condition.h"
0023 #include <mutex>
0024 
0025 //#define DEBUG_BALANCING
0026 
0027 #ifdef DEBUG_BALANCING
0028 #define DEBUG_BALANCING_METRICS(decidedFirst, excl)                     \
0029     dbgKrita << "Balance decision:" << decidedFirst                     \
0030     << "(" << excl << ")"                                               \
0031     << "updates:" << m_d->updatesQueue.sizeMetric()                    \
0032     << "strokes:" << m_d->strokesQueue.sizeMetric()
0033 #else
0034 #define DEBUG_BALANCING_METRICS(decidedFirst, excl)
0035 #endif
0036 
0037 
0038 struct Q_DECL_HIDDEN KisUpdateScheduler::Private {
0039     Private(KisUpdateScheduler *_q, KisProjectionUpdateListener *p)
0040         : q(_q)
0041         , updaterContext(KisImageConfig(true).maxNumberOfThreads(), q)
0042         , projectionUpdateListener(p)
0043     {}
0044 
0045     KisUpdateScheduler *q;
0046 
0047     KisSimpleUpdateQueue updatesQueue;
0048     KisStrokesQueue strokesQueue;
0049     KisUpdaterContext updaterContext;
0050     bool processingBlocked = false;
0051     qreal defaultBalancingRatio = 1.0; // desired strokes-queue-size / updates-queue-size
0052     KisProjectionUpdateListener *projectionUpdateListener;
0053     KisQueuesProgressUpdater *progressUpdater = 0;
0054 
0055     QAtomicInt updatesLockCounter;
0056     QReadWriteLock updatesStartLock;
0057     KisLazyWaitCondition updatesFinishedCondition;
0058 
0059     qreal balancingRatio() const {
0060         const qreal strokeRatioOverride = strokesQueue.balancingRatioOverride();
0061         return strokeRatioOverride > 0 ? strokeRatioOverride : defaultBalancingRatio;
0062     }
0063 };
0064 
0065 KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent)
0066     : QObject(parent),
0067       m_d(new Private(this, projectionUpdateListener))
0068 {
0069     updateSettings();
0070     connectSignals();
0071 }
0072 
0073 KisUpdateScheduler::KisUpdateScheduler()
0074     : m_d(new Private(this, 0))
0075 {
0076 }
0077 
0078 KisUpdateScheduler::~KisUpdateScheduler()
0079 {
0080     delete m_d->progressUpdater;
0081     delete m_d;
0082 }
0083 
0084 void KisUpdateScheduler::setThreadsLimit(int value)
0085 {
0086     KIS_SAFE_ASSERT_RECOVER_RETURN(!m_d->processingBlocked);
0087 
0088     /**
0089      * Thread limit can be changed without the full-featured barrier
0090      * lock, we can avoid waiting for all the jobs to complete. We
0091      * should just ensure there is no more jobs in the updater context.
0092      */
0093     immediateLockForReadOnly();
0094     m_d->updaterContext.lock();
0095     m_d->updaterContext.setThreadsLimit(value);
0096     m_d->updaterContext.unlock();
0097     unlock(false);
0098 }
0099 
0100 int KisUpdateScheduler::threadsLimit() const
0101 {
0102     std::lock_guard<KisUpdaterContext> l(m_d->updaterContext);
0103     return m_d->updaterContext.threadsLimit();
0104 }
0105 
0106 void KisUpdateScheduler::connectSignals()
0107 {
0108     connect(KisImageConfigNotifier::instance(), SIGNAL(configChanged()),
0109             SLOT(updateSettings()));
0110 }
0111 
0112 void KisUpdateScheduler::setProgressProxy(KoProgressProxy *progressProxy)
0113 {
0114     delete m_d->progressUpdater;
0115     m_d->progressUpdater = progressProxy ?
0116         new KisQueuesProgressUpdater(progressProxy, this) : 0;
0117 }
0118 
0119 void KisUpdateScheduler::progressUpdate()
0120 {
0121     if (!m_d->progressUpdater) return;
0122 
0123     if(!m_d->strokesQueue.hasOpenedStrokes()) {
0124         QString jobName = m_d->strokesQueue.currentStrokeName().toString();
0125         if(jobName.isEmpty()) {
0126             jobName = i18n("Updating...");
0127         }
0128 
0129         int sizeMetric = m_d->strokesQueue.sizeMetric();
0130         if (!sizeMetric) {
0131             sizeMetric = m_d->updatesQueue.sizeMetric();
0132         }
0133 
0134         m_d->progressUpdater->updateProgress(sizeMetric, jobName);
0135     }
0136     else {
0137         m_d->progressUpdater->hide();
0138     }
0139 }
0140 
0141 void KisUpdateScheduler::updateProjection(KisNodeSP node, const QVector<QRect> &rects, const QRect &cropRect)
0142 {
0143     m_d->updatesQueue.addUpdateJob(node, rects, cropRect, currentLevelOfDetail());
0144     processQueues();
0145 }
0146 
0147 void KisUpdateScheduler::updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect)
0148 {
0149     m_d->updatesQueue.addUpdateJob(node, rc, cropRect, currentLevelOfDetail());
0150     processQueues();
0151 }
0152 
0153 void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QVector<QRect>& rects, const QRect &cropRect)
0154 {
0155     m_d->updatesQueue.addUpdateNoFilthyJob(node, rects, cropRect, currentLevelOfDetail());
0156     processQueues();
0157 }
0158 
0159 void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect)
0160 {
0161     m_d->updatesQueue.addUpdateNoFilthyJob(node, rc, cropRect, currentLevelOfDetail());
0162     processQueues();
0163 }
0164 
0165 void KisUpdateScheduler::fullRefreshAsync(KisNodeSP root, const QVector<QRect>& rects, const QRect &cropRect)
0166 {
0167     m_d->updatesQueue.addFullRefreshJob(root, rects, cropRect, currentLevelOfDetail());
0168     processQueues();
0169 }
0170 
0171 void KisUpdateScheduler::fullRefreshAsyncNoFilthy(KisNodeSP root, const QVector<QRect> &rects, const QRect &cropRect)
0172 {
0173     m_d->updatesQueue.addFullRefreshNoFilthyJob(root, rects, cropRect, currentLevelOfDetail());
0174     processQueues();
0175 }
0176 
0177 void KisUpdateScheduler::fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect)
0178 {
0179     KisBaseRectsWalkerSP walker = new KisFullRefreshWalker(cropRect);
0180     walker->collectRects(root, rc);
0181 
0182     bool needLock = true;
0183 
0184     if(m_d->processingBlocked) {
0185         warnImage << "WARNING: Calling synchronous fullRefresh under a scheduler lock held";
0186         warnImage << "We will not assert for now, but please port caller's to strokes";
0187         warnImage << "to avoid this warning";
0188         needLock = false;
0189     }
0190 
0191     if(needLock) immediateLockForReadOnly();
0192     m_d->updaterContext.lock();
0193 
0194     Q_ASSERT(m_d->updaterContext.isJobAllowed(walker));
0195     m_d->updaterContext.addMergeJob(walker);
0196     m_d->updaterContext.unlock();
0197 
0198     m_d->updaterContext.waitForDone();
0199 
0200     if(needLock) unlock(true);
0201 }
0202 
0203 void KisUpdateScheduler::addSpontaneousJob(KisSpontaneousJob *spontaneousJob)
0204 {
0205     m_d->updatesQueue.addSpontaneousJob(spontaneousJob);
0206     processQueues();
0207 }
0208 
0209 bool KisUpdateScheduler::hasUpdatesRunning() const
0210 {
0211     return !m_d->updatesQueue.isEmpty();
0212 }
0213 
0214 KisStrokeId KisUpdateScheduler::startStroke(KisStrokeStrategy *strokeStrategy)
0215 {
0216     KisStrokeId id  = m_d->strokesQueue.startStroke(strokeStrategy);
0217     processQueues();
0218     return id;
0219 }
0220 
0221 void KisUpdateScheduler::addJob(KisStrokeId id, KisStrokeJobData *data)
0222 {
0223     m_d->strokesQueue.addJob(id, data);
0224     processQueues();
0225 }
0226 
0227 void KisUpdateScheduler::endStroke(KisStrokeId id)
0228 {
0229     m_d->strokesQueue.endStroke(id);
0230     processQueues();
0231 }
0232 
0233 bool KisUpdateScheduler::cancelStroke(KisStrokeId id)
0234 {
0235     bool result = m_d->strokesQueue.cancelStroke(id);
0236     processQueues();
0237     return result;
0238 }
0239 
0240 bool KisUpdateScheduler::tryCancelCurrentStrokeAsync()
0241 {
0242     return m_d->strokesQueue.tryCancelCurrentStrokeAsync();
0243 }
0244 
0245 UndoResult KisUpdateScheduler::tryUndoLastStrokeAsync()
0246 {
0247     return m_d->strokesQueue.tryUndoLastStrokeAsync();
0248 }
0249 
0250 bool KisUpdateScheduler::wrapAroundModeSupported() const
0251 {
0252     return m_d->strokesQueue.wrapAroundModeSupported();
0253 }
0254 
0255 void KisUpdateScheduler::setLodPreferences(const KisLodPreferences &value)
0256 {
0257     m_d->strokesQueue.setLodPreferences(value);
0258 
0259     /**
0260      * The queue might have started an internal stroke for
0261      * cache synchronization. Process the queues to execute
0262      * it if needed.
0263      */
0264     processQueues();
0265 }
0266 
0267 KisLodPreferences KisUpdateScheduler::lodPreferences() const
0268 {
0269     return m_d->strokesQueue.lodPreferences();
0270 }
0271 
0272 void KisUpdateScheduler::explicitRegenerateLevelOfDetail()
0273 {
0274     m_d->strokesQueue.explicitRegenerateLevelOfDetail();
0275 
0276     // \see a comment in setDesiredLevelOfDetail()
0277     processQueues();
0278 }
0279 
0280 int KisUpdateScheduler::currentLevelOfDetail() const
0281 {
0282     int levelOfDetail = m_d->updaterContext.currentLevelOfDetail();
0283 
0284     if (levelOfDetail < 0) {
0285         // it is safe, because is called iff updaterContext has no running jobs at all
0286         levelOfDetail = m_d->updatesQueue.overrideLevelOfDetail();
0287     }
0288 
0289     if (levelOfDetail < 0) {
0290         levelOfDetail = 0;
0291     }
0292 
0293     return levelOfDetail;
0294 }
0295 
0296 void KisUpdateScheduler::setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory)
0297 {
0298     m_d->strokesQueue.setLod0ToNStrokeStrategyFactory(factory);
0299 }
0300 
0301 void KisUpdateScheduler::setSuspendResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyPairFactory &factory)
0302 {
0303     m_d->strokesQueue.setSuspendResumeUpdatesStrokeStrategyFactory(factory);
0304 }
0305 
0306 void KisUpdateScheduler::setPurgeRedoStateCallback(const std::function<void ()> &callback)
0307 {
0308     m_d->strokesQueue.setPurgeRedoStateCallback(callback);
0309 }
0310 
0311 KisPostExecutionUndoAdapter *KisUpdateScheduler::lodNPostExecutionUndoAdapter() const
0312 {
0313     return m_d->strokesQueue.lodNPostExecutionUndoAdapter();
0314 }
0315 
0316 void KisUpdateScheduler::updateSettings()
0317 {
0318     m_d->updatesQueue.updateSettings();
0319     KisImageConfig config(true);
0320     m_d->defaultBalancingRatio = config.schedulerBalancingRatio();
0321     setThreadsLimit(config.maxNumberOfThreads());
0322 }
0323 
0324 void KisUpdateScheduler::immediateLockForReadOnly()
0325 {
0326     m_d->processingBlocked = true;
0327     m_d->updaterContext.waitForDone();
0328 }
0329 
0330 void KisUpdateScheduler::unlock(bool resetLodLevels)
0331 {
0332     if (resetLodLevels) {
0333         /**
0334          * Legacy strokes may have changed the image while we didn't
0335          * control it. Notify the queue to take it into account.
0336          */
0337         m_d->strokesQueue.notifyUFOChangedImage();
0338     }
0339 
0340     m_d->processingBlocked = false;
0341     processQueues();
0342 }
0343 
0344 bool KisUpdateScheduler::isIdle()
0345 {
0346     bool result = false;
0347 
0348     if (tryBarrierLock()) {
0349         result = true;
0350         unlock(false);
0351     }
0352 
0353     return result;
0354 }
0355 
0356 void KisUpdateScheduler::waitForDone()
0357 {
0358     do {
0359         processQueues();
0360         m_d->updaterContext.waitForDone();
0361     } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty());
0362 }
0363 
0364 bool KisUpdateScheduler::tryBarrierLock()
0365 {
0366     if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) {
0367         return false;
0368     }
0369 
0370     m_d->processingBlocked = true;
0371     m_d->updaterContext.waitForDone();
0372     if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) {
0373         m_d->processingBlocked = false;
0374         processQueues();
0375         return false;
0376     }
0377 
0378     return true;
0379 }
0380 
0381 void KisUpdateScheduler::barrierLock()
0382 {
0383     do {
0384         m_d->processingBlocked = false;
0385         processQueues();
0386         m_d->processingBlocked = true;
0387         m_d->updaterContext.waitForDone();
0388     } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty());
0389 }
0390 
0391 void KisUpdateScheduler::processQueues()
0392 {
0393     wakeUpWaitingThreads();
0394 
0395     if(m_d->processingBlocked) return;
0396 
0397     if(m_d->strokesQueue.needsExclusiveAccess()) {
0398         DEBUG_BALANCING_METRICS("STROKES", "X");
0399         m_d->strokesQueue.processQueue(m_d->updaterContext,
0400                                         !m_d->updatesQueue.isEmpty());
0401 
0402         if(!m_d->strokesQueue.needsExclusiveAccess()) {
0403             tryProcessUpdatesQueue();
0404         }
0405     }
0406     else if(m_d->balancingRatio() * m_d->strokesQueue.sizeMetric() > m_d->updatesQueue.sizeMetric()) {
0407         DEBUG_BALANCING_METRICS("STROKES", "N");
0408         m_d->strokesQueue.processQueue(m_d->updaterContext,
0409                                         !m_d->updatesQueue.isEmpty());
0410         tryProcessUpdatesQueue();
0411     }
0412     else {
0413         DEBUG_BALANCING_METRICS("UPDATES", "N");
0414         tryProcessUpdatesQueue();
0415         m_d->strokesQueue.processQueue(m_d->updaterContext,
0416                                         !m_d->updatesQueue.isEmpty());
0417 
0418     }
0419 
0420     progressUpdate();
0421 }
0422 
0423 void KisUpdateScheduler::blockUpdates()
0424 {
0425     m_d->updatesFinishedCondition.initWaiting();
0426 
0427     m_d->updatesLockCounter.ref();
0428     while(haveUpdatesRunning()) {
0429         m_d->updatesFinishedCondition.wait();
0430     }
0431 
0432     m_d->updatesFinishedCondition.endWaiting();
0433 }
0434 
0435 void KisUpdateScheduler::unblockUpdates()
0436 {
0437     m_d->updatesLockCounter.deref();
0438     processQueues();
0439 }
0440 
0441 void KisUpdateScheduler::wakeUpWaitingThreads()
0442 {
0443     if(m_d->updatesLockCounter && !haveUpdatesRunning()) {
0444         m_d->updatesFinishedCondition.wakeAll();
0445     }
0446 }
0447 
0448 void KisUpdateScheduler::tryProcessUpdatesQueue()
0449 {
0450     QReadLocker locker(&m_d->updatesStartLock);
0451     if(m_d->updatesLockCounter) return;
0452 
0453     m_d->updatesQueue.processQueue(m_d->updaterContext);
0454 }
0455 
0456 bool KisUpdateScheduler::haveUpdatesRunning()
0457 {
0458     QWriteLocker locker(&m_d->updatesStartLock);
0459 
0460     qint32 numMergeJobs, numStrokeJobs;
0461     m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs);
0462 
0463     return numMergeJobs;
0464 }
0465 
0466 void KisUpdateScheduler::continueUpdate(const QRect &rect)
0467 {
0468     Q_ASSERT(m_d->projectionUpdateListener);
0469     m_d->projectionUpdateListener->notifyProjectionUpdated(rect);
0470 }
0471 
0472 void KisUpdateScheduler::doSomeUsefulWork()
0473 {
0474     m_d->updatesQueue.optimize();
0475 }
0476 
0477 void KisUpdateScheduler::spareThreadAppeared()
0478 {
0479     processQueues();
0480 }
0481 
0482 KisTestableUpdateScheduler::KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener,
0483                                                        qint32 threadCount)
0484 {
0485     updateSettings();
0486     m_d->projectionUpdateListener = projectionUpdateListener;
0487 
0488     setThreadsLimit(threadCount);
0489     m_d->updaterContext.setTestingMode(true);
0490 
0491     connectSignals();
0492 }
0493 
0494 KisUpdaterContext *KisTestableUpdateScheduler::updaterContext()
0495 {
0496     return &m_d->updaterContext;
0497 }