File indexing completed on 2024-12-08 12:44:38

0001 /*
0002     SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com>
0003     SPDX-FileCopyrightText: 2015-2019 Daniel Vrátil <dvratil@kde.org>
0004     SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #ifndef KASYNC_EXECUTOR_P_H
0010 #define KASYNC_EXECUTOR_P_H
0011 
0012 #include "execution_p.h"
0013 #include "continuations_p.h"
0014 #include "debug.h"
0015 
0016 namespace KAsync {
0017 
0018 template<typename T>
0019 class Future;
0020 
0021 template<typename T>
0022 class FutureWatcher;
0023 
0024 template<typename Out, typename ... In>
0025 class ContinuationHolder;
0026 
0027 template<typename Out, typename ... In>
0028 class Job;
0029 
0030 class Tracer;
0031 
0032 namespace Private {
0033 
0034 class ExecutorBase;
0035 using ExecutorBasePtr = QSharedPointer<ExecutorBase>;
0036 
0037 class ExecutorBase
0038 {
0039     template<typename Out, typename ... In>
0040     friend class Executor;
0041 
0042     template<typename Out, typename ... In>
0043     friend class KAsync::Job;
0044 
0045     friend struct Execution;
0046     friend class KAsync::Tracer;
0047 
0048 public:
0049     virtual ~ExecutorBase() = default;
0050 
0051     virtual ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer<Private::ExecutionContext> context) = 0;
0052 
0053 protected:
0054     ExecutorBase(const ExecutorBasePtr &parent)
0055         : mPrev(parent)
0056     {}
0057 
0058     template<typename T>
0059     KAsync::Future<T>* createFuture(const ExecutionPtr &execution) const
0060     {
0061         return new KAsync::Future<T>(execution);
0062     }
0063 
0064     void prepend(const ExecutorBasePtr &e)
0065     {
0066         if (mPrev) {
0067             mPrev->prepend(e);
0068         } else {
0069             mPrev = e;
0070         }
0071     }
0072 
0073     void addToContext(const QVariant &entry)
0074     {
0075         mContext.push_back(entry);
0076     }
0077 
0078     void guard(const QObject *o)
0079     {
0080         mGuards.push_back(QPointer<const QObject>{o});
0081     }
0082 
0083     QString mExecutorName;
0084     QVector<QVariant> mContext;
0085     QVector<QPointer<const QObject>> mGuards;
0086     ExecutorBasePtr mPrev;
0087 };
0088 
0089 template<typename Out, typename ... In>
0090 class Executor : public ExecutorBase
0091 {
0092     using PrevOut = std::tuple_element_t<0, std::tuple<In ..., void>>;
0093 
0094 public:
0095     explicit Executor(ContinuationHolder<Out, In ...> &&workerHelper, const ExecutorBasePtr &parent = {},
0096                       ExecutionFlag executionFlag = ExecutionFlag::GoodCase)
0097         : ExecutorBase(parent)
0098         , mContinuationHolder(std::move(workerHelper))
0099         , executionFlag(executionFlag)
0100     {
0101         STORE_EXECUTOR_NAME("Executor", Out, In ...);
0102     }
0103 
0104     virtual ~Executor() = default;
0105 
0106     void run(const ExecutionPtr &execution)
0107     {
0108         KAsync::Future<PrevOut> *prevFuture = nullptr;
0109         if (execution->prevExecution) {
0110             prevFuture = execution->prevExecution->result<PrevOut>();
0111             assert(prevFuture->isFinished());
0112         }
0113 
0114         //Execute one of the available workers
0115         KAsync::Future<Out> *future = execution->result<Out>();
0116 
0117         const auto &continuation = Executor<Out, In ...>::mContinuationHolder;
0118         if (continuationIs<AsyncContinuation<Out, In ...>>(continuation)) {
0119             continuationGet<AsyncContinuation<Out, In ...>>(continuation)(prevFuture ? prevFuture->value() : In() ..., *future);
0120         } else if (continuationIs<AsyncErrorContinuation<Out, In ...>>(continuation)) {
0121             continuationGet<AsyncErrorContinuation<Out, In ...>>(continuation)(
0122                     prevFuture->hasError() ? prevFuture->errors().first() : Error(),
0123                     prevFuture ? prevFuture->value() : In() ..., *future);
0124         } else if (continuationIs<SyncContinuation<Out, In ...>>(continuation)) {
0125             callAndApply(prevFuture ? prevFuture->value() : In() ...,
0126                          continuationGet<SyncContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>());
0127             future->setFinished();
0128         } else if (continuationIs<SyncErrorContinuation<Out, In ...>>(continuation)) {
0129             assert(prevFuture);
0130             callAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(),
0131                          prevFuture ? prevFuture->value() : In() ...,
0132                          continuationGet<SyncErrorContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>());
0133             future->setFinished();
0134         } else if (continuationIs<JobContinuation<Out, In ...>>(continuation)) {
0135             executeJobAndApply(prevFuture ? prevFuture->value() : In() ...,
0136                                continuationGet<JobContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>());
0137         } else if (continuationIs<JobErrorContinuation<Out, In ...>>(continuation)) {
0138             executeJobAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(),
0139                                prevFuture ? prevFuture->value() : In() ...,
0140                                continuationGet<JobErrorContinuation<Out, In ...>>(continuation), *future, std::is_void<Out>());
0141         }
0142 
0143     }
0144 
0145     ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer<Private::ExecutionContext> context) override
0146     {
0147         /*
0148          * One executor per job, created with the construction of the Job object.
0149          * One execution per job per exec(), created only once exec() is called.
0150          *
0151          * The executors make up the linked list that makes up the complete execution chain.
0152          *
0153          * The execution then tracks the execution of each executor.
0154          */
0155 
0156         // Passing 'self' to execution ensures that the Executor chain remains
0157         // valid until the entire execution is finished
0158         ExecutionPtr execution = ExecutionPtr::create(self);
0159 #ifndef QT_NO_DEBUG
0160         execution->tracer = std::make_unique<Tracer>(execution.data()); // owned by execution
0161 #endif
0162 
0163         context->guards += mGuards;
0164 
0165         // chainup
0166         execution->prevExecution = mPrev ? mPrev->exec(mPrev, context) : ExecutionPtr();
0167 
0168         execution->resultBase = ExecutorBase::createFuture<Out>(execution);
0169         //We watch our own future to finish the execution once we're done
0170         auto fw = new KAsync::FutureWatcher<Out>();
0171         QObject::connect(fw, &KAsync::FutureWatcher<Out>::futureReady,
0172                          [fw, execution]() {
0173                              execution->setFinished();
0174                              delete fw;
0175                          });
0176         fw->setFuture(*execution->result<Out>());
0177 
0178         KAsync::Future<PrevOut> *prevFuture = execution->prevExecution ? execution->prevExecution->result<PrevOut>()
0179                                                                        : nullptr;
0180         if (!prevFuture || prevFuture->isFinished()) { //The previous job is already done
0181             runExecution(prevFuture, execution, context->guardIsBroken());
0182         } else { //The previous job is still running and we have to wait for it's completion
0183             auto prevFutureWatcher = new KAsync::FutureWatcher<PrevOut>();
0184             QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher<PrevOut>::futureReady,
0185                              [prevFutureWatcher, execution, this, context]() {
0186                                  auto prevFuture = prevFutureWatcher->future();
0187                                  assert(prevFuture.isFinished());
0188                                  delete prevFutureWatcher;
0189                                  runExecution(&prevFuture, execution, context->guardIsBroken());
0190                              });
0191 
0192             prevFutureWatcher->setFuture(*static_cast<KAsync::Future<PrevOut>*>(prevFuture));
0193         }
0194 
0195         return execution;
0196     }
0197 
0198 private:
0199     void runExecution(const KAsync::Future<PrevOut> *prevFuture, const ExecutionPtr &execution, bool guardIsBroken)
0200     {
0201         if (guardIsBroken) {
0202             execution->resultBase->setFinished();
0203             return;
0204         }
0205         if (prevFuture) {
0206             if (prevFuture->hasError() && executionFlag == ExecutionFlag::GoodCase) {
0207                 //Propagate the error to the outer Future
0208                 Q_ASSERT(prevFuture->errors().size() == 1);
0209                 execution->resultBase->setError(prevFuture->errors().first());
0210                 return;
0211             }
0212             if (!prevFuture->hasError() && executionFlag == ExecutionFlag::ErrorCase) {
0213                 //Propagate the value to the outer Future
0214                 copyFutureValue<PrevOut>(*prevFuture, *execution->result<PrevOut>());
0215                 execution->resultBase->setFinished();
0216                 return;
0217             }
0218         }
0219         run(execution);
0220     }
0221 
0222     void executeJobAndApply(In && ... input, const JobContinuation<Out, In ...> &func,
0223                             Future<Out> &future, std::false_type)
0224     {
0225         func(std::forward<In>(input) ...)
0226             .template then<void, Out>([&future](const KAsync::Error &error, const Out &v,
0227                                                 KAsync::Future<void> &f) {
0228                 if (error) {
0229                     future.setError(error);
0230                 } else {
0231                     future.setResult(v);
0232                 }
0233                 f.setFinished();
0234             }).exec();
0235     }
0236 
0237     void executeJobAndApply(In && ... input, const JobContinuation<Out, In ...> &func,
0238                             Future<Out> &future, std::true_type)
0239     {
0240         func(std::forward<In>(input) ...)
0241             .template then<void>([&future](const KAsync::Error &error, KAsync::Future<void> &f) {
0242                 if (error) {
0243                     future.setError(error);
0244                 } else {
0245                     future.setFinished();
0246                 }
0247                 f.setFinished();
0248             }).exec();
0249     }
0250 
0251     void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation<Out, In ...> &func,
0252                             Future<Out> &future, std::false_type)
0253     {
0254         func(error, std::forward<In>(input) ...)
0255             .template then<void, Out>([&future](const KAsync::Error &error, const Out &v,
0256                                                 KAsync::Future<void> &f) {
0257                 if (error) {
0258                     future.setError(error);
0259                 } else {
0260                     future.setResult(v);
0261                 }
0262                 f.setFinished();
0263             }).exec();
0264     }
0265 
0266     void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation<Out, In ...> &func,
0267                             Future<Out> &future, std::true_type)
0268     {
0269         func(error, std::forward<In>(input) ...)
0270             .template then<void>([&future](const KAsync::Error &error, KAsync::Future<void> &f) {
0271                 if (error) {
0272                     future.setError(error);
0273                 } else {
0274                     future.setFinished();
0275                 }
0276                 f.setFinished();
0277             }).exec();
0278     }
0279 
0280     void callAndApply(In && ... input, const SyncContinuation<Out, In ...> &func, Future<Out> &future, std::false_type)
0281     {
0282         future.setValue(func(std::forward<In>(input) ...));
0283     }
0284 
0285     void callAndApply(In && ... input, const SyncContinuation<Out, In ...> &func, Future<Out> &, std::true_type)
0286     {
0287         func(std::forward<In>(input) ...);
0288     }
0289 
0290     void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation<Out, In ...> &func, Future<Out> &future, std::false_type)
0291     {
0292         future.setValue(func(error, std::forward<In>(input) ...));
0293     }
0294 
0295     void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation<Out, In ...> &func, Future<Out> &, std::true_type)
0296     {
0297         func(error, std::forward<In>(input) ...);
0298     }
0299 
0300     template<typename T>
0301     std::enable_if_t<!std::is_void<T>::value>
0302     copyFutureValue(const KAsync::Future<T> &in, KAsync::Future<T> &out)
0303     {
0304         out.setValue(in.value());
0305     }
0306 
0307     template<typename T>
0308     std::enable_if_t<std::is_void<T>::value>
0309     copyFutureValue(const KAsync::Future<T> &, KAsync::Future<T> &)
0310     {
0311         //noop
0312     }
0313 private:
0314     ContinuationHolder<Out, In ...> mContinuationHolder;
0315     const ExecutionFlag executionFlag;
0316 };
0317 
0318 } // namespace Private
0319 } // nameapce KAsync
0320 
0321 #endif
0322