File indexing completed on 2024-05-12 05:26:02

0001 /*
0002     Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
0003 
0004     This library is free software; you can redistribute it and/or modify it
0005     under the terms of the GNU Library General Public License as published by
0006     the Free Software Foundation; either version 2 of the License, or (at your
0007     option) any later version.
0008 
0009     This library is distributed in the hope that it will be useful, but WITHOUT
0010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
0012     License for more details.
0013 
0014     You should have received a copy of the GNU Library General Public License
0015     along with this library; see the file COPYING.LIB.  If not, write to the
0016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
0017     02110-1301, USA.
0018 */
0019 #include "queryrunner.h"
0020 
0021 #include <limits>
0022 #include <QTime>
0023 #include <QPointer>
0024 #include <thread>
0025 #include <chrono>
0026 
0027 #include "commands.h"
0028 #include "asyncutils.h"
0029 #include "datastorequery.h"
0030 
0031 using namespace Sink;
0032 using namespace Sink::Storage;
0033 
0034 struct ReplayResult {
0035     qint64 newRevision;
0036     qint64 replayedEntities;
0037     bool replayedAll;
0038     DataStoreQuery::State::Ptr queryState;
0039 };
0040 
0041 /*
0042  * This class wraps the actual query implementation.
0043  *
0044  * This is a worker object that can be moved to a thread to execute the query.
0045  * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result.
0046  */
0047 template <typename DomainType>
0048 class QueryWorker : public QObject
0049 {
0050 public:
0051     QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
0052     ~QueryWorker() override;
0053 
0054     ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
0055     ReplayResult executeInitialQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
0056 
0057 private:
0058     void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
0059 
0060     QueryRunnerBase::ResultTransformation mResultTransformation;
0061     ResourceContext mResourceContext;
0062     Sink::Log::Context mLogCtx;
0063 };
0064 
0065 template <class DomainType>
0066 QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx)
0067     : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()), mLogCtx(logCtx.subContext("queryrunner"))
0068 {
0069     SinkTraceCtx(mLogCtx) << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit();
0070     if (query.limit() && query.sortProperty().isEmpty()) {
0071         SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
0072     }
0073     // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
0074     mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); });
0075 
0076     // In case of a live query we keep the runner for as long alive as the result provider exists
0077     if (query.liveQuery()) {
0078         Q_ASSERT(!query.synchronousQuery());
0079         // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
0080         setQuery([=]() { return incrementalFetch(query, bufferType); });
0081         // Ensure the connection is open, if it wasn't already opened
0082         mResourceAccess->open();
0083         QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
0084         // open is not synchronous, so from the time when the initial query is started until we have started and connected to the resource, it's possible to miss updates. We therefore unconditionally try to fetch new entities once we are connected.
0085         QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::ready, this, [this] (bool ready) {
0086             if (ready) {
0087                 revisionChanged();
0088             }
0089         });
0090     }
0091     mResultProvider->onDone([this]() {
0092         delete this;
0093     });
0094 }
0095 
0096 template <class DomainType>
0097 QueryRunner<DomainType>::~QueryRunner()
0098 {
0099     SinkTraceCtx(mLogCtx) << "Stopped query";
0100 }
0101 
0102 
0103 template <class DomainType>
0104 void QueryRunner<DomainType>::delayNextQuery()
0105 {
0106     mDelayNextQuery = true;
0107 }
0108 
0109 //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
0110 template <class DomainType>
0111 void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
0112 {
0113     SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
0114     if (mQueryInProgress) {
0115         SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
0116         mRequestFetchMore = true;
0117         return;
0118     }
0119     mQueryInProgress = true;
0120 
0121     // Immediately protect from cleanup for live queries (there's not need to start the resource otherwise).
0122     // Once the initial query is done we can relax the protection up until the revision we have read.
0123     // Ideally we would only start the query once we have succesfully protected the revision we're about to read.
0124     if (query.liveQuery()) {
0125         mResourceAccess->sendRevisionReplayedCommand(0).exec();
0126     }
0127 
0128     bool addDelay = mDelayNextQuery;
0129     mDelayNextQuery = false;
0130     const bool runAsync = !query.synchronousQuery();
0131     //The lambda will be executed in a separate thread, so copy all arguments
0132     async::run<ReplayResult>([query,
0133                               bufferType,
0134                               resultProvider = mResultProvider,
0135                               resourceContext = mResourceContext,
0136                               logCtx = mLogCtx,
0137                               state = mQueryState,
0138                               resultTransformation = mResultTransformation,
0139                               batchSize = mBatchSize,
0140                               addDelay]() {
0141         QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
0142         const auto result =  worker.executeInitialQuery(query, *resultProvider, batchSize, state);
0143 
0144         //For testing only
0145         if (addDelay) {
0146             std::this_thread::sleep_for(std::chrono::seconds(1));
0147         }
0148 
0149         return result;
0150     }, runAsync)
0151         .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &result) {
0152             if (!guardPtr) {
0153                 //Not an error, the query can vanish at any time.
0154                 return;
0155             }
0156             mInitialQueryComplete = true;
0157             mQueryInProgress = false;
0158             mQueryState = result.queryState;
0159             if (query.liveQuery()) {
0160                 // Relax the lower bound protection to the latest read revision.
0161                 mResourceAccess->sendRevisionReplayedCommand(result.newRevision).exec();
0162             }
0163             //Initial queries do not fetch updates, so avoid updating the revision when fetching more content.
0164             //Otherwise we end up breaking incremental updates.
0165             if (!mResultProvider->revision()) {
0166                 mResultProvider->setRevision(result.newRevision);
0167             }
0168             mResultProvider->initialResultSetComplete(result.replayedAll);
0169             if (mRequestFetchMore) {
0170                 mRequestFetchMore = false;
0171                 //This code exists for incemental fetches, so we don't skip loading another set.
0172                 fetch(query, bufferType);
0173                 return;
0174             }
0175             if (mRevisionChangedMeanwhile) {
0176                 incrementalFetch(query, bufferType).exec();
0177             }
0178         })
0179         .exec();
0180 }
0181 
0182 template <class DomainType>
0183 KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
0184 {
0185     if (!mInitialQueryComplete && !mQueryInProgress) {
0186         //We rely on this codepath in the case of newly added resources to trigger the initial fetch.
0187         fetch(query, bufferType);
0188         return KAsync::null();
0189     }
0190     if (mQueryInProgress) {
0191         //If a query is already in progress we just remember to fetch again once the current query is done.
0192         mRevisionChangedMeanwhile = true;
0193         return KAsync::null();
0194     }
0195     mRevisionChangedMeanwhile = false;
0196     Q_ASSERT(!mQueryInProgress);
0197     bool addDelay = mDelayNextQuery;
0198     mDelayNextQuery = false;
0199     return KAsync::start([&] {
0200             mQueryInProgress = true;
0201         })
0202         //The lambda will be executed in a separate thread, so copy all arguments
0203         .then(async::run<ReplayResult>([query,
0204                                         bufferType,
0205                                         resultProvider = mResultProvider,
0206                                         resourceContext = mResourceContext,
0207                                         logCtx = mLogCtx,
0208                                         state = mQueryState,
0209                                         resultTransformation = mResultTransformation,
0210                                         addDelay]() {
0211                 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
0212                 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state);
0213                 ////For testing only
0214                 if (addDelay) {
0215                     SinkWarning() << "Sleeping in incremental query";
0216                     std::this_thread::sleep_for(std::chrono::seconds(1));
0217                 }
0218 
0219                 return result;
0220             }))
0221         .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &newRevisionAndReplayedEntities) {
0222             if (!guardPtr) {
0223                 //Not an error, the query can vanish at any time.
0224                 return KAsync::null();
0225             }
0226             mQueryInProgress = false;
0227             mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision).exec();
0228             mResultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
0229             if (mRevisionChangedMeanwhile) {
0230                 return incrementalFetch(query, bufferType);
0231             }
0232             return KAsync::null();
0233         });
0234 }
0235 
0236 template <class DomainType>
0237 void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
0238 {
0239     mResultTransformation = transformation;
0240 }
0241 
0242 template <class DomainType>
0243 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
0244 {
0245     return mResultProvider->emitter();
0246 }
0247 
0248 template <class DomainType>
0249 QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext,
0250     const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx)
0251     : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mLogCtx(logCtx.subContext("worker"))
0252 {
0253     SinkTraceCtx(mLogCtx) << "Starting query worker";
0254 }
0255 
0256 template <class DomainType>
0257 QueryWorker<DomainType>::~QueryWorker()
0258 {
0259     SinkTraceCtx(mLogCtx) << "Stopped query worker";
0260 }
0261 
0262 static QString operationName(Sink::Operation operation)
0263 {
0264     switch (operation) {
0265         case Sink::Operation_Creation:
0266             return "Creation";
0267         case Sink::Operation_Modification:
0268             return "Modification";
0269         case Sink::Operation_Removal:
0270             return "Removal";
0271     }
0272     return "Unknown Operation";
0273 }
0274 
0275 template <class DomainType>
0276 void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result)
0277 {
0278     auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(result.entity, query.requestedProperties).template staticCast<DomainType>();
0279     for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) {
0280         valueCopy->setProperty(it.key(), it.value());
0281     }
0282     valueCopy->aggregatedIds() = [&] {
0283         QVector<QByteArray> aggregateIdsBA;
0284         aggregateIdsBA.reserve(result.aggregateIds.size());
0285         for (const auto &id : result.aggregateIds) {
0286             aggregateIdsBA << id.toDisplayByteArray();
0287         }
0288         return aggregateIdsBA;
0289     }();
0290     if (mResultTransformation) {
0291         mResultTransformation(*valueCopy);
0292     }
0293     SinkTraceCtx(mLogCtx) << "Replaying: " << operationName(result.operation) << "\n" <<*valueCopy;
0294     switch (result.operation) {
0295         case Sink::Operation_Creation:
0296             //SinkTraceCtx(mLogCtx) << "Got creation: " << valueCopy->identifier();
0297             resultProvider.add(valueCopy);
0298             break;
0299         case Sink::Operation_Modification:
0300             //SinkTraceCtx(mLogCtx) << "Got modification: " << valueCopy->identifier();
0301             resultProvider.modify(valueCopy);
0302             break;
0303         case Sink::Operation_Removal:
0304             //SinkTraceCtx(mLogCtx) << "Got removal: " << valueCopy->identifier();
0305             resultProvider.remove(valueCopy);
0306             break;
0307     }
0308 }
0309 
0310 template <class DomainType>
0311 ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state)
0312 {
0313     QTime time;
0314     time.start();
0315 
0316     const qint64 baseRevision = resultProvider.revision() + 1;
0317 
0318     auto entityStore = EntityStore{mResourceContext, mLogCtx};
0319     const qint64 topRevision = entityStore.maxRevision();
0320     SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision << " to revision " << topRevision;
0321     if (entityStore.lastCleanRevision() >= baseRevision) {
0322         //This is a situation we should never end up in. In case of removals some revisions may be gone entirely, which will result in failures later on.
0323         SinkErrorCtx(mLogCtx) << "Revision from which we want to replay is no longer available" << entityStore.lastCleanRevision();
0324         Q_ASSERT(false);
0325         return {0, 0, false, DataStoreQuery::State::Ptr{}};
0326     }
0327     if (!state) {
0328         SinkWarningCtx(mLogCtx) << "No previous query state.";
0329         return {0, 0, false, DataStoreQuery::State::Ptr{}};
0330     }
0331     auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true};
0332     auto resultSet = preparedQuery.update(baseRevision);
0333     SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
0334     auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
0335         resultProviderCallback(query, resultProvider, result);
0336     });
0337     preparedQuery.updateComplete();
0338     SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results until revision: " << topRevision << "\n"
0339         << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
0340         << "Incremental query took: " << Log::TraceTime(time.elapsed());
0341     return {topRevision, replayResult.replayedEntities, false, preparedQuery.getState()};
0342 }
0343 
0344 template <class DomainType>
0345 ReplayResult QueryWorker<DomainType>::executeInitialQuery(
0346     const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
0347 {
0348     QTime time;
0349     time.start();
0350 
0351     auto entityStore = EntityStore{mResourceContext, mLogCtx};
0352     const qint64 topRevision = entityStore.maxRevision();
0353     SinkTraceCtx(mLogCtx) << "Running query from revision: " << topRevision;
0354     auto preparedQuery = [&] {
0355         if (state) {
0356             return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
0357         } else {
0358             return DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
0359         }
0360     }();
0361     auto resultSet = preparedQuery.execute();
0362 
0363     SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
0364     auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
0365         resultProviderCallback(query, resultProvider, result);
0366     });
0367 
0368     SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
0369         << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
0370         << "Initial query took: " << Log::TraceTime(time.elapsed());
0371 
0372     return {topRevision, replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
0373 }
0374 
0375 #define REGISTER_TYPE(T) \
0376     template class QueryRunner<T>; \
0377     template class QueryWorker<T>; \
0378 
0379 SINK_REGISTER_TYPES()