File indexing completed on 2024-12-08 12:44:38
0001 /* 0002 SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com> 0003 SPDX-FileCopyrightText: 2015-2019 Daniel Vrátil <dvratil@kde.org> 0004 SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #ifndef KASYNC_EXECUTOR_P_H 0010 #define KASYNC_EXECUTOR_P_H 0011 0012 #include "execution_p.h" 0013 #include "continuations_p.h" 0014 #include "debug.h" 0015 0016 namespace KAsync { 0017 0018 template<typename T> 0019 class Future; 0020 0021 template<typename T> 0022 class FutureWatcher; 0023 0024 template<typename Out, typename ... In> 0025 class ContinuationHolder; 0026 0027 template<typename Out, typename ... In> 0028 class Job; 0029 0030 class Tracer; 0031 0032 namespace Private { 0033 0034 class ExecutorBase; 0035 using ExecutorBasePtr = QSharedPointer<ExecutorBase>; 0036 0037 class ExecutorBase 0038 { 0039 template<typename Out, typename ... In> 0040 friend class Executor; 0041 0042 template<typename Out, typename ... In> 0043 friend class KAsync::Job; 0044 0045 friend struct Execution; 0046 friend class KAsync::Tracer; 0047 0048 public: 0049 virtual ~ExecutorBase() = default; 0050 0051 virtual ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer<Private::ExecutionContext> context) = 0; 0052 0053 protected: 0054 ExecutorBase(const ExecutorBasePtr &parent) 0055 : mPrev(parent) 0056 {} 0057 0058 template<typename T> 0059 KAsync::Future<T>* createFuture(const ExecutionPtr &execution) const 0060 { 0061 return new KAsync::Future<T>(execution); 0062 } 0063 0064 void prepend(const ExecutorBasePtr &e) 0065 { 0066 if (mPrev) { 0067 mPrev->prepend(e); 0068 } else { 0069 mPrev = e; 0070 } 0071 } 0072 0073 void addToContext(const QVariant &entry) 0074 { 0075 mContext.push_back(entry); 0076 } 0077 0078 void guard(const QObject *o) 0079 { 0080 mGuards.push_back(QPointer<const QObject>{o}); 0081 } 0082 0083 QString mExecutorName; 0084 QVector<QVariant> mContext; 0085 QVector<QPointer<const QObject>> mGuards; 0086 ExecutorBasePtr mPrev; 0087 }; 0088 0089 template<typename Out, typename ... In> 0090 class Executor : public ExecutorBase 0091 { 0092 using PrevOut = std::tuple_element_t<0, std::tuple<In ..., void>>; 0093 0094 public: 0095 explicit Executor(ContinuationHolder<Out, In ...> &&workerHelper, const ExecutorBasePtr &parent = {}, 0096 ExecutionFlag executionFlag = ExecutionFlag::GoodCase) 0097 : ExecutorBase(parent) 0098 , mContinuationHolder(std::move(workerHelper)) 0099 , executionFlag(executionFlag) 0100 { 0101 STORE_EXECUTOR_NAME("Executor", Out, In ...); 0102 } 0103 0104 virtual ~Executor() = default; 0105 0106 void run(const ExecutionPtr &execution) 0107 { 0108 KAsync::Future<PrevOut> *prevFuture = nullptr; 0109 if (execution->prevExecution) { 0110 prevFuture = execution->prevExecution->result<PrevOut>(); 0111 assert(prevFuture->isFinished()); 0112 } 0113 0114 //Execute one of the available workers 0115 KAsync::Future<Out> *future = execution->result<Out>(); 0116 0117 const auto &continuation = Executor<Out, In ...>::mContinuationHolder; 0118 if (continuationIs<AsyncContinuation<Out, In ...>>(continuation)) { 0119 continuationGet<AsyncContinuation<Out, In ...>>(continuation)(prevFuture ? prevFuture->value() : In() ..., *future); 0120 } else if (continuationIs<AsyncErrorContinuation<Out, In ...>>(continuation)) { 0121 continuationGet<AsyncErrorContinuation<Out, In ...>>(continuation)( 0122 prevFuture->hasError() ? prevFuture->errors().first() : Error(), 0123 prevFuture ? prevFuture->value() : In() ..., *future); 0124 } else if (continuationIs<SyncContinuation<Out, In ...>>(continuation)) { 0125 callAndApply(prevFuture ? prevFuture->value() : In() ..., 0126 continuationGet<SyncContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>()); 0127 future->setFinished(); 0128 } else if (continuationIs<SyncErrorContinuation<Out, In ...>>(continuation)) { 0129 assert(prevFuture); 0130 callAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), 0131 prevFuture ? prevFuture->value() : In() ..., 0132 continuationGet<SyncErrorContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>()); 0133 future->setFinished(); 0134 } else if (continuationIs<JobContinuation<Out, In ...>>(continuation)) { 0135 executeJobAndApply(prevFuture ? prevFuture->value() : In() ..., 0136 continuationGet<JobContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>()); 0137 } else if (continuationIs<JobErrorContinuation<Out, In ...>>(continuation)) { 0138 executeJobAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), 0139 prevFuture ? prevFuture->value() : In() ..., 0140 continuationGet<JobErrorContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>()); 0141 } 0142 0143 } 0144 0145 ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer<Private::ExecutionContext> context) override 0146 { 0147 /* 0148 * One executor per job, created with the construction of the Job object. 0149 * One execution per job per exec(), created only once exec() is called. 0150 * 0151 * The executors make up the linked list that makes up the complete execution chain. 0152 * 0153 * The execution then tracks the execution of each executor. 0154 */ 0155 0156 // Passing 'self' to execution ensures that the Executor chain remains 0157 // valid until the entire execution is finished 0158 ExecutionPtr execution = ExecutionPtr::create(self); 0159 #ifndef QT_NO_DEBUG 0160 execution->tracer = std::make_unique<Tracer>(execution.data()); // owned by execution 0161 #endif 0162 0163 context->guards += mGuards; 0164 0165 // chainup 0166 execution->prevExecution = mPrev ? mPrev->exec(mPrev, context) : ExecutionPtr(); 0167 0168 execution->resultBase = ExecutorBase::createFuture<Out>(execution); 0169 //We watch our own future to finish the execution once we're done 0170 auto fw = new KAsync::FutureWatcher<Out>(); 0171 QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady, 0172 [fw, execution]() { 0173 execution->setFinished(); 0174 delete fw; 0175 }); 0176 fw->setFuture(*execution->result<Out>()); 0177 0178 KAsync::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>() 0179 : nullptr; 0180 if (!prevFuture || prevFuture->isFinished()) { //The previous job is already done 0181 runExecution(prevFuture, execution, context->guardIsBroken()); 0182 } else { //The previous job is still running and we have to wait for it's completion 0183 auto prevFutureWatcher = new KAsync::FutureWatcher<PrevOut>(); 0184 QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher<PrevOut>::futureReady, 0185 [prevFutureWatcher, execution, this, context]() { 0186 auto prevFuture = prevFutureWatcher->future(); 0187 assert(prevFuture.isFinished()); 0188 delete prevFutureWatcher; 0189 runExecution(&prevFuture, execution, context->guardIsBroken()); 0190 }); 0191 0192 prevFutureWatcher->setFuture(*static_cast<KAsync::Future<PrevOut>*>(prevFuture)); 0193 } 0194 0195 return execution; 0196 } 0197 0198 private: 0199 void runExecution(const KAsync::Future<PrevOut> *prevFuture, const ExecutionPtr &execution, bool guardIsBroken) 0200 { 0201 if (guardIsBroken) { 0202 execution->resultBase->setFinished(); 0203 return; 0204 } 0205 if (prevFuture) { 0206 if (prevFuture->hasError() && executionFlag == ExecutionFlag::GoodCase) { 0207 //Propagate the error to the outer Future 0208 Q_ASSERT(prevFuture->errors().size() == 1); 0209 execution->resultBase->setError(prevFuture->errors().first()); 0210 return; 0211 } 0212 if (!prevFuture->hasError() && executionFlag == ExecutionFlag::ErrorCase) { 0213 //Propagate the value to the outer Future 0214 copyFutureValue<PrevOut>(*prevFuture, *execution->result<PrevOut>()); 0215 execution->resultBase->setFinished(); 0216 return; 0217 } 0218 } 0219 run(execution); 0220 } 0221 0222 void executeJobAndApply(In && ... input, const JobContinuation<Out, In ...> &func, 0223 Future<Out> &future, std::false_type) 0224 { 0225 func(std::forward<In>(input) ...) 0226 .template then<void, Out>([&future](const KAsync::Error &error, const Out &v, 0227 KAsync::Future<void> &f) { 0228 if (error) { 0229 future.setError(error); 0230 } else { 0231 future.setResult(v); 0232 } 0233 f.setFinished(); 0234 }).exec(); 0235 } 0236 0237 void executeJobAndApply(In && ... input, const JobContinuation<Out, In ...> &func, 0238 Future<Out> &future, std::true_type) 0239 { 0240 func(std::forward<In>(input) ...) 0241 .template then<void>([&future](const KAsync::Error &error, KAsync::Future<void> &f) { 0242 if (error) { 0243 future.setError(error); 0244 } else { 0245 future.setFinished(); 0246 } 0247 f.setFinished(); 0248 }).exec(); 0249 } 0250 0251 void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation<Out, In ...> &func, 0252 Future<Out> &future, std::false_type) 0253 { 0254 func(error, std::forward<In>(input) ...) 0255 .template then<void, Out>([&future](const KAsync::Error &error, const Out &v, 0256 KAsync::Future<void> &f) { 0257 if (error) { 0258 future.setError(error); 0259 } else { 0260 future.setResult(v); 0261 } 0262 f.setFinished(); 0263 }).exec(); 0264 } 0265 0266 void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation<Out, In ...> &func, 0267 Future<Out> &future, std::true_type) 0268 { 0269 func(error, std::forward<In>(input) ...) 0270 .template then<void>([&future](const KAsync::Error &error, KAsync::Future<void> &f) { 0271 if (error) { 0272 future.setError(error); 0273 } else { 0274 future.setFinished(); 0275 } 0276 f.setFinished(); 0277 }).exec(); 0278 } 0279 0280 void callAndApply(In && ... input, const SyncContinuation<Out, In ...> &func, Future<Out> &future, std::false_type) 0281 { 0282 future.setValue(func(std::forward<In>(input) ...)); 0283 } 0284 0285 void callAndApply(In && ... input, const SyncContinuation<Out, In ...> &func, Future<Out> &, std::true_type) 0286 { 0287 func(std::forward<In>(input) ...); 0288 } 0289 0290 void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation<Out, In ...> &func, Future<Out> &future, std::false_type) 0291 { 0292 future.setValue(func(error, std::forward<In>(input) ...)); 0293 } 0294 0295 void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation<Out, In ...> &func, Future<Out> &, std::true_type) 0296 { 0297 func(error, std::forward<In>(input) ...); 0298 } 0299 0300 template<typename T> 0301 std::enable_if_t<!std::is_void<T>::value> 0302 copyFutureValue(const KAsync::Future<T> &in, KAsync::Future<T> &out) 0303 { 0304 out.setValue(in.value()); 0305 } 0306 0307 template<typename T> 0308 std::enable_if_t<std::is_void<T>::value> 0309 copyFutureValue(const KAsync::Future<T> &, KAsync::Future<T> &) 0310 { 0311 //noop 0312 } 0313 private: 0314 ContinuationHolder<Out, In ...> mContinuationHolder; 0315 const ExecutionFlag executionFlag; 0316 }; 0317 0318 } // namespace Private 0319 } // nameapce KAsync 0320 0321 #endif 0322