File indexing completed on 2024-12-08 12:44:37
0001 /* 0002 SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com> 0003 SPDX-FileCopyrightText: 2016 Daniel Vrátil <dvratil@kde.org> 0004 SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #ifndef KASYNC_H 0010 #define KASYNC_H 0011 0012 #include "kasync_export.h" 0013 0014 #include <functional> 0015 #include <type_traits> 0016 #include <cassert> 0017 0018 #include <QVariant> 0019 0020 #include "future.h" 0021 #include "debug.h" 0022 0023 #include "continuations_p.h" 0024 #include "executor_p.h" 0025 0026 class QObject; 0027 0028 /** 0029 * @mainpage KAsync 0030 * 0031 * @brief API to help write async code. 0032 * 0033 * This API is based around jobs that take lambdas to execute asynchronous tasks. 0034 * Each async operation can take a continuation that can then be used to execute 0035 * further async operations. That way it is possible to build async chains of 0036 * operations that can be stored and executed later on. Jobs can be composed, 0037 * similarly to functions. 0038 * 0039 * Relations between the components: 0040 * * Job: API wrapper around Executors chain. Can be destroyed while still running, 0041 * because the actual execution happens in the background 0042 * * Executor: Describes task to execute. Executors form a linked list matching the 0043 * order in which they will be executed. The Executor chain is destroyed when 0044 * the parent Job is destroyed. However if the Job is still running it is 0045 * guaranteed that the Executor chain will not be destroyed until the execution 0046 * is finished. 0047 * * Execution: The running execution of the task stored in Executor. Each call to 0048 * Job::exec() instantiates new Execution chain, which makes it possible for 0049 * the Job to be executed multiple times (even in parallel). 0050 * * Future: Representation of the result that is being calculated 0051 * 0052 * 0053 * TODO: Possibility to abort a job through future (perhaps optional?) 0054 * TODO: Support for timeout, specified during exec call, after which the error 0055 * handler gets called with a defined errorCode. 0056 */ 0057 0058 0059 namespace KAsync { 0060 0061 template<typename PrevOut, typename Out, typename ... In> 0062 class Executor; 0063 0064 class JobBase; 0065 0066 template<typename Out, typename ... In> 0067 class Job; 0068 0069 //@cond PRIVATE 0070 namespace Private { 0071 0072 template<typename Out, typename ... In> 0073 Job<Out, In ...> startImpl(Private::ContinuationHolder<Out, In ...> &&helper) 0074 { 0075 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); 0076 return Job<Out, In...>(QSharedPointer<Private::Executor<Out, In ...>>::create( 0077 std::forward<Private::ContinuationHolder<Out, In...>>(helper), nullptr, Private::ExecutionFlag::GoodCase)); 0078 } 0079 0080 } // namespace Private 0081 //@endcond 0082 0083 0084 /** 0085 * @relates Job 0086 * 0087 * Start an asynchronous job sequence. 0088 * 0089 * start() is your starting point to build a chain of jobs to be executed 0090 * asynchronously. 0091 * 0092 * @param func A continuation to be executed. 0093 */ 0094 0095 ///Sync continuation without job: [] () -> T { ... } 0096 template<typename Out = void, typename ... In, typename F> 0097 auto start(F &&func) -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(std::declval<In>() ...))>::value, 0098 Job<decltype(func(std::declval<In>() ...)), In...>> 0099 { 0100 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); 0101 return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(SyncContinuation<Out, In ...>(std::forward<F>(func)))); 0102 } 0103 0104 ///continuation with job: [] () -> KAsync::Job<...> { ... } 0105 template<typename Out = void, typename ... In, typename F> 0106 auto start(F &&func) -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(std::declval<In>() ...))>::value, 0107 Job<typename decltype(func(std::declval<In>() ...))::OutType, In...>> 0108 { 0109 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); 0110 return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(JobContinuation<Out, In...>(std::forward<F>(func)))); 0111 } 0112 0113 ///Handle continuation: [] (KAsync::Future<T>, ...) { ... } 0114 template<typename Out = void, typename ... In> 0115 auto start(AsyncContinuation<Out, In ...> &&func) -> Job<Out, In ...> 0116 { 0117 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); 0118 return Private::startImpl<Out, In...>(Private::ContinuationHolder<Out, In ...>(std::forward<AsyncContinuation<Out, In ...>>(func))); 0119 } 0120 0121 enum ControlFlowFlag { 0122 Break, 0123 Continue 0124 }; 0125 0126 /** 0127 * @relates Job 0128 * 0129 * Async while loop. 0130 * 0131 * Loop continues while body returns ControlFlowFlag::Continue. 0132 */ 0133 KASYNC_EXPORT Job<void> doWhile(const Job<ControlFlowFlag> &body); 0134 0135 /** 0136 * @relates Job 0137 * 0138 * Async while loop. 0139 * 0140 * Shorthand that takes a continuation. 0141 * 0142 * @see doWhile 0143 */ 0144 KASYNC_EXPORT Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body); 0145 0146 0147 0148 /** 0149 * @relates Job 0150 * 0151 * Async delay. 0152 */ 0153 KASYNC_EXPORT Job<void> wait(int delay); 0154 0155 /** 0156 * @relates Job 0157 * 0158 * A null job. 0159 * 0160 * An async noop. 0161 * 0162 */ 0163 template<typename Out = void> 0164 Job<Out> null(); 0165 0166 /** 0167 * @relates Job 0168 * 0169 * Async value. 0170 */ 0171 template<typename Out> 0172 Job<Out> value(Out); 0173 0174 /** 0175 * @relates Job 0176 * 0177 * Async foreach loop. 0178 * 0179 * This will execute a job for every value in the list. 0180 * Errors while not stop processing of other jobs but set an error on the wrapper job. 0181 */ 0182 template<typename List, typename ValueType = typename List::value_type> 0183 Job<void, List> forEach(KAsync::Job<void, ValueType> job); 0184 0185 /** 0186 * @relates Job 0187 * 0188 * Async foreach loop. 0189 * 0190 * Shorthand that takes a continuation. 0191 * 0192 * @see serialForEach 0193 */ 0194 template<typename List, typename ValueType = typename List::value_type> 0195 Job<void, List> forEach(JobContinuation<void, ValueType> &&); 0196 0197 0198 /** 0199 * @relates Job 0200 * 0201 * Serial Async foreach loop. 0202 * 0203 * This will execute a job for every value in the list sequentially. 0204 * Errors while not stop processing of other jobs but set an error on the wrapper job. 0205 */ 0206 template<typename List, typename ValueType = typename List::value_type> 0207 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job); 0208 0209 /** 0210 * @relates Job 0211 * 0212 * Serial Async foreach loop. 0213 * 0214 * Shorthand that takes a continuation. 0215 * 0216 * @see serialForEach 0217 */ 0218 template<typename List, typename ValueType = typename List::value_type> 0219 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&); 0220 0221 /** 0222 * @brief Wait until all given futures are completed. 0223 */ 0224 template<template<typename> class Container> 0225 Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures); 0226 0227 /** 0228 * @relates Job 0229 * 0230 * An error job. 0231 * 0232 * An async error. 0233 * 0234 */ 0235 template<typename Out = void> 0236 Job<Out> error(int errorCode = 1, const QString &errorMessage = QString()); 0237 0238 /** 0239 * @relates Job 0240 * 0241 * An error job. 0242 * 0243 * An async error. 0244 * 0245 */ 0246 template<typename Out = void> 0247 Job<Out> error(const char *); 0248 0249 /** 0250 * @relates Job 0251 * 0252 * An error job. 0253 * 0254 * An async error. 0255 * 0256 */ 0257 template<typename Out = void> 0258 Job<Out> error(const Error &); 0259 0260 //@cond PRIVATE 0261 class KASYNC_EXPORT JobBase 0262 { 0263 template<typename Out, typename ... In> 0264 friend class Job; 0265 0266 public: 0267 explicit JobBase(const Private::ExecutorBasePtr &executor) 0268 : mExecutor(executor) 0269 {} 0270 0271 virtual ~JobBase() = default; 0272 0273 protected: 0274 Private::ExecutorBasePtr mExecutor; 0275 }; 0276 //@endcond 0277 0278 /** 0279 * @brief An Asynchronous job 0280 * 0281 * A single instance of Job represents a single method that will be executed 0282 * asynchronously. The Job is started by exec(), which returns Future 0283 * immediately. The Future will be set to finished state once the asynchronous 0284 * task has finished. You can use Future::waitForFinished() to wait for 0285 * for the Future in blocking manner. 0286 * 0287 * It is possible to chain multiple Jobs one after another in different fashion 0288 * (sequential, parallel, etc.). Calling exec() will then return a pending 0289 * Future, and will execute the entire chain of jobs. 0290 * 0291 * @code 0292 * auto job = Job::start<QList<int>>( 0293 * [](KAsync::Future<QList<int>> &future) { 0294 * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); 0295 * QObject::connect(pu, &PendingOperation::finished, 0296 * [&](PendingOperation *pu) { 0297 * future->setValue(dynamic_cast<MyREST::PendingUsers*>(pu)->userIds()); 0298 * future->setFinished(); 0299 * }); 0300 * }) 0301 * .each<QList<MyREST::User>, int>( 0302 * [](const int &userId, KAsync::Future<QList<MyREST::User>> &future) { 0303 * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); 0304 * QObject::connect(pu, &PendingOperation::finished, 0305 * [&](PendingOperation *pu) { 0306 * future->setValue(Qlist<MyREST::User>() << dynamic_cast<MyREST::PendingUser*>(pu)->user()); 0307 * future->setFinished(); 0308 * }); 0309 * }); 0310 * 0311 * KAsync::Future<QList<MyREST::User>> usersFuture = job.exec(); 0312 * usersFuture.waitForFinished(); 0313 * QList<MyRest::User> users = usersFuture.value(); 0314 * @endcode 0315 * 0316 * In the example above, calling @p job.exec() will first invoke the first job, 0317 * which will retrieve a list of IDs and then will invoke the second function 0318 * for each single entry in the list returned by the first function. 0319 */ 0320 template<typename Out, typename ... In> 0321 class [[nodiscard]] Job : public JobBase 0322 { 0323 //@cond PRIVATE 0324 template<typename OutOther, typename ... InOther> 0325 friend class Job; 0326 0327 template<typename OutOther, typename ... InOther> 0328 friend Job<OutOther, InOther ...> Private::startImpl(Private::ContinuationHolder<OutOther, InOther ...> &&); 0329 0330 template<typename List, typename ValueType> 0331 friend Job<void, List> forEach(KAsync::Job<void, ValueType> job); 0332 0333 template<typename List, typename ValueType> 0334 friend Job<void, List> serialForEach(KAsync::Job<void, ValueType> job); 0335 0336 // Used to disable implicit conversion of Job<void to Job<void> which triggers 0337 // comiler warning. 0338 struct IncompleteType; 0339 //@endcond 0340 0341 public: 0342 typedef Out OutType; 0343 0344 ///A continuation 0345 template<typename OutOther, typename ... InOther> 0346 Job<OutOther, In ...> then(const Job<OutOther, InOther ...> &job) const; 0347 0348 ///Shorthands for a job that returns another job from it's continuation 0349 // 0350 //OutOther and InOther are only there fore backwards compatibility, but are otherwise ignored. 0351 //It should never be necessary to specify any template arguments, as they are automatically deduced from the provided argument. 0352 // 0353 //We currently have to write a then overload for: 0354 //* One argument in the continuation 0355 //* No argument in the continuation 0356 //* One argument + error in the continuation 0357 //* No argument + error in the continuation 0358 //This is due to how we extract the return type with "decltype(func(std::declval<Out>()))". 0359 //Ideally we could conflate this into at least fewer overloads, but I didn't manage so far and this at least works as expected. 0360 0361 ///Continuation returning job: [] (Arg) -> KAsync::Job<...> { ... } 0362 template<typename OutOther = void, typename ... InOther, typename F> 0363 auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(std::declval<Out>()))>::value, 0364 Job<typename decltype(func(std::declval<Out>()))::OutType, In...>> 0365 { 0366 using ResultJob = decltype(func(std::declval<Out>())); //Job<QString, int> 0367 return thenImpl<typename ResultJob::OutType, Out>( 0368 {JobContinuation<typename ResultJob::OutType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase); 0369 } 0370 0371 ///Void continuation with job: [] () -> KAsync::Job<...> { ... } 0372 template<typename OutOther = void, typename ... InOther, typename F> 0373 auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func())>::value, 0374 Job<typename decltype(func())::OutType, In...>> 0375 { 0376 using ResultJob = decltype(func()); //Job<QString, void> 0377 return thenImpl<typename ResultJob::OutType>( 0378 {JobContinuation<typename ResultJob::OutType>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase); 0379 } 0380 0381 ///Error continuation returning job: [] (KAsync::Error, Arg) -> KAsync::Job<...> { ... } 0382 template<typename OutOther = void, typename ... InOther, typename F> 0383 auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(KAsync::Error{}, std::declval<Out>()))>::value, 0384 Job<typename decltype(func(KAsync::Error{}, std::declval<Out>()))::OutType, In...>> 0385 { 0386 using ResultJob = decltype(func(KAsync::Error{}, std::declval<Out>())); //Job<QString, int> 0387 return thenImpl<typename ResultJob::OutType, Out>( 0388 {JobErrorContinuation<typename ResultJob::OutType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::Always); 0389 } 0390 0391 ///Error void continuation returning job: [] (KAsync::Error) -> KAsync::Job<...> { ... } 0392 template<typename OutOther = void, typename ... InOther, typename F> 0393 auto then(F &&func) const -> std::enable_if_t<std::is_base_of<JobBase, decltype(func(KAsync::Error{}))>::value, 0394 Job<typename decltype(func(KAsync::Error{}))::OutType, In...>> 0395 { 0396 using ResultJob = decltype(func(KAsync::Error{})); 0397 return thenImpl<typename ResultJob::OutType>( 0398 {JobErrorContinuation<typename ResultJob::OutType>(std::forward<F>(func))}, Private::ExecutionFlag::Always); 0399 } 0400 0401 ///Sync continuation: [] (Arg) -> void { ... } 0402 template<typename OutOther = void, typename ... InOther, typename F> 0403 auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(std::declval<Out>()))>::value, 0404 Job<decltype(func(std::declval<Out>())), In...>> 0405 { 0406 using ResultType = decltype(func(std::declval<Out>())); //QString 0407 return thenImpl<ResultType, Out>( 0408 {SyncContinuation<ResultType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase); 0409 } 0410 0411 ///Sync void continuation: [] () -> void { ... } 0412 template<typename OutOther = void, typename ... InOther, typename F> 0413 auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func())>::value, 0414 Job<decltype(func()), In...>> 0415 { 0416 using ResultType = decltype(func()); //QString 0417 return thenImpl<ResultType>( 0418 {SyncContinuation<ResultType>(std::forward<F>(func))}, Private::ExecutionFlag::GoodCase); 0419 } 0420 0421 ///Sync error continuation: [] (KAsync::Error, Arg) -> void { ... } 0422 template<typename OutOther = void, typename ... InOther, typename F> 0423 auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(KAsync::Error{}, std::declval<Out>()))>::value, 0424 Job<decltype(func(KAsync::Error{}, std::declval<Out>())),In...>> 0425 { 0426 using ResultType = decltype(func(KAsync::Error{}, std::declval<Out>())); //QString 0427 return thenImpl<ResultType, Out>( 0428 {SyncErrorContinuation<ResultType, Out>(std::forward<F>(func))}, Private::ExecutionFlag::Always); 0429 } 0430 0431 ///Sync void error continuation: [] (KAsync::Error) -> void { ... } 0432 template<typename OutOther = void, typename ... InOther, typename F> 0433 auto then(F &&func) const -> std::enable_if_t<!std::is_base_of<JobBase, decltype(func(KAsync::Error{}))>::value, 0434 Job<decltype(func(KAsync::Error{})), In...>> 0435 { 0436 using ResultType = decltype(func(KAsync::Error{})); 0437 return thenImpl<ResultType>( 0438 {SyncErrorContinuation<ResultType>(std::forward<F>(func))}, Private::ExecutionFlag::Always); 0439 } 0440 0441 ///Shorthand for a job that receives the error and a handle 0442 template<typename OutOther, typename ... InOther> 0443 Job<OutOther, In ...> then(AsyncContinuation<OutOther, InOther ...> &&func) const 0444 { 0445 return thenImpl<OutOther, InOther ...>({std::forward<AsyncContinuation<OutOther, InOther ...>>(func)}, 0446 Private::ExecutionFlag::GoodCase); 0447 } 0448 0449 ///Shorthand for a job that receives the error and a handle 0450 template<typename OutOther, typename ... InOther> 0451 Job<OutOther, In ...> then(AsyncErrorContinuation<OutOther, InOther ...> &&func) const 0452 { 0453 return thenImpl<OutOther, InOther ...>({std::forward<AsyncErrorContinuation<OutOther, InOther ...>>(func)}, Private::ExecutionFlag::Always); 0454 } 0455 0456 ///Shorthand for a job that receives the error only 0457 Job<Out, In ...> onError(SyncErrorContinuation<void> &&errorFunc) const; 0458 0459 /** 0460 * Shorthand for a forEach loop that automatically uses the return type of 0461 * this job to deduce the type expected. 0462 */ 0463 template<typename OutOther = void, typename ListType = Out, typename ValueType = typename ListType::value_type, std::enable_if_t<!std::is_void<ListType>::value, int> = 0> 0464 Job<void, In ...> each(JobContinuation<void, ValueType> &&func) const 0465 { 0466 eachInvariants<OutOther>(); 0467 return then<void, In ...>(forEach<Out, ValueType>(std::forward<JobContinuation<void, ValueType>>(func))); 0468 } 0469 0470 /** 0471 * Shorthand for a serialForEach loop that automatically uses the return type 0472 * of this job to deduce the type expected. 0473 */ 0474 template<typename OutOther = void, typename ListType = Out, typename ValueType = typename ListType::value_type, std::enable_if_t<!std::is_void<ListType>::value, int> = 0> 0475 Job<void, In ...> serialEach(JobContinuation<void, ValueType> &&func) const 0476 { 0477 eachInvariants<OutOther>(); 0478 return then<void, In ...>(serialForEach<Out, ValueType>(std::forward<JobContinuation<void, ValueType>>(func))); 0479 } 0480 0481 /** 0482 * Enable implicit conversion to Job<void>. 0483 * 0484 * This is necessary in assignments that only use the return value (which is the normal case). 0485 * This avoids constructs like: 0486 * auto job = KAsync::start<int>( ... ) 0487 * .then<void, int>( ... ) 0488 * .then<void>([](){}); //Necessary for the assignment without the implicit conversion 0489 */ 0490 template<typename ... InOther> 0491 operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>>(); 0492 0493 /** 0494 * Adds an unnamed value to the context. 0495 * The context is guaranteed to persist until the jobs execution has finished. 0496 * 0497 * Useful for setting smart pointer to manage lifetime of objects required 0498 * during the execution of the job. 0499 */ 0500 template<typename T> 0501 Job<Out, In ...> &addToContext(const T &value) 0502 { 0503 assert(mExecutor); 0504 mExecutor->addToContext(QVariant::fromValue<T>(value)); 0505 return *this; 0506 } 0507 0508 /** 0509 * Adds a guard. 0510 * It is guaranteed that no callback is executed after the guard vanishes. 0511 * 0512 * Use this i.e. ensure you don't call-back into an already destroyed object. 0513 */ 0514 Job<Out, In ...> &guard(const QObject *o) 0515 { 0516 assert(mExecutor); 0517 mExecutor->guard(o); 0518 return *this; 0519 } 0520 0521 /** 0522 * @brief Starts execution of the job chain. 0523 * 0524 * This will start the execution of the task chain, starting from the 0525 * first one. It is possible to call this function multiple times, each 0526 * invocation will start a new processing and provide a new Future to 0527 * watch its status. 0528 * 0529 * @param in Argument to be passed to the very first task 0530 * @return Future<Out> object which will contain result of the last 0531 * task once if finishes executing. See Future documentation for more details. 0532 * 0533 * @see exec(), Future 0534 */ 0535 template<typename FirstIn> 0536 KAsync::Future<Out> exec(FirstIn in); 0537 0538 /** 0539 * @brief Starts execution of the job chain. 0540 * 0541 * This will start the execution of the task chain, starting from the 0542 * first one. It is possible to call this function multiple times, each 0543 * invocation will start a new processing and provide a new Future to 0544 * watch its status. 0545 * 0546 * @return Future<Out> object which will contain result of the last 0547 * task once if finishes executing. See Future documentation for more details. 0548 * 0549 * @see exec(FirstIn in), Future 0550 */ 0551 KAsync::Future<Out> exec(); 0552 0553 explicit Job(JobContinuation<Out, In ...> &&func); 0554 explicit Job(AsyncContinuation<Out, In ...> &&func); 0555 0556 private: 0557 //@cond PRIVATE 0558 explicit Job(Private::ExecutorBasePtr executor); 0559 0560 template<typename OutOther, typename ... InOther> 0561 Job<OutOther, In ...> thenImpl(Private::ContinuationHolder<OutOther, InOther ...> helper, 0562 Private::ExecutionFlag execFlag = Private::ExecutionFlag::GoodCase) const; 0563 0564 template<typename InOther, typename ... InOtherTail> 0565 void thenInvariants() const; 0566 0567 //Base case for an empty parameter pack 0568 template<typename ... InOther> 0569 auto thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>; 0570 0571 template<typename OutOther> 0572 void eachInvariants() const; 0573 //@endcond 0574 }; 0575 0576 } // namespace KAsync 0577 0578 0579 // out-of-line definitions of Job methods 0580 #include "job_impl.h" 0581 0582 #endif // KASYNC_H