File indexing completed on 2024-05-12 15:58:47

0001 /*
0002  *  SPDX-FileCopyrightText: 2011 Dmitry Kazakov <dimula73@gmail.com>
0003  *
0004  *  SPDX-License-Identifier: GPL-2.0-or-later
0005  */
0006 
0007 #ifndef __KIS_UPDATE_JOB_ITEM_H
0008 #define __KIS_UPDATE_JOB_ITEM_H
0009 
0010 #include <atomic>
0011 
0012 #include <QRunnable>
0013 #include <QReadWriteLock>
0014 
0015 #include "kis_stroke_job.h"
0016 #include "kis_spontaneous_job.h"
0017 #include "kis_base_rects_walker.h"
0018 #include "kis_async_merger.h"
0019 #include "kis_updater_context.h"
0020 #include <KoAlwaysInline.h>
0021 
0022 //#define DEBUG_JOBS_SEQUENCE
0023 
0024 class KRITAIMAGE_EXPORT KisUpdateJobItem : public QObject, public QRunnable
0025 {
0026     Q_OBJECT
0027 public:
0028     enum class Type : int {
0029         EMPTY = 0,
0030         WAITING,
0031         MERGE,
0032         STROKE,
0033         SPONTANEOUS
0034     };
0035 
0036 public:
0037     KisUpdateJobItem(KisUpdaterContext *updaterContext)
0038         : m_updaterContext(updaterContext)
0039     {
0040         setAutoDelete(false);
0041         KIS_SAFE_ASSERT_RECOVER_NOOP(m_atomicType.is_lock_free());
0042     }
0043     ~KisUpdateJobItem() override
0044     {
0045         delete m_runnableJob;
0046     }
0047 
0048     void run() override {
0049         runImpl();
0050 
0051         // notify that the job is exiting and wake everybody
0052         // waiting on wakeForDone()
0053         m_updaterContext->jobThreadExited();
0054     }
0055 
0056 private:
0057 
0058     ALWAYS_INLINE void runImpl() {
0059         if (!isRunning()) return;
0060 
0061         /**
0062          * Here we break the idea of QThreadPool a bit. Ideally, we should split the
0063          * jobs into distinct QRunnable objects and pass all of them to QThreadPool.
0064          * That is a nice idea, but it doesn't work well when the jobs are small enough
0065          * and the number of available cores is high (>4 cores). It this case the
0066          * threads just tend to execute the job very quickly and go to sleep, which is
0067          * an expensive operation.
0068          *
0069          * To overcome this problem we try to bulk-process the jobs. In sigJobFinished()
0070          * signal (which is DirectConnection), the context may add the job to ourselves(!!!),
0071          * so we switch from "done" state into "running" again.
0072          */
0073 
0074         while (1) {
0075             KIS_SAFE_ASSERT_RECOVER_RETURN(isRunning());
0076 
0077             if(m_exclusive) {
0078                 m_updaterContext->m_exclusiveJobLock.lockForWrite();
0079             } else {
0080                 m_updaterContext->m_exclusiveJobLock.lockForRead();
0081             }
0082 
0083             if(m_atomicType == Type::MERGE) {
0084                 runMergeJob();
0085             } else {
0086                 KIS_ASSERT(m_atomicType == Type::STROKE ||
0087                            m_atomicType == Type::SPONTANEOUS);
0088 
0089                 if (m_runnableJob) {
0090 #ifdef DEBUG_JOBS_SEQUENCE
0091                     if (m_atomicType == Type::STROKE) {
0092                         qDebug() << "running: stroke" << m_runnableJob->debugName();
0093                     } else if (m_atomicType == Type::SPONTANEOUS) {
0094                         qDebug() << "running: spont " << m_runnableJob->debugName();
0095                     } else {
0096                         qDebug() << "running: unkn. " << m_runnableJob->debugName();
0097                     }
0098 #endif
0099 
0100                     m_runnableJob->run();
0101                 }
0102             }
0103 
0104             setDone();
0105 
0106             m_updaterContext->doSomeUsefulWork();
0107 
0108             // may flip the current state from Waiting -> Running again
0109             m_updaterContext->jobFinished();
0110 
0111             m_updaterContext->m_exclusiveJobLock.unlock();
0112 
0113             // try to exit the loop. Please note, that no one can flip the state from
0114             // WAITING to EMPTY except ourselves!
0115             Type expectedValue = Type::WAITING;
0116             if (m_atomicType.compare_exchange_strong(expectedValue, Type::EMPTY)) {
0117                 break;
0118             }
0119         }
0120     }
0121 
0122 public:
0123 
0124     inline void runMergeJob() {
0125         KIS_SAFE_ASSERT_RECOVER_RETURN(m_atomicType == Type::MERGE);
0126         KIS_SAFE_ASSERT_RECOVER_RETURN(m_walker);
0127         // dbgKrita << "Executing merge job" << m_walker->changeRect()
0128         //          << "on thread" << QThread::currentThreadId();
0129 
0130 #ifdef DEBUG_JOBS_SEQUENCE
0131         qDebug() << "running: merge " << m_walker->startNode() << m_walker->changeRect();
0132 
0133 #endif
0134 
0135         m_merger.startMerge(*m_walker);
0136 
0137         QRect changeRect = m_walker->changeRect();
0138         m_updaterContext->continueUpdate(changeRect);
0139     }
0140 
0141     // return true if the thread should actually be started
0142     inline bool setWalker(KisBaseRectsWalkerSP walker) {
0143         KIS_ASSERT(m_atomicType <= Type::WAITING);
0144 
0145         m_accessRect = walker->accessRect();
0146         m_changeRect = walker->changeRect();
0147         m_walker = walker;
0148 
0149         m_exclusive = false;
0150         m_runnableJob = 0;
0151 
0152         const Type oldState = m_atomicType.exchange(Type::MERGE);
0153         return oldState == Type::EMPTY;
0154     }
0155 
0156     // return true if the thread should actually be started
0157     inline bool setStrokeJob(KisStrokeJob *strokeJob) {
0158         KIS_ASSERT(m_atomicType <= Type::WAITING);
0159 
0160         m_runnableJob = strokeJob;
0161         m_strokeJobSequentiality = strokeJob->sequentiality();
0162 
0163         m_exclusive = strokeJob->isExclusive();
0164         m_walker = 0;
0165         m_accessRect = m_changeRect = QRect();
0166 
0167         const Type oldState = m_atomicType.exchange(Type::STROKE);
0168         return oldState == Type::EMPTY;
0169     }
0170 
0171     // return true if the thread should actually be started
0172     inline bool setSpontaneousJob(KisSpontaneousJob *spontaneousJob) {
0173         KIS_ASSERT(m_atomicType <= Type::WAITING);
0174 
0175         m_runnableJob = spontaneousJob;
0176 
0177         m_exclusive = spontaneousJob->isExclusive();
0178         m_walker = 0;
0179         m_accessRect = m_changeRect = QRect();
0180 
0181         const Type oldState = m_atomicType.exchange(Type::SPONTANEOUS);
0182         return oldState == Type::EMPTY;
0183     }
0184 
0185     inline void setDone() {
0186         m_walker = 0;
0187         delete m_runnableJob;
0188         m_runnableJob = 0;
0189         m_atomicType = Type::WAITING;
0190     }
0191 
0192     inline bool isRunning() const {
0193         return m_atomicType >= Type::MERGE;
0194     }
0195 
0196     inline Type type() const {
0197         return m_atomicType;
0198     }
0199 
0200     inline const QRect& accessRect() const {
0201         return m_accessRect;
0202     }
0203 
0204     inline const QRect& changeRect() const {
0205         return m_changeRect;
0206     }
0207 
0208     inline KisStrokeJobData::Sequentiality strokeJobSequentiality() const {
0209         return m_strokeJobSequentiality;
0210     }
0211 
0212 private:
0213     /**
0214      * Open walker and stroke job for the testing suite.
0215      * Please, do not use it in production code.
0216      */
0217     friend class KisTestableUpdaterContext;
0218     friend class KisSimpleUpdateQueueTest;
0219     friend class KisStrokesQueueTest;
0220     friend class KisUpdateSchedulerTest;
0221     friend class KisUpdaterContext;
0222 
0223     inline KisBaseRectsWalkerSP walker() const {
0224         return m_walker;
0225     }
0226 
0227     inline KisStrokeJob* strokeJob() const {
0228         KisStrokeJob *job = dynamic_cast<KisStrokeJob*>(m_runnableJob);
0229         Q_ASSERT(job);
0230 
0231         return job;
0232     }
0233 
0234     inline void testingSetDone() {
0235         setDone();
0236     }
0237 
0238 private:
0239     KisUpdaterContext *m_updaterContext {0};
0240     bool m_exclusive {false};
0241     std::atomic<Type> m_atomicType {Type::EMPTY};
0242     volatile KisStrokeJobData::Sequentiality m_strokeJobSequentiality;
0243 
0244     /**
0245      * Runnable jobs part
0246      * The job is owned by the context and deleted after completion
0247      */
0248     KisRunnableWithDebugName *m_runnableJob {0};
0249 
0250     /**
0251      * Merge jobs part
0252      */
0253     KisBaseRectsWalkerSP m_walker;
0254     KisAsyncMerger m_merger;
0255 
0256     /**
0257      * These rects cache actual values from the walker
0258      * to eliminate concurrent access to a walker structure
0259      */
0260     QRect m_accessRect;
0261     QRect m_changeRect;
0262 };
0263 
0264 
0265 #endif /* __KIS_UPDATE_JOB_ITEM_H */