File indexing completed on 2024-05-12 05:26:02
0001 /* 0002 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> 0003 0004 This library is free software; you can redistribute it and/or modify it 0005 under the terms of the GNU Library General Public License as published by 0006 the Free Software Foundation; either version 2 of the License, or (at your 0007 option) any later version. 0008 0009 This library is distributed in the hope that it will be useful, but WITHOUT 0010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 0011 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 0012 License for more details. 0013 0014 You should have received a copy of the GNU Library General Public License 0015 along with this library; see the file COPYING.LIB. If not, write to the 0016 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 0017 02110-1301, USA. 0018 */ 0019 #include "queryrunner.h" 0020 0021 #include <limits> 0022 #include <QTime> 0023 #include <QPointer> 0024 #include <thread> 0025 #include <chrono> 0026 0027 #include "commands.h" 0028 #include "asyncutils.h" 0029 #include "datastorequery.h" 0030 0031 using namespace Sink; 0032 using namespace Sink::Storage; 0033 0034 struct ReplayResult { 0035 qint64 newRevision; 0036 qint64 replayedEntities; 0037 bool replayedAll; 0038 DataStoreQuery::State::Ptr queryState; 0039 }; 0040 0041 /* 0042 * This class wraps the actual query implementation. 0043 * 0044 * This is a worker object that can be moved to a thread to execute the query. 0045 * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. 0046 */ 0047 template <typename DomainType> 0048 class QueryWorker : public QObject 0049 { 0050 public: 0051 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 0052 ~QueryWorker() override; 0053 0054 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 0055 ReplayResult executeInitialQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); 0056 0057 private: 0058 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 0059 0060 QueryRunnerBase::ResultTransformation mResultTransformation; 0061 ResourceContext mResourceContext; 0062 Sink::Log::Context mLogCtx; 0063 }; 0064 0065 template <class DomainType> 0066 QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx) 0067 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()), mLogCtx(logCtx.subContext("queryrunner")) 0068 { 0069 SinkTraceCtx(mLogCtx) << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); 0070 if (query.limit() && query.sortProperty().isEmpty()) { 0071 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 0072 } 0073 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 0074 mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); 0075 0076 // In case of a live query we keep the runner for as long alive as the result provider exists 0077 if (query.liveQuery()) { 0078 Q_ASSERT(!query.synchronousQuery()); 0079 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 0080 setQuery([=]() { return incrementalFetch(query, bufferType); }); 0081 // Ensure the connection is open, if it wasn't already opened 0082 mResourceAccess->open(); 0083 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); 0084 // open is not synchronous, so from the time when the initial query is started until we have started and connected to the resource, it's possible to miss updates. We therefore unconditionally try to fetch new entities once we are connected. 0085 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::ready, this, [this] (bool ready) { 0086 if (ready) { 0087 revisionChanged(); 0088 } 0089 }); 0090 } 0091 mResultProvider->onDone([this]() { 0092 delete this; 0093 }); 0094 } 0095 0096 template <class DomainType> 0097 QueryRunner<DomainType>::~QueryRunner() 0098 { 0099 SinkTraceCtx(mLogCtx) << "Stopped query"; 0100 } 0101 0102 0103 template <class DomainType> 0104 void QueryRunner<DomainType>::delayNextQuery() 0105 { 0106 mDelayNextQuery = true; 0107 } 0108 0109 //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. 0110 template <class DomainType> 0111 void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) 0112 { 0113 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; 0114 if (mQueryInProgress) { 0115 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; 0116 mRequestFetchMore = true; 0117 return; 0118 } 0119 mQueryInProgress = true; 0120 0121 // Immediately protect from cleanup for live queries (there's not need to start the resource otherwise). 0122 // Once the initial query is done we can relax the protection up until the revision we have read. 0123 // Ideally we would only start the query once we have succesfully protected the revision we're about to read. 0124 if (query.liveQuery()) { 0125 mResourceAccess->sendRevisionReplayedCommand(0).exec(); 0126 } 0127 0128 bool addDelay = mDelayNextQuery; 0129 mDelayNextQuery = false; 0130 const bool runAsync = !query.synchronousQuery(); 0131 //The lambda will be executed in a separate thread, so copy all arguments 0132 async::run<ReplayResult>([query, 0133 bufferType, 0134 resultProvider = mResultProvider, 0135 resourceContext = mResourceContext, 0136 logCtx = mLogCtx, 0137 state = mQueryState, 0138 resultTransformation = mResultTransformation, 0139 batchSize = mBatchSize, 0140 addDelay]() { 0141 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 0142 const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); 0143 0144 //For testing only 0145 if (addDelay) { 0146 std::this_thread::sleep_for(std::chrono::seconds(1)); 0147 } 0148 0149 return result; 0150 }, runAsync) 0151 .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &result) { 0152 if (!guardPtr) { 0153 //Not an error, the query can vanish at any time. 0154 return; 0155 } 0156 mInitialQueryComplete = true; 0157 mQueryInProgress = false; 0158 mQueryState = result.queryState; 0159 if (query.liveQuery()) { 0160 // Relax the lower bound protection to the latest read revision. 0161 mResourceAccess->sendRevisionReplayedCommand(result.newRevision).exec(); 0162 } 0163 //Initial queries do not fetch updates, so avoid updating the revision when fetching more content. 0164 //Otherwise we end up breaking incremental updates. 0165 if (!mResultProvider->revision()) { 0166 mResultProvider->setRevision(result.newRevision); 0167 } 0168 mResultProvider->initialResultSetComplete(result.replayedAll); 0169 if (mRequestFetchMore) { 0170 mRequestFetchMore = false; 0171 //This code exists for incemental fetches, so we don't skip loading another set. 0172 fetch(query, bufferType); 0173 return; 0174 } 0175 if (mRevisionChangedMeanwhile) { 0176 incrementalFetch(query, bufferType).exec(); 0177 } 0178 }) 0179 .exec(); 0180 } 0181 0182 template <class DomainType> 0183 KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) 0184 { 0185 if (!mInitialQueryComplete && !mQueryInProgress) { 0186 //We rely on this codepath in the case of newly added resources to trigger the initial fetch. 0187 fetch(query, bufferType); 0188 return KAsync::null(); 0189 } 0190 if (mQueryInProgress) { 0191 //If a query is already in progress we just remember to fetch again once the current query is done. 0192 mRevisionChangedMeanwhile = true; 0193 return KAsync::null(); 0194 } 0195 mRevisionChangedMeanwhile = false; 0196 Q_ASSERT(!mQueryInProgress); 0197 bool addDelay = mDelayNextQuery; 0198 mDelayNextQuery = false; 0199 return KAsync::start([&] { 0200 mQueryInProgress = true; 0201 }) 0202 //The lambda will be executed in a separate thread, so copy all arguments 0203 .then(async::run<ReplayResult>([query, 0204 bufferType, 0205 resultProvider = mResultProvider, 0206 resourceContext = mResourceContext, 0207 logCtx = mLogCtx, 0208 state = mQueryState, 0209 resultTransformation = mResultTransformation, 0210 addDelay]() { 0211 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 0212 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); 0213 ////For testing only 0214 if (addDelay) { 0215 SinkWarning() << "Sleeping in incremental query"; 0216 std::this_thread::sleep_for(std::chrono::seconds(1)); 0217 } 0218 0219 return result; 0220 })) 0221 .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &newRevisionAndReplayedEntities) { 0222 if (!guardPtr) { 0223 //Not an error, the query can vanish at any time. 0224 return KAsync::null(); 0225 } 0226 mQueryInProgress = false; 0227 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision).exec(); 0228 mResultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 0229 if (mRevisionChangedMeanwhile) { 0230 return incrementalFetch(query, bufferType); 0231 } 0232 return KAsync::null(); 0233 }); 0234 } 0235 0236 template <class DomainType> 0237 void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 0238 { 0239 mResultTransformation = transformation; 0240 } 0241 0242 template <class DomainType> 0243 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() 0244 { 0245 return mResultProvider->emitter(); 0246 } 0247 0248 template <class DomainType> 0249 QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, 0250 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx) 0251 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mLogCtx(logCtx.subContext("worker")) 0252 { 0253 SinkTraceCtx(mLogCtx) << "Starting query worker"; 0254 } 0255 0256 template <class DomainType> 0257 QueryWorker<DomainType>::~QueryWorker() 0258 { 0259 SinkTraceCtx(mLogCtx) << "Stopped query worker"; 0260 } 0261 0262 static QString operationName(Sink::Operation operation) 0263 { 0264 switch (operation) { 0265 case Sink::Operation_Creation: 0266 return "Creation"; 0267 case Sink::Operation_Modification: 0268 return "Modification"; 0269 case Sink::Operation_Removal: 0270 return "Removal"; 0271 } 0272 return "Unknown Operation"; 0273 } 0274 0275 template <class DomainType> 0276 void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result) 0277 { 0278 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(result.entity, query.requestedProperties).template staticCast<DomainType>(); 0279 for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { 0280 valueCopy->setProperty(it.key(), it.value()); 0281 } 0282 valueCopy->aggregatedIds() = [&] { 0283 QVector<QByteArray> aggregateIdsBA; 0284 aggregateIdsBA.reserve(result.aggregateIds.size()); 0285 for (const auto &id : result.aggregateIds) { 0286 aggregateIdsBA << id.toDisplayByteArray(); 0287 } 0288 return aggregateIdsBA; 0289 }(); 0290 if (mResultTransformation) { 0291 mResultTransformation(*valueCopy); 0292 } 0293 SinkTraceCtx(mLogCtx) << "Replaying: " << operationName(result.operation) << "\n" <<*valueCopy; 0294 switch (result.operation) { 0295 case Sink::Operation_Creation: 0296 //SinkTraceCtx(mLogCtx) << "Got creation: " << valueCopy->identifier(); 0297 resultProvider.add(valueCopy); 0298 break; 0299 case Sink::Operation_Modification: 0300 //SinkTraceCtx(mLogCtx) << "Got modification: " << valueCopy->identifier(); 0301 resultProvider.modify(valueCopy); 0302 break; 0303 case Sink::Operation_Removal: 0304 //SinkTraceCtx(mLogCtx) << "Got removal: " << valueCopy->identifier(); 0305 resultProvider.remove(valueCopy); 0306 break; 0307 } 0308 } 0309 0310 template <class DomainType> 0311 ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state) 0312 { 0313 QTime time; 0314 time.start(); 0315 0316 const qint64 baseRevision = resultProvider.revision() + 1; 0317 0318 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 0319 const qint64 topRevision = entityStore.maxRevision(); 0320 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision << " to revision " << topRevision; 0321 if (entityStore.lastCleanRevision() >= baseRevision) { 0322 //This is a situation we should never end up in. In case of removals some revisions may be gone entirely, which will result in failures later on. 0323 SinkErrorCtx(mLogCtx) << "Revision from which we want to replay is no longer available" << entityStore.lastCleanRevision(); 0324 Q_ASSERT(false); 0325 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 0326 } 0327 if (!state) { 0328 SinkWarningCtx(mLogCtx) << "No previous query state."; 0329 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 0330 } 0331 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true}; 0332 auto resultSet = preparedQuery.update(baseRevision); 0333 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 0334 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 0335 resultProviderCallback(query, resultProvider, result); 0336 }); 0337 preparedQuery.updateComplete(); 0338 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results until revision: " << topRevision << "\n" 0339 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 0340 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 0341 return {topRevision, replayResult.replayedEntities, false, preparedQuery.getState()}; 0342 } 0343 0344 template <class DomainType> 0345 ReplayResult QueryWorker<DomainType>::executeInitialQuery( 0346 const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) 0347 { 0348 QTime time; 0349 time.start(); 0350 0351 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 0352 const qint64 topRevision = entityStore.maxRevision(); 0353 SinkTraceCtx(mLogCtx) << "Running query from revision: " << topRevision; 0354 auto preparedQuery = [&] { 0355 if (state) { 0356 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; 0357 } else { 0358 return DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 0359 } 0360 }(); 0361 auto resultSet = preparedQuery.execute(); 0362 0363 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); 0364 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 0365 resultProviderCallback(query, resultProvider, result); 0366 }); 0367 0368 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 0369 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 0370 << "Initial query took: " << Log::TraceTime(time.elapsed()); 0371 0372 return {topRevision, replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; 0373 } 0374 0375 #define REGISTER_TYPE(T) \ 0376 template class QueryRunner<T>; \ 0377 template class QueryWorker<T>; \ 0378 0379 SINK_REGISTER_TYPES()