File indexing completed on 2024-05-12 05:26:01
0001 /* 0002 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 0003 * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com> 0004 * 0005 * This library is free software; you can redistribute it and/or 0006 * modify it under the terms of the GNU Lesser General Public 0007 * License as published by the Free Software Foundation; either 0008 * version 2.1 of the License, or (at your option) version 3, or any 0009 * later version accepted by the membership of KDE e.V. (or its 0010 * successor approved by the membership of KDE e.V.), which shall 0011 * act as a proxy defined in Section 6 of version 3 of the license. 0012 * 0013 * This library is distributed in the hope that it will be useful, 0014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0016 * Lesser General Public License for more details. 0017 * 0018 * You should have received a copy of the GNU Lesser General Public 0019 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0020 */ 0021 0022 #include "pipeline.h" 0023 0024 #include <QByteArray> 0025 #include <QVector> 0026 #include <QDebug> 0027 #include <QTime> 0028 #include "entity_generated.h" 0029 #include "metadata_generated.h" 0030 #include "createentity_generated.h" 0031 #include "modifyentity_generated.h" 0032 #include "deleteentity_generated.h" 0033 #include "entitybuffer.h" 0034 #include "log.h" 0035 #include "domain/applicationdomaintype.h" 0036 #include "adaptorfactoryregistry.h" 0037 #include "definitions.h" 0038 #include "bufferutils.h" 0039 #include "storage/entitystore.h" 0040 #include "store.h" 0041 0042 using namespace Sink; 0043 using namespace Sink::Storage; 0044 0045 class Pipeline::Private 0046 { 0047 public: 0048 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false) 0049 { 0050 } 0051 0052 Sink::Log::Context logCtx; 0053 ResourceContext resourceContext; 0054 Storage::EntityStore entityStore; 0055 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 0056 bool revisionChanged; 0057 QTime transactionTime; 0058 int transactionItemCount; 0059 }; 0060 0061 0062 Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx)) 0063 { 0064 //Create main store immediately on first start 0065 d->entityStore.initialize(); 0066 } 0067 0068 Pipeline::~Pipeline() 0069 { 0070 } 0071 0072 void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 0073 { 0074 auto &list = d->processors[entityType]; 0075 list.clear(); 0076 for (auto p : processors) { 0077 p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this, &d->entityStore); 0078 list.append(QSharedPointer<Preprocessor>(p)); 0079 } 0080 } 0081 0082 void Pipeline::startTransaction() 0083 { 0084 // TODO call for all types 0085 // But avoid doing it during cleanup 0086 // for (auto processor : d->processors[bufferType]) { 0087 // processor->startBatch(); 0088 // } 0089 SinkTraceCtx(d->logCtx) << "Starting transaction."; 0090 d->transactionTime.start(); 0091 d->transactionItemCount = 0; 0092 d->entityStore.startTransaction(DataStore::ReadWrite); 0093 } 0094 0095 void Pipeline::commit() 0096 { 0097 // TODO call for all types 0098 // But avoid doing it during cleanup 0099 // for (auto processor : d->processors[bufferType]) { 0100 // processor->finalize(); 0101 // } 0102 if (!d->revisionChanged) { 0103 d->entityStore.abortTransaction(); 0104 return; 0105 } 0106 const auto revision = d->entityStore.maxRevision(); 0107 d->entityStore.commitTransaction(); 0108 const auto elapsed = d->transactionTime.elapsed(); 0109 SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 0110 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 0111 if (d->revisionChanged) { 0112 d->revisionChanged = false; 0113 emit revisionUpdated(revision); 0114 } 0115 } 0116 0117 KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 0118 { 0119 d->transactionItemCount++; 0120 0121 { 0122 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 0123 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 0124 SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer"; 0125 return KAsync::error<qint64>(); 0126 } 0127 } 0128 auto createEntity = Commands::GetCreateEntity(command); 0129 0130 const bool replayToSource = createEntity->replayToSource(); 0131 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 0132 QByteArray key; 0133 if (createEntity->entityId()) { 0134 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 0135 if (!key.isEmpty() && d->entityStore.contains(bufferType, key)) { 0136 SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key; 0137 return KAsync::error<qint64>(); 0138 } 0139 } 0140 0141 if (key.isEmpty()) { 0142 key = DataStore::generateUid(); 0143 } 0144 SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 0145 Q_ASSERT(!key.isEmpty()); 0146 0147 { 0148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 0149 if (!VerifyEntityBuffer(verifyer)) { 0150 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; 0151 return KAsync::error<qint64>(); 0152 } 0153 } 0154 auto entity = GetEntity(createEntity->delta()->Data()); 0155 if (!entity->resource()->size() && !entity->local()->size()) { 0156 SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity."; 0157 return KAsync::error<qint64>(); 0158 } 0159 0160 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 0161 if (!adaptorFactory) { 0162 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType << d->resourceContext.resourceType; 0163 return KAsync::error<qint64>(); 0164 } 0165 0166 auto adaptor = adaptorFactory->createAdaptor(*entity); 0167 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(); 0168 Sink::ApplicationDomain::copyBuffer(*adaptor, *memoryAdaptor); 0169 0170 d->revisionChanged = true; 0171 auto revision = d->entityStore.maxRevision(); 0172 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; 0173 o.setChangedProperties(o.availableProperties().toSet()); 0174 0175 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties()); 0176 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 0177 0178 foreach (const auto &processor, d->processors[bufferType]) { 0179 processor->newEntity(newEntity); 0180 } 0181 0182 if (!d->entityStore.add(bufferType, newEntity, replayToSource)) { 0183 return KAsync::error<qint64>(); 0184 } 0185 0186 return KAsync::value(d->entityStore.maxRevision()); 0187 } 0188 0189 template <class T> 0190 struct CreateHelper { 0191 KAsync::Job<void> operator()(const ApplicationDomain::ApplicationDomainType &arg) const { 0192 return Sink::Store::create<T>(T{arg}); 0193 } 0194 }; 0195 0196 static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) 0197 { 0198 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity); 0199 } 0200 0201 KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 0202 { 0203 d->transactionItemCount++; 0204 0205 { 0206 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 0207 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 0208 SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer"; 0209 return KAsync::error<qint64>(); 0210 } 0211 } 0212 auto modifyEntity = Commands::GetModifyEntity(command); 0213 Q_ASSERT(modifyEntity); 0214 QList<QByteArray> changeset; 0215 if (modifyEntity->modifiedProperties()) { 0216 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 0217 } else { 0218 SinkWarningCtx(d->logCtx) << "No changeset available"; 0219 } 0220 const qint64 baseRevision = modifyEntity->revision(); 0221 const bool replayToSource = modifyEntity->replayToSource(); 0222 0223 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 0224 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 0225 SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 0226 if (bufferType.isEmpty() || key.isEmpty()) { 0227 SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key; 0228 return KAsync::error<qint64>(); 0229 } 0230 { 0231 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 0232 if (!VerifyEntityBuffer(verifyer)) { 0233 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; 0234 return KAsync::error<qint64>(); 0235 } 0236 } 0237 0238 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 0239 if (!adaptorFactory) { 0240 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; 0241 return KAsync::error<qint64>(); 0242 } 0243 0244 auto diffEntity = GetEntity(modifyEntity->delta()->Data()); 0245 Q_ASSERT(diffEntity); 0246 Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; 0247 diff.setChangedProperties(changeset.toSet()); 0248 0249 QByteArrayList deletions; 0250 if (modifyEntity->deletions()) { 0251 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 0252 } 0253 0254 Sink::ApplicationDomain::ApplicationDomainType current; 0255 bool alreadyRemoved = false; 0256 0257 d->entityStore.readLatest(bufferType, diff.identifier(), [&](const QByteArray &uid, const EntityBuffer &buffer) { 0258 if (buffer.operation() == Sink::Operation_Removal) { 0259 alreadyRemoved = true; 0260 } else { 0261 auto entity = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(buffer.entity())}; 0262 current = *Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(entity, entity.availableProperties()); 0263 } 0264 }); 0265 0266 if (alreadyRemoved) { 0267 SinkWarningCtx(d->logCtx) << "Tried to modify a removed entity: " << diff.identifier(); 0268 return KAsync::error<qint64>(); 0269 } 0270 0271 if (current.identifier().isEmpty()) { 0272 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); 0273 return KAsync::error<qint64>(); 0274 } 0275 0276 //We avoid overwriting local changes that haven't been played back yet with remote modifications 0277 QSet<QByteArray> excludeProperties; 0278 if (!replayToSource) { //We assume this means the change is coming from the source already 0279 d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) { 0280 if (entity.metadataBuffer()) { 0281 if (auto metadata = GetMetadata(entity.metadataBuffer())) { 0282 if (metadata->operation() == Operation_Modification && metadata->modifiedProperties()) { 0283 const auto locallyModifiedProperties = BufferUtils::fromVector(*metadata->modifiedProperties()).toSet(); 0284 SinkTraceCtx(d->logCtx) << "Protecting locally modified properties: " << locallyModifiedProperties; 0285 excludeProperties += locallyModifiedProperties; 0286 } 0287 } 0288 } 0289 }); 0290 } 0291 0292 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties); 0293 0294 bool isMove = false; 0295 if (modifyEntity->targetResource()) { 0296 isMove = modifyEntity->removeEntity(); 0297 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); 0298 } 0299 0300 foreach (const auto &processor, d->processors[bufferType]) { 0301 bool exitLoop = false; 0302 const auto result = processor->process(Preprocessor::Modification, current, newEntity); 0303 switch (result.action) { 0304 case Preprocessor::MoveToResource: 0305 isMove = true; 0306 exitLoop = true; 0307 break; 0308 case Preprocessor::CopyToResource: 0309 isMove = true; 0310 exitLoop = true; 0311 break; 0312 case Preprocessor::DropModification: 0313 SinkTraceCtx(d->logCtx) << "Dropping modification"; 0314 return KAsync::error<qint64>(); 0315 case Preprocessor::NoAction: 0316 case Preprocessor::DeleteEntity: 0317 default: 0318 break; 0319 } 0320 if (exitLoop) { 0321 break; 0322 } 0323 } 0324 0325 //The entity is either being copied or moved 0326 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { 0327 auto copy = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(newEntity, newEntity.availableProperties()); 0328 copy.setResource(newEntity.resourceInstanceIdentifier()); 0329 copy.setChangedProperties(copy.availableProperties().toSet()); 0330 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << copy.identifier() << copy.resourceInstanceIdentifier(); 0331 return create(bufferType, copy) 0332 .then([=](const KAsync::Error &error) { 0333 if (!error) { 0334 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 0335 if (isMove) { 0336 flatbuffers::FlatBufferBuilder fbb; 0337 auto entityId = fbb.CreateString(current.identifier().toStdString()); 0338 auto type = fbb.CreateString(bufferType.toStdString()); 0339 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 0340 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 0341 const auto data = BufferUtils::extractBuffer(fbb); 0342 deletedEntity(data, data.size()).exec(); 0343 } 0344 } else { 0345 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); 0346 } 0347 }) 0348 .then([this] { 0349 return d->entityStore.maxRevision(); 0350 }); 0351 } 0352 0353 d->revisionChanged = true; 0354 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { 0355 return KAsync::error<qint64>(); 0356 } 0357 0358 return KAsync::value(d->entityStore.maxRevision()); 0359 } 0360 0361 KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 0362 { 0363 d->transactionItemCount++; 0364 0365 { 0366 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 0367 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 0368 SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer"; 0369 return KAsync::error<qint64>(); 0370 } 0371 } 0372 auto deleteEntity = Commands::GetDeleteEntity(command); 0373 0374 const bool replayToSource = deleteEntity->replayToSource(); 0375 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 0376 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 0377 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 0378 0379 const auto current = d->entityStore.readLatest(bufferType, key); 0380 0381 foreach (const auto &processor, d->processors[bufferType]) { 0382 processor->deletedEntity(current); 0383 } 0384 0385 d->revisionChanged = true; 0386 if (!d->entityStore.remove(bufferType, current, replayToSource)) { 0387 return KAsync::error<qint64>(); 0388 } 0389 0390 return KAsync::value(d->entityStore.maxRevision()); 0391 } 0392 0393 void Pipeline::cleanupRevisions(qint64 revision) 0394 { 0395 //We have to set revisionChanged, otherwise a call to commit might abort 0396 //the transaction when not using the implicit internal transaction 0397 d->revisionChanged = d->entityStore.cleanupRevisions(revision); 0398 } 0399 0400 0401 class Preprocessor::Private { 0402 public: 0403 QByteArray resourceType; 0404 QByteArray resourceInstanceIdentifier; 0405 Pipeline *pipeline; 0406 Storage::EntityStore *entityStore; 0407 }; 0408 0409 Preprocessor::Preprocessor() : d(new Preprocessor::Private) 0410 { 0411 } 0412 0413 Preprocessor::~Preprocessor() 0414 { 0415 } 0416 0417 void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline, Storage::EntityStore *entityStore) 0418 { 0419 d->resourceType = resourceType; 0420 d->resourceInstanceIdentifier = resourceInstanceIdentifier; 0421 d->pipeline = pipeline; 0422 d->entityStore = entityStore; 0423 } 0424 0425 void Preprocessor::startBatch() 0426 { 0427 } 0428 0429 void Preprocessor::finalizeBatch() 0430 { 0431 } 0432 0433 void Preprocessor::newEntity(ApplicationDomain::ApplicationDomainType &newEntity) 0434 { 0435 0436 } 0437 0438 void Preprocessor::modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) 0439 { 0440 0441 } 0442 0443 void Preprocessor::deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) 0444 { 0445 0446 } 0447 0448 Preprocessor::Result Preprocessor::process(Type type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType &diff) 0449 { 0450 switch(type) { 0451 case Creation: 0452 newEntity(diff); 0453 break; 0454 case Modification: 0455 modifiedEntity(current, diff); 0456 break; 0457 case Deletion: 0458 deletedEntity(current); 0459 break; 0460 default: 0461 break; 0462 } 0463 return {NoAction}; 0464 } 0465 0466 QByteArray Preprocessor::resourceInstanceIdentifier() const 0467 { 0468 return d->resourceInstanceIdentifier; 0469 } 0470 0471 Storage::EntityStore &Preprocessor::entityStore() const 0472 { 0473 return *d->entityStore; 0474 } 0475 0476 void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource) 0477 { 0478 flatbuffers::FlatBufferBuilder entityFbb; 0479 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, typeName); 0480 adaptorFactory->createBuffer(entity, entityFbb); 0481 const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); 0482 0483 flatbuffers::FlatBufferBuilder fbb; 0484 auto entityId = fbb.CreateString(entity.identifier().toStdString()); 0485 auto type = fbb.CreateString(typeName.toStdString()); 0486 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); 0487 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 0488 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 0489 0490 const auto data = BufferUtils::extractBuffer(fbb); 0491 d->pipeline->newEntity(data, data.size()).exec(); 0492 } 0493 0494 void Preprocessor::deleteEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource) 0495 { 0496 flatbuffers::FlatBufferBuilder fbb; 0497 auto entityId = fbb.CreateString(entity.identifier().toStdString()); 0498 auto type = fbb.CreateString(typeName.toStdString()); 0499 auto location = Sink::Commands::CreateDeleteEntity(fbb, entity.revision(), entityId, type, replayToSource); 0500 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 0501 const auto data = BufferUtils::extractBuffer(fbb); 0502 d->pipeline->deletedEntity(data, data.size()).exec(); 0503 } 0504 0505 #pragma clang diagnostic push 0506 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 0507 #include "moc_pipeline.cpp" 0508 #pragma clang diagnostic pop