File indexing completed on 2024-05-12 05:25:58

0001 /*
0002  *   Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  *   This program is free software; you can redistribute it and/or modify
0005  *   it under the terms of the GNU General Public License as published by
0006  *   the Free Software Foundation; either version 2 of the License, or
0007  *   (at your option) any later version.
0008  *
0009  *   This program is distributed in the hope that it will be useful,
0010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
0011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0012  *   GNU General Public License for more details.
0013  *
0014  *   You should have received a copy of the GNU General Public License
0015  *   along with this program; if not, write to the
0016  *   Free Software Foundation, Inc.,
0017  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
0018  */
0019 #include "datastorequery.h"
0020 
0021 #include <QElapsedTimer>
0022 
0023 #include "log.h"
0024 #include "applicationdomaintype.h"
0025 
0026 using namespace Sink;
0027 using namespace Sink::Storage;
0028 
0029 static QByteArray operationName(const Sink::Operation op)
0030 {
0031     switch(op) {
0032         case Sink::Operation_Creation:
0033             return "Creation";
0034         case Sink::Operation_Modification:
0035             return "Modification";
0036         case Sink::Operation_Removal:
0037             return "Removal";
0038     }
0039     return "";
0040 }
0041 
0042 static bool compare(const QVariant &left, const QVariant &right, QueryBase::Reduce::Selector::Comparator comparator) {
0043     if (comparator == QueryBase::Reduce::Selector::Max) {
0044         return left > right;
0045     }
0046     if (comparator == QueryBase::Reduce::Selector::Min) {
0047         return left < right;
0048     }
0049     return false;
0050 }
0051 
0052 
0053 class Source : public FilterBase {
0054     public:
0055     typedef QSharedPointer<Source> Ptr;
0056 
0057     QVector<Identifier> mIds;
0058     QVector<Identifier>::ConstIterator mIt;
0059     QVector<Identifier> mIncrementalIds;
0060     QVector<Identifier>::ConstIterator mIncrementalIt{};
0061     bool mHaveIncrementalChanges{false};
0062     bool mIdsAreFinal{false};
0063 
0064     Source (const QVector<Identifier> &ids, DataStoreQuery *store, bool idsAreFinal = false)
0065         : FilterBase(store),
0066         mIds(ids),
0067         mIdsAreFinal(idsAreFinal)
0068     {
0069         mIt = mIds.constBegin();
0070     }
0071 
0072     ~Source() override = default;
0073 
0074     void skip() override
0075     {
0076         if (mIt != mIds.constEnd()) {
0077             mIt++;
0078         }
0079     };
0080 
0081     void add(const QVector<Key> &keys)
0082     {
0083         mIncrementalIds.clear();
0084         mIncrementalIds.reserve(keys.size());
0085         for (const auto &key : keys) {
0086             //Pre-filter by uid if a uid-filter is set.
0087             if (!mIdsAreFinal || mIds.contains(key.identifier())) {
0088                 mIncrementalIds.append(key.identifier());
0089             }
0090         }
0091         mIncrementalIt = mIncrementalIds.constBegin();
0092         mHaveIncrementalChanges = true;
0093     }
0094 
0095     bool next(const std::function<void(const ResultSet::Result &result)> &callback) override
0096     {
0097         if (mHaveIncrementalChanges) {
0098             if (mIncrementalIt == mIncrementalIds.constEnd()) {
0099                 return false;
0100             }
0101             readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
0102                 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
0103                 callback({entity, operation});
0104             });
0105             mIncrementalIt++;
0106 
0107             return mIncrementalIt != mIncrementalIds.constEnd();
0108         }
0109         if (mIt == mIds.constEnd()) {
0110             return false;
0111         }
0112         readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
0113             SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
0114             callback({entity, operation});
0115         });
0116         mIt++;
0117         return mIt != mIds.constEnd();
0118     }
0119 };
0120 
0121 class Collector : public FilterBase {
0122 public:
0123     typedef QSharedPointer<Collector> Ptr;
0124 
0125     Collector(FilterBase::Ptr source, DataStoreQuery *store)
0126         : FilterBase(source, store)
0127     {
0128 
0129     }
0130     ~Collector() override = default;
0131 
0132     bool next(const std::function<void(const ResultSet::Result &result)> &callback) override
0133     {
0134         return mSource->next(callback);
0135     }
0136 };
0137 
0138 class Filter : public FilterBase {
0139 public:
0140     typedef QSharedPointer<Filter> Ptr;
0141 
0142     QHash<QByteArrayList, Sink::QueryBase::Comparator> propertyFilter;
0143     std::function<bool(const ApplicationDomain::ApplicationDomainType &)> filterFunction;
0144 
0145     Filter(FilterBase::Ptr source, DataStoreQuery *store)
0146         : FilterBase(source, store)
0147     {
0148 
0149     }
0150 
0151     ~Filter() override{}
0152 
0153     bool next(const std::function<void(const ResultSet::Result &result)> &callback) override {
0154         bool foundValue = false;
0155         while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
0156                 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation);
0157 
0158                 //Always accept removals. They can't match the filter since the data is gone.
0159                 if (result.operation == Sink::Operation_Removal) {
0160                     SinkTraceCtx(mDatastore->mLogCtx) << "Removal: " << result.entity.identifier() << operationName(result.operation);
0161                     callback(result);
0162                     foundValue = true;
0163                 } else if (matchesFilter(result.entity)) {
0164                     SinkTraceCtx(mDatastore->mLogCtx) << "Accepted: " << result.entity.identifier() << operationName(result.operation);
0165                     callback(result);
0166                     foundValue = true;
0167                     //TODO if something did not match the filter so far but does now, turn into an add operation.
0168                 } else {
0169                     SinkTraceCtx(mDatastore->mLogCtx) << "Rejected: " << result.entity.identifier() << operationName(result.operation);
0170                     //TODO emit a removal if we had the uid in the result set and this is a modification.
0171                     //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways
0172                     callback({result.entity, Sink::Operation_Removal, result.aggregateValues});
0173                 }
0174                 return false;
0175             }))
0176         {}
0177         return foundValue;
0178     }
0179 
0180     bool matchesFilter(const ApplicationDomain::ApplicationDomainType &entity) {
0181         if (filterFunction) {
0182             return filterFunction(entity);
0183         }
0184         for (auto it =  propertyFilter.begin(); it !=  propertyFilter.end(); it++) {
0185             const auto filterProperty = it.key();
0186             const QVariant property = [&] () -> QVariant {
0187                 if (filterProperty.size() == 1) {
0188                     return entity.getProperty(filterProperty[0]);
0189                 } else {
0190                     QVariantList propList;
0191                     for (const auto &propName : filterProperty) {
0192                         propList.push_back(entity.getProperty(propName));
0193                     }
0194                     return propList;
0195                 }
0196             }();
0197             const auto comparator = it.value();
0198             //Reevaluate the fulltext filter during incremental queries.
0199             if (comparator.comparator == QueryBase::Comparator::Fulltext) {
0200                 //Don't apply it for initial results, since the fulltext index is always the source set.
0201                 if (mIncremental) {
0202                     const auto entityId = Identifier::fromDisplayByteArray(entity.identifier());
0203                     //We filter the potentially expensive query by the identifier that we actually require.
0204                     const auto matches = indexLookup("fulltext", comparator.value.toString(), {entityId});
0205                     if (!matches.contains(entityId)) {
0206                         SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to mismatch on fulltext filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value;
0207                         return false;
0208                     }
0209                 }
0210             } else if (!comparator.matches(property)) {
0211                 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value;
0212                 return false;
0213             }
0214         }
0215         return true;
0216     }
0217 };
0218 
0219 
0220 struct Aggregator {
0221     QueryBase::Aggregator::Operation operation;
0222     QByteArray property;
0223     QByteArray resultProperty;
0224 
0225     Aggregator(QueryBase::Aggregator::Operation o, const QByteArray &property_, const QByteArray &resultProperty_)
0226         : operation(o), property(property_), resultProperty(resultProperty_)
0227     {
0228 
0229     }
0230 
0231     void process(const QVariant &value) {
0232         if (operation == QueryBase::Aggregator::Collect) {
0233             mResult = mResult.toList() << value;
0234         } else if (operation == QueryBase::Aggregator::Count) {
0235             mResult = mResult.toInt() + 1;
0236         } else {
0237             Q_ASSERT(false);
0238         }
0239     }
0240 
0241     void process(const Sink::ApplicationDomain::ApplicationDomainType &entity) {
0242         if (!property.isEmpty()) {
0243             process(entity.getProperty(property));
0244         } else {
0245             process(QVariant{});
0246         }
0247     }
0248 
0249     void reset()
0250     {
0251         mResult.clear();
0252     }
0253 
0254     QVariant result() const
0255     {
0256         return mResult;
0257     }
0258 private:
0259     QVariant mResult;
0260 };
0261 
0262 class Reduce : public Filter {
0263 public:
0264     typedef QSharedPointer<Reduce> Ptr;
0265 
0266     struct PropertySelector {
0267         QueryBase::Reduce::Selector selector;
0268         QByteArray resultProperty;
0269 
0270         PropertySelector(QueryBase::Reduce::Selector s, const QByteArray &resultProperty_)
0271             : selector(s), resultProperty(resultProperty_)
0272         {
0273 
0274         }
0275 
0276         void process(const QVariant &value, const QVariant &selectionValue) {
0277             if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, selector.comparator)) {
0278                 selectionResultValue = selectionValue;
0279                 mResult = value;
0280             }
0281         }
0282 
0283         void reset()
0284         {
0285             selectionResultValue.clear();
0286             mResult.clear();
0287         }
0288 
0289         QVariant result() const
0290         {
0291             return mResult;
0292         }
0293     private:
0294 
0295         QVariant selectionResultValue;
0296         QVariant mResult;
0297     };
0298 
0299     QSet<QByteArray> mReducedValues;
0300     QSet<QByteArray> mIncrementallyReducedValues;
0301     QHash<QByteArray, Identifier> mSelectedValues;
0302     QByteArray mReductionProperty;
0303     QByteArray mSelectionProperty;
0304     QueryBase::Reduce::Selector::Comparator mSelectionComparator;
0305     QList<Aggregator> mAggregators;
0306     QList<PropertySelector> mSelectors;
0307 
0308     Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, QueryBase::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store)
0309         : Filter(source, store),
0310         mReductionProperty(reductionProperty),
0311         mSelectionProperty(selectionProperty),
0312         mSelectionComparator(comparator)
0313     {
0314 
0315     }
0316 
0317     ~Reduce() override{}
0318 
0319     void updateComplete() override
0320     {
0321         SinkTraceCtx(mDatastore->mLogCtx) << "Reduction update is complete.";
0322         mIncrementallyReducedValues.clear();
0323     }
0324 
0325     static QByteArray getByteArray(const QVariant &value) {
0326         if (value.type() == QVariant::DateTime) {
0327             return value.toDateTime().toString().toLatin1();
0328         }
0329         if (value.isValid() && !value.toByteArray().isEmpty()) {
0330             return value.toByteArray();
0331         }
0332         return QByteArray();
0333     }
0334 
0335     struct ReductionResult {
0336         Identifier selection;
0337         QVector<Identifier> aggregateIds;
0338         QMap<QByteArray, QVariant> aggregateValues;
0339     };
0340 
0341     ReductionResult reduceOnValue(const QVariant &reductionValue)
0342     {
0343         QMap<QByteArray, QVariant> aggregateValues;
0344         QVariant selectionResultValue;
0345         Identifier selectionResult;
0346         const auto results = indexLookup(mReductionProperty, reductionValue);
0347         for (auto &aggregator : mAggregators) {
0348             aggregator.reset();
0349         }
0350         for (auto &selector : mSelectors) {
0351             selector.reset();
0352         }
0353         QVector<Identifier> reducedAndFilteredResults;
0354         for (const auto &r : results) {
0355             readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
0356                 //We need to apply all property filters that we have until the reduction, because the index lookup was unfiltered.
0357                 if (!matchesFilter(entity)) {
0358                     return;
0359                 }
0360                 reducedAndFilteredResults << r;
0361                 Q_ASSERT(operation != Sink::Operation_Removal);
0362 
0363                 for (auto &aggregator : mAggregators) {
0364                     aggregator.process(entity);
0365                 }
0366 
0367                 const auto selectionValue = entity.getProperty(mSelectionProperty);
0368 
0369                 for (auto &selector : mSelectors) {
0370                     if (!selector.selector.property.isEmpty()) {
0371                         selector.process(entity.getProperty(selector.selector.property), selectionValue);
0372                     }
0373                 }
0374                 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
0375                     selectionResultValue = selectionValue;
0376                     selectionResult = Identifier::fromDisplayByteArray(entity.identifier());
0377                 }
0378             });
0379         }
0380 
0381         for (const auto &aggregator : mAggregators) {
0382             aggregateValues.insert(aggregator.resultProperty, aggregator.result());
0383         }
0384         for (const auto &selector : mSelectors) {
0385             aggregateValues.insert(selector.resultProperty, selector.result());
0386         }
0387         return {selectionResult, reducedAndFilteredResults, aggregateValues};
0388     }
0389 
0390     bool next(const std::function<void(const ResultSet::Result &)> &callback) override {
0391         bool foundValue = false;
0392         while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
0393                 const auto reductionValue = [&] {
0394                     const auto v = result.entity.getProperty(mReductionProperty);
0395                     //Because we also get Operation_Removal for filtered entities. We use the fact that actually removed entites
0396                     //won't have the property to reduce on.
0397                     //TODO: Perhaps find a cleaner solutoin than abusing Operation::Removed for filtered properties.
0398                     if (v.isNull() && result.operation == Sink::Operation_Removal) {
0399                         //For removals we have to read the last revision to get a value, and thus be able to find the correct thread.
0400                         QVariant reductionValue;
0401                         const auto id = Identifier::fromDisplayByteArray(result.entity.identifier());
0402                         readPrevious(id, [&] (const ApplicationDomain::ApplicationDomainType &prev) {
0403                             Q_ASSERT(result.entity.identifier() == prev.identifier());
0404                             reductionValue = prev.getProperty(mReductionProperty);
0405                         });
0406                         return reductionValue;
0407                     } else {
0408                         return v;
0409                     }
0410                 }();
0411                 if (reductionValue.isNull()) {
0412                     SinkTraceCtx(mDatastore->mLogCtx) << "No reduction value: " << result.entity.identifier();
0413                     //We failed to find a value to reduce on, so ignore this entity.
0414                     //Can happen if the entity was already removed and we have no previous revision.
0415                     return;
0416                 }
0417                 const auto reductionValueBa = getByteArray(reductionValue);
0418                 if (!mReducedValues.contains(reductionValueBa)) {
0419                     SinkTraceCtx(mDatastore->mLogCtx) << "Reducing new value: " << result.entity.identifier() << reductionValueBa;
0420                     //Only reduce every value once.
0421                     mReducedValues.insert(reductionValueBa);
0422                     auto reductionResult = reduceOnValue(reductionValue);
0423 
0424                     //This can happen if we get a removal message from a filtered entity and all entites of the reduction are filtered.
0425                     if (reductionResult.selection.isNull()) {
0426                         return;
0427                     }
0428                     mSelectedValues.insert(reductionValueBa, reductionResult.selection);
0429                     readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
0430                         callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds});
0431                         foundValue = true;
0432                     });
0433                 } else {
0434                     //During initial query, do nothing. The lookup above will take care of it.
0435                     //During updates adjust the reduction according to the modification/addition or removal
0436                     //We have to redo the reduction for every element, because of the aggregation values.
0437                     if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) {
0438                         SinkTraceCtx(mDatastore->mLogCtx) << "Incremental reduction update: " << result.entity.identifier() << reductionValueBa;
0439                         mIncrementallyReducedValues.insert(reductionValueBa);
0440                         //Redo the reduction to find new aggregated values
0441                         const auto selectionResult = reduceOnValue(reductionValue);
0442 
0443                         //If mSelectedValues did not contain the value, oldSelectionResult will be empty.(Happens if entites have been filtered)
0444                         const auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
0445                         SinkTraceCtx(mDatastore->mLogCtx) << "Old selection result: " << oldSelectionResult << " New selection result: " << selectionResult.selection;
0446                         if (selectionResult.selection.isNull() && oldSelectionResult.isNull()) {
0447                             //Nothing to do, the item was filtered before, and still is.
0448                         } else if (oldSelectionResult == selectionResult.selection) {
0449                             mSelectedValues.insert(reductionValueBa, selectionResult.selection);
0450                             Q_ASSERT(!selectionResult.selection.isNull());
0451                             readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
0452                                 callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds});
0453                             });
0454                         } else {
0455                             //remove old result
0456                             if (!oldSelectionResult.isNull()) {
0457                                 readEntity(oldSelectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
0458                                     callback({entity, Sink::Operation_Removal});
0459                                 });
0460                             }
0461 
0462                             //If the last item has been removed, then there's nothing to add
0463                             if (!selectionResult.selection.isNull()) {
0464                                 //add new result
0465                                 mSelectedValues.insert(reductionValueBa, selectionResult.selection);
0466                                 Q_ASSERT(!selectionResult.selection.isNull());
0467                                 readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
0468                                     callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds});
0469                                 });
0470                             }
0471                         }
0472                     }
0473                 }
0474             }))
0475         {}
0476         return foundValue;
0477     }
0478 };
0479 
0480 class Bloom : public Filter {
0481 public:
0482     typedef QSharedPointer<Bloom> Ptr;
0483 
0484     QByteArray mBloomProperty;
0485 
0486     Bloom(const QByteArray &bloomProperty, FilterBase::Ptr source, DataStoreQuery *store)
0487         : Filter(source, store),
0488         mBloomProperty(bloomProperty)
0489     {
0490 
0491     }
0492 
0493     ~Bloom() override{}
0494 
0495     bool next(const std::function<void(const ResultSet::Result &result)> &callback) override {
0496         if (!mBloomed) {
0497             //Initially we bloom on the first value that matches.
0498             //From there on we just filter.
0499             bool foundValue = false;
0500             while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
0501                     mBloomValue = result.entity.getProperty(mBloomProperty);
0502                     const auto results = indexLookup(mBloomProperty, mBloomValue);
0503                     for (const auto &r : results) {
0504                         readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
0505                             callback({entity, Sink::Operation_Creation});
0506                             SinkTraceCtx(mDatastore->mLogCtx) << "Bloom result: " << entity.identifier() << operationName(operation);
0507                             foundValue = true;
0508                         });
0509                     }
0510                     return false;
0511                 }))
0512             {}
0513             mBloomed = true;
0514             propertyFilter.insert({mBloomProperty}, mBloomValue);
0515             return foundValue;
0516         } else {
0517             //Filter on bloom value
0518             return Filter::next(callback);
0519         }
0520     }
0521     QVariant mBloomValue;
0522     bool mBloomed = false;
0523 };
0524 
0525 
0526 class ReferenceResolver : public Filter {
0527 public:
0528     typedef QSharedPointer<ReferenceResolver> Ptr;
0529 
0530     QByteArray mReferenceProperty;
0531     QList<Aggregator> mAggregators;
0532 
0533     ReferenceResolver(const QByteArray &referenceProperty, FilterBase::Ptr source, DataStoreQuery *store)
0534         : Filter(source, store),
0535         mReferenceProperty(referenceProperty)
0536     {
0537 
0538     }
0539 
0540     ~ReferenceResolver() override{}
0541 
0542     void resolveReference(const ApplicationDomain::ApplicationDomainType &entity) {
0543         auto parentFolder = entity.getProperty(mReferenceProperty).toByteArray();
0544         while (!parentFolder.isEmpty()) {
0545             //TODO abort on error
0546             readEntity(Identifier::fromDisplayByteArray(parentFolder), [&](const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation operation) {
0547                 for (auto &aggregator : mAggregators) {
0548                     aggregator.process(e);
0549                 }
0550 
0551                 parentFolder = e.getProperty(mReferenceProperty).toByteArray();
0552             });
0553         }
0554     }
0555 
0556     bool next(const std::function<void(const ResultSet::Result &result)> &callback) override {
0557         return mSource->next([this, callback](const ResultSet::Result &result) {
0558             for (auto &aggregator : mAggregators) {
0559                 aggregator.reset();
0560             }
0561             resolveReference(result.entity);
0562             QMap<QByteArray, QVariant> aggregateValues = result.aggregateValues;
0563             for (const auto &aggregator : mAggregators) {
0564                 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
0565             }
0566             callback(ResultSet::Result{result.entity, result.operation, aggregateValues, result.aggregateIds});
0567         });
0568     }
0569 };
0570 
0571 DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
0572     : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
0573 {
0574     //This is what we use during a new query
0575     setupQuery(query);
0576 }
0577 
0578 DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental)
0579     : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
0580 {
0581     //This is what we use when fetching more data, without having a new revision with incremental=false
0582     //And this is what we use when the data changed and we want to update with incremental = true
0583     mCollector = state.mCollector;
0584     mSource = state.mSource;
0585 
0586     auto source = mCollector;
0587     while (source) {
0588         source->mDatastore = this;
0589         source->mIncremental = incremental;
0590         source = source->mSource;
0591     }
0592 }
0593 
0594 DataStoreQuery::~DataStoreQuery()
0595 {
0596 
0597 }
0598 
0599 DataStoreQuery::State::Ptr DataStoreQuery::getState()
0600 {
0601     auto state = State::Ptr::create();
0602     state->mSource = mSource;
0603     state->mCollector = mCollector;
0604     return state;
0605 }
0606 
0607 void DataStoreQuery::readEntity(const Identifier &id, const BufferCallback &resultCallback)
0608 {
0609     mStore.readLatest(mType, id, resultCallback);
0610 }
0611 
0612 void DataStoreQuery::readPrevious(const Identifier &id, const std::function<void (const ApplicationDomain::ApplicationDomainType &)> &callback)
0613 {
0614     mStore.readPrevious(mType, id, mStore.maxRevision(), callback);
0615 }
0616 
0617 QVector<Identifier> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter)
0618 {
0619     QElapsedTimer timer;
0620     timer.start();
0621     const auto result =  mStore.indexLookup(mType, property, value, filter);
0622     if (timer.elapsed() > 2) {
0623         SinkLogCtx(mLogCtx) << "Index lookup returned " << result.size() << "results, in " << Sink::Log::TraceTime(timer.elapsed());
0624     }
0625     return result;
0626 }
0627 
0628 /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */
0629 /* { */
0630 /*     const bool sortingRequired = !sortProperty.isEmpty(); */
0631 /*     if (mInitialQuery && sortingRequired) { */
0632 /*         SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */
0633 /*         // Sort the complete set by reading the sort property and filling into a sorted map */
0634 /*         auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); */
0635 /*         while (resultSet.next()) { */
0636 /*             // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */
0637 /*             readEntity(resultSet.id(), */
0638 /*                 [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
0639 
0640 /*                     const auto operation = buffer.operation(); */
0641 
0642 /*                     // We're not interested in removals during the initial query */
0643 /*                     if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */
0644 /*                         if (!sortProperty.isEmpty()) { */
0645 /*                             const auto sortValue = getProperty(buffer.entity(), sortProperty); */
0646 /*                             if (sortValue.type() == QVariant::DateTime) { */
0647 /*                                 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); */
0648 /*                             } else { */
0649 /*                                 sortedMap->insert(sortValue.toString().toLatin1(), uid); */
0650 /*                             } */
0651 /*                         } else { */
0652 /*                             sortedMap->insert(uid, uid); */
0653 /*                         } */
0654 /*                     } */
0655 /*                 }); */
0656 /*         } */
0657 
0658 /*         SinkTrace() << "Sorted " << sortedMap->size() << " values."; */
0659 /*         auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); */
0660 /*         ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */
0661 /*             std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { */
0662 /*             if (iterator->hasNext()) { */
0663 /*                 readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
0664 /*                         callback(uid, buffer, Sink::Operation_Creation); */
0665 /*                     }); */
0666 /*                 return true; */
0667 /*             } */
0668 /*             return false; */
0669 /*         }; */
0670 
0671 /*         auto skip = [iterator]() { */
0672 /*             if (iterator->hasNext()) { */
0673 /*                 iterator->next(); */
0674 /*             } */
0675 /*         }; */
0676 /*         return ResultSet(generator, skip); */
0677 /*     } else { */
0678 /*         auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); */
0679 /*         ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */
0680 /*             if (resultSetPtr->next()) { */
0681 /*                 SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */
0682 /*                 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */
0683 /*                 readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
0684 /*                     const auto operation = buffer.operation(); */
0685 /*                     if (mInitialQuery) { */
0686 /*                         // We're not interested in removals during the initial query */
0687 /*                         if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */
0688 /*                             // In the initial set every entity is new */
0689 /*                             callback(uid, buffer, Sink::Operation_Creation); */
0690 /*                         } */
0691 /*                     } else { */
0692 /*                         // Always remove removals, they probably don't match due to non-available properties */
0693 /*                         if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */
0694 /*                             // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */
0695 /*                             callback(uid, buffer, operation); */
0696 /*                         } */
0697 /*                     } */
0698 /*                 }); */
0699 /*                 return true; */
0700 /*             } */
0701 /*             return false; */
0702 /*         }; */
0703 /*         auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */
0704 /*         return ResultSet(generator, skip); */
0705 /*     } */
0706 /* } */
0707 
0708 QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery)
0709 {
0710     Q_ASSERT(!subquery.type().isEmpty());
0711     auto sub = DataStoreQuery(subquery, subquery.type(), mStore);
0712     auto result = sub.execute();
0713     QByteArrayList ids;
0714     while (result.next([&ids](const ResultSet::Result &result) {
0715             ids << result.entity.identifier();
0716         }))
0717     {}
0718     return ids;
0719 }
0720 
0721 void DataStoreQuery::setupQuery(const Sink::QueryBase &query_)
0722 {
0723     auto query = query_;
0724     auto baseFilters = query.getBaseFilters();
0725     //Resolve any subqueries we have
0726     for (const auto &k : baseFilters.keys()) {
0727         const auto comparator = baseFilters.value(k);
0728         if (comparator.value.canConvert<Query>()) {
0729             SinkTraceCtx(mLogCtx) << "Executing subquery for property: " << k;
0730             const auto result = executeSubquery(comparator.value.value<Query>());
0731             baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In));
0732         }
0733     }
0734     query.setBaseFilters(baseFilters);
0735 
0736     QByteArray appliedSorting;
0737 
0738     //Determine initial set
0739     mSource = [&]() {
0740         if (!query.ids().isEmpty()) {
0741             //We have a set of ids as a starting point
0742             QVector<Identifier> ids;
0743             for (const auto & id: query.ids()) {
0744                 ids.append(Identifier::fromDisplayByteArray(id));
0745             }
0746 
0747             //If there is no bloom or reduction filter the result set is final (so we must reject new ids as we are not filtering them later on)
0748             const auto fs = query.getFilterStages();
0749             const bool resultSetIsFinal = !std::any_of(fs.cbegin(), fs.cend(), [&] (const auto &stage) {
0750                 return stage.template dynamicCast<Query::Reduce>() || stage.template dynamicCast<Query::Bloom>();
0751             });
0752 
0753             return Source::Ptr::create(ids, this, resultSetIsFinal);
0754         } else {
0755             QSet<QByteArrayList> appliedFilters;
0756             QElapsedTimer timer;
0757             timer.start();
0758             auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting);
0759             if (timer.elapsed() > 2) {
0760                 SinkLogCtx(mLogCtx) << "Index lookup returned " << resultSet.size() << "results, in " << Sink::Log::TraceTime(timer.elapsed());
0761             }
0762             if (!appliedFilters.isEmpty() || !appliedSorting.isEmpty()) {
0763                 //We have an index lookup as starting point
0764                 return Source::Ptr::create(resultSet, this);
0765             }
0766             // We do a full scan if there were no indexes available to create the initial set (this is going to be expensive for large sets).
0767             return Source::Ptr::create(mStore.fullScan(mType), this);
0768         }
0769     }();
0770 
0771     FilterBase::Ptr baseSet = mSource;
0772     if (!query.getBaseFilters().isEmpty()) {
0773         auto filter = Filter::Ptr::create(baseSet, this);
0774         //For incremental queries the remaining filters are not sufficient,
0775         //we have to check the properties that we used during the index lookup since we are not re-executing the index lookup.
0776         for (const auto &f : query.getBaseFilters().keys()) {
0777             filter->propertyFilter.insert(f, query.getFilter(f));
0778         }
0779         baseSet = filter;
0780     }
0781     /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */
0782     /*     //Apply manual sorting */
0783     /*     baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */
0784     /* } */
0785 
0786     //Setup the rest of the filter stages on top of the base set
0787     for (const auto &stage : query.getFilterStages()) {
0788         if (auto filter = stage.dynamicCast<Query::Filter>()) {
0789             auto f = Filter::Ptr::create(baseSet, this);
0790             f->propertyFilter = filter->propertyFilter;
0791             baseSet = f;
0792         } else if (auto filter = stage.dynamicCast<Query::Reduce>()) {
0793             auto reduction = ::Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this);
0794             for (const auto &aggregator : qAsConst(filter->aggregators)) {
0795                 reduction->mAggregators << ::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty);
0796             }
0797             for (const auto &propertySelector : qAsConst(filter->propertySelectors)) {
0798                 reduction->mSelectors << ::Reduce::PropertySelector(propertySelector.selector, propertySelector.resultProperty);
0799             }
0800             reduction->propertyFilter = query.getBaseFilters();
0801             baseSet = reduction;
0802         } else if (auto filter = stage.dynamicCast<Query::ReferenceResolver>()) {
0803             auto reduction = ::ReferenceResolver::Ptr::create(filter->referenceProperty, baseSet, this);
0804             for (const auto &aggregator : qAsConst(filter->aggregators)) {
0805                 reduction->mAggregators << ::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty);
0806             }
0807             baseSet = reduction;
0808         } else if (auto filter = stage.dynamicCast<Query::Bloom>()) {
0809             baseSet = Bloom::Ptr::create(filter->property, baseSet, this);
0810         }
0811     }
0812 
0813     if (query.getPostQueryFilter()) {
0814         auto f = Filter::Ptr::create(baseSet, this);
0815         f->filterFunction = query.getPostQueryFilter();
0816         baseSet = f;
0817     }
0818 
0819     mCollector = Collector::Ptr::create(baseSet, this);
0820 }
0821 
0822 QVector<Key> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision)
0823 {
0824     QVector<Key> changedKeys;
0825     mStore.readRevisions(baseRevision, mType, [&](const Key &key) {
0826         changedKeys << key;
0827     });
0828     return changedKeys;
0829 }
0830 
0831 ResultSet DataStoreQuery::update(qint64 baseRevision)
0832 {
0833     SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision << " to revision " << mStore.maxRevision();
0834     auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
0835     SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet;
0836     mSource->add(incrementalResultSet);
0837     ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
0838         if (mCollector->next([this, callback](const ResultSet::Result &result) {
0839                 SinkTraceCtx(mLogCtx) << "Got incremental result: " << result.entity.identifier() << operationName(result.operation);
0840                 callback(result);
0841             }))
0842         {
0843             return true;
0844         }
0845         return false;
0846     };
0847     return ResultSet(generator, [this]() { mCollector->skip(); });
0848 }
0849 
0850 void DataStoreQuery::updateComplete()
0851 {
0852     mSource->mIncrementalIds.clear();
0853     mSource->mHaveIncrementalChanges = false;
0854     auto source = mCollector;
0855     while (source) {
0856         source->updateComplete();
0857         source = source->mSource;
0858     }
0859 }
0860 
0861 ResultSet DataStoreQuery::execute()
0862 {
0863     SinkTraceCtx(mLogCtx) << "Executing query";
0864 
0865     Q_ASSERT(mCollector);
0866     ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
0867         return mCollector->next([this, callback](const ResultSet::Result &result) {
0868                 if (result.operation != Sink::Operation_Removal) {
0869                     SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation;
0870                     callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds});
0871                 }
0872             });
0873     };
0874     return ResultSet(generator, [this]() { mCollector->skip(); });
0875 }