File indexing completed on 2024-05-12 05:26:01

0001 /*
0002  * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
0003  * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
0004  *
0005  * This library is free software; you can redistribute it and/or
0006  * modify it under the terms of the GNU Lesser General Public
0007  * License as published by the Free Software Foundation; either
0008  * version 2.1 of the License, or (at your option) version 3, or any
0009  * later version accepted by the membership of KDE e.V. (or its
0010  * successor approved by the membership of KDE e.V.), which shall
0011  * act as a proxy defined in Section 6 of version 3 of the license.
0012  *
0013  * This library is distributed in the hope that it will be useful,
0014  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0015  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0016  * Lesser General Public License for more details.
0017  *
0018  * You should have received a copy of the GNU Lesser General Public
0019  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0020  */
0021 
0022 #include "pipeline.h"
0023 
0024 #include <QByteArray>
0025 #include <QVector>
0026 #include <QDebug>
0027 #include <QTime>
0028 #include "entity_generated.h"
0029 #include "metadata_generated.h"
0030 #include "createentity_generated.h"
0031 #include "modifyentity_generated.h"
0032 #include "deleteentity_generated.h"
0033 #include "entitybuffer.h"
0034 #include "log.h"
0035 #include "domain/applicationdomaintype.h"
0036 #include "adaptorfactoryregistry.h"
0037 #include "definitions.h"
0038 #include "bufferutils.h"
0039 #include "storage/entitystore.h"
0040 #include "store.h"
0041 
0042 using namespace Sink;
0043 using namespace Sink::Storage;
0044 
0045 class Pipeline::Private
0046 {
0047 public:
0048     Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false)
0049     {
0050     }
0051 
0052     Sink::Log::Context logCtx;
0053     ResourceContext resourceContext;
0054     Storage::EntityStore entityStore;
0055     QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
0056     bool revisionChanged;
0057     QTime transactionTime;
0058     int transactionItemCount;
0059 };
0060 
0061 
0062 Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx))
0063 {
0064     //Create main store immediately on first start
0065     d->entityStore.initialize();
0066 }
0067 
0068 Pipeline::~Pipeline()
0069 {
0070 }
0071 
0072 void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
0073 {
0074     auto &list = d->processors[entityType];
0075     list.clear();
0076     for (auto p : processors) {
0077         p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this, &d->entityStore);
0078         list.append(QSharedPointer<Preprocessor>(p));
0079     }
0080 }
0081 
0082 void Pipeline::startTransaction()
0083 {
0084     // TODO call for all types
0085     // But avoid doing it during cleanup
0086     // for (auto processor : d->processors[bufferType]) {
0087     //     processor->startBatch();
0088     // }
0089     SinkTraceCtx(d->logCtx) << "Starting transaction.";
0090     d->transactionTime.start();
0091     d->transactionItemCount = 0;
0092     d->entityStore.startTransaction(DataStore::ReadWrite);
0093 }
0094 
0095 void Pipeline::commit()
0096 {
0097     // TODO call for all types
0098     // But avoid doing it during cleanup
0099     // for (auto processor : d->processors[bufferType]) {
0100     //     processor->finalize();
0101     // }
0102     if (!d->revisionChanged) {
0103         d->entityStore.abortTransaction();
0104         return;
0105     }
0106     const auto revision = d->entityStore.maxRevision();
0107     d->entityStore.commitTransaction();
0108     const auto elapsed = d->transactionTime.elapsed();
0109     SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
0110             << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
0111     if (d->revisionChanged) {
0112         d->revisionChanged = false;
0113         emit revisionUpdated(revision);
0114     }
0115 }
0116 
0117 KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
0118 {
0119     d->transactionItemCount++;
0120 
0121     {
0122         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
0123         if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
0124             SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer";
0125             return KAsync::error<qint64>();
0126         }
0127     }
0128     auto createEntity = Commands::GetCreateEntity(command);
0129 
0130     const bool replayToSource = createEntity->replayToSource();
0131     const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size());
0132     QByteArray key;
0133     if (createEntity->entityId()) {
0134         key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
0135         if (!key.isEmpty() && d->entityStore.contains(bufferType, key)) {
0136             SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key;
0137             return KAsync::error<qint64>();
0138         }
0139     }
0140 
0141     if (key.isEmpty()) {
0142         key = DataStore::generateUid();
0143     }
0144     SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
0145     Q_ASSERT(!key.isEmpty());
0146 
0147     {
0148         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
0149         if (!VerifyEntityBuffer(verifyer)) {
0150             SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
0151             return KAsync::error<qint64>();
0152         }
0153     }
0154     auto entity = GetEntity(createEntity->delta()->Data());
0155     if (!entity->resource()->size() && !entity->local()->size()) {
0156         SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity.";
0157         return KAsync::error<qint64>();
0158     }
0159 
0160     auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
0161     if (!adaptorFactory) {
0162         SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType << d->resourceContext.resourceType;
0163         return KAsync::error<qint64>();
0164     }
0165 
0166     auto adaptor = adaptorFactory->createAdaptor(*entity);
0167     auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create();
0168     Sink::ApplicationDomain::copyBuffer(*adaptor, *memoryAdaptor);
0169 
0170     d->revisionChanged = true;
0171     auto revision = d->entityStore.maxRevision();
0172     auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
0173     o.setChangedProperties(o.availableProperties().toSet());
0174 
0175     auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties());
0176     newEntity.setChangedProperties(newEntity.availableProperties().toSet());
0177 
0178     foreach (const auto &processor, d->processors[bufferType]) {
0179         processor->newEntity(newEntity);
0180     }
0181 
0182     if (!d->entityStore.add(bufferType, newEntity, replayToSource)) {
0183         return KAsync::error<qint64>();
0184     }
0185 
0186     return KAsync::value(d->entityStore.maxRevision());
0187 }
0188 
0189 template <class T>
0190 struct CreateHelper {
0191     KAsync::Job<void> operator()(const ApplicationDomain::ApplicationDomainType &arg) const {
0192         return Sink::Store::create<T>(T{arg});
0193     }
0194 };
0195 
0196 static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
0197 {
0198     return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
0199 }
0200 
0201 KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
0202 {
0203     d->transactionItemCount++;
0204 
0205     {
0206         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
0207         if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
0208             SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer";
0209             return KAsync::error<qint64>();
0210         }
0211     }
0212     auto modifyEntity = Commands::GetModifyEntity(command);
0213     Q_ASSERT(modifyEntity);
0214     QList<QByteArray> changeset;
0215     if (modifyEntity->modifiedProperties()) {
0216         changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
0217     } else {
0218         SinkWarningCtx(d->logCtx) << "No changeset available";
0219     }
0220     const qint64 baseRevision = modifyEntity->revision();
0221     const bool replayToSource = modifyEntity->replayToSource();
0222 
0223     const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
0224     const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
0225     SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
0226     if (bufferType.isEmpty() || key.isEmpty()) {
0227         SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key;
0228         return KAsync::error<qint64>();
0229     }
0230     {
0231         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
0232         if (!VerifyEntityBuffer(verifyer)) {
0233             SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
0234             return KAsync::error<qint64>();
0235         }
0236     }
0237 
0238     auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
0239     if (!adaptorFactory) {
0240         SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType;
0241         return KAsync::error<qint64>();
0242     }
0243 
0244     auto diffEntity = GetEntity(modifyEntity->delta()->Data());
0245     Q_ASSERT(diffEntity);
0246     Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)};
0247     diff.setChangedProperties(changeset.toSet());
0248 
0249     QByteArrayList deletions;
0250     if (modifyEntity->deletions()) {
0251         deletions = BufferUtils::fromVector(*modifyEntity->deletions());
0252     }
0253 
0254     Sink::ApplicationDomain::ApplicationDomainType current;
0255     bool alreadyRemoved = false;
0256 
0257     d->entityStore.readLatest(bufferType, diff.identifier(), [&](const QByteArray &uid, const EntityBuffer &buffer) {
0258         if (buffer.operation() == Sink::Operation_Removal) {
0259             alreadyRemoved = true;
0260         } else {
0261             auto entity = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(buffer.entity())};
0262             current = *Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(entity, entity.availableProperties());
0263         }
0264     });
0265 
0266     if (alreadyRemoved) {
0267         SinkWarningCtx(d->logCtx) << "Tried to modify a removed entity: " << diff.identifier();
0268         return KAsync::error<qint64>();
0269     }
0270 
0271     if (current.identifier().isEmpty()) {
0272         SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
0273         return KAsync::error<qint64>();
0274     }
0275 
0276     //We avoid overwriting local changes that haven't been played back yet with remote modifications
0277     QSet<QByteArray> excludeProperties;
0278     if (!replayToSource) { //We assume this means the change is coming from the source already
0279         d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) {
0280             if (entity.metadataBuffer()) {
0281                 if (auto metadata = GetMetadata(entity.metadataBuffer())) {
0282                     if (metadata->operation() == Operation_Modification && metadata->modifiedProperties()) {
0283                         const auto locallyModifiedProperties = BufferUtils::fromVector(*metadata->modifiedProperties()).toSet();
0284                         SinkTraceCtx(d->logCtx) << "Protecting locally modified properties: " << locallyModifiedProperties;
0285                         excludeProperties += locallyModifiedProperties;
0286                     }
0287                 }
0288             }
0289         });
0290     }
0291 
0292     auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties);
0293 
0294     bool isMove = false;
0295     if (modifyEntity->targetResource()) {
0296         isMove = modifyEntity->removeEntity();
0297         newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
0298     }
0299 
0300     foreach (const auto &processor, d->processors[bufferType]) {
0301         bool exitLoop = false;
0302         const auto result = processor->process(Preprocessor::Modification, current, newEntity);
0303         switch (result.action) {
0304             case Preprocessor::MoveToResource:
0305                 isMove = true;
0306                 exitLoop = true;
0307                 break;
0308             case Preprocessor::CopyToResource:
0309                 isMove = true;
0310                 exitLoop = true;
0311                 break;
0312             case Preprocessor::DropModification:
0313                 SinkTraceCtx(d->logCtx) << "Dropping modification";
0314                 return KAsync::error<qint64>();
0315             case Preprocessor::NoAction:
0316             case Preprocessor::DeleteEntity:
0317             default:
0318                 break;
0319         }
0320         if (exitLoop) {
0321             break;
0322         }
0323     }
0324 
0325     //The entity is either being copied or moved
0326     if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
0327         auto copy = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(newEntity, newEntity.availableProperties());
0328         copy.setResource(newEntity.resourceInstanceIdentifier());
0329         copy.setChangedProperties(copy.availableProperties().toSet());
0330         SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << copy.identifier() << copy.resourceInstanceIdentifier();
0331         return create(bufferType, copy)
0332             .then([=](const KAsync::Error &error) {
0333                 if (!error) {
0334                     SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
0335                     if (isMove) {
0336                         flatbuffers::FlatBufferBuilder fbb;
0337                         auto entityId = fbb.CreateString(current.identifier().toStdString());
0338                         auto type = fbb.CreateString(bufferType.toStdString());
0339                         auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
0340                         Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
0341                         const auto data = BufferUtils::extractBuffer(fbb);
0342                         deletedEntity(data, data.size()).exec();
0343                     }
0344                 } else {
0345                     SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
0346                 }
0347             })
0348             .then([this] {
0349                 return d->entityStore.maxRevision();
0350             });
0351     }
0352 
0353     d->revisionChanged = true;
0354     if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
0355         return KAsync::error<qint64>();
0356     }
0357 
0358     return KAsync::value(d->entityStore.maxRevision());
0359 }
0360 
0361 KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
0362 {
0363     d->transactionItemCount++;
0364 
0365     {
0366         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
0367         if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
0368             SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer";
0369             return KAsync::error<qint64>();
0370         }
0371     }
0372     auto deleteEntity = Commands::GetDeleteEntity(command);
0373 
0374     const bool replayToSource = deleteEntity->replayToSource();
0375     const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
0376     const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
0377     SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
0378 
0379     const auto current = d->entityStore.readLatest(bufferType, key);
0380 
0381     foreach (const auto &processor, d->processors[bufferType]) {
0382         processor->deletedEntity(current);
0383     }
0384 
0385     d->revisionChanged = true;
0386     if (!d->entityStore.remove(bufferType, current, replayToSource)) {
0387         return KAsync::error<qint64>();
0388     }
0389 
0390     return KAsync::value(d->entityStore.maxRevision());
0391 }
0392 
0393 void Pipeline::cleanupRevisions(qint64 revision)
0394 {
0395     //We have to set revisionChanged, otherwise a call to commit might abort
0396     //the transaction when not using the implicit internal transaction
0397     d->revisionChanged = d->entityStore.cleanupRevisions(revision);
0398 }
0399 
0400 
0401 class Preprocessor::Private {
0402 public:
0403     QByteArray resourceType;
0404     QByteArray resourceInstanceIdentifier;
0405     Pipeline *pipeline;
0406     Storage::EntityStore *entityStore;
0407 };
0408 
0409 Preprocessor::Preprocessor() : d(new Preprocessor::Private)
0410 {
0411 }
0412 
0413 Preprocessor::~Preprocessor()
0414 {
0415 }
0416 
0417 void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline, Storage::EntityStore *entityStore)
0418 {
0419     d->resourceType = resourceType;
0420     d->resourceInstanceIdentifier = resourceInstanceIdentifier;
0421     d->pipeline = pipeline;
0422     d->entityStore = entityStore;
0423 }
0424 
0425 void Preprocessor::startBatch()
0426 {
0427 }
0428 
0429 void Preprocessor::finalizeBatch()
0430 {
0431 }
0432 
0433 void Preprocessor::newEntity(ApplicationDomain::ApplicationDomainType &newEntity)
0434 {
0435 
0436 }
0437 
0438 void Preprocessor::modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity)
0439 {
0440 
0441 }
0442 
0443 void Preprocessor::deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity)
0444 {
0445 
0446 }
0447 
0448 Preprocessor::Result Preprocessor::process(Type type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType &diff)
0449 {
0450     switch(type) {
0451         case Creation:
0452             newEntity(diff);
0453             break;
0454         case Modification:
0455             modifiedEntity(current, diff);
0456             break;
0457         case Deletion:
0458             deletedEntity(current);
0459             break;
0460         default:
0461             break;
0462     }
0463     return {NoAction};
0464 }
0465 
0466 QByteArray Preprocessor::resourceInstanceIdentifier() const
0467 {
0468     return d->resourceInstanceIdentifier;
0469 }
0470 
0471 Storage::EntityStore &Preprocessor::entityStore() const
0472 {
0473     return *d->entityStore;
0474 }
0475 
0476 void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource)
0477 {
0478     flatbuffers::FlatBufferBuilder entityFbb;
0479     auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, typeName);
0480     adaptorFactory->createBuffer(entity, entityFbb);
0481     const auto entityBuffer = BufferUtils::extractBuffer(entityFbb);
0482 
0483     flatbuffers::FlatBufferBuilder fbb;
0484     auto entityId = fbb.CreateString(entity.identifier().toStdString());
0485     auto type = fbb.CreateString(typeName.toStdString());
0486     auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size());
0487     auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
0488     Sink::Commands::FinishCreateEntityBuffer(fbb, location);
0489 
0490     const auto data = BufferUtils::extractBuffer(fbb);
0491     d->pipeline->newEntity(data, data.size()).exec();
0492 }
0493 
0494 void Preprocessor::deleteEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource)
0495 {
0496     flatbuffers::FlatBufferBuilder fbb;
0497     auto entityId = fbb.CreateString(entity.identifier().toStdString());
0498     auto type = fbb.CreateString(typeName.toStdString());
0499     auto location = Sink::Commands::CreateDeleteEntity(fbb, entity.revision(), entityId, type, replayToSource);
0500     Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
0501     const auto data = BufferUtils::extractBuffer(fbb);
0502     d->pipeline->deletedEntity(data, data.size()).exec();
0503 }
0504 
0505 #pragma clang diagnostic push
0506 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
0507 #include "moc_pipeline.cpp"
0508 #pragma clang diagnostic pop