File indexing completed on 2024-06-16 05:00:54
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #include "entitystore.h" 0021 0022 #include <QDir> 0023 #include <QFile> 0024 0025 #include "entitybuffer.h" 0026 #include "log.h" 0027 #include "typeindex.h" 0028 #include "definitions.h" 0029 #include "resourcecontext.h" 0030 #include "index.h" 0031 #include "bufferutils.h" 0032 #include "entity_generated.h" 0033 #include "typeimplementations.h" 0034 0035 using namespace Sink; 0036 using namespace Sink::Storage; 0037 0038 template <typename T, typename First> 0039 void mergeImpl(T &map, First f) 0040 { 0041 for (auto it = f.constBegin(); it != f.constEnd(); it++) { 0042 map.insert(it.key(), it.value()); 0043 } 0044 } 0045 0046 template <typename T, typename First, typename ... Tail> 0047 void mergeImpl(T &map, First f, Tail ...maps) 0048 { 0049 for (auto it = f.constBegin(); it != f.constEnd(); it++) { 0050 map.insert(it.key(), it.value()); 0051 } 0052 mergeImpl<T, Tail...>(map, maps...); 0053 } 0054 0055 template <typename First, typename ... Tail> 0056 First merge(First f, Tail ...maps) 0057 { 0058 First map; 0059 mergeImpl(map, f, maps...); 0060 return map; 0061 } 0062 0063 template <class T> 0064 struct DbLayoutHelper { 0065 void operator()(QMap<QByteArray, int> map) const { 0066 mergeImpl(map, ApplicationDomain::TypeImplementation<T>::typeDatabases()); 0067 } 0068 }; 0069 0070 static Sink::Storage::DbLayout dbLayout(const QByteArray &instanceId) 0071 { 0072 static auto databases = [] { 0073 QMap<QByteArray, int> map; 0074 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Mail>::typeDatabases()); 0075 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Folder>::typeDatabases()); 0076 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Contact>::typeDatabases()); 0077 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Addressbook>::typeDatabases()); 0078 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Calendar>::typeDatabases()); 0079 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Event>::typeDatabases()); 0080 mergeImpl(map, ApplicationDomain::TypeImplementation<ApplicationDomain::Todo>::typeDatabases()); 0081 return merge(Storage::DataStore::baseDbs(), map); 0082 }(); 0083 return {instanceId, databases}; 0084 } 0085 0086 0087 class EntityStore::Private { 0088 public: 0089 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) 0090 { 0091 } 0092 0093 ResourceContext resourceContext; 0094 DataStore::Transaction transaction; 0095 QHash<QByteArray, QSharedPointer<TypeIndex> > indexByType; 0096 Sink::Log::Context logCtx; 0097 0098 bool exists() 0099 { 0100 return Storage::DataStore::exists(Sink::storageLocation(), resourceContext.instanceId()); 0101 } 0102 0103 DataStore::Transaction &getTransaction() 0104 { 0105 if (transaction) { 0106 return transaction; 0107 } 0108 0109 DataStore store(Sink::storageLocation(), dbLayout(resourceContext.instanceId()), DataStore::ReadOnly); 0110 transaction = store.createTransaction(DataStore::ReadOnly); 0111 return transaction; 0112 } 0113 0114 template <class T> 0115 struct ConfigureHelper { 0116 void operator()(TypeIndex &arg) const { 0117 ApplicationDomain::TypeImplementation<T>::configure(arg); 0118 } 0119 }; 0120 0121 TypeIndex &cachedIndex(const QByteArray &type) 0122 { 0123 if (indexByType.contains(type)) { 0124 return *indexByType.value(type); 0125 } 0126 auto index = QSharedPointer<TypeIndex>::create(type, logCtx); 0127 TypeHelper<ConfigureHelper>{type}.template operator()<void>(*index); 0128 indexByType.insert(type, index); 0129 return *index; 0130 0131 } 0132 0133 TypeIndex &typeIndex(const QByteArray &type) 0134 { 0135 auto &index = cachedIndex(type); 0136 index.mTransaction = &transaction; 0137 return index; 0138 } 0139 0140 ApplicationDomainType createApplicationDomainType(const QByteArray &type, const QByteArray &uid, qint64 revision, const EntityBuffer &buffer) 0141 { 0142 auto adaptor = resourceContext.adaptorFactory(type).createAdaptor(buffer.entity(), &typeIndex(type)); 0143 return ApplicationDomainType{resourceContext.instanceId(), uid, revision, adaptor}; 0144 } 0145 }; 0146 0147 EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx) 0148 : d(new EntityStore::Private{context, ctx}) 0149 { 0150 0151 } 0152 0153 void EntityStore::initialize() 0154 { 0155 //This function is only called in the resource code where we want to be able to write to the databse. 0156 0157 //Check for the existience of the db without creating it or the envrionment. 0158 //This is required to be able to set the database version only in the case where we create a new database. 0159 if (!Storage::DataStore::exists(Sink::storageLocation(), d->resourceContext.instanceId())) { 0160 //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. 0161 startTransaction(DataStore::ReadWrite); 0162 //Create the database with the correct version if it wasn't existing before 0163 SinkLogCtx(d->logCtx) << "Creating resource database."; 0164 Storage::DataStore::setDatabaseVersion(d->transaction, Sink::latestDatabaseVersion()); 0165 } else { 0166 //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. 0167 startTransaction(DataStore::ReadWrite); 0168 } 0169 commitTransaction(); 0170 } 0171 0172 void EntityStore::startTransaction(DataStore::AccessMode accessMode) 0173 { 0174 SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode; 0175 Q_ASSERT(!d->transaction); 0176 d->transaction = DataStore(Sink::storageLocation(), dbLayout(d->resourceContext.instanceId()), accessMode).createTransaction(accessMode); 0177 } 0178 0179 void EntityStore::commitTransaction() 0180 { 0181 SinkTraceCtx(d->logCtx) << "Committing transaction"; 0182 0183 for (const auto &type : d->indexByType.keys()) { 0184 d->typeIndex(type).commitTransaction(); 0185 } 0186 0187 Q_ASSERT(d->transaction); 0188 d->transaction.commit(); 0189 d->transaction = {}; 0190 } 0191 0192 void EntityStore::abortTransaction() 0193 { 0194 SinkTraceCtx(d->logCtx) << "Aborting transaction"; 0195 d->transaction.abort(); 0196 d->transaction = {}; 0197 } 0198 0199 bool EntityStore::hasTransaction() const 0200 { 0201 return d->transaction; 0202 } 0203 0204 bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool replayToSource) 0205 { 0206 if (entity.identifier().isEmpty()) { 0207 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; 0208 return false; 0209 } 0210 0211 SinkTraceCtx(d->logCtx) << "New entity " << entity; 0212 0213 const auto identifier = Identifier::fromDisplayByteArray(entity.identifier()); 0214 0215 d->typeIndex(type).add(identifier, entity, d->transaction, d->resourceContext.instanceId()); 0216 0217 //The maxRevision may have changed meanwhile if the entity created sub-entities 0218 const qint64 newRevision = maxRevision() + 1; 0219 0220 // Add metadata buffer 0221 flatbuffers::FlatBufferBuilder metadataFbb; 0222 auto metadataBuilder = MetadataBuilder(metadataFbb); 0223 metadataBuilder.add_revision(newRevision); 0224 metadataBuilder.add_operation(Operation_Creation); 0225 metadataBuilder.add_replayToSource(replayToSource); 0226 auto metadataBuffer = metadataBuilder.Finish(); 0227 FinishMetadataBuffer(metadataFbb, metadataBuffer); 0228 0229 flatbuffers::FlatBufferBuilder fbb; 0230 d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 0231 0232 const auto key = Key(identifier, newRevision); 0233 0234 DataStore::mainDatabase(d->transaction, type) 0235 .write(newRevision, BufferUtils::extractBuffer(fbb), 0236 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); 0237 0238 DataStore::setMaxRevision(d->transaction, newRevision); 0239 DataStore::recordRevision(d->transaction, newRevision, identifier, type); 0240 DataStore::recordUid(d->transaction, identifier, type); 0241 SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type; 0242 return true; 0243 } 0244 0245 ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const 0246 { 0247 SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties; 0248 auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); 0249 0250 // Apply diff 0251 for (const auto &property : diff.changedProperties()) { 0252 if (!excludeProperties.contains(property)) { 0253 const auto value = diff.getProperty(property); 0254 if (value.isValid()) { 0255 newEntity.setProperty(property, value); 0256 } 0257 } 0258 } 0259 0260 // Remove deletions 0261 for (const auto &property : deletions) { 0262 if (!excludeProperties.contains(property)) { 0263 newEntity.setProperty(property, QVariant()); 0264 } 0265 } 0266 return newEntity; 0267 } 0268 0269 bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) 0270 { 0271 const auto current = readLatest(type, diff.identifier()); 0272 if (current.identifier().isEmpty()) { 0273 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); 0274 return false; 0275 } 0276 0277 auto newEntity = applyDiff(type, current, diff, deletions); 0278 return modify(type, current, newEntity, replayToSource); 0279 } 0280 0281 bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource) 0282 { 0283 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; 0284 //This should not normally happen and is possibly a client defect 0285 if (newEntity.changedProperties().isEmpty()) { 0286 SinkWarningCtx(d->logCtx) << "Attempted an empty modification: " << newEntity; 0287 return false; 0288 } 0289 0290 const auto identifier = Identifier::fromDisplayByteArray(newEntity.identifier()); 0291 d->typeIndex(type).modify(identifier, current, newEntity, d->transaction, d->resourceContext.instanceId()); 0292 0293 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; 0294 0295 // Add metadata buffer 0296 flatbuffers::FlatBufferBuilder metadataFbb; 0297 { 0298 //We add availableProperties to account for the properties that have been changed by the preprocessors 0299 auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); 0300 auto metadataBuilder = MetadataBuilder(metadataFbb); 0301 metadataBuilder.add_revision(newRevision); 0302 metadataBuilder.add_operation(Operation_Modification); 0303 metadataBuilder.add_replayToSource(replayToSource); 0304 metadataBuilder.add_modifiedProperties(modifiedProperties); 0305 auto metadataBuffer = metadataBuilder.Finish(); 0306 FinishMetadataBuffer(metadataFbb, metadataBuffer); 0307 } 0308 SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); 0309 0310 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 0311 0312 flatbuffers::FlatBufferBuilder fbb; 0313 d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 0314 0315 DataStore::mainDatabase(d->transaction, type) 0316 .write(newRevision, BufferUtils::extractBuffer(fbb), 0317 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); 0318 0319 DataStore::setMaxRevision(d->transaction, newRevision); 0320 DataStore::recordRevision(d->transaction, newRevision, identifier, type); 0321 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; 0322 return true; 0323 } 0324 0325 bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource) 0326 { 0327 const auto uid = current.identifier(); 0328 if (!exists(type, uid)) { 0329 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; 0330 return false; 0331 } 0332 const auto identifier = Identifier::fromDisplayByteArray(uid); 0333 d->typeIndex(type).remove(identifier, current, d->transaction, d->resourceContext.instanceId()); 0334 0335 SinkTraceCtx(d->logCtx) << "Removed entity " << current; 0336 0337 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; 0338 0339 // Add metadata buffer 0340 flatbuffers::FlatBufferBuilder metadataFbb; 0341 auto metadataBuilder = MetadataBuilder(metadataFbb); 0342 metadataBuilder.add_revision(newRevision); 0343 metadataBuilder.add_operation(Operation_Removal); 0344 metadataBuilder.add_replayToSource(replayToSource); 0345 auto metadataBuffer = metadataBuilder.Finish(); 0346 FinishMetadataBuffer(metadataFbb, metadataBuffer); 0347 0348 flatbuffers::FlatBufferBuilder fbb; 0349 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 0350 0351 DataStore::mainDatabase(d->transaction, type) 0352 .write(newRevision, BufferUtils::extractBuffer(fbb), 0353 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); 0354 0355 DataStore::setMaxRevision(d->transaction, newRevision); 0356 DataStore::recordRevision(d->transaction, newRevision, identifier, type); 0357 DataStore::removeUid(d->transaction, identifier, type); 0358 return true; 0359 } 0360 0361 void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) 0362 { 0363 const auto internalUid = DataStore::getUidFromRevision(d->transaction, revision); 0364 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); 0365 if (bufferType.isEmpty() || internalUid.isNull()) { 0366 SinkErrorCtx(d->logCtx) << "Failed to find revision during cleanup: " << revision; 0367 Q_ASSERT(false); 0368 return; 0369 } 0370 SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << internalUid << bufferType; 0371 0372 // Remove old revisions 0373 const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, internalUid, revision); 0374 0375 for (const auto &revisionToRemove : revisionsToRemove) { 0376 DataStore::removeRevision(d->transaction, revisionToRemove); 0377 DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove); 0378 } 0379 0380 // And remove the specified revision only if marked for removal 0381 DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) { 0382 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 0383 if (!buffer.isValid()) { 0384 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; 0385 return false; 0386 } 0387 0388 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 0389 if (metadata->operation() == Operation_Removal) { 0390 DataStore::removeRevision(d->transaction, revision); 0391 DataStore::mainDatabase(d->transaction, bufferType).remove(revision); 0392 } 0393 0394 return false; 0395 }); 0396 0397 DataStore::setCleanedUpRevision(d->transaction, revision); 0398 } 0399 0400 bool EntityStore::cleanupRevisions(qint64 revision) 0401 { 0402 Q_ASSERT(d->exists()); 0403 bool implicitTransaction = false; 0404 if (!d->transaction) { 0405 startTransaction(DataStore::ReadWrite); 0406 Q_ASSERT(d->transaction); 0407 implicitTransaction = true; 0408 } 0409 const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction); 0410 const auto firstRevisionToCleanup = lastCleanRevision + 1; 0411 bool cleanupIsNecessary = firstRevisionToCleanup <= revision; 0412 if (cleanupIsNecessary) { 0413 SinkTraceCtx(d->logCtx) << "Cleaning up from " << firstRevisionToCleanup << " to " << revision; 0414 for (qint64 rev = firstRevisionToCleanup; rev <= revision; rev++) { 0415 cleanupEntityRevisionsUntil(rev); 0416 } 0417 } 0418 if (implicitTransaction) { 0419 commitTransaction(); 0420 } 0421 return cleanupIsNecessary; 0422 } 0423 0424 qint64 EntityStore::lastCleanRevision() 0425 { 0426 if (!d->exists()) { 0427 return 0; 0428 } 0429 bool implicitTransaction = false; 0430 if (!d->transaction) { 0431 startTransaction(DataStore::ReadOnly); 0432 Q_ASSERT(d->transaction); 0433 implicitTransaction = true; 0434 } 0435 const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction); 0436 if (implicitTransaction) { 0437 abortTransaction(); 0438 } 0439 return lastCleanRevision; 0440 } 0441 0442 0443 QVector<Identifier> EntityStore::fullScan(const QByteArray &type) 0444 { 0445 SinkTraceCtx(d->logCtx) << "Looking for : " << type; 0446 if (!d->exists()) { 0447 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; 0448 return {}; 0449 } 0450 0451 QSet<Identifier> keys; 0452 0453 DataStore::getUids(type, d->getTransaction(), [&keys] (const Identifier &id) { 0454 keys << id; 0455 }); 0456 0457 SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; 0458 return keys.toList().toVector(); 0459 } 0460 0461 QVector<Identifier> EntityStore::indexLookup(const QByteArray &type, const QueryBase &query, QSet<QByteArrayList> &appliedFilters, QByteArray &appliedSorting) 0462 { 0463 if (!d->exists()) { 0464 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; 0465 return {}; 0466 } 0467 return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction(), d->resourceContext.instanceId()); 0468 } 0469 0470 QVector<Identifier> EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const QVector<Sink::Storage::Identifier> &filter) 0471 { 0472 if (!d->exists()) { 0473 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; 0474 return {}; 0475 } 0476 return d->typeIndex(type).lookup(property, value, d->getTransaction(), d->resourceContext.instanceId(), filter); 0477 } 0478 0479 void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function<void(const QByteArray &uid)> &callback) 0480 { 0481 if (!d->exists()) { 0482 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; 0483 return; 0484 } 0485 const auto list = indexLookup(type, property, value, QVector<Sink::Storage::Identifier>{}); 0486 for (const auto &id : list) { 0487 callback(id.toDisplayByteArray()); 0488 } 0489 /* Index index(type + ".index." + property, d->transaction); */ 0490 /* index.lookup(value, [&](const QByteArray &sinkId) { */ 0491 /* callback(sinkId); */ 0492 /* }, */ 0493 /* [&](const Index::Error &error) { */ 0494 /* SinkWarningCtx(d->logCtx) << "Error in index: " << error.message << property; */ 0495 /* }); */ 0496 } 0497 0498 void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback) 0499 { 0500 Q_ASSERT(d); 0501 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id); 0502 if (!revision) { 0503 //This is not an error. We rely on this when looking for an id in resource that don't have it. 0504 SinkTraceCtx(d->logCtx) << "Failed to readLatest: " << type << id; 0505 return; 0506 } 0507 auto db = DataStore::mainDatabase(d->getTransaction(), type); 0508 db.scan(revision, 0509 [=](size_t, const QByteArray &value) { 0510 callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); 0511 return false; 0512 }, 0513 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); 0514 } 0515 0516 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback) 0517 { 0518 readLatest(type, Identifier::fromDisplayByteArray(uid), callback); 0519 } 0520 0521 void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function<void(const ApplicationDomainType &)> &callback) 0522 { 0523 readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { 0524 //TODO cache max revision for the duration of the transaction. 0525 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); 0526 }); 0527 } 0528 0529 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> &callback) 0530 { 0531 readLatest(type, Identifier::fromDisplayByteArray(uid), callback); 0532 } 0533 0534 void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function<void(const ApplicationDomainType &, Sink::Operation)> &callback) 0535 { 0536 readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { 0537 //TODO cache max revision for the duration of the transaction. 0538 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer), buffer.operation()); 0539 }); 0540 } 0541 0542 void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &, Sink::Operation)> &callback) 0543 { 0544 readLatest(type, Identifier::fromDisplayByteArray(uid), callback); 0545 } 0546 0547 ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid) 0548 { 0549 ApplicationDomainType dt; 0550 readLatest(type, uid, [&](const ApplicationDomainType &entity) { 0551 dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties()); 0552 }); 0553 return dt; 0554 } 0555 0556 void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKey, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback) 0557 { 0558 const auto key = Key::fromDisplayByteArray(displayKey); 0559 auto db = DataStore::mainDatabase(d->getTransaction(), type); 0560 db.scan(key.revision().toSizeT(), 0561 [=](size_t rev, const QByteArray &value) -> bool { 0562 const auto uid = DataStore::getUidFromRevision(d->transaction, rev); 0563 callback(uid.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); 0564 return false; 0565 }, 0566 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; }); 0567 } 0568 0569 void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> &callback) 0570 { 0571 readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { 0572 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); 0573 }); 0574 } 0575 0576 ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid) 0577 { 0578 ApplicationDomainType dt; 0579 readEntity(type, uid, [&](const ApplicationDomainType &entity) { 0580 dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties()); 0581 }); 0582 return dt; 0583 } 0584 0585 0586 void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomainType &entity)> &callback) 0587 { 0588 readAllUids(type, [&] (const QByteArray &uid) { 0589 readLatest(type, Identifier::fromDisplayByteArray(uid), callback); 0590 }); 0591 } 0592 0593 void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const Key &key)> &callback) 0594 { 0595 qint64 revisionCounter = baseRevision; 0596 const qint64 topRevision = DataStore::maxRevision(d->getTransaction()); 0597 // Spit out the revision keys one by one. 0598 while (revisionCounter <= topRevision) { 0599 const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter); 0600 const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter); 0601 // SinkTrace() << "Revision" << *revisionCounter << type << uid; 0602 Q_ASSERT(!uid.isNull()); 0603 Q_ASSERT(!type.isEmpty()); 0604 if (type != expectedType) { 0605 // Skip revision 0606 revisionCounter++; 0607 continue; 0608 } 0609 const auto key = Key(uid, revisionCounter); 0610 revisionCounter++; 0611 callback(key); 0612 } 0613 } 0614 0615 void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> &callback) 0616 { 0617 const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id, revision); 0618 const size_t latestRevision = previousRevisions[previousRevisions.size() - 1]; 0619 const auto key = Key(id, latestRevision); 0620 readEntity(type, key.toDisplayByteArray(), callback); 0621 } 0622 0623 void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const ApplicationDomainType &)> &callback) 0624 { 0625 readPrevious(type, id, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) { 0626 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); 0627 }); 0628 } 0629 0630 ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision) 0631 { 0632 ApplicationDomainType dt; 0633 readPrevious(type, id, revision, [&](const ApplicationDomainType &entity) { 0634 dt = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(entity, entity.availableProperties()); 0635 }); 0636 return dt; 0637 } 0638 0639 void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> &callback) 0640 { 0641 DataStore::getUids(type, d->getTransaction(), [&] (const Identifier &uid) { callback(uid.toDisplayByteArray()); }); 0642 } 0643 0644 bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid) 0645 { 0646 Q_ASSERT(!uid.isEmpty()); 0647 return !DataStore::getRevisionsFromUid(d->getTransaction(), Identifier::fromDisplayByteArray(uid)).isEmpty(); 0648 } 0649 0650 bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) 0651 { 0652 bool found = false; 0653 bool alreadyRemoved = false; 0654 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), Identifier::fromDisplayByteArray(uid)); 0655 DataStore::mainDatabase(d->transaction, type) 0656 .scan(revision, 0657 [&found, &alreadyRemoved](size_t, const QByteArray &data) { 0658 auto entity = GetEntity(data.data()); 0659 if (entity && entity->metadata()) { 0660 auto metadata = GetMetadata(entity->metadata()->Data()); 0661 found = true; 0662 if (metadata->operation() == Operation_Removal) { 0663 alreadyRemoved = true; 0664 } 0665 } 0666 return true; 0667 }, 0668 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); 0669 if (!found) { 0670 SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; 0671 return false; 0672 } 0673 if (alreadyRemoved) { 0674 SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; 0675 return false; 0676 } 0677 return true; 0678 } 0679 0680 void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, size_t startingRevision, 0681 const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> &callback) 0682 { 0683 Q_ASSERT(d); 0684 Q_ASSERT(!uid.isEmpty()); 0685 0686 const auto revisions = DataStore::getRevisionsFromUid(d->transaction, Identifier::fromDisplayByteArray(uid)); 0687 0688 const auto db = DataStore::mainDatabase(d->transaction, type); 0689 0690 for (const auto revision : revisions) { 0691 if (revision < static_cast<size_t>(startingRevision)) { 0692 continue; 0693 } 0694 0695 db.scan(revision, 0696 [&](size_t rev, const QByteArray &value) { 0697 Q_ASSERT(rev == revision); 0698 callback(uid, revision, Sink::EntityBuffer(value.data(), value.size())); 0699 return false; 0700 }, 0701 [&](const DataStore::Error &error) { 0702 SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; 0703 }); 0704 } 0705 } 0706 0707 qint64 EntityStore::maxRevision() 0708 { 0709 if (!d->exists()) { 0710 SinkTraceCtx(d->logCtx) << "Database is not existing."; 0711 return 0; 0712 } 0713 return DataStore::maxRevision(d->getTransaction()); 0714 } 0715 0716 Sink::Log::Context EntityStore::logContext() const 0717 { 0718 return d->logCtx; 0719 }