File indexing completed on 2024-05-12 05:25:58
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * 0004 * This program is free software; you can redistribute it and/or modify 0005 * it under the terms of the GNU General Public License as published by 0006 * the Free Software Foundation; either version 2 of the License, or 0007 * (at your option) any later version. 0008 * 0009 * This program is distributed in the hope that it will be useful, 0010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 0012 * GNU General Public License for more details. 0013 * 0014 * You should have received a copy of the GNU General Public License 0015 * along with this program; if not, write to the 0016 * Free Software Foundation, Inc., 0017 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 0018 */ 0019 #include "datastorequery.h" 0020 0021 #include <QElapsedTimer> 0022 0023 #include "log.h" 0024 #include "applicationdomaintype.h" 0025 0026 using namespace Sink; 0027 using namespace Sink::Storage; 0028 0029 static QByteArray operationName(const Sink::Operation op) 0030 { 0031 switch(op) { 0032 case Sink::Operation_Creation: 0033 return "Creation"; 0034 case Sink::Operation_Modification: 0035 return "Modification"; 0036 case Sink::Operation_Removal: 0037 return "Removal"; 0038 } 0039 return ""; 0040 } 0041 0042 static bool compare(const QVariant &left, const QVariant &right, QueryBase::Reduce::Selector::Comparator comparator) { 0043 if (comparator == QueryBase::Reduce::Selector::Max) { 0044 return left > right; 0045 } 0046 if (comparator == QueryBase::Reduce::Selector::Min) { 0047 return left < right; 0048 } 0049 return false; 0050 } 0051 0052 0053 class Source : public FilterBase { 0054 public: 0055 typedef QSharedPointer<Source> Ptr; 0056 0057 QVector<Identifier> mIds; 0058 QVector<Identifier>::ConstIterator mIt; 0059 QVector<Identifier> mIncrementalIds; 0060 QVector<Identifier>::ConstIterator mIncrementalIt{}; 0061 bool mHaveIncrementalChanges{false}; 0062 bool mIdsAreFinal{false}; 0063 0064 Source (const QVector<Identifier> &ids, DataStoreQuery *store, bool idsAreFinal = false) 0065 : FilterBase(store), 0066 mIds(ids), 0067 mIdsAreFinal(idsAreFinal) 0068 { 0069 mIt = mIds.constBegin(); 0070 } 0071 0072 ~Source() override = default; 0073 0074 void skip() override 0075 { 0076 if (mIt != mIds.constEnd()) { 0077 mIt++; 0078 } 0079 }; 0080 0081 void add(const QVector<Key> &keys) 0082 { 0083 mIncrementalIds.clear(); 0084 mIncrementalIds.reserve(keys.size()); 0085 for (const auto &key : keys) { 0086 //Pre-filter by uid if a uid-filter is set. 0087 if (!mIdsAreFinal || mIds.contains(key.identifier())) { 0088 mIncrementalIds.append(key.identifier()); 0089 } 0090 } 0091 mIncrementalIt = mIncrementalIds.constBegin(); 0092 mHaveIncrementalChanges = true; 0093 } 0094 0095 bool next(const std::function<void(const ResultSet::Result &result)> &callback) override 0096 { 0097 if (mHaveIncrementalChanges) { 0098 if (mIncrementalIt == mIncrementalIds.constEnd()) { 0099 return false; 0100 } 0101 readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 0102 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 0103 callback({entity, operation}); 0104 }); 0105 mIncrementalIt++; 0106 0107 return mIncrementalIt != mIncrementalIds.constEnd(); 0108 } 0109 if (mIt == mIds.constEnd()) { 0110 return false; 0111 } 0112 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 0113 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 0114 callback({entity, operation}); 0115 }); 0116 mIt++; 0117 return mIt != mIds.constEnd(); 0118 } 0119 }; 0120 0121 class Collector : public FilterBase { 0122 public: 0123 typedef QSharedPointer<Collector> Ptr; 0124 0125 Collector(FilterBase::Ptr source, DataStoreQuery *store) 0126 : FilterBase(source, store) 0127 { 0128 0129 } 0130 ~Collector() override = default; 0131 0132 bool next(const std::function<void(const ResultSet::Result &result)> &callback) override 0133 { 0134 return mSource->next(callback); 0135 } 0136 }; 0137 0138 class Filter : public FilterBase { 0139 public: 0140 typedef QSharedPointer<Filter> Ptr; 0141 0142 QHash<QByteArrayList, Sink::QueryBase::Comparator> propertyFilter; 0143 std::function<bool(const ApplicationDomain::ApplicationDomainType &)> filterFunction; 0144 0145 Filter(FilterBase::Ptr source, DataStoreQuery *store) 0146 : FilterBase(source, store) 0147 { 0148 0149 } 0150 0151 ~Filter() override{} 0152 0153 bool next(const std::function<void(const ResultSet::Result &result)> &callback) override { 0154 bool foundValue = false; 0155 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 0156 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); 0157 0158 //Always accept removals. They can't match the filter since the data is gone. 0159 if (result.operation == Sink::Operation_Removal) { 0160 SinkTraceCtx(mDatastore->mLogCtx) << "Removal: " << result.entity.identifier() << operationName(result.operation); 0161 callback(result); 0162 foundValue = true; 0163 } else if (matchesFilter(result.entity)) { 0164 SinkTraceCtx(mDatastore->mLogCtx) << "Accepted: " << result.entity.identifier() << operationName(result.operation); 0165 callback(result); 0166 foundValue = true; 0167 //TODO if something did not match the filter so far but does now, turn into an add operation. 0168 } else { 0169 SinkTraceCtx(mDatastore->mLogCtx) << "Rejected: " << result.entity.identifier() << operationName(result.operation); 0170 //TODO emit a removal if we had the uid in the result set and this is a modification. 0171 //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways 0172 callback({result.entity, Sink::Operation_Removal, result.aggregateValues}); 0173 } 0174 return false; 0175 })) 0176 {} 0177 return foundValue; 0178 } 0179 0180 bool matchesFilter(const ApplicationDomain::ApplicationDomainType &entity) { 0181 if (filterFunction) { 0182 return filterFunction(entity); 0183 } 0184 for (auto it = propertyFilter.begin(); it != propertyFilter.end(); it++) { 0185 const auto filterProperty = it.key(); 0186 const QVariant property = [&] () -> QVariant { 0187 if (filterProperty.size() == 1) { 0188 return entity.getProperty(filterProperty[0]); 0189 } else { 0190 QVariantList propList; 0191 for (const auto &propName : filterProperty) { 0192 propList.push_back(entity.getProperty(propName)); 0193 } 0194 return propList; 0195 } 0196 }(); 0197 const auto comparator = it.value(); 0198 //Reevaluate the fulltext filter during incremental queries. 0199 if (comparator.comparator == QueryBase::Comparator::Fulltext) { 0200 //Don't apply it for initial results, since the fulltext index is always the source set. 0201 if (mIncremental) { 0202 const auto entityId = Identifier::fromDisplayByteArray(entity.identifier()); 0203 //We filter the potentially expensive query by the identifier that we actually require. 0204 const auto matches = indexLookup("fulltext", comparator.value.toString(), {entityId}); 0205 if (!matches.contains(entityId)) { 0206 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to mismatch on fulltext filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; 0207 return false; 0208 } 0209 } 0210 } else if (!comparator.matches(property)) { 0211 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; 0212 return false; 0213 } 0214 } 0215 return true; 0216 } 0217 }; 0218 0219 0220 struct Aggregator { 0221 QueryBase::Aggregator::Operation operation; 0222 QByteArray property; 0223 QByteArray resultProperty; 0224 0225 Aggregator(QueryBase::Aggregator::Operation o, const QByteArray &property_, const QByteArray &resultProperty_) 0226 : operation(o), property(property_), resultProperty(resultProperty_) 0227 { 0228 0229 } 0230 0231 void process(const QVariant &value) { 0232 if (operation == QueryBase::Aggregator::Collect) { 0233 mResult = mResult.toList() << value; 0234 } else if (operation == QueryBase::Aggregator::Count) { 0235 mResult = mResult.toInt() + 1; 0236 } else { 0237 Q_ASSERT(false); 0238 } 0239 } 0240 0241 void process(const Sink::ApplicationDomain::ApplicationDomainType &entity) { 0242 if (!property.isEmpty()) { 0243 process(entity.getProperty(property)); 0244 } else { 0245 process(QVariant{}); 0246 } 0247 } 0248 0249 void reset() 0250 { 0251 mResult.clear(); 0252 } 0253 0254 QVariant result() const 0255 { 0256 return mResult; 0257 } 0258 private: 0259 QVariant mResult; 0260 }; 0261 0262 class Reduce : public Filter { 0263 public: 0264 typedef QSharedPointer<Reduce> Ptr; 0265 0266 struct PropertySelector { 0267 QueryBase::Reduce::Selector selector; 0268 QByteArray resultProperty; 0269 0270 PropertySelector(QueryBase::Reduce::Selector s, const QByteArray &resultProperty_) 0271 : selector(s), resultProperty(resultProperty_) 0272 { 0273 0274 } 0275 0276 void process(const QVariant &value, const QVariant &selectionValue) { 0277 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, selector.comparator)) { 0278 selectionResultValue = selectionValue; 0279 mResult = value; 0280 } 0281 } 0282 0283 void reset() 0284 { 0285 selectionResultValue.clear(); 0286 mResult.clear(); 0287 } 0288 0289 QVariant result() const 0290 { 0291 return mResult; 0292 } 0293 private: 0294 0295 QVariant selectionResultValue; 0296 QVariant mResult; 0297 }; 0298 0299 QSet<QByteArray> mReducedValues; 0300 QSet<QByteArray> mIncrementallyReducedValues; 0301 QHash<QByteArray, Identifier> mSelectedValues; 0302 QByteArray mReductionProperty; 0303 QByteArray mSelectionProperty; 0304 QueryBase::Reduce::Selector::Comparator mSelectionComparator; 0305 QList<Aggregator> mAggregators; 0306 QList<PropertySelector> mSelectors; 0307 0308 Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, QueryBase::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store) 0309 : Filter(source, store), 0310 mReductionProperty(reductionProperty), 0311 mSelectionProperty(selectionProperty), 0312 mSelectionComparator(comparator) 0313 { 0314 0315 } 0316 0317 ~Reduce() override{} 0318 0319 void updateComplete() override 0320 { 0321 SinkTraceCtx(mDatastore->mLogCtx) << "Reduction update is complete."; 0322 mIncrementallyReducedValues.clear(); 0323 } 0324 0325 static QByteArray getByteArray(const QVariant &value) { 0326 if (value.type() == QVariant::DateTime) { 0327 return value.toDateTime().toString().toLatin1(); 0328 } 0329 if (value.isValid() && !value.toByteArray().isEmpty()) { 0330 return value.toByteArray(); 0331 } 0332 return QByteArray(); 0333 } 0334 0335 struct ReductionResult { 0336 Identifier selection; 0337 QVector<Identifier> aggregateIds; 0338 QMap<QByteArray, QVariant> aggregateValues; 0339 }; 0340 0341 ReductionResult reduceOnValue(const QVariant &reductionValue) 0342 { 0343 QMap<QByteArray, QVariant> aggregateValues; 0344 QVariant selectionResultValue; 0345 Identifier selectionResult; 0346 const auto results = indexLookup(mReductionProperty, reductionValue); 0347 for (auto &aggregator : mAggregators) { 0348 aggregator.reset(); 0349 } 0350 for (auto &selector : mSelectors) { 0351 selector.reset(); 0352 } 0353 QVector<Identifier> reducedAndFilteredResults; 0354 for (const auto &r : results) { 0355 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 0356 //We need to apply all property filters that we have until the reduction, because the index lookup was unfiltered. 0357 if (!matchesFilter(entity)) { 0358 return; 0359 } 0360 reducedAndFilteredResults << r; 0361 Q_ASSERT(operation != Sink::Operation_Removal); 0362 0363 for (auto &aggregator : mAggregators) { 0364 aggregator.process(entity); 0365 } 0366 0367 const auto selectionValue = entity.getProperty(mSelectionProperty); 0368 0369 for (auto &selector : mSelectors) { 0370 if (!selector.selector.property.isEmpty()) { 0371 selector.process(entity.getProperty(selector.selector.property), selectionValue); 0372 } 0373 } 0374 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { 0375 selectionResultValue = selectionValue; 0376 selectionResult = Identifier::fromDisplayByteArray(entity.identifier()); 0377 } 0378 }); 0379 } 0380 0381 for (const auto &aggregator : mAggregators) { 0382 aggregateValues.insert(aggregator.resultProperty, aggregator.result()); 0383 } 0384 for (const auto &selector : mSelectors) { 0385 aggregateValues.insert(selector.resultProperty, selector.result()); 0386 } 0387 return {selectionResult, reducedAndFilteredResults, aggregateValues}; 0388 } 0389 0390 bool next(const std::function<void(const ResultSet::Result &)> &callback) override { 0391 bool foundValue = false; 0392 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 0393 const auto reductionValue = [&] { 0394 const auto v = result.entity.getProperty(mReductionProperty); 0395 //Because we also get Operation_Removal for filtered entities. We use the fact that actually removed entites 0396 //won't have the property to reduce on. 0397 //TODO: Perhaps find a cleaner solutoin than abusing Operation::Removed for filtered properties. 0398 if (v.isNull() && result.operation == Sink::Operation_Removal) { 0399 //For removals we have to read the last revision to get a value, and thus be able to find the correct thread. 0400 QVariant reductionValue; 0401 const auto id = Identifier::fromDisplayByteArray(result.entity.identifier()); 0402 readPrevious(id, [&] (const ApplicationDomain::ApplicationDomainType &prev) { 0403 Q_ASSERT(result.entity.identifier() == prev.identifier()); 0404 reductionValue = prev.getProperty(mReductionProperty); 0405 }); 0406 return reductionValue; 0407 } else { 0408 return v; 0409 } 0410 }(); 0411 if (reductionValue.isNull()) { 0412 SinkTraceCtx(mDatastore->mLogCtx) << "No reduction value: " << result.entity.identifier(); 0413 //We failed to find a value to reduce on, so ignore this entity. 0414 //Can happen if the entity was already removed and we have no previous revision. 0415 return; 0416 } 0417 const auto reductionValueBa = getByteArray(reductionValue); 0418 if (!mReducedValues.contains(reductionValueBa)) { 0419 SinkTraceCtx(mDatastore->mLogCtx) << "Reducing new value: " << result.entity.identifier() << reductionValueBa; 0420 //Only reduce every value once. 0421 mReducedValues.insert(reductionValueBa); 0422 auto reductionResult = reduceOnValue(reductionValue); 0423 0424 //This can happen if we get a removal message from a filtered entity and all entites of the reduction are filtered. 0425 if (reductionResult.selection.isNull()) { 0426 return; 0427 } 0428 mSelectedValues.insert(reductionValueBa, reductionResult.selection); 0429 readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 0430 callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds}); 0431 foundValue = true; 0432 }); 0433 } else { 0434 //During initial query, do nothing. The lookup above will take care of it. 0435 //During updates adjust the reduction according to the modification/addition or removal 0436 //We have to redo the reduction for every element, because of the aggregation values. 0437 if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { 0438 SinkTraceCtx(mDatastore->mLogCtx) << "Incremental reduction update: " << result.entity.identifier() << reductionValueBa; 0439 mIncrementallyReducedValues.insert(reductionValueBa); 0440 //Redo the reduction to find new aggregated values 0441 const auto selectionResult = reduceOnValue(reductionValue); 0442 0443 //If mSelectedValues did not contain the value, oldSelectionResult will be empty.(Happens if entites have been filtered) 0444 const auto oldSelectionResult = mSelectedValues.take(reductionValueBa); 0445 SinkTraceCtx(mDatastore->mLogCtx) << "Old selection result: " << oldSelectionResult << " New selection result: " << selectionResult.selection; 0446 if (selectionResult.selection.isNull() && oldSelectionResult.isNull()) { 0447 //Nothing to do, the item was filtered before, and still is. 0448 } else if (oldSelectionResult == selectionResult.selection) { 0449 mSelectedValues.insert(reductionValueBa, selectionResult.selection); 0450 Q_ASSERT(!selectionResult.selection.isNull()); 0451 readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { 0452 callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds}); 0453 }); 0454 } else { 0455 //remove old result 0456 if (!oldSelectionResult.isNull()) { 0457 readEntity(oldSelectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { 0458 callback({entity, Sink::Operation_Removal}); 0459 }); 0460 } 0461 0462 //If the last item has been removed, then there's nothing to add 0463 if (!selectionResult.selection.isNull()) { 0464 //add new result 0465 mSelectedValues.insert(reductionValueBa, selectionResult.selection); 0466 Q_ASSERT(!selectionResult.selection.isNull()); 0467 readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { 0468 callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds}); 0469 }); 0470 } 0471 } 0472 } 0473 } 0474 })) 0475 {} 0476 return foundValue; 0477 } 0478 }; 0479 0480 class Bloom : public Filter { 0481 public: 0482 typedef QSharedPointer<Bloom> Ptr; 0483 0484 QByteArray mBloomProperty; 0485 0486 Bloom(const QByteArray &bloomProperty, FilterBase::Ptr source, DataStoreQuery *store) 0487 : Filter(source, store), 0488 mBloomProperty(bloomProperty) 0489 { 0490 0491 } 0492 0493 ~Bloom() override{} 0494 0495 bool next(const std::function<void(const ResultSet::Result &result)> &callback) override { 0496 if (!mBloomed) { 0497 //Initially we bloom on the first value that matches. 0498 //From there on we just filter. 0499 bool foundValue = false; 0500 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 0501 mBloomValue = result.entity.getProperty(mBloomProperty); 0502 const auto results = indexLookup(mBloomProperty, mBloomValue); 0503 for (const auto &r : results) { 0504 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 0505 callback({entity, Sink::Operation_Creation}); 0506 SinkTraceCtx(mDatastore->mLogCtx) << "Bloom result: " << entity.identifier() << operationName(operation); 0507 foundValue = true; 0508 }); 0509 } 0510 return false; 0511 })) 0512 {} 0513 mBloomed = true; 0514 propertyFilter.insert({mBloomProperty}, mBloomValue); 0515 return foundValue; 0516 } else { 0517 //Filter on bloom value 0518 return Filter::next(callback); 0519 } 0520 } 0521 QVariant mBloomValue; 0522 bool mBloomed = false; 0523 }; 0524 0525 0526 class ReferenceResolver : public Filter { 0527 public: 0528 typedef QSharedPointer<ReferenceResolver> Ptr; 0529 0530 QByteArray mReferenceProperty; 0531 QList<Aggregator> mAggregators; 0532 0533 ReferenceResolver(const QByteArray &referenceProperty, FilterBase::Ptr source, DataStoreQuery *store) 0534 : Filter(source, store), 0535 mReferenceProperty(referenceProperty) 0536 { 0537 0538 } 0539 0540 ~ReferenceResolver() override{} 0541 0542 void resolveReference(const ApplicationDomain::ApplicationDomainType &entity) { 0543 auto parentFolder = entity.getProperty(mReferenceProperty).toByteArray(); 0544 while (!parentFolder.isEmpty()) { 0545 //TODO abort on error 0546 readEntity(Identifier::fromDisplayByteArray(parentFolder), [&](const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation operation) { 0547 for (auto &aggregator : mAggregators) { 0548 aggregator.process(e); 0549 } 0550 0551 parentFolder = e.getProperty(mReferenceProperty).toByteArray(); 0552 }); 0553 } 0554 } 0555 0556 bool next(const std::function<void(const ResultSet::Result &result)> &callback) override { 0557 return mSource->next([this, callback](const ResultSet::Result &result) { 0558 for (auto &aggregator : mAggregators) { 0559 aggregator.reset(); 0560 } 0561 resolveReference(result.entity); 0562 QMap<QByteArray, QVariant> aggregateValues = result.aggregateValues; 0563 for (const auto &aggregator : mAggregators) { 0564 aggregateValues.insert(aggregator.resultProperty, aggregator.result()); 0565 } 0566 callback(ResultSet::Result{result.entity, result.operation, aggregateValues, result.aggregateIds}); 0567 }); 0568 } 0569 }; 0570 0571 DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 0572 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 0573 { 0574 //This is what we use during a new query 0575 setupQuery(query); 0576 } 0577 0578 DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental) 0579 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 0580 { 0581 //This is what we use when fetching more data, without having a new revision with incremental=false 0582 //And this is what we use when the data changed and we want to update with incremental = true 0583 mCollector = state.mCollector; 0584 mSource = state.mSource; 0585 0586 auto source = mCollector; 0587 while (source) { 0588 source->mDatastore = this; 0589 source->mIncremental = incremental; 0590 source = source->mSource; 0591 } 0592 } 0593 0594 DataStoreQuery::~DataStoreQuery() 0595 { 0596 0597 } 0598 0599 DataStoreQuery::State::Ptr DataStoreQuery::getState() 0600 { 0601 auto state = State::Ptr::create(); 0602 state->mSource = mSource; 0603 state->mCollector = mCollector; 0604 return state; 0605 } 0606 0607 void DataStoreQuery::readEntity(const Identifier &id, const BufferCallback &resultCallback) 0608 { 0609 mStore.readLatest(mType, id, resultCallback); 0610 } 0611 0612 void DataStoreQuery::readPrevious(const Identifier &id, const std::function<void (const ApplicationDomain::ApplicationDomainType &)> &callback) 0613 { 0614 mStore.readPrevious(mType, id, mStore.maxRevision(), callback); 0615 } 0616 0617 QVector<Identifier> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter) 0618 { 0619 QElapsedTimer timer; 0620 timer.start(); 0621 const auto result = mStore.indexLookup(mType, property, value, filter); 0622 if (timer.elapsed() > 2) { 0623 SinkLogCtx(mLogCtx) << "Index lookup returned " << result.size() << "results, in " << Sink::Log::TraceTime(timer.elapsed()); 0624 } 0625 return result; 0626 } 0627 0628 /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ 0629 /* { */ 0630 /* const bool sortingRequired = !sortProperty.isEmpty(); */ 0631 /* if (mInitialQuery && sortingRequired) { */ 0632 /* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */ 0633 /* // Sort the complete set by reading the sort property and filling into a sorted map */ 0634 /* auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); */ 0635 /* while (resultSet.next()) { */ 0636 /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ 0637 /* readEntity(resultSet.id(), */ 0638 /* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ 0639 0640 /* const auto operation = buffer.operation(); */ 0641 0642 /* // We're not interested in removals during the initial query */ 0643 /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ 0644 /* if (!sortProperty.isEmpty()) { */ 0645 /* const auto sortValue = getProperty(buffer.entity(), sortProperty); */ 0646 /* if (sortValue.type() == QVariant::DateTime) { */ 0647 /* sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); */ 0648 /* } else { */ 0649 /* sortedMap->insert(sortValue.toString().toLatin1(), uid); */ 0650 /* } */ 0651 /* } else { */ 0652 /* sortedMap->insert(uid, uid); */ 0653 /* } */ 0654 /* } */ 0655 /* }); */ 0656 /* } */ 0657 0658 /* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */ 0659 /* auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); */ 0660 /* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */ 0661 /* std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { */ 0662 /* if (iterator->hasNext()) { */ 0663 /* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ 0664 /* callback(uid, buffer, Sink::Operation_Creation); */ 0665 /* }); */ 0666 /* return true; */ 0667 /* } */ 0668 /* return false; */ 0669 /* }; */ 0670 0671 /* auto skip = [iterator]() { */ 0672 /* if (iterator->hasNext()) { */ 0673 /* iterator->next(); */ 0674 /* } */ 0675 /* }; */ 0676 /* return ResultSet(generator, skip); */ 0677 /* } else { */ 0678 /* auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); */ 0679 /* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */ 0680 /* if (resultSetPtr->next()) { */ 0681 /* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */ 0682 /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ 0683 /* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ 0684 /* const auto operation = buffer.operation(); */ 0685 /* if (mInitialQuery) { */ 0686 /* // We're not interested in removals during the initial query */ 0687 /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ 0688 /* // In the initial set every entity is new */ 0689 /* callback(uid, buffer, Sink::Operation_Creation); */ 0690 /* } */ 0691 /* } else { */ 0692 /* // Always remove removals, they probably don't match due to non-available properties */ 0693 /* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */ 0694 /* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */ 0695 /* callback(uid, buffer, operation); */ 0696 /* } */ 0697 /* } */ 0698 /* }); */ 0699 /* return true; */ 0700 /* } */ 0701 /* return false; */ 0702 /* }; */ 0703 /* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */ 0704 /* return ResultSet(generator, skip); */ 0705 /* } */ 0706 /* } */ 0707 0708 QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) 0709 { 0710 Q_ASSERT(!subquery.type().isEmpty()); 0711 auto sub = DataStoreQuery(subquery, subquery.type(), mStore); 0712 auto result = sub.execute(); 0713 QByteArrayList ids; 0714 while (result.next([&ids](const ResultSet::Result &result) { 0715 ids << result.entity.identifier(); 0716 })) 0717 {} 0718 return ids; 0719 } 0720 0721 void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) 0722 { 0723 auto query = query_; 0724 auto baseFilters = query.getBaseFilters(); 0725 //Resolve any subqueries we have 0726 for (const auto &k : baseFilters.keys()) { 0727 const auto comparator = baseFilters.value(k); 0728 if (comparator.value.canConvert<Query>()) { 0729 SinkTraceCtx(mLogCtx) << "Executing subquery for property: " << k; 0730 const auto result = executeSubquery(comparator.value.value<Query>()); 0731 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); 0732 } 0733 } 0734 query.setBaseFilters(baseFilters); 0735 0736 QByteArray appliedSorting; 0737 0738 //Determine initial set 0739 mSource = [&]() { 0740 if (!query.ids().isEmpty()) { 0741 //We have a set of ids as a starting point 0742 QVector<Identifier> ids; 0743 for (const auto & id: query.ids()) { 0744 ids.append(Identifier::fromDisplayByteArray(id)); 0745 } 0746 0747 //If there is no bloom or reduction filter the result set is final (so we must reject new ids as we are not filtering them later on) 0748 const auto fs = query.getFilterStages(); 0749 const bool resultSetIsFinal = !std::any_of(fs.cbegin(), fs.cend(), [&] (const auto &stage) { 0750 return stage.template dynamicCast<Query::Reduce>() || stage.template dynamicCast<Query::Bloom>(); 0751 }); 0752 0753 return Source::Ptr::create(ids, this, resultSetIsFinal); 0754 } else { 0755 QSet<QByteArrayList> appliedFilters; 0756 QElapsedTimer timer; 0757 timer.start(); 0758 auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); 0759 if (timer.elapsed() > 2) { 0760 SinkLogCtx(mLogCtx) << "Index lookup returned " << resultSet.size() << "results, in " << Sink::Log::TraceTime(timer.elapsed()); 0761 } 0762 if (!appliedFilters.isEmpty() || !appliedSorting.isEmpty()) { 0763 //We have an index lookup as starting point 0764 return Source::Ptr::create(resultSet, this); 0765 } 0766 // We do a full scan if there were no indexes available to create the initial set (this is going to be expensive for large sets). 0767 return Source::Ptr::create(mStore.fullScan(mType), this); 0768 } 0769 }(); 0770 0771 FilterBase::Ptr baseSet = mSource; 0772 if (!query.getBaseFilters().isEmpty()) { 0773 auto filter = Filter::Ptr::create(baseSet, this); 0774 //For incremental queries the remaining filters are not sufficient, 0775 //we have to check the properties that we used during the index lookup since we are not re-executing the index lookup. 0776 for (const auto &f : query.getBaseFilters().keys()) { 0777 filter->propertyFilter.insert(f, query.getFilter(f)); 0778 } 0779 baseSet = filter; 0780 } 0781 /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ 0782 /* //Apply manual sorting */ 0783 /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ 0784 /* } */ 0785 0786 //Setup the rest of the filter stages on top of the base set 0787 for (const auto &stage : query.getFilterStages()) { 0788 if (auto filter = stage.dynamicCast<Query::Filter>()) { 0789 auto f = Filter::Ptr::create(baseSet, this); 0790 f->propertyFilter = filter->propertyFilter; 0791 baseSet = f; 0792 } else if (auto filter = stage.dynamicCast<Query::Reduce>()) { 0793 auto reduction = ::Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this); 0794 for (const auto &aggregator : qAsConst(filter->aggregators)) { 0795 reduction->mAggregators << ::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty); 0796 } 0797 for (const auto &propertySelector : qAsConst(filter->propertySelectors)) { 0798 reduction->mSelectors << ::Reduce::PropertySelector(propertySelector.selector, propertySelector.resultProperty); 0799 } 0800 reduction->propertyFilter = query.getBaseFilters(); 0801 baseSet = reduction; 0802 } else if (auto filter = stage.dynamicCast<Query::ReferenceResolver>()) { 0803 auto reduction = ::ReferenceResolver::Ptr::create(filter->referenceProperty, baseSet, this); 0804 for (const auto &aggregator : qAsConst(filter->aggregators)) { 0805 reduction->mAggregators << ::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty); 0806 } 0807 baseSet = reduction; 0808 } else if (auto filter = stage.dynamicCast<Query::Bloom>()) { 0809 baseSet = Bloom::Ptr::create(filter->property, baseSet, this); 0810 } 0811 } 0812 0813 if (query.getPostQueryFilter()) { 0814 auto f = Filter::Ptr::create(baseSet, this); 0815 f->filterFunction = query.getPostQueryFilter(); 0816 baseSet = f; 0817 } 0818 0819 mCollector = Collector::Ptr::create(baseSet, this); 0820 } 0821 0822 QVector<Key> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) 0823 { 0824 QVector<Key> changedKeys; 0825 mStore.readRevisions(baseRevision, mType, [&](const Key &key) { 0826 changedKeys << key; 0827 }); 0828 return changedKeys; 0829 } 0830 0831 ResultSet DataStoreQuery::update(qint64 baseRevision) 0832 { 0833 SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision << " to revision " << mStore.maxRevision(); 0834 auto incrementalResultSet = loadIncrementalResultSet(baseRevision); 0835 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; 0836 mSource->add(incrementalResultSet); 0837 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 0838 if (mCollector->next([this, callback](const ResultSet::Result &result) { 0839 SinkTraceCtx(mLogCtx) << "Got incremental result: " << result.entity.identifier() << operationName(result.operation); 0840 callback(result); 0841 })) 0842 { 0843 return true; 0844 } 0845 return false; 0846 }; 0847 return ResultSet(generator, [this]() { mCollector->skip(); }); 0848 } 0849 0850 void DataStoreQuery::updateComplete() 0851 { 0852 mSource->mIncrementalIds.clear(); 0853 mSource->mHaveIncrementalChanges = false; 0854 auto source = mCollector; 0855 while (source) { 0856 source->updateComplete(); 0857 source = source->mSource; 0858 } 0859 } 0860 0861 ResultSet DataStoreQuery::execute() 0862 { 0863 SinkTraceCtx(mLogCtx) << "Executing query"; 0864 0865 Q_ASSERT(mCollector); 0866 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 0867 return mCollector->next([this, callback](const ResultSet::Result &result) { 0868 if (result.operation != Sink::Operation_Removal) { 0869 SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; 0870 callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds}); 0871 } 0872 }); 0873 }; 0874 return ResultSet(generator, [this]() { mCollector->skip(); }); 0875 }