File indexing completed on 2024-05-12 04:41:22
0001 /* 0002 SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com> 0003 SPDX-FileCopyrightText: 2015-2016 Daniel Vrátil <dvratil@kde.org> 0004 SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #ifndef KASYNC_JOB_IMPL_H 0010 #define KASYNC_JOB_IMPL_H 0011 0012 #include "async.h" 0013 #include "traits_p.h" 0014 0015 #include <QTimer> 0016 0017 //@cond PRIVATE 0018 0019 namespace KAsync 0020 { 0021 0022 template<typename Out, typename ... In> 0023 template<typename ... InOther> 0024 Job<Out, In ...>::operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>> () 0025 { 0026 return thenImpl<void, InOther ...>({JobContinuation<void, InOther ...>([](InOther ...){ return KAsync::null<void>(); })}, {}); 0027 } 0028 0029 template<typename Out, typename ... In> 0030 template<typename OutOther, typename ... InOther> 0031 Job<OutOther, In ...> Job<Out, In ...>::thenImpl(Private::ContinuationHolder<OutOther, InOther ...> workHelper, 0032 Private::ExecutionFlag execFlag) const 0033 { 0034 thenInvariants<InOther ...>(); 0035 return Job<OutOther, In ...>(QSharedPointer<Private::Executor<OutOther, InOther ...>>::create( 0036 std::forward<Private::ContinuationHolder<OutOther, InOther ...>>(workHelper), mExecutor, execFlag)); 0037 } 0038 0039 template<typename Out, typename ... In> 0040 template<typename OutOther, typename ... InOther> 0041 Job<OutOther, In ...> Job<Out, In ...>::then(const Job<OutOther, InOther ...> &job) const 0042 { 0043 thenInvariants<InOther ...>(); 0044 auto executor = job.mExecutor; 0045 executor->prepend(mExecutor); 0046 return Job<OutOther, In ...>(executor); 0047 } 0048 0049 template<typename Out, typename ... In> 0050 Job<Out, In ...> Job<Out, In ...>::onError(SyncErrorContinuation<void> &&errorFunc) const 0051 { 0052 return Job<Out, In...>(QSharedPointer<Private::Executor<Out, Out>>::create( 0053 // Extra indirection to allow propagating the result of a previous future when no 0054 // error occurs 0055 Private::ContinuationHolder<Out, Out>([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) { 0056 errorFunc(error); 0057 return val; 0058 }), mExecutor, Private::ExecutionFlag::ErrorCase)); 0059 } 0060 0061 template<> // Specialize for void jobs 0062 inline Job<void> Job<void>::onError(SyncErrorContinuation<void> &&errorFunc) const 0063 { 0064 return Job<void>(QSharedPointer<Private::Executor<void>>::create( 0065 Private::ContinuationHolder<void>(std::forward<SyncErrorContinuation<void>>(errorFunc)), 0066 mExecutor, Private::ExecutionFlag::ErrorCase)); 0067 } 0068 0069 template<typename Out, typename ... In> 0070 template<typename FirstIn> 0071 KAsync::Future<Out> Job<Out, In ...>::exec(FirstIn in) 0072 { 0073 // Inject a fake sync executor that will return the initial value 0074 Private::ExecutorBasePtr first = mExecutor; 0075 while (first->mPrev) { 0076 first = first->mPrev; 0077 } 0078 0079 first->mPrev = QSharedPointer<Private::Executor<FirstIn>>::create( 0080 Private::ContinuationHolder<FirstIn>([val = std::move(in)](Future<FirstIn> &future) { 0081 future.setResult(val); 0082 })); 0083 0084 auto result = exec(); 0085 // Remove the injected executor 0086 first->mPrev.reset(); 0087 return result; 0088 } 0089 0090 template<typename Out, typename ... In> 0091 KAsync::Future<Out> Job<Out, In ...>::exec() 0092 { 0093 Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create()); 0094 KAsync::Future<Out> result = *execution->result<Out>(); 0095 0096 return result; 0097 } 0098 0099 template<typename Out, typename ... In> 0100 Job<Out, In ...>::Job(Private::ExecutorBasePtr executor) 0101 : JobBase(executor) 0102 {} 0103 0104 template<typename Out, typename ... In> 0105 Job<Out, In ...>::Job(JobContinuation<Out, In ...> &&func) 0106 : JobBase(new Private::Executor<Out, In ...>(std::forward<JobContinuation<Out, In ...>>(func), {})) 0107 { 0108 qWarning() << "Creating job job"; 0109 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); 0110 } 0111 0112 template<typename Out, typename ... In> 0113 template<typename OutOther> 0114 void Job<Out, In ...>::eachInvariants() const 0115 { 0116 static_assert(traits::isContainer<Out>::value, 0117 "The 'Each' task can only be connected to a job that returns a list or an array."); 0118 static_assert(std::is_void<OutOther>::value || traits::isContainer<OutOther>::value, 0119 "The result type of 'Each' task must be void, a list or an array."); 0120 } 0121 0122 template<typename Out, typename ... In> 0123 template<typename InOtherFirst, typename ... InOtherTail> 0124 void Job<Out, In ...>::thenInvariants() const 0125 { 0126 static_assert(!std::is_void<Out>::value && (std::is_convertible<Out, InOtherFirst>::value || std::is_base_of<Out, InOtherFirst>::value), 0127 "The return type of previous task must be compatible with input type of this task"); 0128 } 0129 0130 template<typename Out, typename ... In> 0131 template<typename ... InOther> 0132 auto Job<Out, In ...>::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)> 0133 { 0134 } 0135 0136 template<template<typename> class Container> 0137 KAsync::Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures) 0138 { 0139 struct Context { 0140 void removeWatcher(KAsync::FutureWatcher<void> *w) 0141 { 0142 pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) { 0143 return w == watcher.get(); 0144 })); 0145 } 0146 0147 std::vector<std::unique_ptr<KAsync::FutureWatcher<void>>> pending; 0148 }; 0149 0150 return start<Context *>([]() { 0151 return new Context(); 0152 }) 0153 .template then<Context*, Context*>([futures](Context *context, KAsync::Future<Context *> &future) { 0154 for (KAsync::Future<void> subFuture : futures) { 0155 if (subFuture.isFinished()) { 0156 continue; 0157 } 0158 // FIXME bind lifetime all watcher to future (respectively the main job) 0159 auto watcher = std::make_unique<KAsync::FutureWatcher<void>>(); 0160 QObject::connect(watcher.get(), &KAsync::FutureWatcher<void>::futureReady, 0161 [&future, watcher = watcher.get(), context]() { 0162 context->removeWatcher(watcher); 0163 if (context->pending.empty()) { 0164 future.setResult(context); 0165 } 0166 }); 0167 watcher->setFuture(subFuture); 0168 context->pending.push_back(std::move(watcher)); 0169 } 0170 if (context->pending.empty()) { 0171 future.setResult(context); 0172 } 0173 }) 0174 .template then<void, Context*>([](Context *context) { 0175 delete context; 0176 }); 0177 // .finally<void>([context]() { delete context; }); 0178 } 0179 0180 template<typename List, typename ValueType> 0181 Job<void, List> forEach(KAsync::Job<void, ValueType> job) 0182 { 0183 auto cont = [job] (const List &values) mutable { 0184 auto error = QSharedPointer<KAsync::Error>::create(); 0185 QVector<KAsync::Future<void>> list; 0186 for (const auto &v : values) { 0187 auto future = job 0188 .template then<void>([error] (const KAsync::Error &e) { 0189 if (e && !*error) { 0190 //TODO ideally we would aggregate the errors instead of just using the first one 0191 *error = e; 0192 } 0193 }) 0194 .exec(v); 0195 list.push_back(future); 0196 } 0197 return waitForCompletion(list) 0198 .then<void>([error](KAsync::Future<void> &future) { 0199 if (*error) { 0200 future.setError(*error); 0201 } else { 0202 future.setFinished(); 0203 } 0204 }); 0205 }; 0206 return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create( 0207 Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); 0208 } 0209 0210 0211 template<typename List, typename ValueType> 0212 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job) 0213 { 0214 auto cont = [job] (const List &values) mutable { 0215 auto error = QSharedPointer<KAsync::Error>::create(); 0216 auto serialJob = KAsync::null<void>(); 0217 for (const auto &value : values) { 0218 serialJob = serialJob.then<void>([value, job, error](KAsync::Future<void> &future) { 0219 job.template then<void>([&future, error] (const KAsync::Error &e) { 0220 if (e && !*error) { 0221 //TODO ideally we would aggregate the errors instead of just using the first one 0222 *error = e; 0223 } 0224 future.setFinished(); 0225 }) 0226 .exec(value); 0227 }); 0228 } 0229 return serialJob 0230 .then<void>([error](KAsync::Future<void> &future) { 0231 if (*error) { 0232 future.setError(*error); 0233 } else { 0234 future.setFinished(); 0235 } 0236 }); 0237 }; 0238 return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create( 0239 Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); 0240 } 0241 0242 template<typename List, typename ValueType> 0243 Job<void, List> forEach(JobContinuation<void, ValueType> &&func) 0244 { 0245 return forEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func))); 0246 } 0247 0248 template<typename List, typename ValueType> 0249 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&func) 0250 { 0251 return serialForEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func))); 0252 } 0253 0254 template<typename Out> 0255 Job<Out> null() 0256 { 0257 return KAsync::start<Out>( 0258 [](KAsync::Future<Out> &future) { 0259 future.setFinished(); 0260 }); 0261 } 0262 0263 template<typename Out> 0264 Job<Out> value(Out v) 0265 { 0266 return KAsync::start<Out>( 0267 [val = std::move(v)](KAsync::Future<Out> &future) { 0268 future.setResult(val); 0269 }); 0270 } 0271 0272 template<typename Out> 0273 Job<Out> error(int errorCode, const QString &errorMessage) 0274 { 0275 return error<Out>({errorCode, errorMessage}); 0276 } 0277 0278 template<typename Out> 0279 Job<Out> error(const char *message) 0280 { 0281 return error<Out>(Error(message)); 0282 } 0283 0284 template<typename Out> 0285 Job<Out> error(const Error &error) 0286 { 0287 return KAsync::start<Out>( 0288 [error](KAsync::Future<Out> &future) { 0289 future.setError(error); 0290 }); 0291 } 0292 0293 inline Job<void> doWhile(const Job<ControlFlowFlag> &body) 0294 { 0295 return KAsync::start<void>([body] (KAsync::Future<void> &future) { 0296 auto job = body.then<void, ControlFlowFlag>([&future, body](const KAsync::Error &error, ControlFlowFlag flag) { 0297 if (error) { 0298 future.setError(error); 0299 future.setFinished(); 0300 } else if (flag == ControlFlowFlag::Continue) { 0301 doWhile(body).then<void>([&future](const KAsync::Error &error) { 0302 if (error) { 0303 future.setError(error); 0304 } 0305 future.setFinished(); 0306 }).exec(); 0307 } else { 0308 future.setFinished(); 0309 } 0310 }).exec(); 0311 }); 0312 } 0313 0314 inline Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body) 0315 { 0316 return doWhile(KAsync::start<ControlFlowFlag>([body] { 0317 return body(); 0318 })); 0319 } 0320 0321 inline Job<void> wait(int delay) 0322 { 0323 return KAsync::start<void>([delay](KAsync::Future<void> &future) { 0324 QTimer::singleShot(delay, [&future]() { 0325 future.setFinished(); 0326 }); 0327 }); 0328 } 0329 } // namespace KAsync 0330 0331 //@endcond 0332 0333 #endif // KASYNC_JOB_IMPL_H