File indexing completed on 2024-05-12 05:26:04

0001 /*
0002  * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 
0021 #pragma once
0022 
0023 #include <functional>
0024 #include <memory>
0025 #include <QMutexLocker>
0026 #include <QPointer>
0027 
0028 namespace Sink {
0029 
0030 /**
0031 * Query result set
0032 */
0033 template <class T>
0034 class ResultEmitter;
0035 
0036 template <class T>
0037 class ResultProviderInterface
0038 {
0039 public:
0040     ResultProviderInterface() : mRevision(0)
0041     {
0042     }
0043 
0044     virtual ~ResultProviderInterface()
0045     {
0046     }
0047 
0048     virtual void add(const T &value) = 0;
0049     virtual void modify(const T &value) = 0;
0050     virtual void remove(const T &value) = 0;
0051     virtual void initialResultSetComplete(bool) = 0;
0052     virtual void complete() = 0;
0053     virtual void clear() = 0;
0054     virtual void setFetcher(const std::function<void()> &fetcher) = 0;
0055 
0056     void setRevision(qint64 revision)
0057     {
0058         mRevision = revision;
0059     }
0060 
0061     qint64 revision() const
0062     {
0063         return mRevision;
0064     }
0065 
0066 private:
0067     qint64 mRevision;
0068 };
0069 
0070 /*
0071 * The promise side for the result emitter
0072 */
0073 template <class T>
0074 class ResultProvider : public ResultProviderInterface<T>
0075 {
0076 public:
0077     typedef QSharedPointer<ResultProvider<T>> Ptr;
0078 
0079     virtual ~ResultProvider() Q_DECL_OVERRIDE
0080     {
0081     }
0082 
0083     // Called from worker thread
0084     void add(const T &value) Q_DECL_OVERRIDE
0085     {
0086         if (auto strongRef = mResultEmitter.toStrongRef()) {
0087             strongRef->add(value);
0088         }
0089     }
0090 
0091     void modify(const T &value) Q_DECL_OVERRIDE
0092     {
0093         if (auto strongRef = mResultEmitter.toStrongRef()) {
0094             strongRef->modify(value);
0095         }
0096     }
0097 
0098     void remove(const T &value) Q_DECL_OVERRIDE
0099     {
0100         if (auto strongRef = mResultEmitter.toStrongRef()) {
0101             strongRef->remove(value);
0102         }
0103     }
0104 
0105     void initialResultSetComplete(bool replayedAll) Q_DECL_OVERRIDE
0106     {
0107         if (auto strongRef = mResultEmitter.toStrongRef()) {
0108             strongRef->initialResultSetComplete(replayedAll);
0109         }
0110     }
0111 
0112     // Called from worker thread
0113     void complete() Q_DECL_OVERRIDE
0114     {
0115         if (auto strongRef = mResultEmitter.toStrongRef()) {
0116             strongRef->complete();
0117         }
0118     }
0119 
0120     void clear() Q_DECL_OVERRIDE
0121     {
0122         if (auto strongRef = mResultEmitter.toStrongRef()) {
0123             strongRef->clear();
0124         }
0125     }
0126 
0127 
0128     QSharedPointer<ResultEmitter<T>> emitter()
0129     {
0130         if (!mResultEmitter) {
0131             // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
0132             auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) {
0133                 done();
0134                 delete emitter;
0135             });
0136             mResultEmitter = sharedPtr;
0137             sharedPtr->setFetcher([this]() {
0138                 Q_ASSERT(mFetcher);
0139                 mFetcher();
0140             });
0141             return sharedPtr;
0142         }
0143 
0144         return mResultEmitter.toStrongRef();
0145     }
0146 
0147     void onDone(const std::function<void()> &callback)
0148     {
0149         mOnDoneCallback = callback;
0150     }
0151 
0152     bool isDone() const
0153     {
0154         // The existance of the emitter currently defines wether we're done or not.
0155         return mResultEmitter.toStrongRef().isNull();
0156     }
0157 
0158     void setFetcher(const std::function<void()> &fetcher) Q_DECL_OVERRIDE
0159     {
0160         mFetcher = fetcher;
0161     }
0162 
0163 private:
0164     void done()
0165     {
0166         if (mOnDoneCallback) {
0167             auto callback = mOnDoneCallback;
0168             mOnDoneCallback = std::function<void()>();
0169             // This may delete this object
0170             callback();
0171         }
0172     }
0173 
0174     QWeakPointer<ResultEmitter<T>> mResultEmitter;
0175     std::function<void()> mOnDoneCallback;
0176     std::function<void()> mFetcher;
0177 };
0178 
0179 /*
0180 * The future side for the client.
0181 *
0182 * It does not directly hold the state.
0183 *
0184 * The advantage of this is that we can specialize it to:
0185 * * do inline transformations to the data
0186 * * directly store the state in a suitable datastructure: QList, QSet, std::list, QVector, ...
0187 * * build async interfaces with signals
0188 * * build sync interfaces that block when accessing the value
0189 *
0190 */
0191 template <class DomainType>
0192 class ResultEmitter
0193 {
0194 public:
0195     typedef QSharedPointer<ResultEmitter<DomainType>> Ptr;
0196 
0197     virtual ~ResultEmitter()
0198     {
0199         //Try locking in case we're in the middle of an execution in another thread
0200         QMutexLocker locker{&mMutex};
0201     }
0202 
0203     virtual void waitForMethodExecutionEnd()
0204     {
0205         //If we're in the middle of a method execution, this will block until the method is done.
0206         QMutexLocker locker{&mMutex};
0207         mDone = true;
0208     }
0209 
0210     void onAdded(const std::function<void(const DomainType &)> &handler)
0211     {
0212         addHandler = handler;
0213     }
0214 
0215     void onModified(const std::function<void(const DomainType &)> &handler)
0216     {
0217         modifyHandler = handler;
0218     }
0219 
0220     void onRemoved(const std::function<void(const DomainType &)> &handler)
0221     {
0222         removeHandler = handler;
0223     }
0224 
0225     void onInitialResultSetComplete(const std::function<void(bool)> &handler)
0226     {
0227         initialResultSetCompleteHandler = handler;
0228     }
0229 
0230     void onComplete(const std::function<void(void)> &handler)
0231     {
0232         completeHandler = handler;
0233     }
0234 
0235     void onClear(const std::function<void(void)> &handler)
0236     {
0237         clearHandler = handler;
0238     }
0239 
0240     bool guardOk()
0241     {
0242         return !mDone;
0243     }
0244 
0245     void add(const DomainType &value)
0246     {
0247         QMutexLocker locker{&mMutex};
0248         if (guardOk()) {
0249             if (addHandler) {
0250                 addHandler(value);
0251             }
0252         }
0253     }
0254 
0255     void modify(const DomainType &value)
0256     {
0257         QMutexLocker locker{&mMutex};
0258         if (guardOk()) {
0259             if (modifyHandler) {
0260                 modifyHandler(value);
0261             }
0262         }
0263     }
0264 
0265     void remove(const DomainType &value)
0266     {
0267         QMutexLocker locker{&mMutex};
0268         if (guardOk()) {
0269             if (removeHandler) {
0270                 removeHandler(value);
0271             }
0272         }
0273     }
0274 
0275     void initialResultSetComplete(bool replayedAll)
0276     {
0277         //This callback is only ever called from the main thread, so we don't do any locking
0278         if (initialResultSetCompleteHandler && guardOk()) {
0279             if (initialResultSetCompleteHandler) {
0280                 //This can directly lead to our destruction and thus waitForMethodExecutionEnd
0281                 initialResultSetCompleteHandler(replayedAll);
0282             }
0283         }
0284     }
0285 
0286     void complete()
0287     {
0288         QMutexLocker locker{&mMutex};
0289         if (completeHandler && guardOk()) {
0290             if (completeHandler) {
0291                 completeHandler();
0292             }
0293         }
0294     }
0295 
0296     void clear()
0297     {
0298         QMutexLocker locker{&mMutex};
0299         if (clearHandler && guardOk()) {
0300             if (clearHandler) {
0301                 clearHandler();
0302             }
0303         }
0304     }
0305 
0306     void setFetcher(const std::function<void()> &fetcher)
0307     {
0308         mFetcher = fetcher;
0309     }
0310 
0311     virtual void fetch()
0312     {
0313         if (mFetcher) {
0314             mFetcher();
0315         }
0316     }
0317 
0318 private:
0319     friend class ResultProvider<DomainType>;
0320 
0321     std::function<void(const DomainType &)> addHandler;
0322     std::function<void(const DomainType &)> modifyHandler;
0323     std::function<void(const DomainType &)> removeHandler;
0324     std::function<void(bool)> initialResultSetCompleteHandler;
0325     std::function<void(void)> completeHandler;
0326     std::function<void(void)> clearHandler;
0327 
0328     std::function<void()> mFetcher;
0329     /*
0330      * This mutex is here to protect the emitter from getting destroyed while the producer-thread (ResultProvider) is calling into it,
0331      * and vice-verca, to protect the producer thread from calling into a destroyed emitter.
0332      *
0333      * This is necessary because Emitter and ResultProvider have lifetimes managed by two different threads.
0334      * The emitter lives in the application thread, and the resultprovider in the query thread.
0335      */
0336     QMutex mMutex;
0337     bool mDone = false;
0338 };
0339 
0340 template <class DomainType>
0341 class AggregatingResultEmitter : public ResultEmitter<DomainType>
0342 {
0343 public:
0344     typedef QSharedPointer<AggregatingResultEmitter<DomainType>> Ptr;
0345 
0346     ~AggregatingResultEmitter() Q_DECL_OVERRIDE
0347     {
0348     }
0349 
0350     virtual void waitForMethodExecutionEnd() Q_DECL_OVERRIDE
0351     {
0352         for (const auto &emitter : mEmitter) {
0353             emitter->waitForMethodExecutionEnd();
0354         }
0355         ResultEmitter<DomainType>::waitForMethodExecutionEnd();
0356     }
0357 
0358     void addEmitter(const typename ResultEmitter<DomainType>::Ptr &emitter)
0359     {
0360         Q_ASSERT(emitter);
0361         emitter->onAdded([this](const DomainType &value) { this->add(value); });
0362         emitter->onModified([this](const DomainType &value) { this->modify(value); });
0363         emitter->onRemoved([this](const DomainType &value) { this->remove(value); });
0364         auto ptr = emitter.data();
0365         emitter->onInitialResultSetComplete([this, ptr](bool replayedAll) {
0366             if (replayedAll) {
0367                 mAllResultsReplayed.remove(ptr);
0368             }
0369             mInitialResultSetInProgress.remove(ptr);
0370             callInitialResultCompleteIfDone();
0371         });
0372         emitter->onComplete([this]() { this->complete(); });
0373         emitter->onClear([this]() { this->clear(); });
0374         mEmitter << emitter;
0375     }
0376 
0377     void callInitialResultCompleteIfDone()
0378     {
0379         // Normally a parent is only in a single resource, except the toplevel (invalid) parent
0380         if (mInitialResultSetInProgress.isEmpty() && mAllResultsFetched && !mResultEmitted) {
0381             mResultEmitted = true;
0382             this->initialResultSetComplete(mAllResultsReplayed.isEmpty());
0383         }
0384     }
0385 
0386     void fetch() Q_DECL_OVERRIDE
0387     {
0388         if (mEmitter.isEmpty()) {
0389             this->initialResultSetComplete(true);
0390         } else {
0391             mResultEmitted = false;
0392             mAllResultsFetched = false;
0393             mInitialResultSetInProgress.clear();
0394             mAllResultsReplayed.clear();
0395             for (const auto &emitter : mEmitter) {
0396                 mInitialResultSetInProgress.insert(emitter.data());
0397                 mAllResultsReplayed.insert(emitter.data());
0398                 emitter->fetch();
0399             }
0400             mAllResultsFetched = true;
0401             callInitialResultCompleteIfDone();
0402         }
0403     }
0404 
0405 private:
0406     QList<typename ResultEmitter<DomainType>::Ptr> mEmitter;
0407     QSet<ResultEmitter<DomainType>*> mInitialResultSetInProgress;
0408     QSet<ResultEmitter<DomainType>*> mAllResultsReplayed;
0409     bool mAllResultsFetched;
0410     bool mResultEmitted;
0411 };
0412 }