File indexing completed on 2024-12-08 12:44:37

0001 /*
0002     SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com>
0003     SPDX-FileCopyrightText: 2016 Daniel Vrátil <dvratil@kde.org>
0004     SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #ifndef KASYNC_H
0010 #define KASYNC_H
0011 
0012 #include "kasync_export.h"
0013 
0014 #include <functional>
0015 #include <type_traits>
0016 #include <cassert>
0017 
0018 #include <QVariant>
0019 
0020 #include "future.h"
0021 #include "debug.h"
0022 
0023 #include "continuations_p.h"
0024 #include "executor_p.h"
0025 
0026 class QObject;
0027 
0028 /**
0029  * @mainpage KAsync
0030  *
0031  * @brief API to help write async code.
0032  *
0033  * This API is based around jobs that take lambdas to execute asynchronous tasks.
0034  * Each async operation can take a continuation that can then be used to execute
0035  * further async operations. That way it is possible to build async chains of
0036  * operations that can be stored and executed later on. Jobs can be composed,
0037  * similarly to functions.
0038  *
0039  * Relations between the components:
0040  * * Job: API wrapper around Executors chain. Can be destroyed while still running,
0041  *        because the actual execution happens in the background
0042  * * Executor: Describes task to execute. Executors form a linked list matching the
0043  *        order in which they will be executed. The Executor chain is destroyed when
0044  *        the parent Job is destroyed. However if the Job is still running it is
0045  *        guaranteed that the Executor chain will not be destroyed until the execution
0046  *        is finished.
0047  * * Execution: The running execution of the task stored in Executor. Each call to
0048  *        Job::exec() instantiates new Execution chain, which makes it possible for
0049  *        the Job to be executed multiple times (even in parallel).
0050  * * Future: Representation of the result that is being calculated
0051  *
0052  *
0053  * TODO: Possibility to abort a job through future (perhaps optional?)
0054  * TODO: Support for timeout, specified during exec call, after which the error
0055  *       handler gets called with a defined errorCode.
0056  */
0057 
0058 
0059 namespace KAsync {
0060 
0061 template<typename PrevOut, typename Out, typename ... In>
0062 class Executor;
0063 
0064 class JobBase;
0065 
0066 template<typename Out, typename ... In>
0067 class Job;
0068 
0069 //@cond PRIVATE
0070 namespace Private {
0071 
0072 template<typename Out, typename ... In>
0073 Job<Out, In ...> startImpl(Private::ContinuationHolder<Out, In ...> &&helper)
0074 {
0075     static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
0076     return Job<Out, In...>(QSharedPointer<Private::Executor<Out, In ...>>::create(
0077                 std::forward<Private::ContinuationHolder<Out, In...>>(helper), nullptr, Private::ExecutionFlag::GoodCase));
0078 }
0079 
0080 } // namespace Private
0081 //@endcond
0082 
0083 
0084 /**
0085  * @relates Job
0086  *
0087  * Start an asynchronous job sequence.
0088  *
0089  * start() is your starting point to build a chain of jobs to be executed
0090  * asynchronously.
0091  *
0092  * @param func A continuation to be executed.
0093  */
0094 
0095 ///Sync continuation without job: [] () -> T { ... }
0096 template<typename Out = void, typename ... In, typename F>
0097 auto start(F &&func) -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(std::declval<In>() ...))>::value,
0098                                          Job<decltype(func(std::declval<In>() ...)), In...>>
0099 {
0100     static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
0101     return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(SyncContinuation<Out, In ...>(std::forward<F>(func))));
0102 }
0103 
0104 ///continuation with job: [] () -> KAsync::Job<...> { ... }
0105 template<typename Out = void, typename ... In, typename F>
0106 auto start(F &&func) -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(std::declval<In>() ...))>::value,
0107                                          Job<typename decltype(func(std::declval<In>() ...))::OutType, In...>>
0108 {
0109     static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
0110     return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(JobContinuation<Out, In...>(std::forward<F>(func))));
0111 }
0112 
0113 ///Handle continuation: [] (KAsync::Future<T>, ...) { ... }
0114 template<typename Out = void, typename ... In>
0115 auto start(AsyncContinuation<Out, In ...> &&func) -> Job<Out, In ...>
0116 {
0117     static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
0118     return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(std::forward<AsyncContinuation<Out, In ...>>(func)));
0119 }
0120 
0121 enum ControlFlowFlag {
0122     Break,
0123     Continue
0124 };
0125 
0126 /**
0127  * @relates Job
0128  *
0129  * Async while loop.
0130  *
0131  * Loop continues while body returns ControlFlowFlag::Continue.
0132  */
0133 KASYNC_EXPORT Job<void> doWhile(const Job<ControlFlowFlag> &body);
0134 
0135 /**
0136  * @relates Job
0137  *
0138  * Async while loop.
0139  *
0140  * Shorthand that takes a continuation.
0141  *
0142  * @see doWhile
0143  */
0144 KASYNC_EXPORT Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body);
0145 
0146 
0147 
0148 /**
0149  * @relates Job
0150  *
0151  * Async delay.
0152  */
0153 KASYNC_EXPORT Job<void> wait(int delay);
0154 
0155 /**
0156  * @relates Job
0157  *
0158  * A null job.
0159  *
0160  * An async noop.
0161  *
0162  */
0163 template<typename Out = void>
0164 Job<Out> null();
0165 
0166 /**
0167  * @relates Job
0168  *
0169  * Async value.
0170  */
0171 template<typename Out>
0172 Job<Out> value(Out);
0173 
0174 /**
0175  * @relates Job
0176  *
0177  * Async foreach loop.
0178  *
0179  * This will execute a job for every value in the list.
0180  * Errors while not stop processing of other jobs but set an error on the wrapper job.
0181  */
0182 template<typename List, typename ValueType = typename List::value_type>
0183 Job<void, List> forEach(KAsync::Job<void, ValueType> job);
0184 
0185 /**
0186  * @relates Job
0187  *
0188  * Async foreach loop.
0189  *
0190  * Shorthand that takes a continuation.
0191  *
0192  * @see serialForEach
0193  */
0194 template<typename List, typename ValueType = typename List::value_type>
0195  Job<void, List> forEach(JobContinuation<void, ValueType> &&);
0196 
0197 
0198 /**
0199  * @relates Job
0200  *
0201  * Serial Async foreach loop.
0202  *
0203  * This will execute a job for every value in the list sequentially.
0204  * Errors while not stop processing of other jobs but set an error on the wrapper job.
0205  */
0206 template<typename List, typename ValueType = typename List::value_type>
0207 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job);
0208 
0209 /**
0210  * @relates Job
0211  *
0212  * Serial Async foreach loop.
0213  *
0214  * Shorthand that takes a continuation.
0215  *
0216  * @see serialForEach
0217  */
0218 template<typename List, typename ValueType = typename List::value_type>
0219 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&);
0220 
0221 /**
0222  * @brief Wait until all given futures are completed.
0223  */
0224 template<template<typename> class Container>
0225 Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures);
0226 
0227 /**
0228  * @relates Job
0229  *
0230  * An error job.
0231  *
0232  * An async error.
0233  *
0234  */
0235 template<typename Out = void>
0236 Job<Out> error(int errorCode = 1, const QString &errorMessage = QString());
0237 
0238 /**
0239  * @relates Job
0240  *
0241  * An error job.
0242  *
0243  * An async error.
0244  *
0245  */
0246 template<typename Out = void>
0247 Job<Out> error(const char *);
0248 
0249 /**
0250  * @relates Job
0251  *
0252  * An error job.
0253  *
0254  * An async error.
0255  *
0256  */
0257 template<typename Out = void>
0258 Job<Out> error(const Error &);
0259 
0260 //@cond PRIVATE
0261 class KASYNC_EXPORT JobBase
0262 {
0263     template<typename Out, typename ... In>
0264     friend class Job;
0265 
0266 public:
0267     explicit JobBase(const Private::ExecutorBasePtr &executor)
0268         : mExecutor(executor)
0269     {}
0270 
0271     virtual ~JobBase() = default;
0272 
0273 protected:
0274     Private::ExecutorBasePtr mExecutor;
0275 };
0276 //@endcond
0277 
0278 /**
0279  * @brief An Asynchronous job
0280  *
0281  * A single instance of Job represents a single method that will be executed
0282  * asynchronously. The Job is started by exec(), which returns Future
0283  * immediately. The Future will be set to finished state once the asynchronous
0284  * task has finished. You can use Future::waitForFinished() to wait for
0285  * for the Future in blocking manner.
0286  *
0287  * It is possible to chain multiple Jobs one after another in different fashion
0288  * (sequential, parallel, etc.). Calling exec() will then return a pending
0289  * Future, and will execute the entire chain of jobs.
0290  *
0291  * @code
0292  * auto job = Job::start<QList<int>>(
0293  *     [](KAsync::Future<QList<int>> &future) {
0294  *         MyREST::PendingUsers *pu = MyREST::requestListOfUsers();
0295  *         QObject::connect(pu, &PendingOperation::finished,
0296  *                          [&](PendingOperation *pu) {
0297  *                              future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds());
0298  *                              future->setFinished();
0299  *                          });
0300  *      })
0301  * .each<QList<MyREST::User>, int>(
0302  *      [](const int &userId, KAsync::Future<QList<MyREST::User>> &future) {
0303  *          MyREST::PendingUser *pu = MyREST::requestUserDetails(userId);
0304  *          QObject::connect(pu, &PendingOperation::finished,
0305  *                           [&](PendingOperation *pu) {
0306  *                              future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user());
0307  *                              future->setFinished();
0308  *                           });
0309  *      });
0310  *
0311  * KAsync::Future<QList<MyREST::User>> usersFuture = job.exec();
0312  * usersFuture.waitForFinished();
0313  * QList<MyRest::User> users = usersFuture.value();
0314  * @endcode
0315  *
0316  * In the example above, calling @p job.exec() will first invoke the first job,
0317  * which will retrieve a list of IDs and then will invoke the second function
0318  * for each single entry in the list returned by the first function.
0319  */
0320 template<typename Out, typename ... In>
0321 class [[nodiscard]] Job : public JobBase
0322 {
0323     //@cond PRIVATE
0324     template<typename OutOther, typename ... InOther>
0325     friend class Job;
0326 
0327     template<typename OutOther, typename ... InOther>
0328     friend Job<OutOther, InOther ...> Private::startImpl(Private::ContinuationHolder<OutOther, InOther ...> &&);
0329 
0330     template<typename List, typename ValueType>
0331     friend  Job<void, List> forEach(KAsync::Job<void, ValueType> job);
0332 
0333     template<typename List, typename ValueType>
0334     friend Job<void, List> serialForEach(KAsync::Job<void, ValueType> job);
0335 
0336     // Used to disable implicit conversion of Job<void to Job<void> which triggers
0337     // comiler warning.
0338     struct IncompleteType;
0339     //@endcond
0340 
0341 public:
0342     typedef Out OutType;
0343 
0344     ///A continuation
0345     template<typename OutOther, typename ... InOther>
0346     Job<OutOther, In ...> then(const Job<OutOther, InOther ...> &job) const;
0347 
0348     ///Shorthands for a job that returns another job from it's continuation
0349     //
0350     //OutOther and InOther are only there fore backwards compatibility, but are otherwise ignored.
0351     //It should never be necessary to specify any template arguments, as they are automatically deduced from the provided argument.
0352     //
0353     //We currently have to write a then overload for:
0354     //* One argument in the continuation
0355     //* No argument in the continuation
0356     //* One argument + error in the continuation
0357     //* No argument + error in the continuation
0358     //This is due to how we extract the return type with "decltype(func(std::declval<Out>()))".
0359     //Ideally we could conflate this into at least fewer overloads, but I didn't manage so far and this at least works as expected.
0360 
0361     ///Continuation returning job: [] (Arg) -> KAsync::Job<...> { ... }
0362     template<typename OutOther = void, typename ... InOther, typename F>
0363     auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(std::declval<Out>()))>::value,
0364                                                   Job<typename decltype(func(std::declval<Out>()))::OutType, In...>>
0365     {
0366         using ResultJob = decltype(func(std::declval<Out>())); //Job<QString, int>
0367         return thenImpl<typename ResultJob::OutType, Out>(
0368                 {JobContinuation<typename ResultJob::OutType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase);
0369     }
0370 
0371     ///Void continuation with job: [] () -> KAsync::Job<...> { ... }
0372     template<typename OutOther = void, typename ... InOther, typename F>
0373     auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func())>::value,
0374                                                   Job<typename decltype(func())::OutType, In...>>
0375     {
0376         using ResultJob = decltype(func()); //Job<QString, void>
0377         return thenImpl<typename ResultJob::OutType>(
0378                 {JobContinuation<typename ResultJob::OutType>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase);
0379     }
0380 
0381     ///Error continuation returning job: [] (KAsync::Error, Arg) -> KAsync::Job<...> { ... }
0382     template<typename OutOther = void, typename ... InOther, typename F>
0383     auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(KAsync::Error{}, std::declval<Out>()))>::value,
0384                                                   Job<typename decltype(func(KAsync::Error{}, std::declval<Out>()))::OutType, In...>>
0385     {
0386         using ResultJob = decltype(func(KAsync::Error{}, std::declval<Out>())); //Job<QString, int>
0387         return thenImpl<typename ResultJob::OutType, Out>(
0388                 {JobErrorContinuation<typename ResultJob::OutType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::Always);
0389     }
0390 
0391     ///Error void continuation returning job: [] (KAsync::Error) -> KAsync::Job<...> { ... }
0392     template<typename OutOther = void, typename ... InOther, typename F>
0393     auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(KAsync::Error{}))>::value,
0394                                                   Job<typename decltype(func(KAsync::Error{}))::OutType, In...>>
0395     {
0396         using ResultJob = decltype(func(KAsync::Error{}));
0397         return thenImpl<typename ResultJob::OutType>(
0398                 {JobErrorContinuation<typename ResultJob::OutType>(std::forward<F>(func))}, Private::ExecutionFlag::Always);
0399     }
0400 
0401     ///Sync continuation: [] (Arg) -> void { ... }
0402     template<typename OutOther = void, typename ... InOther, typename F>
0403     auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(std::declval<Out>()))>::value,
0404                                                   Job<decltype(func(std::declval<Out>())), In...>>
0405     {
0406         using ResultType = decltype(func(std::declval<Out>())); //QString
0407         return thenImpl<ResultType, Out>(
0408                 {SyncContinuation<ResultType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase);
0409     }
0410 
0411     ///Sync void continuation: [] () -> void { ... }
0412     template<typename OutOther = void, typename ... InOther, typename F>
0413     auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func())>::value,
0414                                                   Job<decltype(func()), In...>>
0415     {
0416         using ResultType = decltype(func()); //QString
0417         return thenImpl<ResultType>(
0418                 {SyncContinuation<ResultType>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase);
0419     }
0420 
0421     ///Sync error continuation: [] (KAsync::Error, Arg) -> void { ... }
0422     template<typename OutOther = void, typename ... InOther, typename F>
0423     auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(KAsync::Error{}, std::declval<Out>()))>::value,
0424                                                   Job<decltype(func(KAsync::Error{}, std::declval<Out>())),In...>>
0425     {
0426         using ResultType = decltype(func(KAsync::Error{}, std::declval<Out>())); //QString
0427         return thenImpl<ResultType, Out>(
0428                 {SyncErrorContinuation<ResultType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::Always);
0429     }
0430 
0431     ///Sync void error continuation: [] (KAsync::Error) -> void { ... }
0432     template<typename OutOther = void, typename ... InOther, typename F>
0433     auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(KAsync::Error{}))>::value,
0434                                                   Job<decltype(func(KAsync::Error{})), In...>>
0435     {
0436         using ResultType = decltype(func(KAsync::Error{}));
0437         return thenImpl<ResultType>(
0438                 {SyncErrorContinuation<ResultType>(std::forward<F>(func))}, Private::ExecutionFlag::Always);
0439     }
0440 
0441     ///Shorthand for a job that receives the error and a handle
0442     template<typename OutOther, typename ... InOther>
0443     Job<OutOther, In ...> then(AsyncContinuation<OutOther, InOther ...> &&func) const
0444     {
0445         return thenImpl<OutOther, InOther ...>({std::forward<AsyncContinuation<OutOther, InOther ...>>(func)},
0446                                                Private::ExecutionFlag::GoodCase);
0447     }
0448 
0449     ///Shorthand for a job that receives the error and a handle
0450     template<typename OutOther, typename ... InOther>
0451     Job<OutOther, In ...> then(AsyncErrorContinuation<OutOther, InOther ...> &&func) const
0452     {
0453         return thenImpl<OutOther, InOther ...>({std::forward<AsyncErrorContinuation<OutOther, InOther ...>>(func)}, Private::ExecutionFlag::Always);
0454     }
0455 
0456     ///Shorthand for a job that receives the error only
0457     Job<Out, In ...> onError(SyncErrorContinuation<void> &&errorFunc) const;
0458 
0459     /**
0460      * Shorthand for a forEach loop that automatically uses the return type of
0461      * this job to deduce the type expected.
0462      */
0463     template<typename OutOther = void, typename ListType = Out, typename ValueType = typename ListType::value_type, std::enable_if_t<!std::is_void<ListType>::value, int> = 0>
0464     Job<void, In ...> each(JobContinuation<void, ValueType> &&func) const
0465     {
0466         eachInvariants<OutOther>();
0467         return then<void, In ...>(forEach<Out, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
0468     }
0469 
0470     /**
0471      * Shorthand for a serialForEach loop that automatically uses the return type
0472      * of this job to deduce the type expected.
0473      */
0474     template<typename OutOther = void, typename ListType = Out, typename ValueType = typename ListType::value_type, std::enable_if_t<!std::is_void<ListType>::value, int> = 0>
0475     Job<void, In ...> serialEach(JobContinuation<void, ValueType> &&func) const
0476     {
0477         eachInvariants<OutOther>();
0478         return then<void, In ...>(serialForEach<Out, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
0479     }
0480 
0481     /**
0482      * Enable implicit conversion to Job<void>.
0483      *
0484      * This is necessary in assignments that only use the return value (which is the normal case).
0485      * This avoids constructs like:
0486      * auto job = KAsync::start<int>( ... )
0487      *  .then<void, int>( ... )
0488      *  .then<void>([](){}); //Necessary for the assignment without the implicit conversion
0489      */
0490     template<typename ... InOther>
0491     operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>>();
0492 
0493     /**
0494      * Adds an unnamed value to the context.
0495      * The context is guaranteed to persist until the jobs execution has finished.
0496      *
0497      * Useful for setting smart pointer to manage lifetime of objects required
0498      * during the execution of the job.
0499      */
0500     template<typename T>
0501     Job<Out, In ...> &addToContext(const T &value)
0502     {
0503         assert(mExecutor);
0504         mExecutor->addToContext(QVariant::fromValue<T>(value));
0505         return *this;
0506     }
0507 
0508     /**
0509      * Adds a guard.
0510      * It is guaranteed that no callback is executed after the guard vanishes.
0511      *
0512      * Use this i.e. ensure you don't call-back into an already destroyed object.
0513      */
0514     Job<Out, In ...> &guard(const QObject *o)
0515     {
0516         assert(mExecutor);
0517         mExecutor->guard(o);
0518         return *this;
0519     }
0520 
0521     /**
0522      * @brief Starts execution of the job chain.
0523      *
0524      * This will start the execution of the task chain, starting from the
0525      * first one. It is possible to call this function multiple times, each
0526      * invocation will start a new processing and provide a new Future to
0527      * watch its status.
0528      *
0529      * @param in Argument to be passed to the very first task
0530      * @return Future&lt;Out&gt; object which will contain result of the last
0531      * task once if finishes executing. See Future documentation for more details.
0532      *
0533      * @see exec(), Future
0534      */
0535     template<typename FirstIn>
0536     KAsync::Future<Out> exec(FirstIn in);
0537 
0538     /**
0539      * @brief Starts execution of the job chain.
0540      *
0541      * This will start the execution of the task chain, starting from the
0542      * first one. It is possible to call this function multiple times, each
0543      * invocation will start a new processing and provide a new Future to
0544      * watch its status.
0545      *
0546      * @return Future&lt;Out&gt; object which will contain result of the last
0547      * task once if finishes executing. See Future documentation for more details.
0548      *
0549      * @see exec(FirstIn in), Future
0550      */
0551     KAsync::Future<Out> exec();
0552 
0553     explicit Job(JobContinuation<Out, In ...> &&func);
0554     explicit Job(AsyncContinuation<Out, In ...> &&func);
0555 
0556 private:
0557     //@cond PRIVATE
0558     explicit Job(Private::ExecutorBasePtr executor);
0559 
0560     template<typename OutOther, typename ... InOther>
0561     Job<OutOther, In ...> thenImpl(Private::ContinuationHolder<OutOther, InOther ...> helper,
0562                                    Private::ExecutionFlag execFlag = Private::ExecutionFlag::GoodCase) const;
0563 
0564     template<typename InOther, typename ... InOtherTail>
0565     void thenInvariants() const;
0566 
0567     //Base case for an empty parameter pack
0568     template<typename ... InOther>
0569     auto thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>;
0570 
0571     template<typename OutOther>
0572     void eachInvariants() const;
0573     //@endcond
0574 };
0575 
0576 } // namespace KAsync
0577 
0578 
0579 // out-of-line definitions of Job methods
0580 #include "job_impl.h"
0581 
0582 #endif // KASYNC_H