File indexing completed on 2024-05-12 05:26:06

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "synchronizer.h"
0021 
0022 #include <QCoreApplication>
0023 
0024 #include "definitions.h"
0025 #include "commands.h"
0026 #include "bufferutils.h"
0027 #include "synchronizerstore.h"
0028 #include "datastorequery.h"
0029 #include "createentity_generated.h"
0030 #include "modifyentity_generated.h"
0031 #include "deleteentity_generated.h"
0032 #include "flush_generated.h"
0033 #include "notification_generated.h"
0034 #include "utils.h"
0035 
0036 using namespace Sink;
0037 
0038 bool operator==(const Synchronizer::SyncRequest &left, const Synchronizer::SyncRequest &right)
0039 {
0040     return left.flushType == right.flushType
0041         && left.requestId == right.requestId
0042         && left.requestType == right.requestType
0043         && left.options == right.options
0044         && left.query == right.query
0045         && left.applicableEntities == right.applicableEntities;
0046 }
0047 
0048 Synchronizer::Synchronizer(const Sink::ResourceContext &context)
0049     : ChangeReplay(context, {"synchronizer"}),
0050     mLogCtx{"synchronizer"},
0051     mResourceContext(context),
0052     mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)),
0053     mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
0054     mSyncInProgress(false),
0055     mAbort(false)
0056 {
0057     mCurrentState.push(ApplicationDomain::Status::NoStatus);
0058     SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
0059 }
0060 
0061 Synchronizer::~Synchronizer()
0062 {
0063 
0064 }
0065 
0066 void Synchronizer::setSecret(const QString &s)
0067 {
0068     mSecret = s;
0069 
0070     if (!mSyncRequestQueue.isEmpty()) {
0071         processSyncQueue().exec();
0072     }
0073 }
0074 
0075 QString Synchronizer::secret() const
0076 {
0077     return mSecret;
0078 }
0079 
0080 void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &mq)
0081 {
0082     mEnqueue = enqueueCommandCallback;
0083     mMessageQueue = &mq;
0084 }
0085 
0086 void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
0087 {
0088     Q_ASSERT(mEnqueue);
0089     mEnqueue(commandId, data);
0090 }
0091 
0092 Storage::EntityStore &Synchronizer::store()
0093 {
0094     Q_ASSERT(mEntityStore->hasTransaction());
0095     return *mEntityStore;
0096 }
0097 
0098 SynchronizerStore &Synchronizer::syncStore()
0099 {
0100     if (!mSyncStore) {
0101         mSyncStore = QSharedPointer<SynchronizerStore>::create(syncTransaction());
0102     }
0103     return *mSyncStore;
0104 }
0105 
0106 void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
0107 {
0108     // These changes are coming from the source
0109     const auto replayToSource = false;
0110     flatbuffers::FlatBufferBuilder entityFbb;
0111     mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
0112     flatbuffers::FlatBufferBuilder fbb;
0113     auto entityId = fbb.CreateString(sinkId.toStdString());
0114     auto type = fbb.CreateString(bufferType.toStdString());
0115     auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
0116     auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
0117     Sink::Commands::FinishCreateEntityBuffer(fbb, location);
0118     enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb));
0119 }
0120 
0121 void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove)
0122 {
0123     // FIXME removals
0124     QByteArrayList deletedProperties;
0125     // These changes are coming from the source
0126     const auto replayToSource = false;
0127     flatbuffers::FlatBufferBuilder entityFbb;
0128     mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
0129     flatbuffers::FlatBufferBuilder fbb;
0130     auto entityId = fbb.CreateString(sinkId.toStdString());
0131     auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties());
0132     auto deletions = BufferUtils::toVector(fbb, deletedProperties);
0133     auto type = fbb.CreateString(bufferType.toStdString());
0134     auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
0135     auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData());
0136     auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove);
0137     Sink::Commands::FinishModifyEntityBuffer(fbb, location);
0138     enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb));
0139 }
0140 
0141 void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType)
0142 {
0143     // These changes are coming from the source
0144     const auto replayToSource = false;
0145     flatbuffers::FlatBufferBuilder fbb;
0146     auto entityId = fbb.CreateString(sinkId.toStdString());
0147     // This is the resource type and not the domain type
0148     auto type = fbb.CreateString(bufferType.toStdString());
0149     auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
0150     Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
0151     enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb));
0152 }
0153 
0154 int Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
0155 {
0156     int count = 0;
0157     entryGenerator([this, bufferType, &exists, &count](const QByteArray &sinkId) {
0158         const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
0159         SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId;
0160         // If we have no remoteId, the entity hasn't been replayed to the source yet
0161         if (!remoteId.isEmpty()) {
0162             if (!exists(remoteId)) {
0163                 SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId;
0164                 count++;
0165                 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
0166             }
0167         }
0168     });
0169     return count;
0170 }
0171 
0172 int Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists)
0173 {
0174     return scanForRemovals(bufferType,
0175         [this, &bufferType](const std::function<void(const QByteArray &)> &callback) {
0176             store().readAllUids(bufferType, [callback](const QByteArray &uid) {
0177                 callback(uid);
0178             });
0179         },
0180         exists
0181     );
0182 }
0183 
0184 void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
0185 {
0186     store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &current) {
0187         const bool changed = [&] {
0188             for (const auto &property : entity.changedProperties()) {
0189                 if (entity.getProperty(property) != current.getProperty(property)) {
0190                     SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property;
0191                     return true;
0192                 }
0193             }
0194             return false;
0195         }();
0196         if (changed) {
0197             SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId;
0198             modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
0199         } else {
0200             SinkTraceCtx(mLogCtx) << "Entity was not modified: " << sinkId;
0201         }
0202     });
0203 }
0204 
0205 void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
0206 {
0207     const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false);
0208     if (sinkId.isEmpty()) {
0209         SinkWarningCtx(mLogCtx) << "Failed to find the local id for " << remoteId;
0210         return;
0211     }
0212     Storage::EntityStore store(mResourceContext, mLogCtx);
0213     modifyIfChanged(store, bufferType, sinkId, entity);
0214 }
0215 
0216 void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
0217 {
0218     SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
0219     const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
0220     if (sinkId.isEmpty()) {
0221         SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId;
0222         Q_ASSERT(false);
0223         return;
0224     }
0225     Storage::EntityStore store(mResourceContext, mLogCtx);
0226     if (!store.contains(bufferType, sinkId)) {
0227         SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
0228         createEntity(sinkId, bufferType, entity);
0229     } else { // modification
0230         modifyIfChanged(store, bufferType, sinkId, entity);
0231     }
0232 }
0233 
0234 template<typename DomainType>
0235 void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria)
0236 {
0237     SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
0238     const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
0239     if (sinkId.isEmpty()) {
0240         SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId;
0241         Q_ASSERT(false);
0242         return;
0243     }
0244     Storage::EntityStore store(mResourceContext, mLogCtx);
0245     if (!store.contains(bufferType, sinkId)) {
0246         if (!mergeCriteria.isEmpty()) {
0247             Sink::Query query;
0248             for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) {
0249                 query.filter(it.key(), it.value());
0250             }
0251             bool merge = false;
0252             DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), store};
0253             auto resultSet = dataStoreQuery.execute();
0254             resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) {
0255                 merge = true;
0256                 SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId;
0257                 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId);
0258             });
0259 
0260             if (!merge) {
0261                 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
0262                 createEntity(sinkId, bufferType, entity);
0263             }
0264         } else {
0265             SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
0266             createEntity(sinkId, bufferType, entity);
0267         }
0268     } else { // modification
0269         modifyIfChanged(store, bufferType, sinkId, entity);
0270     }
0271 }
0272 
0273 QByteArrayList Synchronizer::resolveQuery(const QueryBase &query)
0274 {
0275     if (query.type().isEmpty()) {
0276         SinkWarningCtx(mLogCtx) << "Can't resolve a query without a type" << query;
0277         return {};
0278     }
0279     QByteArrayList result;
0280     Storage::EntityStore store{mResourceContext, mLogCtx};
0281     DataStoreQuery dataStoreQuery{query, query.type(), store};
0282     auto resultSet = dataStoreQuery.execute();
0283     resultSet.replaySet(0, 0, [&](const ResultSet::Result &r) {
0284         result << r.entity.identifier();
0285     });
0286     return result;
0287 }
0288 
0289 QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
0290 {
0291     if (filter.value.canConvert<QByteArray>()) {
0292         const auto value = filter.value.value<QByteArray>();
0293         if (value.isEmpty()) {
0294             SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter;
0295         } else {
0296             return {filter.value.value<QByteArray>()};
0297         }
0298     } else if (filter.value.canConvert<QueryBase>()) {
0299         return resolveQuery(filter.value.value<QueryBase>());
0300     } else if (filter.value.canConvert<Query>()) {
0301         return resolveQuery(filter.value.value<Query>());
0302     } else if (filter.value.canConvert<SyncScope>()) {
0303         return resolveQuery(filter.value.value<SyncScope>());
0304     } else {
0305         SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter;
0306         Q_ASSERT(false);
0307     }
0308     return {};
0309 }
0310 
0311 template<typename DomainType>
0312 void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove)
0313 {
0314     modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity, newResource, remove);
0315 }
0316 
0317 QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
0318 {
0319     return {Synchronizer::SyncRequest{query, "sync"}};
0320 }
0321 
0322 void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue)
0323 {
0324     queue << request;
0325 }
0326 
0327 void Synchronizer::addToQueue(const Synchronizer::SyncRequest &request)
0328 {
0329     mergeIntoQueue(request, mSyncRequestQueue);
0330 }
0331 
0332 void Synchronizer::synchronize(const Sink::QueryBase &query)
0333 {
0334     SinkTraceCtx(mLogCtx) << "Synchronizing" << query;
0335     auto newRequests = getSyncRequests(query);
0336     for (const auto &request: newRequests) {
0337         auto shouldSkip = [&] {
0338             for (auto &r : mSyncRequestQueue) {
0339                 if (r == request) {
0340                     //Merge
0341                     SinkTraceCtx(mLogCtx) << "Merging equal request " << request.query << "\n to" << r.query;
0342                     return true;
0343                 }
0344             }
0345             return false;
0346         };
0347 
0348         if (shouldSkip()) {
0349             continue;
0350         }
0351         mergeIntoQueue(request, mSyncRequestQueue);
0352     }
0353     processSyncQueue().exec();
0354 }
0355 
0356 void Synchronizer::clearQueue()
0357 {
0358     //Complete all pending flushes. Without this pending flushes would get stuck indefinitely when we clear the queue on failure.
0359     //TODO we should probably fail them instead
0360     for (const auto &request : mSyncRequestQueue) {
0361         if (request.requestType == Synchronizer::SyncRequest::Flush) {
0362             SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId;
0363             emitNotification(Notification::FlushCompletion, 0, "", request.requestId);
0364         }
0365     }
0366     mSyncRequestQueue.clear();
0367 }
0368 
0369 void Synchronizer::abort()
0370 {
0371     SinkLogCtx(mLogCtx) << "Aborting all running synchronization requests";
0372     clearQueue();
0373     mAbort = true;
0374 }
0375 
0376 void Synchronizer::flush(int commandId, const QByteArray &flushId)
0377 {
0378     Q_ASSERT(!flushId.isEmpty());
0379     SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId;
0380     mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
0381     processSyncQueue().exec();
0382 }
0383 
0384 void Synchronizer::flushComplete(const QByteArray &flushId)
0385 {
0386     SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId;
0387     if (mPendingSyncRequests.contains(flushId)) {
0388         const auto requests = mPendingSyncRequests.values(flushId);
0389         for (const auto &r : requests) {
0390             //We want to process the pending request before any others in the queue
0391             mSyncRequestQueue.prepend(r);
0392         }
0393         mPendingSyncRequests.remove(flushId);
0394         processSyncQueue().exec();
0395     }
0396 }
0397 
0398 void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArray &applicableEntitiesType, const QByteArrayList &entities)
0399 {
0400     Sink::Notification n;
0401     n.id = id;
0402     n.type = type;
0403     n.message = message;
0404     n.code = code;
0405     n.entitiesType = applicableEntitiesType;
0406     n.entities = entities;
0407     emit notify(n);
0408 }
0409 
0410 void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArray &entitiesType, const QByteArrayList &entities)
0411 {
0412     Sink::Notification n;
0413     n.id = id;
0414     n.type = type;
0415     n.progress = progress;
0416     n.total = total;
0417     n.entitiesType = entitiesType;
0418     n.entities = entities;
0419     emit notify(n);
0420 }
0421 
0422 void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities)
0423 {
0424     if (progress > 0 && total > 0) {
0425         //Limit progress updates for large amounts
0426         if (total >= 1000 && progress % 100 != 0) {
0427             return;
0428         } else if (total >= 100 && progress % 10 != 0) {
0429             return;
0430         }
0431         SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities;
0432         const auto applicableEntities = [&] {
0433             if (entities.isEmpty()) {
0434                 return mCurrentRequest.applicableEntities;
0435             }
0436             return entities;
0437         }();
0438         emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, mCurrentRequest.query.type(), applicableEntities);
0439     }
0440 }
0441 
0442 void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
0443 {
0444     if (error) {
0445         if (error.errorCode == ApplicationDomain::ConnectionError) {
0446             //Couldn't connect, so we assume we don't have a network connection.
0447             setStatus(ApplicationDomain::OfflineStatus, s, requestId);
0448         } else if (error.errorCode == ApplicationDomain::NoServerError) {
0449             //Failed to contact the server.
0450             setStatus(ApplicationDomain::OfflineStatus, s, requestId);
0451         } else if (error.errorCode == ApplicationDomain::ConfigurationError) {
0452             //There is an error with the configuration.
0453             setStatus(ApplicationDomain::ErrorStatus, s, requestId);
0454         } else if (error.errorCode == ApplicationDomain::LoginError) {
0455             //If we failed to login altough we could connect that indicates a problem with our setup.
0456             setStatus(ApplicationDomain::ErrorStatus, s, requestId);
0457         } else if (error.errorCode == ApplicationDomain::ConnectionLostError) {
0458             //We've lost the connection so we assume the connection to the server broke.
0459             setStatus(ApplicationDomain::OfflineStatus, s, requestId);
0460         }
0461         //We don't know what kind of error this was, so we assume it's transient and don't change our status.
0462     } else {
0463         //An operation against the server worked, so we're probably online.
0464         setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
0465     }
0466 }
0467 
0468 KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
0469 {
0470     if (request.options & SyncRequest::RequestFlush) {
0471         return KAsync::start([=] {
0472             //Trigger a flush and record original request without flush option
0473             auto modifiedRequest = request;
0474             modifiedRequest.options = SyncRequest::NoOptions;
0475             //Normally we won't have a requestId here
0476             if (modifiedRequest.requestId.isEmpty()) {
0477                 modifiedRequest.requestId = createUuid();
0478             }
0479             SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId;
0480 
0481             //The sync request will be executed once the flush has completed
0482             mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
0483 
0484             flatbuffers::FlatBufferBuilder fbb;
0485             auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
0486             auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
0487             Sink::Commands::FinishFlushBuffer(fbb, location);
0488             enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
0489         });
0490     } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
0491         return KAsync::start([this, request] {
0492             SinkLogCtx(mLogCtx) << "Synchronizing:" << request.query;
0493             setBusy(true, "Synchronization has started.", request.requestId);
0494             emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntitiesType, request.applicableEntities);
0495         }).then(synchronizeWithSource(request.query)).then([this] {
0496             //Commit after every request, so implementations only have to commit more if they add a lot of data.
0497             commit();
0498         }).then<void>([this, request](const KAsync::Error &error) {
0499             setStatusFromResult(error, "Synchronization has ended.", request.requestId);
0500             if (error) {
0501                 //Emit notification with error
0502                 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
0503                 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntitiesType, request.applicableEntities);
0504                 return KAsync::error(error);
0505             } else {
0506                 SinkLogCtx(mLogCtx) << "Done Synchronizing";
0507                 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntitiesType, request.applicableEntities);
0508                 return KAsync::null();
0509             }
0510         });
0511     } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
0512         return KAsync::start([=] {
0513             Q_ASSERT(!request.requestId.isEmpty());
0514             //FIXME it looks like this is emitted before the replay actually finishes
0515             if (request.flushType == Flush::FlushReplayQueue) {
0516                 SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId;
0517                 emitNotification(Notification::FlushCompletion, 0, "", request.requestId);
0518             } else {
0519                 flatbuffers::FlatBufferBuilder fbb;
0520                 auto flushId = fbb.CreateString(request.requestId.toStdString());
0521                 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
0522                 Sink::Commands::FinishFlushBuffer(fbb, location);
0523                 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
0524             }
0525         });
0526     } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
0527         if (ChangeReplay::allChangesReplayed()) {
0528             return KAsync::null();
0529         } else {
0530             return KAsync::start([this, request] {
0531                 setBusy(true, "ChangeReplay has started.", request.requestId);
0532                 SinkLogCtx(mLogCtx) << "Replaying changes.";
0533             })
0534             .then(replayNextRevision())
0535             .then<void>([this, request](const KAsync::Error &error) {
0536                 setStatusFromResult(error, "Changereplay has ended.", request.requestId);
0537                 if (error) {
0538                     SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error;
0539                     return KAsync::error(error);
0540                 } else {
0541                     SinkLogCtx(mLogCtx) << "Done replaying changes";
0542                     return KAsync::null();
0543                 }
0544             });
0545         }
0546     } else {
0547         SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType;
0548         return KAsync::error(KAsync::Error{"Unknown request type."});
0549     }
0550 
0551 }
0552 
0553 /*
0554  * We're using a stack so we can go back to whatever we had after the temporary busy status.
0555  * Whenever we do change the status we emit a status notification.
0556  */
0557 void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
0558 {
0559     //We won't be able to execute any of the coming requests, so clear them
0560     if (state == ApplicationDomain::OfflineStatus || state == ApplicationDomain::ErrorStatus) {
0561         clearQueue();
0562     }
0563     if (state != mCurrentState.top()) {
0564         //The busy state is transient and we want to override it.
0565         if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
0566             mCurrentState.pop();
0567         }
0568         if (state != mCurrentState.top()) {
0569             //Always leave the first state intact
0570             if (mCurrentState.count() > 1 && state != ApplicationDomain::BusyStatus) {
0571                 mCurrentState.pop();
0572             }
0573             mCurrentState.push(state);
0574         }
0575         //We should never have more than: (NoStatus, $SOMESTATUS, BusyStatus)
0576         if (mCurrentState.count() > 3) {
0577             qWarning() << mCurrentState;
0578             Q_ASSERT(false);
0579         }
0580         emitNotification(Notification::Status, state, reason, requestId);
0581     }
0582 }
0583 
0584 void Synchronizer::resetStatus(const QByteArray requestId)
0585 {
0586     mCurrentState.pop();
0587     emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
0588 }
0589 
0590 void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
0591 {
0592     if (busy) {
0593         setStatus(ApplicationDomain::BusyStatus, reason, requestId);
0594     } else {
0595         if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
0596             resetStatus(requestId);
0597         }
0598     }
0599 }
0600 
0601 KAsync::Job<void> Synchronizer::processSyncQueue()
0602 {
0603     if (secret().isEmpty()) {
0604         SinkTraceCtx(mLogCtx) << "Secret not available but required.";
0605         emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {});
0606         return KAsync::null<void>();
0607     }
0608     if (mSyncRequestQueue.isEmpty()) {
0609         SinkLogCtx(mLogCtx) << "All requests processed.";
0610         return KAsync::null<void>();
0611     }
0612     if (mSyncInProgress) {
0613         SinkTraceCtx(mLogCtx) << "Sync still in progress.";
0614         return KAsync::null<void>();
0615     }
0616     //Don't process any new requests until we're done with the pending ones.
0617     //Otherwise we might process a flush before the previous request actually completed.
0618     if (!mPendingSyncRequests.isEmpty()) {
0619         SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request.";
0620         return KAsync::null<void>();
0621     }
0622 
0623     const auto request = mSyncRequestQueue.takeFirst();
0624     return KAsync::start([=] {
0625         SinkTraceCtx(mLogCtx) << "Start processing request " << request.requestType;
0626         mTime.start();
0627         mMessageQueue->startTransaction();
0628         mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
0629         mSyncInProgress = true;
0630         mCurrentRequest = request;
0631     })
0632     .then(processRequest(request))
0633     .then<void>([this, request](const KAsync::Error &error) {
0634         SinkTraceCtx(mLogCtx) << "Sync request processed " << Sink::Log::TraceTime(mTime.elapsed());
0635         setBusy(false, {}, request.requestId);
0636         mCurrentRequest = {};
0637         mEntityStore->abortTransaction();
0638         mSyncTransaction.abort();
0639         mMessageQueue->commit();
0640         mSyncStore.clear();
0641         mSyncInProgress = false;
0642         mAbort = false;
0643         if (allChangesReplayed()) {
0644             emit changesReplayed();
0645         }
0646         if (error) {
0647             SinkWarningCtx(mLogCtx) << "Error during sync: " << error;
0648             emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId);
0649         }
0650         //In case we got more requests meanwhile.
0651         return processSyncQueue();
0652     });
0653 }
0654 
0655 bool Synchronizer::aborting() const
0656 {
0657     return mAbort;
0658 }
0659 
0660 void Synchronizer::commit()
0661 {
0662     SinkTraceCtx(mLogCtx) << "Commit." << Sink::Log::TraceTime(mTime.elapsed());
0663     mMessageQueue->commit();
0664     mSyncTransaction.commit();
0665     mSyncStore.clear();
0666 
0667     //Avoid accumulating free pages at the cost of not executing a full sync on a consistent db view
0668     if (mEntityStore->hasTransaction()) {
0669         mEntityStore->abortTransaction();
0670         mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
0671     }
0672 
0673     QCoreApplication::processEvents(QEventLoop::AllEvents, 10);
0674 
0675     if (mSyncInProgress) {
0676         mMessageQueue->startTransaction();
0677     }
0678 }
0679 
0680 Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
0681 {
0682     if (!mSyncTransaction) {
0683         SinkTraceCtx(mLogCtx) << "Starting transaction on sync store.";
0684         mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
0685     }
0686     return mSyncTransaction;
0687 }
0688 
0689 void Synchronizer::revisionChanged()
0690 {
0691     //One replay request is enough
0692     for (const auto &r : mSyncRequestQueue) {
0693         if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) {
0694             return;
0695         }
0696     }
0697     mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"};
0698     processSyncQueue().exec();
0699 }
0700 
0701 bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
0702 {
0703     Sink::EntityBuffer buffer(value);
0704     const Sink::Entity &entity = buffer.entity();
0705     const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
0706     Q_ASSERT(metadataBuffer);
0707     if (!metadataBuffer->replayToSource()) {
0708         SinkTraceCtx(mLogCtx) << "Change is coming from the source";
0709     }
0710     return metadataBuffer->replayToSource();
0711 }
0712 
0713 KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
0714 {
0715     SinkTraceCtx(mLogCtx) << "Replaying" << type << key;
0716 
0717     Sink::EntityBuffer buffer(value);
0718     const Sink::Entity &entity = buffer.entity();
0719     const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
0720     if (!metadataBuffer) {
0721         SinkErrorCtx(mLogCtx) << "No metadata buffer available.";
0722         return KAsync::error("No metadata buffer");
0723     }
0724     if (mSyncTransaction) {
0725         SinkErrorCtx(mLogCtx) << "Leftover sync transaction.";
0726         mSyncTransaction.abort();
0727     }
0728     if (mSyncStore) {
0729         SinkErrorCtx(mLogCtx) << "Leftover sync store.";
0730         mSyncStore.clear();
0731     }
0732     Q_ASSERT(metadataBuffer);
0733     Q_ASSERT(!mSyncStore);
0734     Q_ASSERT(!mSyncTransaction);
0735     //The entitystore transaction is handled by processSyncQueue
0736     Q_ASSERT(mEntityStore->hasTransaction());
0737 
0738     const auto operation = metadataBuffer->operation();
0739     // TODO: should not use internal representations
0740     const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray();
0741     const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList();
0742     QByteArray oldRemoteId;
0743 
0744     if (operation != Sink::Operation_Creation) {
0745         oldRemoteId = syncStore().resolveLocalId(type, uid);
0746         //oldRemoteId can be empty if the resource implementation didn't return a remoteid
0747     }
0748     SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision() << "Modified properties" << modifiedProperties;
0749 
0750     //If the entity has been removed already and this is not the removal, skip over.
0751     //This is important so we can unblock changereplay by removing entities.
0752     bool skipOver = false;
0753     store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) {
0754         if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) {
0755             skipOver = true;
0756         }
0757     });
0758     if (skipOver) {
0759         SinkLogCtx(mLogCtx) << "Skipping over already removed entity";
0760         return KAsync::null();
0761     }
0762 
0763     KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
0764     //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally?
0765     if (type == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
0766         job = replay(store().readEntity<ApplicationDomain::Folder>(key), operation, oldRemoteId, modifiedProperties);
0767     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
0768         job = replay(store().readEntity<ApplicationDomain::Mail>(key), operation, oldRemoteId, modifiedProperties);
0769     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Contact>()) {
0770         job = replay(store().readEntity<ApplicationDomain::Contact>(key), operation, oldRemoteId, modifiedProperties);
0771     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Addressbook>()) {
0772         job = replay(store().readEntity<ApplicationDomain::Addressbook>(key), operation, oldRemoteId, modifiedProperties);
0773     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Event>()) {
0774         job = replay(store().readEntity<ApplicationDomain::Event>(key), operation, oldRemoteId, modifiedProperties);
0775     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Todo>()) {
0776         job = replay(store().readEntity<ApplicationDomain::Todo>(key), operation, oldRemoteId, modifiedProperties);
0777     } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Calendar>()) {
0778         job = replay(store().readEntity<ApplicationDomain::Calendar>(key), operation, oldRemoteId, modifiedProperties);
0779     } else {
0780         SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type;
0781     }
0782 
0783     return job.then([=](const KAsync::Error &error, const QByteArray &remoteId) {
0784 
0785         //Returning an error here means we stop replaying, so we only to that for known-to-be-transient errors.
0786         if (error) {
0787             switch (error.errorCode) {
0788                 case ApplicationDomain::ConnectionError:
0789                 case ApplicationDomain::NoServerError:
0790                 case ApplicationDomain::ConfigurationError:
0791                 case ApplicationDomain::LoginError:
0792                 case ApplicationDomain::ConnectionLostError:
0793                     SinkTraceCtx(mLogCtx) << "Error during changereplay (aborting):" << error;
0794                     return KAsync::error(error);
0795                 default:
0796                     SinkErrorCtx(mLogCtx) << "Error during changereplay (continuing):" << error;
0797                     break;
0798 
0799             }
0800         }
0801 
0802         switch (operation) {
0803             case Sink::Operation_Creation: {
0804                 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId;
0805                 if (!remoteId.isEmpty()) {
0806                     syncStore().recordRemoteId(type, uid, remoteId);
0807                 }
0808             }
0809             break;
0810             case Sink::Operation_Modification: {
0811                 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId;
0812                 if (!remoteId.isEmpty()) {
0813                     syncStore().updateRemoteId(type, uid, remoteId);
0814                 }
0815             }
0816             break;
0817             case Sink::Operation_Removal: {
0818                 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId;
0819                 if (!oldRemoteId.isEmpty()) {
0820                     syncStore().removeRemoteId(type, uid, oldRemoteId);
0821                 }
0822             }
0823             break;
0824             default:
0825                 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation;
0826         }
0827 
0828         //We need to commit here otherwise the next change-replay step will abort the transaction
0829         mSyncStore.clear();
0830         mSyncTransaction.commit();
0831 
0832         //Ignore errors if not caught above
0833         return KAsync::null();
0834     });
0835 }
0836 
0837 void Synchronizer::notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value)
0838 {
0839 
0840     Sink::EntityBuffer buffer(value);
0841     const Sink::Entity &entity = buffer.entity();
0842     const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
0843     if (!metadataBuffer) {
0844         SinkErrorCtx(mLogCtx) << "No metadata buffer available.";
0845         Q_ASSERT(false);
0846         return;
0847     }
0848     if (metadataBuffer->operation() == Sink::Operation_Removal) {
0849         const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray();
0850         const auto oldRemoteId = syncStore().resolveLocalId(type, uid);
0851         SinkLogCtx(mLogCtx) << "Cleaning up removal with remote id: " << oldRemoteId;
0852         if (!oldRemoteId.isEmpty()) {
0853             syncStore().removeRemoteId(type, uid, oldRemoteId);
0854         }
0855     }
0856     mSyncStore.clear();
0857     mSyncTransaction.commit();
0858 }
0859 
0860 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0861 {
0862     return KAsync::null<QByteArray>();
0863 }
0864 
0865 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0866 {
0867     return KAsync::null<QByteArray>();
0868 }
0869 
0870 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0871 {
0872     return KAsync::null<QByteArray>();
0873 }
0874 
0875 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0876 {
0877     return KAsync::null<QByteArray>();
0878 }
0879 
0880 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0881 {
0882     return KAsync::null<QByteArray>();
0883 }
0884 
0885 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0886 {
0887     return KAsync::null<QByteArray>();
0888 }
0889 
0890 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
0891 {
0892     return KAsync::null<QByteArray>();
0893 }
0894 
0895 bool Synchronizer::allChangesReplayed()
0896 {
0897     if (!mSyncRequestQueue.isEmpty()) {
0898         SinkTraceCtx(mLogCtx) << "Queue is not empty";
0899         return false;
0900     }
0901     return ChangeReplay::allChangesReplayed();
0902 }
0903 
0904 #define REGISTER_TYPE(T)                                                          \
0905     template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \
0906     template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove);
0907 
0908 SINK_REGISTER_TYPES()
0909