File indexing completed on 2024-05-12 05:26:04
0001 /* 0002 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 0021 #pragma once 0022 0023 #include <functional> 0024 #include <memory> 0025 #include <QMutexLocker> 0026 #include <QPointer> 0027 0028 namespace Sink { 0029 0030 /** 0031 * Query result set 0032 */ 0033 template <class T> 0034 class ResultEmitter; 0035 0036 template <class T> 0037 class ResultProviderInterface 0038 { 0039 public: 0040 ResultProviderInterface() : mRevision(0) 0041 { 0042 } 0043 0044 virtual ~ResultProviderInterface() 0045 { 0046 } 0047 0048 virtual void add(const T &value) = 0; 0049 virtual void modify(const T &value) = 0; 0050 virtual void remove(const T &value) = 0; 0051 virtual void initialResultSetComplete(bool) = 0; 0052 virtual void complete() = 0; 0053 virtual void clear() = 0; 0054 virtual void setFetcher(const std::function<void()> &fetcher) = 0; 0055 0056 void setRevision(qint64 revision) 0057 { 0058 mRevision = revision; 0059 } 0060 0061 qint64 revision() const 0062 { 0063 return mRevision; 0064 } 0065 0066 private: 0067 qint64 mRevision; 0068 }; 0069 0070 /* 0071 * The promise side for the result emitter 0072 */ 0073 template <class T> 0074 class ResultProvider : public ResultProviderInterface<T> 0075 { 0076 public: 0077 typedef QSharedPointer<ResultProvider<T>> Ptr; 0078 0079 virtual ~ResultProvider() Q_DECL_OVERRIDE 0080 { 0081 } 0082 0083 // Called from worker thread 0084 void add(const T &value) Q_DECL_OVERRIDE 0085 { 0086 if (auto strongRef = mResultEmitter.toStrongRef()) { 0087 strongRef->add(value); 0088 } 0089 } 0090 0091 void modify(const T &value) Q_DECL_OVERRIDE 0092 { 0093 if (auto strongRef = mResultEmitter.toStrongRef()) { 0094 strongRef->modify(value); 0095 } 0096 } 0097 0098 void remove(const T &value) Q_DECL_OVERRIDE 0099 { 0100 if (auto strongRef = mResultEmitter.toStrongRef()) { 0101 strongRef->remove(value); 0102 } 0103 } 0104 0105 void initialResultSetComplete(bool replayedAll) Q_DECL_OVERRIDE 0106 { 0107 if (auto strongRef = mResultEmitter.toStrongRef()) { 0108 strongRef->initialResultSetComplete(replayedAll); 0109 } 0110 } 0111 0112 // Called from worker thread 0113 void complete() Q_DECL_OVERRIDE 0114 { 0115 if (auto strongRef = mResultEmitter.toStrongRef()) { 0116 strongRef->complete(); 0117 } 0118 } 0119 0120 void clear() Q_DECL_OVERRIDE 0121 { 0122 if (auto strongRef = mResultEmitter.toStrongRef()) { 0123 strongRef->clear(); 0124 } 0125 } 0126 0127 0128 QSharedPointer<ResultEmitter<T>> emitter() 0129 { 0130 if (!mResultEmitter) { 0131 // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again 0132 auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) { 0133 done(); 0134 delete emitter; 0135 }); 0136 mResultEmitter = sharedPtr; 0137 sharedPtr->setFetcher([this]() { 0138 Q_ASSERT(mFetcher); 0139 mFetcher(); 0140 }); 0141 return sharedPtr; 0142 } 0143 0144 return mResultEmitter.toStrongRef(); 0145 } 0146 0147 void onDone(const std::function<void()> &callback) 0148 { 0149 mOnDoneCallback = callback; 0150 } 0151 0152 bool isDone() const 0153 { 0154 // The existance of the emitter currently defines wether we're done or not. 0155 return mResultEmitter.toStrongRef().isNull(); 0156 } 0157 0158 void setFetcher(const std::function<void()> &fetcher) Q_DECL_OVERRIDE 0159 { 0160 mFetcher = fetcher; 0161 } 0162 0163 private: 0164 void done() 0165 { 0166 if (mOnDoneCallback) { 0167 auto callback = mOnDoneCallback; 0168 mOnDoneCallback = std::function<void()>(); 0169 // This may delete this object 0170 callback(); 0171 } 0172 } 0173 0174 QWeakPointer<ResultEmitter<T>> mResultEmitter; 0175 std::function<void()> mOnDoneCallback; 0176 std::function<void()> mFetcher; 0177 }; 0178 0179 /* 0180 * The future side for the client. 0181 * 0182 * It does not directly hold the state. 0183 * 0184 * The advantage of this is that we can specialize it to: 0185 * * do inline transformations to the data 0186 * * directly store the state in a suitable datastructure: QList, QSet, std::list, QVector, ... 0187 * * build async interfaces with signals 0188 * * build sync interfaces that block when accessing the value 0189 * 0190 */ 0191 template <class DomainType> 0192 class ResultEmitter 0193 { 0194 public: 0195 typedef QSharedPointer<ResultEmitter<DomainType>> Ptr; 0196 0197 virtual ~ResultEmitter() 0198 { 0199 //Try locking in case we're in the middle of an execution in another thread 0200 QMutexLocker locker{&mMutex}; 0201 } 0202 0203 virtual void waitForMethodExecutionEnd() 0204 { 0205 //If we're in the middle of a method execution, this will block until the method is done. 0206 QMutexLocker locker{&mMutex}; 0207 mDone = true; 0208 } 0209 0210 void onAdded(const std::function<void(const DomainType &)> &handler) 0211 { 0212 addHandler = handler; 0213 } 0214 0215 void onModified(const std::function<void(const DomainType &)> &handler) 0216 { 0217 modifyHandler = handler; 0218 } 0219 0220 void onRemoved(const std::function<void(const DomainType &)> &handler) 0221 { 0222 removeHandler = handler; 0223 } 0224 0225 void onInitialResultSetComplete(const std::function<void(bool)> &handler) 0226 { 0227 initialResultSetCompleteHandler = handler; 0228 } 0229 0230 void onComplete(const std::function<void(void)> &handler) 0231 { 0232 completeHandler = handler; 0233 } 0234 0235 void onClear(const std::function<void(void)> &handler) 0236 { 0237 clearHandler = handler; 0238 } 0239 0240 bool guardOk() 0241 { 0242 return !mDone; 0243 } 0244 0245 void add(const DomainType &value) 0246 { 0247 QMutexLocker locker{&mMutex}; 0248 if (guardOk()) { 0249 if (addHandler) { 0250 addHandler(value); 0251 } 0252 } 0253 } 0254 0255 void modify(const DomainType &value) 0256 { 0257 QMutexLocker locker{&mMutex}; 0258 if (guardOk()) { 0259 if (modifyHandler) { 0260 modifyHandler(value); 0261 } 0262 } 0263 } 0264 0265 void remove(const DomainType &value) 0266 { 0267 QMutexLocker locker{&mMutex}; 0268 if (guardOk()) { 0269 if (removeHandler) { 0270 removeHandler(value); 0271 } 0272 } 0273 } 0274 0275 void initialResultSetComplete(bool replayedAll) 0276 { 0277 //This callback is only ever called from the main thread, so we don't do any locking 0278 if (initialResultSetCompleteHandler && guardOk()) { 0279 if (initialResultSetCompleteHandler) { 0280 //This can directly lead to our destruction and thus waitForMethodExecutionEnd 0281 initialResultSetCompleteHandler(replayedAll); 0282 } 0283 } 0284 } 0285 0286 void complete() 0287 { 0288 QMutexLocker locker{&mMutex}; 0289 if (completeHandler && guardOk()) { 0290 if (completeHandler) { 0291 completeHandler(); 0292 } 0293 } 0294 } 0295 0296 void clear() 0297 { 0298 QMutexLocker locker{&mMutex}; 0299 if (clearHandler && guardOk()) { 0300 if (clearHandler) { 0301 clearHandler(); 0302 } 0303 } 0304 } 0305 0306 void setFetcher(const std::function<void()> &fetcher) 0307 { 0308 mFetcher = fetcher; 0309 } 0310 0311 virtual void fetch() 0312 { 0313 if (mFetcher) { 0314 mFetcher(); 0315 } 0316 } 0317 0318 private: 0319 friend class ResultProvider<DomainType>; 0320 0321 std::function<void(const DomainType &)> addHandler; 0322 std::function<void(const DomainType &)> modifyHandler; 0323 std::function<void(const DomainType &)> removeHandler; 0324 std::function<void(bool)> initialResultSetCompleteHandler; 0325 std::function<void(void)> completeHandler; 0326 std::function<void(void)> clearHandler; 0327 0328 std::function<void()> mFetcher; 0329 /* 0330 * This mutex is here to protect the emitter from getting destroyed while the producer-thread (ResultProvider) is calling into it, 0331 * and vice-verca, to protect the producer thread from calling into a destroyed emitter. 0332 * 0333 * This is necessary because Emitter and ResultProvider have lifetimes managed by two different threads. 0334 * The emitter lives in the application thread, and the resultprovider in the query thread. 0335 */ 0336 QMutex mMutex; 0337 bool mDone = false; 0338 }; 0339 0340 template <class DomainType> 0341 class AggregatingResultEmitter : public ResultEmitter<DomainType> 0342 { 0343 public: 0344 typedef QSharedPointer<AggregatingResultEmitter<DomainType>> Ptr; 0345 0346 ~AggregatingResultEmitter() Q_DECL_OVERRIDE 0347 { 0348 } 0349 0350 virtual void waitForMethodExecutionEnd() Q_DECL_OVERRIDE 0351 { 0352 for (const auto &emitter : mEmitter) { 0353 emitter->waitForMethodExecutionEnd(); 0354 } 0355 ResultEmitter<DomainType>::waitForMethodExecutionEnd(); 0356 } 0357 0358 void addEmitter(const typename ResultEmitter<DomainType>::Ptr &emitter) 0359 { 0360 Q_ASSERT(emitter); 0361 emitter->onAdded([this](const DomainType &value) { this->add(value); }); 0362 emitter->onModified([this](const DomainType &value) { this->modify(value); }); 0363 emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); 0364 auto ptr = emitter.data(); 0365 emitter->onInitialResultSetComplete([this, ptr](bool replayedAll) { 0366 if (replayedAll) { 0367 mAllResultsReplayed.remove(ptr); 0368 } 0369 mInitialResultSetInProgress.remove(ptr); 0370 callInitialResultCompleteIfDone(); 0371 }); 0372 emitter->onComplete([this]() { this->complete(); }); 0373 emitter->onClear([this]() { this->clear(); }); 0374 mEmitter << emitter; 0375 } 0376 0377 void callInitialResultCompleteIfDone() 0378 { 0379 // Normally a parent is only in a single resource, except the toplevel (invalid) parent 0380 if (mInitialResultSetInProgress.isEmpty() && mAllResultsFetched && !mResultEmitted) { 0381 mResultEmitted = true; 0382 this->initialResultSetComplete(mAllResultsReplayed.isEmpty()); 0383 } 0384 } 0385 0386 void fetch() Q_DECL_OVERRIDE 0387 { 0388 if (mEmitter.isEmpty()) { 0389 this->initialResultSetComplete(true); 0390 } else { 0391 mResultEmitted = false; 0392 mAllResultsFetched = false; 0393 mInitialResultSetInProgress.clear(); 0394 mAllResultsReplayed.clear(); 0395 for (const auto &emitter : mEmitter) { 0396 mInitialResultSetInProgress.insert(emitter.data()); 0397 mAllResultsReplayed.insert(emitter.data()); 0398 emitter->fetch(); 0399 } 0400 mAllResultsFetched = true; 0401 callInitialResultCompleteIfDone(); 0402 } 0403 } 0404 0405 private: 0406 QList<typename ResultEmitter<DomainType>::Ptr> mEmitter; 0407 QSet<ResultEmitter<DomainType>*> mInitialResultSetInProgress; 0408 QSet<ResultEmitter<DomainType>*> mAllResultsReplayed; 0409 bool mAllResultsFetched; 0410 bool mResultEmitted; 0411 }; 0412 }