File indexing completed on 2024-05-12 04:41:22

0001 /*
0002     SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com>
0003     SPDX-FileCopyrightText: 2015-2016 Daniel Vrátil <dvratil@kde.org>
0004     SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #ifndef KASYNC_JOB_IMPL_H
0010 #define KASYNC_JOB_IMPL_H
0011 
0012 #include "async.h"
0013 #include "traits_p.h"
0014 
0015 #include <QTimer>
0016 
0017 //@cond PRIVATE
0018 
0019 namespace KAsync
0020 {
0021 
0022 template<typename Out, typename ... In>
0023 template<typename ... InOther>
0024 Job<Out, In ...>::operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>> ()
0025 {
0026     return thenImpl<void, InOther ...>({JobContinuation<void, InOther ...>([](InOther ...){ return KAsync::null<void>(); })}, {});
0027 }
0028 
0029 template<typename Out, typename ... In>
0030 template<typename OutOther, typename ... InOther>
0031 Job<OutOther, In ...> Job<Out, In ...>::thenImpl(Private::ContinuationHolder<OutOther, InOther ...> workHelper,
0032                                                  Private::ExecutionFlag execFlag) const
0033 {
0034     thenInvariants<InOther ...>();
0035     return Job<OutOther, In ...>(QSharedPointer<Private::Executor<OutOther, InOther ...>>::create(
0036                 std::forward<Private::ContinuationHolder<OutOther, InOther ...>>(workHelper), mExecutor, execFlag));
0037 }
0038 
0039 template<typename Out, typename ... In>
0040 template<typename OutOther, typename ... InOther>
0041 Job<OutOther, In ...> Job<Out, In ...>::then(const Job<OutOther, InOther ...> &job) const
0042 {
0043     thenInvariants<InOther ...>();
0044     auto executor = job.mExecutor;
0045     executor->prepend(mExecutor);
0046     return Job<OutOther, In ...>(executor);
0047 }
0048 
0049 template<typename Out, typename ... In>
0050 Job<Out, In ...> Job<Out, In ...>::onError(SyncErrorContinuation<void> &&errorFunc) const
0051 {
0052     return Job<Out, In...>(QSharedPointer<Private::Executor<Out, Out>>::create(
0053                 // Extra indirection to allow propagating the result of a previous future when no
0054                 // error occurs
0055                 Private::ContinuationHolder<Out, Out>([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) {
0056                     errorFunc(error);
0057                     return val;
0058                 }), mExecutor, Private::ExecutionFlag::ErrorCase));
0059 }
0060 
0061 template<> // Specialize for void jobs
0062 inline Job<void> Job<void>::onError(SyncErrorContinuation<void> &&errorFunc) const
0063 {
0064     return Job<void>(QSharedPointer<Private::Executor<void>>::create(
0065                 Private::ContinuationHolder<void>(std::forward<SyncErrorContinuation<void>>(errorFunc)),
0066                 mExecutor, Private::ExecutionFlag::ErrorCase));
0067 }
0068 
0069 template<typename Out, typename ... In>
0070 template<typename FirstIn>
0071 KAsync::Future<Out> Job<Out, In ...>::exec(FirstIn in)
0072 {
0073     // Inject a fake sync executor that will return the initial value
0074     Private::ExecutorBasePtr first = mExecutor;
0075     while (first->mPrev) {
0076         first = first->mPrev;
0077     }
0078 
0079     first->mPrev = QSharedPointer<Private::Executor<FirstIn>>::create(
0080             Private::ContinuationHolder<FirstIn>([val = std::move(in)](Future<FirstIn> &future) {
0081                  future.setResult(val);
0082             }));
0083 
0084     auto result = exec();
0085     // Remove the injected executor
0086     first->mPrev.reset();
0087     return result;
0088 }
0089 
0090 template<typename Out, typename ... In>
0091 KAsync::Future<Out> Job<Out, In ...>::exec()
0092 {
0093     Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create());
0094     KAsync::Future<Out> result = *execution->result<Out>();
0095 
0096     return result;
0097 }
0098 
0099 template<typename Out, typename ... In>
0100 Job<Out, In ...>::Job(Private::ExecutorBasePtr executor)
0101     : JobBase(executor)
0102 {}
0103 
0104 template<typename Out, typename ... In>
0105 Job<Out, In ...>::Job(JobContinuation<Out, In ...> &&func)
0106     : JobBase(new Private::Executor<Out, In ...>(std::forward<JobContinuation<Out, In ...>>(func), {}))
0107 {
0108     qWarning() << "Creating job job";
0109     static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
0110 }
0111 
0112 template<typename Out, typename ... In>
0113 template<typename OutOther>
0114 void Job<Out, In ...>::eachInvariants() const
0115 {
0116     static_assert(traits::isContainer<Out>::value,
0117                     "The 'Each' task can only be connected to a job that returns a list or an array.");
0118     static_assert(std::is_void<OutOther>::value || traits::isContainer<OutOther>::value,
0119                     "The result type of 'Each' task must be void, a list or an array.");
0120 }
0121 
0122 template<typename Out, typename ... In>
0123 template<typename InOtherFirst, typename ... InOtherTail>
0124 void Job<Out, In ...>::thenInvariants() const
0125 {
0126     static_assert(!std::is_void<Out>::value && (std::is_convertible<Out, InOtherFirst>::value || std::is_base_of<Out, InOtherFirst>::value),
0127                     "The return type of previous task must be compatible with input type of this task");
0128 }
0129 
0130 template<typename Out, typename ... In>
0131 template<typename ... InOther>
0132 auto Job<Out, In ...>::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>
0133 {
0134 }
0135 
0136 template<template<typename> class Container>
0137 KAsync::Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures)
0138 {
0139     struct Context {
0140         void removeWatcher(KAsync::FutureWatcher<void> *w)
0141         {
0142             pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) {
0143                 return w == watcher.get();
0144             }));
0145         }
0146 
0147         std::vector<std::unique_ptr<KAsync::FutureWatcher<void>>> pending;
0148     };
0149 
0150     return start<Context *>([]() {
0151             return new Context();
0152         })
0153         .template then<Context*, Context*>([futures](Context *context, KAsync::Future<Context *> &future) {
0154             for (KAsync::Future<void> subFuture : futures) {
0155                 if (subFuture.isFinished()) {
0156                     continue;
0157                 }
0158                 // FIXME bind lifetime all watcher to future (respectively the main job)
0159                 auto watcher = std::make_unique<KAsync::FutureWatcher<void>>();
0160                 QObject::connect(watcher.get(), &KAsync::FutureWatcher<void>::futureReady,
0161                                  [&future, watcher = watcher.get(), context]() {
0162                                     context->removeWatcher(watcher);
0163                                     if (context->pending.empty()) {
0164                                         future.setResult(context);
0165                                     }
0166                                 });
0167                 watcher->setFuture(subFuture);
0168                 context->pending.push_back(std::move(watcher));
0169             }
0170             if (context->pending.empty()) {
0171                 future.setResult(context);
0172             }
0173         })
0174         .template then<void, Context*>([](Context *context) {
0175             delete context;
0176         });
0177         // .finally<void>([context]() { delete context; });
0178 }
0179 
0180 template<typename List, typename ValueType>
0181 Job<void, List> forEach(KAsync::Job<void, ValueType> job)
0182 {
0183     auto cont = [job] (const List &values) mutable {
0184             auto error = QSharedPointer<KAsync::Error>::create();
0185             QVector<KAsync::Future<void>> list;
0186             for (const auto &v : values) {
0187                 auto future = job
0188                     .template then<void>([error] (const KAsync::Error &e) {
0189                         if (e && !*error) {
0190                             //TODO ideally we would aggregate the errors instead of just using the first one
0191                             *error = e;
0192                         }
0193                     })
0194                     .exec(v);
0195                 list.push_back(future);
0196             }
0197             return waitForCompletion(list)
0198                 .then<void>([error](KAsync::Future<void> &future) {
0199                     if (*error) {
0200                         future.setError(*error);
0201                     } else {
0202                         future.setFinished();
0203                     }
0204                 });
0205         };
0206     return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
0207                 Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
0208 }
0209 
0210 
0211 template<typename List, typename ValueType>
0212 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job)
0213 {
0214     auto cont = [job] (const List &values) mutable {
0215             auto error = QSharedPointer<KAsync::Error>::create();
0216             auto serialJob = KAsync::null<void>();
0217             for (const auto &value : values) {
0218                 serialJob = serialJob.then<void>([value, job, error](KAsync::Future<void> &future) {
0219                     job.template then<void>([&future, error] (const KAsync::Error &e) {
0220                         if (e && !*error) {
0221                             //TODO ideally we would aggregate the errors instead of just using the first one
0222                             *error = e;
0223                         }
0224                         future.setFinished();
0225                     })
0226                     .exec(value);
0227                 });
0228             }
0229             return serialJob
0230                 .then<void>([error](KAsync::Future<void> &future) {
0231                     if (*error) {
0232                         future.setError(*error);
0233                     } else {
0234                         future.setFinished();
0235                     }
0236                 });
0237         };
0238     return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
0239             Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
0240 }
0241 
0242 template<typename List, typename ValueType>
0243 Job<void, List> forEach(JobContinuation<void, ValueType> &&func)
0244 {
0245     return forEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
0246 }
0247 
0248 template<typename List, typename ValueType>
0249 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&func)
0250 {
0251     return serialForEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
0252 }
0253 
0254 template<typename Out>
0255 Job<Out> null()
0256 {
0257     return KAsync::start<Out>(
0258         [](KAsync::Future<Out> &future) {
0259             future.setFinished();
0260         });
0261 }
0262 
0263 template<typename Out>
0264 Job<Out> value(Out v)
0265 {
0266     return KAsync::start<Out>(
0267         [val = std::move(v)](KAsync::Future<Out> &future) {
0268             future.setResult(val);
0269         });
0270 }
0271 
0272 template<typename Out>
0273 Job<Out> error(int errorCode, const QString &errorMessage)
0274 {
0275     return error<Out>({errorCode, errorMessage});
0276 }
0277 
0278 template<typename Out>
0279 Job<Out> error(const char *message)
0280 {
0281     return error<Out>(Error(message));
0282 }
0283 
0284 template<typename Out>
0285 Job<Out> error(const Error &error)
0286 {
0287     return KAsync::start<Out>(
0288         [error](KAsync::Future<Out> &future) {
0289             future.setError(error);
0290         });
0291 }
0292 
0293 inline Job<void> doWhile(const Job<ControlFlowFlag> &body)
0294 {
0295     return KAsync::start<void>([body] (KAsync::Future<void> &future) {
0296         auto job = body.then<void, ControlFlowFlag>([&future, body](const KAsync::Error &error, ControlFlowFlag flag) {
0297             if (error) {
0298                 future.setError(error);
0299                 future.setFinished();
0300             } else if (flag == ControlFlowFlag::Continue) {
0301                 doWhile(body).then<void>([&future](const KAsync::Error &error) {
0302                     if (error) {
0303                         future.setError(error);
0304                     }
0305                     future.setFinished();
0306                 }).exec();
0307             } else {
0308                 future.setFinished();
0309             }
0310         }).exec();
0311     });
0312 }
0313 
0314 inline Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body)
0315 {
0316     return doWhile(KAsync::start<ControlFlowFlag>([body] {
0317         return body();
0318     }));
0319 }
0320 
0321 inline Job<void> wait(int delay)
0322 {
0323     return KAsync::start<void>([delay](KAsync::Future<void> &future) {
0324         QTimer::singleShot(delay, [&future]() {
0325             future.setFinished();
0326         });
0327     });
0328 }
0329 } // namespace KAsync
0330 
0331 //@endcond
0332 
0333 #endif // KASYNC_JOB_IMPL_H