File indexing completed on 2024-05-12 15:58:47
0001 /* 0002 * SPDX-FileCopyrightText: 2011 Dmitry Kazakov <dimula73@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 0007 #ifndef __KIS_UPDATE_JOB_ITEM_H 0008 #define __KIS_UPDATE_JOB_ITEM_H 0009 0010 #include <atomic> 0011 0012 #include <QRunnable> 0013 #include <QReadWriteLock> 0014 0015 #include "kis_stroke_job.h" 0016 #include "kis_spontaneous_job.h" 0017 #include "kis_base_rects_walker.h" 0018 #include "kis_async_merger.h" 0019 #include "kis_updater_context.h" 0020 #include <KoAlwaysInline.h> 0021 0022 //#define DEBUG_JOBS_SEQUENCE 0023 0024 class KRITAIMAGE_EXPORT KisUpdateJobItem : public QObject, public QRunnable 0025 { 0026 Q_OBJECT 0027 public: 0028 enum class Type : int { 0029 EMPTY = 0, 0030 WAITING, 0031 MERGE, 0032 STROKE, 0033 SPONTANEOUS 0034 }; 0035 0036 public: 0037 KisUpdateJobItem(KisUpdaterContext *updaterContext) 0038 : m_updaterContext(updaterContext) 0039 { 0040 setAutoDelete(false); 0041 KIS_SAFE_ASSERT_RECOVER_NOOP(m_atomicType.is_lock_free()); 0042 } 0043 ~KisUpdateJobItem() override 0044 { 0045 delete m_runnableJob; 0046 } 0047 0048 void run() override { 0049 runImpl(); 0050 0051 // notify that the job is exiting and wake everybody 0052 // waiting on wakeForDone() 0053 m_updaterContext->jobThreadExited(); 0054 } 0055 0056 private: 0057 0058 ALWAYS_INLINE void runImpl() { 0059 if (!isRunning()) return; 0060 0061 /** 0062 * Here we break the idea of QThreadPool a bit. Ideally, we should split the 0063 * jobs into distinct QRunnable objects and pass all of them to QThreadPool. 0064 * That is a nice idea, but it doesn't work well when the jobs are small enough 0065 * and the number of available cores is high (>4 cores). It this case the 0066 * threads just tend to execute the job very quickly and go to sleep, which is 0067 * an expensive operation. 0068 * 0069 * To overcome this problem we try to bulk-process the jobs. In sigJobFinished() 0070 * signal (which is DirectConnection), the context may add the job to ourselves(!!!), 0071 * so we switch from "done" state into "running" again. 0072 */ 0073 0074 while (1) { 0075 KIS_SAFE_ASSERT_RECOVER_RETURN(isRunning()); 0076 0077 if(m_exclusive) { 0078 m_updaterContext->m_exclusiveJobLock.lockForWrite(); 0079 } else { 0080 m_updaterContext->m_exclusiveJobLock.lockForRead(); 0081 } 0082 0083 if(m_atomicType == Type::MERGE) { 0084 runMergeJob(); 0085 } else { 0086 KIS_ASSERT(m_atomicType == Type::STROKE || 0087 m_atomicType == Type::SPONTANEOUS); 0088 0089 if (m_runnableJob) { 0090 #ifdef DEBUG_JOBS_SEQUENCE 0091 if (m_atomicType == Type::STROKE) { 0092 qDebug() << "running: stroke" << m_runnableJob->debugName(); 0093 } else if (m_atomicType == Type::SPONTANEOUS) { 0094 qDebug() << "running: spont " << m_runnableJob->debugName(); 0095 } else { 0096 qDebug() << "running: unkn. " << m_runnableJob->debugName(); 0097 } 0098 #endif 0099 0100 m_runnableJob->run(); 0101 } 0102 } 0103 0104 setDone(); 0105 0106 m_updaterContext->doSomeUsefulWork(); 0107 0108 // may flip the current state from Waiting -> Running again 0109 m_updaterContext->jobFinished(); 0110 0111 m_updaterContext->m_exclusiveJobLock.unlock(); 0112 0113 // try to exit the loop. Please note, that no one can flip the state from 0114 // WAITING to EMPTY except ourselves! 0115 Type expectedValue = Type::WAITING; 0116 if (m_atomicType.compare_exchange_strong(expectedValue, Type::EMPTY)) { 0117 break; 0118 } 0119 } 0120 } 0121 0122 public: 0123 0124 inline void runMergeJob() { 0125 KIS_SAFE_ASSERT_RECOVER_RETURN(m_atomicType == Type::MERGE); 0126 KIS_SAFE_ASSERT_RECOVER_RETURN(m_walker); 0127 // dbgKrita << "Executing merge job" << m_walker->changeRect() 0128 // << "on thread" << QThread::currentThreadId(); 0129 0130 #ifdef DEBUG_JOBS_SEQUENCE 0131 qDebug() << "running: merge " << m_walker->startNode() << m_walker->changeRect(); 0132 0133 #endif 0134 0135 m_merger.startMerge(*m_walker); 0136 0137 QRect changeRect = m_walker->changeRect(); 0138 m_updaterContext->continueUpdate(changeRect); 0139 } 0140 0141 // return true if the thread should actually be started 0142 inline bool setWalker(KisBaseRectsWalkerSP walker) { 0143 KIS_ASSERT(m_atomicType <= Type::WAITING); 0144 0145 m_accessRect = walker->accessRect(); 0146 m_changeRect = walker->changeRect(); 0147 m_walker = walker; 0148 0149 m_exclusive = false; 0150 m_runnableJob = 0; 0151 0152 const Type oldState = m_atomicType.exchange(Type::MERGE); 0153 return oldState == Type::EMPTY; 0154 } 0155 0156 // return true if the thread should actually be started 0157 inline bool setStrokeJob(KisStrokeJob *strokeJob) { 0158 KIS_ASSERT(m_atomicType <= Type::WAITING); 0159 0160 m_runnableJob = strokeJob; 0161 m_strokeJobSequentiality = strokeJob->sequentiality(); 0162 0163 m_exclusive = strokeJob->isExclusive(); 0164 m_walker = 0; 0165 m_accessRect = m_changeRect = QRect(); 0166 0167 const Type oldState = m_atomicType.exchange(Type::STROKE); 0168 return oldState == Type::EMPTY; 0169 } 0170 0171 // return true if the thread should actually be started 0172 inline bool setSpontaneousJob(KisSpontaneousJob *spontaneousJob) { 0173 KIS_ASSERT(m_atomicType <= Type::WAITING); 0174 0175 m_runnableJob = spontaneousJob; 0176 0177 m_exclusive = spontaneousJob->isExclusive(); 0178 m_walker = 0; 0179 m_accessRect = m_changeRect = QRect(); 0180 0181 const Type oldState = m_atomicType.exchange(Type::SPONTANEOUS); 0182 return oldState == Type::EMPTY; 0183 } 0184 0185 inline void setDone() { 0186 m_walker = 0; 0187 delete m_runnableJob; 0188 m_runnableJob = 0; 0189 m_atomicType = Type::WAITING; 0190 } 0191 0192 inline bool isRunning() const { 0193 return m_atomicType >= Type::MERGE; 0194 } 0195 0196 inline Type type() const { 0197 return m_atomicType; 0198 } 0199 0200 inline const QRect& accessRect() const { 0201 return m_accessRect; 0202 } 0203 0204 inline const QRect& changeRect() const { 0205 return m_changeRect; 0206 } 0207 0208 inline KisStrokeJobData::Sequentiality strokeJobSequentiality() const { 0209 return m_strokeJobSequentiality; 0210 } 0211 0212 private: 0213 /** 0214 * Open walker and stroke job for the testing suite. 0215 * Please, do not use it in production code. 0216 */ 0217 friend class KisTestableUpdaterContext; 0218 friend class KisSimpleUpdateQueueTest; 0219 friend class KisStrokesQueueTest; 0220 friend class KisUpdateSchedulerTest; 0221 friend class KisUpdaterContext; 0222 0223 inline KisBaseRectsWalkerSP walker() const { 0224 return m_walker; 0225 } 0226 0227 inline KisStrokeJob* strokeJob() const { 0228 KisStrokeJob *job = dynamic_cast<KisStrokeJob*>(m_runnableJob); 0229 Q_ASSERT(job); 0230 0231 return job; 0232 } 0233 0234 inline void testingSetDone() { 0235 setDone(); 0236 } 0237 0238 private: 0239 KisUpdaterContext *m_updaterContext {0}; 0240 bool m_exclusive {false}; 0241 std::atomic<Type> m_atomicType {Type::EMPTY}; 0242 volatile KisStrokeJobData::Sequentiality m_strokeJobSequentiality; 0243 0244 /** 0245 * Runnable jobs part 0246 * The job is owned by the context and deleted after completion 0247 */ 0248 KisRunnableWithDebugName *m_runnableJob {0}; 0249 0250 /** 0251 * Merge jobs part 0252 */ 0253 KisBaseRectsWalkerSP m_walker; 0254 KisAsyncMerger m_merger; 0255 0256 /** 0257 * These rects cache actual values from the walker 0258 * to eliminate concurrent access to a walker structure 0259 */ 0260 QRect m_accessRect; 0261 QRect m_changeRect; 0262 }; 0263 0264 0265 #endif /* __KIS_UPDATE_JOB_ITEM_H */