File indexing completed on 2024-05-12 05:25:58

0001 /*
0002  *   Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  *   This program is free software; you can redistribute it and/or modify
0005  *   it under the terms of the GNU General Public License as published by
0006  *   the Free Software Foundation; either version 2 of the License, or
0007  *   (at your option) any later version.
0008  *
0009  *   This program is distributed in the hope that it will be useful,
0010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
0011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0012  *   GNU General Public License for more details.
0013  *
0014  *   You should have received a copy of the GNU General Public License
0015  *   along with this program; if not, write to the
0016  *   Free Software Foundation, Inc.,
0017  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
0018  */
0019 #pragma once
0020 
0021 #include "sink_export.h"
0022 
0023 #include "query.h"
0024 #include "resultset.h"
0025 #include "log.h"
0026 #include "storage/entitystore.h"
0027 #include "storage/key.h"
0028 
0029 class Source;
0030 class Bloom;
0031 class Reduce;
0032 class Filter;
0033 class FilterBase;
0034 
0035 class SINK_EXPORT DataStoreQuery {
0036     friend class FilterBase;
0037     friend class Source;
0038     friend class Bloom;
0039     friend class Reduce;
0040     friend class Filter;
0041 public:
0042     typedef QSharedPointer<DataStoreQuery> Ptr;
0043 
0044     struct State {
0045         typedef QSharedPointer<State> Ptr;
0046         QSharedPointer<FilterBase> mCollector;
0047         QSharedPointer<Source> mSource;
0048     };
0049 
0050     DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store);
0051     DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental);
0052     ~DataStoreQuery();
0053     ResultSet execute();
0054     ResultSet update(qint64 baseRevision);
0055     void updateComplete();
0056 
0057     State::Ptr getState();
0058 
0059 private:
0060 
0061     typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction;
0062     typedef std::function<void(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> BufferCallback;
0063 
0064     QVector<Sink::Storage::Identifier> indexLookup(const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter = {});
0065 
0066     void readEntity(const Sink::Storage::Identifier &id, const BufferCallback &resultCallback);
0067     void readPrevious(const Sink::Storage::Identifier &id, const std::function<void (const Sink::ApplicationDomain::ApplicationDomainType &)> &callback);
0068 
0069     ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &);
0070     QVector<Sink::Storage::Key> loadIncrementalResultSet(qint64 baseRevision);
0071 
0072     void setupQuery(const Sink::QueryBase &query_);
0073     QByteArrayList executeSubquery(const Sink::QueryBase &subquery);
0074 
0075     const QByteArray mType;
0076     QSharedPointer<FilterBase> mCollector;
0077     QSharedPointer<Source> mSource;
0078 
0079     Sink::Storage::EntityStore &mStore;
0080     Sink::Log::Context mLogCtx;
0081 };
0082 
0083 
0084 class FilterBase {
0085 public:
0086     typedef QSharedPointer<FilterBase> Ptr;
0087     FilterBase(DataStoreQuery *store)
0088         : mDatastore(store)
0089     {
0090 
0091     }
0092 
0093     FilterBase(FilterBase::Ptr source, DataStoreQuery *store)
0094         : mSource(source),
0095         mDatastore(store)
0096     {
0097     }
0098 
0099     virtual ~FilterBase(){}
0100 
0101     void readEntity(const Sink::Storage::Identifier &id, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> &callback)
0102     {
0103         Q_ASSERT(mDatastore);
0104         mDatastore->readEntity(id, callback);
0105     }
0106 
0107     QVector<Sink::Storage::Identifier> indexLookup(const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter = {})
0108     {
0109         Q_ASSERT(mDatastore);
0110         return mDatastore->indexLookup(property, value, filter);
0111     }
0112 
0113     void readPrevious(const Sink::Storage::Identifier &id, const std::function<void (const Sink::ApplicationDomain::ApplicationDomainType &)> &callback)
0114     {
0115         Q_ASSERT(mDatastore);
0116         mDatastore->readPrevious(id, callback);
0117     }
0118 
0119     virtual void skip() { mSource->skip(); }
0120 
0121     //Returns true for as long as a result is available
0122     virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0;
0123 
0124     virtual void updateComplete() { }
0125 
0126     FilterBase::Ptr mSource;
0127     DataStoreQuery *mDatastore{nullptr};
0128     bool mIncremental = false;
0129 };
0130