File indexing completed on 2024-06-16 05:00:54

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "entitystore.h"
0021 
0022 #include <QDir>
0023 #include <QFile>
0024 
0025 #include "entitybuffer.h"
0026 #include "log.h"
0027 #include "typeindex.h"
0028 #include "definitions.h"
0029 #include "resourcecontext.h"
0030 #include "index.h"
0031 #include "bufferutils.h"
0032 #include "entity_generated.h"
0033 #include "typeimplementations.h"
0034 
0035 using namespace Sink;
0036 using namespace Sink::Storage;
0037 
0038 template <typename T, typename First>
0039 void mergeImpl(T &map, First f)
0040 {
0041     for (auto it = f.constBegin(); it != f.constEnd(); it++) {
0042         map.insert(it.key(), it.value());
0043     }
0044 }
0045 
0046 template <typename T, typename First, typename ... Tail>
0047 void mergeImpl(T &map, First f, Tail ...maps)
0048 {
0049     for (auto it = f.constBegin(); it != f.constEnd(); it++) {
0050         map.insert(it.key(), it.value());
0051     }
0052     mergeImpl<T, Tail...>(map, maps...);
0053 }
0054 
0055 template <typename First, typename ... Tail>
0056 First merge(First f, Tail ...maps)
0057 {
0058     First map;
0059     mergeImpl(map, f, maps...);
0060     return map;
0061 }
0062 
0063 template <class T>
0064 struct DbLayoutHelper {
0065     void operator()(QMap<QByteArray, int> map) const {
0066         mergeImpl(map, ApplicationDomain::TypeImplementation<T>::typeDatabases());
0067     }
0068 };
0069 
0070 static Sink::Storage::DbLayout dbLayout(const QByteArray &instanceId)
0071 {
0072     static auto databases = [] {
0073         QMap<QByteArray, int> map;
0074         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Mail>::typeDatabases());
0075         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Folder>::typeDatabases());
0076         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Contact>::typeDatabases());
0077         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Addressbook>::typeDatabases());
0078         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Calendar>::typeDatabases());
0079         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Event>::typeDatabases());
0080         mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Todo>::typeDatabases());
0081         return merge(Storage::DataStore::baseDbs(), map);
0082     }();
0083     return {instanceId, databases};
0084 }
0085 
0086 
0087 class EntityStore::Private {
0088 public:
0089     Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore"))
0090     {
0091     }
0092 
0093     ResourceContext resourceContext;
0094     DataStore::Transaction transaction;
0095     QHash<QByteArray, QSharedPointer<TypeIndex> > indexByType;
0096     Sink::Log::Context logCtx;
0097 
0098     bool exists()
0099     {
0100         return Storage::DataStore::exists(Sink::storageLocation(), resourceContext.instanceId());
0101     }
0102 
0103     DataStore::Transaction &getTransaction()
0104     {
0105         if (transaction) {
0106             return transaction;
0107         }
0108 
0109         DataStore store(Sink::storageLocation(), dbLayout(resourceContext.instanceId()), DataStore::ReadOnly);
0110         transaction = store.createTransaction(DataStore::ReadOnly);
0111         return transaction;
0112     }
0113 
0114     template <class T>
0115     struct ConfigureHelper {
0116         void operator()(TypeIndex &arg) const {
0117             ApplicationDomain::TypeImplementation<T>::configure(arg);
0118         }
0119     };
0120 
0121     TypeIndex &cachedIndex(const QByteArray &type)
0122     {
0123         if (indexByType.contains(type)) {
0124             return *indexByType.value(type);
0125         }
0126         auto index = QSharedPointer<TypeIndex>::create(type, logCtx);
0127         TypeHelper<ConfigureHelper>{type}.template operator()<void>(*index);
0128         indexByType.insert(type, index);
0129         return *index;
0130 
0131     }
0132 
0133     TypeIndex &typeIndex(const QByteArray &type)
0134     {
0135         auto &index = cachedIndex(type);
0136         index.mTransaction = &transaction;
0137         return index;
0138     }
0139 
0140     ApplicationDomainType createApplicationDomainType(const QByteArray &type, const QByteArray &uid, qint64 revision, const EntityBuffer &buffer)
0141     {
0142         auto adaptor = resourceContext.adaptorFactory(type).createAdaptor(buffer.entity(), &typeIndex(type));
0143         return ApplicationDomainType{resourceContext.instanceId(), uid, revision, adaptor};
0144     }
0145 };
0146 
0147 EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx)
0148     : d(new EntityStore::Private{context, ctx})
0149 {
0150 
0151 }
0152 
0153 void EntityStore::initialize()
0154 {
0155     //This function is only called in the resource code where we want to be able to write to the databse.
0156 
0157     //Check for the existience of the db without creating it or the envrionment.
0158     //This is required to be able to set the database version only in the case where we create a new database.
0159     if (!Storage::DataStore::exists(Sink::storageLocation(), d->resourceContext.instanceId())) {
0160         //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail.
0161         startTransaction(DataStore::ReadWrite);
0162         //Create the database with the correct version if it wasn't existing before
0163         SinkLogCtx(d->logCtx) << "Creating resource database.";
0164         Storage::DataStore::setDatabaseVersion(d->transaction, Sink::latestDatabaseVersion());
0165     } else {
0166         //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail.
0167         startTransaction(DataStore::ReadWrite);
0168     }
0169     commitTransaction();
0170 }
0171 
0172 void EntityStore::startTransaction(DataStore::AccessMode accessMode)
0173 {
0174     SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode;
0175     Q_ASSERT(!d->transaction);
0176     d->transaction = DataStore(Sink::storageLocation(), dbLayout(d->resourceContext.instanceId()), accessMode).createTransaction(accessMode);
0177 }
0178 
0179 void EntityStore::commitTransaction()
0180 {
0181     SinkTraceCtx(d->logCtx) << "Committing transaction";
0182 
0183     for (const auto &type : d->indexByType.keys()) {
0184         d->typeIndex(type).commitTransaction();
0185     }
0186 
0187     Q_ASSERT(d->transaction);
0188     d->transaction.commit();
0189     d->transaction = {};
0190 }
0191 
0192 void EntityStore::abortTransaction()
0193 {
0194     SinkTraceCtx(d->logCtx) << "Aborting transaction";
0195     d->transaction.abort();
0196     d->transaction = {};
0197 }
0198 
0199 bool EntityStore::hasTransaction() const
0200 {
0201     return d->transaction;
0202 }
0203 
0204 bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool replayToSource)
0205 {
0206     if (entity.identifier().isEmpty()) {
0207         SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier";
0208         return false;
0209     }
0210 
0211     SinkTraceCtx(d->logCtx) << "New entity " << entity;
0212 
0213     const auto identifier = Identifier::fromDisplayByteArray(entity.identifier());
0214 
0215     d->typeIndex(type).add(identifier, entity, d->transaction, d->resourceContext.instanceId());
0216 
0217     //The maxRevision may have changed meanwhile if the entity created sub-entities
0218     const qint64 newRevision = maxRevision() + 1;
0219 
0220     // Add metadata buffer
0221     flatbuffers::FlatBufferBuilder metadataFbb;
0222     auto metadataBuilder = MetadataBuilder(metadataFbb);
0223     metadataBuilder.add_revision(newRevision);
0224     metadataBuilder.add_operation(Operation_Creation);
0225     metadataBuilder.add_replayToSource(replayToSource);
0226     auto metadataBuffer = metadataBuilder.Finish();
0227     FinishMetadataBuffer(metadataFbb, metadataBuffer);
0228 
0229     flatbuffers::FlatBufferBuilder fbb;
0230     d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
0231 
0232     const auto key = Key(identifier, newRevision);
0233 
0234     DataStore::mainDatabase(d->transaction, type)
0235         .write(newRevision, BufferUtils::extractBuffer(fbb),
0236             [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; });
0237 
0238     DataStore::setMaxRevision(d->transaction, newRevision);
0239     DataStore::recordRevision(d->transaction, newRevision, identifier, type);
0240     DataStore::recordUid(d->transaction, identifier, type);
0241     SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type;
0242     return true;
0243 }
0244 
0245 ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const
0246 {
0247     SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties;
0248     auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties());
0249 
0250     // Apply diff
0251     for (const auto &property : diff.changedProperties()) {
0252         if (!excludeProperties.contains(property)) {
0253             const auto value = diff.getProperty(property);
0254             if (value.isValid()) {
0255                 newEntity.setProperty(property, value);
0256             }
0257         }
0258     }
0259 
0260     // Remove deletions
0261     for (const auto &property : deletions) {
0262         if (!excludeProperties.contains(property)) {
0263             newEntity.setProperty(property, QVariant());
0264         }
0265     }
0266     return newEntity;
0267 }
0268 
0269 bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource)
0270 {
0271     const auto current = readLatest(type, diff.identifier());
0272     if (current.identifier().isEmpty()) {
0273         SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
0274         return false;
0275     }
0276 
0277     auto newEntity = applyDiff(type, current, diff, deletions);
0278     return modify(type, current, newEntity, replayToSource);
0279 }
0280 
0281 bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &current, ApplicationDomainType newEntity, bool replayToSource)
0282 {
0283     SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
0284     //This should not normally happen and is possibly a client defect
0285     if (newEntity.changedProperties().isEmpty()) {
0286         SinkWarningCtx(d->logCtx) << "Attempted an empty modification: " << newEntity;
0287         return false;
0288     }
0289 
0290     const auto identifier = Identifier::fromDisplayByteArray(newEntity.identifier());
0291     d->typeIndex(type).modify(identifier, current, newEntity, d->transaction, d->resourceContext.instanceId());
0292 
0293     const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
0294 
0295     // Add metadata buffer
0296     flatbuffers::FlatBufferBuilder metadataFbb;
0297     {
0298         //We add availableProperties to account for the properties that have been changed by the preprocessors
0299         auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties());
0300         auto metadataBuilder = MetadataBuilder(metadataFbb);
0301         metadataBuilder.add_revision(newRevision);
0302         metadataBuilder.add_operation(Operation_Modification);
0303         metadataBuilder.add_replayToSource(replayToSource);
0304         metadataBuilder.add_modifiedProperties(modifiedProperties);
0305         auto metadataBuffer = metadataBuilder.Finish();
0306         FinishMetadataBuffer(metadataFbb, metadataBuffer);
0307     }
0308     SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties();
0309 
0310     newEntity.setChangedProperties(newEntity.availableProperties().toSet());
0311 
0312     flatbuffers::FlatBufferBuilder fbb;
0313     d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
0314 
0315     DataStore::mainDatabase(d->transaction, type)
0316         .write(newRevision, BufferUtils::extractBuffer(fbb),
0317             [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; });
0318 
0319     DataStore::setMaxRevision(d->transaction, newRevision);
0320     DataStore::recordRevision(d->transaction, newRevision, identifier, type);
0321     SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
0322     return true;
0323 }
0324 
0325 bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &current, bool replayToSource)
0326 {
0327     const auto uid = current.identifier();
0328     if (!exists(type, uid)) {
0329         SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
0330         return false;
0331     }
0332     const auto identifier = Identifier::fromDisplayByteArray(uid);
0333     d->typeIndex(type).remove(identifier, current, d->transaction, d->resourceContext.instanceId());
0334 
0335     SinkTraceCtx(d->logCtx) << "Removed entity " << current;
0336 
0337     const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
0338 
0339     // Add metadata buffer
0340     flatbuffers::FlatBufferBuilder metadataFbb;
0341     auto metadataBuilder = MetadataBuilder(metadataFbb);
0342     metadataBuilder.add_revision(newRevision);
0343     metadataBuilder.add_operation(Operation_Removal);
0344     metadataBuilder.add_replayToSource(replayToSource);
0345     auto metadataBuffer = metadataBuilder.Finish();
0346     FinishMetadataBuffer(metadataFbb, metadataBuffer);
0347 
0348     flatbuffers::FlatBufferBuilder fbb;
0349     EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
0350 
0351     DataStore::mainDatabase(d->transaction, type)
0352         .write(newRevision, BufferUtils::extractBuffer(fbb),
0353             [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; });
0354 
0355     DataStore::setMaxRevision(d->transaction, newRevision);
0356     DataStore::recordRevision(d->transaction, newRevision, identifier, type);
0357     DataStore::removeUid(d->transaction, identifier, type);
0358     return true;
0359 }
0360 
0361 void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
0362 {
0363     const auto internalUid = DataStore::getUidFromRevision(d->transaction, revision);
0364     const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
0365     if (bufferType.isEmpty() || internalUid.isNull()) {
0366         SinkErrorCtx(d->logCtx) << "Failed to find revision during cleanup: " << revision;
0367         Q_ASSERT(false);
0368         return;
0369     }
0370     SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << internalUid << bufferType;
0371 
0372     // Remove old revisions
0373     const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, internalUid, revision);
0374 
0375     for (const auto &revisionToRemove : revisionsToRemove) {
0376         DataStore::removeRevision(d->transaction, revisionToRemove);
0377         DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove);
0378     }
0379 
0380     // And remove the specified revision only if marked for removal
0381     DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) {
0382         EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
0383         if (!buffer.isValid()) {
0384             SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
0385             return false;
0386         }
0387 
0388         const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
0389         if (metadata->operation() == Operation_Removal) {
0390             DataStore::removeRevision(d->transaction, revision);
0391             DataStore::mainDatabase(d->transaction, bufferType).remove(revision);
0392         }
0393 
0394         return false;
0395     });
0396 
0397     DataStore::setCleanedUpRevision(d->transaction, revision);
0398 }
0399 
0400 bool EntityStore::cleanupRevisions(qint64 revision)
0401 {
0402     Q_ASSERT(d->exists());
0403     bool implicitTransaction = false;
0404     if (!d->transaction) {
0405         startTransaction(DataStore::ReadWrite);
0406         Q_ASSERT(d->transaction);
0407         implicitTransaction = true;
0408     }
0409     const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction);
0410     const auto firstRevisionToCleanup = lastCleanRevision + 1;
0411     bool cleanupIsNecessary = firstRevisionToCleanup <= revision;
0412     if (cleanupIsNecessary) {
0413         SinkTraceCtx(d->logCtx) << "Cleaning up from " << firstRevisionToCleanup << " to " << revision;
0414         for (qint64 rev = firstRevisionToCleanup; rev <= revision; rev++) {
0415             cleanupEntityRevisionsUntil(rev);
0416         }
0417     }
0418     if (implicitTransaction) {
0419         commitTransaction();
0420     }
0421     return cleanupIsNecessary;
0422 }
0423 
0424 qint64 EntityStore::lastCleanRevision()
0425 {
0426     if (!d->exists()) {
0427         return 0;
0428     }
0429     bool implicitTransaction = false;
0430     if (!d->transaction) {
0431         startTransaction(DataStore::ReadOnly);
0432         Q_ASSERT(d->transaction);
0433         implicitTransaction = true;
0434     }
0435     const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction);
0436     if (implicitTransaction) {
0437         abortTransaction();
0438     }
0439     return lastCleanRevision;
0440 }
0441 
0442 
0443 QVector<Identifier> EntityStore::fullScan(const QByteArray &type)
0444 {
0445     SinkTraceCtx(d->logCtx) << "Looking for : " << type;
0446     if (!d->exists()) {
0447         SinkTraceCtx(d->logCtx) << "Database is not existing: " << type;
0448         return {};
0449     }
0450 
0451     QSet<Identifier> keys;
0452 
0453     DataStore::getUids(type, d->getTransaction(), [&keys] (const Identifier &id) {
0454         keys << id;
0455     });
0456 
0457     SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results.";
0458     return keys.toList().toVector();
0459 }
0460 
0461 QVector<Identifier> EntityStore::indexLookup(const QByteArray &type, const QueryBase &query, QSet<QByteArrayList> &appliedFilters, QByteArray &appliedSorting)
0462 {
0463     if (!d->exists()) {
0464         SinkTraceCtx(d->logCtx) << "Database is not existing: " << type;
0465         return {};
0466     }
0467     return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction(), d->resourceContext.instanceId());
0468 }
0469 
0470 QVector<Identifier> EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter)
0471 {
0472     if (!d->exists()) {
0473         SinkTraceCtx(d->logCtx) << "Database is not existing: " << type;
0474         return {};
0475     }
0476     return d->typeIndex(type).lookup(property, value, d->getTransaction(), d->resourceContext.instanceId(), filter);
0477 }
0478 
0479 void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function<void(const QByteArray &uid)> &callback)
0480 {
0481     if (!d->exists()) {
0482         SinkTraceCtx(d->logCtx) << "Database is not existing: " << type;
0483         return;
0484     }
0485     const auto list = indexLookup(type, property, value, QVector<Sink::Storage::Identifier>{});
0486     for (const auto &id : list) {
0487         callback(id.toDisplayByteArray());
0488     }
0489     /* Index index(type + ".index." + property, d->transaction); */
0490     /* index.lookup(value, [&](const QByteArray &sinkId) { */
0491     /*     callback(sinkId); */
0492     /* }, */
0493     /* [&](const Index::Error &error) { */
0494     /*     SinkWarningCtx(d->logCtx) << "Error in index: " <<  error.message << property; */
0495     /* }); */
0496 }
0497 
0498 void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback)
0499 {
0500     Q_ASSERT(d);
0501     const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id);
0502     if (!revision) {
0503         //This is not an error. We rely on this when looking for an id in resource that don't have it.
0504         SinkTraceCtx(d->logCtx) << "Failed to readLatest: " << type << id;
0505         return;
0506     }
0507     auto db = DataStore::mainDatabase(d->getTransaction(), type);
0508     db.scan(revision,
0509         [=](size_t, const QByteArray &value) {
0510             callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size()));
0511             return false;
0512         },
0513         [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; });
0514 }
0515 
0516 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback)
0517 {
0518     readLatest(type, Identifier::fromDisplayByteArray(uid), callback);
0519 }
0520 
0521 void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function<void(const ApplicationDomainType &)> &callback)
0522 {
0523     readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
0524         //TODO cache max revision for the duration of the transaction.
0525         callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer));
0526     });
0527 }
0528 
0529 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> &callback)
0530 {
0531     readLatest(type, Identifier::fromDisplayByteArray(uid), callback);
0532 }
0533 
0534 void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function<void(const ApplicationDomainType &, Sink::Operation)> &callback)
0535 {
0536     readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
0537         //TODO cache max revision for the duration of the transaction.
0538         callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer), buffer.operation());
0539     });
0540 }
0541 
0542 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &, Sink::Operation)> &callback)
0543 {
0544     readLatest(type, Identifier::fromDisplayByteArray(uid), callback);
0545 }
0546 
0547 ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid)
0548 {
0549     ApplicationDomainType dt;
0550     readLatest(type, uid, [&](const ApplicationDomainType &entity) {
0551         dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties());
0552     });
0553     return dt;
0554 }
0555 
0556 void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKey, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback)
0557 {
0558     const auto key = Key::fromDisplayByteArray(displayKey);
0559     auto db = DataStore::mainDatabase(d->getTransaction(), type);
0560     db.scan(key.revision().toSizeT(),
0561         [=](size_t rev, const QByteArray &value) -> bool {
0562             const auto uid = DataStore::getUidFromRevision(d->transaction, rev);
0563             callback(uid.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size()));
0564             return false;
0565         },
0566         [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; });
0567 }
0568 
0569 void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> &callback)
0570 {
0571     readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
0572         callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer));
0573     });
0574 }
0575 
0576 ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid)
0577 {
0578     ApplicationDomainType dt;
0579     readEntity(type, uid, [&](const ApplicationDomainType &entity) {
0580         dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties());
0581     });
0582     return dt;
0583 }
0584 
0585 
0586 void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomainType &entity)> &callback)
0587 {
0588     readAllUids(type, [&] (const QByteArray &uid) {
0589         readLatest(type, Identifier::fromDisplayByteArray(uid), callback);
0590     });
0591 }
0592 
0593 void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const Key &key)> &callback)
0594 {
0595     qint64 revisionCounter = baseRevision;
0596     const qint64 topRevision = DataStore::maxRevision(d->getTransaction());
0597     // Spit out the revision keys one by one.
0598     while (revisionCounter <= topRevision) {
0599         const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter);
0600         const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter);
0601         // SinkTrace() << "Revision" << *revisionCounter << type << uid;
0602         Q_ASSERT(!uid.isNull());
0603         Q_ASSERT(!type.isEmpty());
0604         if (type != expectedType) {
0605             // Skip revision
0606             revisionCounter++;
0607             continue;
0608         }
0609         const auto key = Key(uid, revisionCounter);
0610         revisionCounter++;
0611         callback(key);
0612     }
0613 }
0614 
0615 void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback)
0616 {
0617     const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id, revision);
0618     const size_t latestRevision = previousRevisions[previousRevisions.size() - 1];
0619     const auto key = Key(id, latestRevision);
0620     readEntity(type, key.toDisplayByteArray(), callback);
0621 }
0622 
0623 void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const ApplicationDomainType &)> &callback)
0624 {
0625     readPrevious(type, id, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) {
0626         callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer));
0627     });
0628 }
0629 
0630 ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision)
0631 {
0632     ApplicationDomainType dt;
0633     readPrevious(type, id, revision, [&](const ApplicationDomainType &entity) {
0634         dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties());
0635     });
0636     return dt;
0637 }
0638 
0639 void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> &callback)
0640 {
0641     DataStore::getUids(type, d->getTransaction(), [&] (const Identifier &uid) { callback(uid.toDisplayByteArray()); });
0642 }
0643 
0644 bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid)
0645 {
0646     Q_ASSERT(!uid.isEmpty());
0647     return !DataStore::getRevisionsFromUid(d->getTransaction(), Identifier::fromDisplayByteArray(uid)).isEmpty();
0648 }
0649 
0650 bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
0651 {
0652     bool found = false;
0653     bool alreadyRemoved = false;
0654     const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), Identifier::fromDisplayByteArray(uid));
0655     DataStore::mainDatabase(d->transaction, type)
0656         .scan(revision,
0657             [&found, &alreadyRemoved](size_t, const QByteArray &data) {
0658                 auto entity = GetEntity(data.data());
0659                 if (entity && entity->metadata()) {
0660                     auto metadata = GetMetadata(entity->metadata()->Data());
0661                     found = true;
0662                     if (metadata->operation() == Operation_Removal) {
0663                         alreadyRemoved = true;
0664                     }
0665                 }
0666                 return true;
0667             },
0668             [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
0669     if (!found) {
0670         SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
0671         return false;
0672     }
0673     if (alreadyRemoved) {
0674         SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
0675         return false;
0676     }
0677     return true;
0678 }
0679 
0680 void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, size_t startingRevision,
0681     const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> &callback)
0682 {
0683     Q_ASSERT(d);
0684     Q_ASSERT(!uid.isEmpty());
0685 
0686     const auto revisions = DataStore::getRevisionsFromUid(d->transaction, Identifier::fromDisplayByteArray(uid));
0687 
0688     const auto db = DataStore::mainDatabase(d->transaction, type);
0689 
0690     for (const auto revision : revisions) {
0691         if (revision < static_cast<size_t>(startingRevision)) {
0692             continue;
0693         }
0694 
0695         db.scan(revision,
0696             [&](size_t rev, const QByteArray &value) {
0697                 Q_ASSERT(rev == revision);
0698                 callback(uid, revision, Sink::EntityBuffer(value.data(), value.size()));
0699                 return false;
0700             },
0701             [&](const DataStore::Error &error) {
0702                 SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message;
0703             });
0704     }
0705 }
0706 
0707 qint64 EntityStore::maxRevision()
0708 {
0709     if (!d->exists()) {
0710         SinkTraceCtx(d->logCtx) << "Database is not existing.";
0711         return 0;
0712     }
0713     return DataStore::maxRevision(d->getTransaction());
0714 }
0715 
0716 Sink::Log::Context EntityStore::logContext() const
0717 {
0718     return d->logCtx;
0719 }