File indexing completed on 2024-05-12 05:26:06
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #include "synchronizer.h" 0021 0022 #include <QCoreApplication> 0023 0024 #include "definitions.h" 0025 #include "commands.h" 0026 #include "bufferutils.h" 0027 #include "synchronizerstore.h" 0028 #include "datastorequery.h" 0029 #include "createentity_generated.h" 0030 #include "modifyentity_generated.h" 0031 #include "deleteentity_generated.h" 0032 #include "flush_generated.h" 0033 #include "notification_generated.h" 0034 #include "utils.h" 0035 0036 using namespace Sink; 0037 0038 bool operator==(const Synchronizer::SyncRequest &left, const Synchronizer::SyncRequest &right) 0039 { 0040 return left.flushType == right.flushType 0041 && left.requestId == right.requestId 0042 && left.requestType == right.requestType 0043 && left.options == right.options 0044 && left.query == right.query 0045 && left.applicableEntities == right.applicableEntities; 0046 } 0047 0048 Synchronizer::Synchronizer(const Sink::ResourceContext &context) 0049 : ChangeReplay(context, {"synchronizer"}), 0050 mLogCtx{"synchronizer"}, 0051 mResourceContext(context), 0052 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), 0053 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 0054 mSyncInProgress(false), 0055 mAbort(false) 0056 { 0057 mCurrentState.push(ApplicationDomain::Status::NoStatus); 0058 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 0059 } 0060 0061 Synchronizer::~Synchronizer() 0062 { 0063 0064 } 0065 0066 void Synchronizer::setSecret(const QString &s) 0067 { 0068 mSecret = s; 0069 0070 if (!mSyncRequestQueue.isEmpty()) { 0071 processSyncQueue().exec(); 0072 } 0073 } 0074 0075 QString Synchronizer::secret() const 0076 { 0077 return mSecret; 0078 } 0079 0080 void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &mq) 0081 { 0082 mEnqueue = enqueueCommandCallback; 0083 mMessageQueue = &mq; 0084 } 0085 0086 void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) 0087 { 0088 Q_ASSERT(mEnqueue); 0089 mEnqueue(commandId, data); 0090 } 0091 0092 Storage::EntityStore &Synchronizer::store() 0093 { 0094 Q_ASSERT(mEntityStore->hasTransaction()); 0095 return *mEntityStore; 0096 } 0097 0098 SynchronizerStore &Synchronizer::syncStore() 0099 { 0100 if (!mSyncStore) { 0101 mSyncStore = QSharedPointer<SynchronizerStore>::create(syncTransaction()); 0102 } 0103 return *mSyncStore; 0104 } 0105 0106 void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) 0107 { 0108 // These changes are coming from the source 0109 const auto replayToSource = false; 0110 flatbuffers::FlatBufferBuilder entityFbb; 0111 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); 0112 flatbuffers::FlatBufferBuilder fbb; 0113 auto entityId = fbb.CreateString(sinkId.toStdString()); 0114 auto type = fbb.CreateString(bufferType.toStdString()); 0115 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 0116 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 0117 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 0118 enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); 0119 } 0120 0121 void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove) 0122 { 0123 // FIXME removals 0124 QByteArrayList deletedProperties; 0125 // These changes are coming from the source 0126 const auto replayToSource = false; 0127 flatbuffers::FlatBufferBuilder entityFbb; 0128 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); 0129 flatbuffers::FlatBufferBuilder fbb; 0130 auto entityId = fbb.CreateString(sinkId.toStdString()); 0131 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); 0132 auto deletions = BufferUtils::toVector(fbb, deletedProperties); 0133 auto type = fbb.CreateString(bufferType.toStdString()); 0134 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 0135 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); 0136 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove); 0137 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 0138 enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); 0139 } 0140 0141 void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) 0142 { 0143 // These changes are coming from the source 0144 const auto replayToSource = false; 0145 flatbuffers::FlatBufferBuilder fbb; 0146 auto entityId = fbb.CreateString(sinkId.toStdString()); 0147 // This is the resource type and not the domain type 0148 auto type = fbb.CreateString(bufferType.toStdString()); 0149 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 0150 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 0151 enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); 0152 } 0153 0154 int Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 0155 { 0156 int count = 0; 0157 entryGenerator([this, bufferType, &exists, &count](const QByteArray &sinkId) { 0158 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 0159 SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; 0160 // If we have no remoteId, the entity hasn't been replayed to the source yet 0161 if (!remoteId.isEmpty()) { 0162 if (!exists(remoteId)) { 0163 SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; 0164 count++; 0165 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); 0166 } 0167 } 0168 }); 0169 return count; 0170 } 0171 0172 int Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists) 0173 { 0174 return scanForRemovals(bufferType, 0175 [this, &bufferType](const std::function<void(const QByteArray &)> &callback) { 0176 store().readAllUids(bufferType, [callback](const QByteArray &uid) { 0177 callback(uid); 0178 }); 0179 }, 0180 exists 0181 ); 0182 } 0183 0184 void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 0185 { 0186 store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { 0187 const bool changed = [&] { 0188 for (const auto &property : entity.changedProperties()) { 0189 if (entity.getProperty(property) != current.getProperty(property)) { 0190 SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; 0191 return true; 0192 } 0193 } 0194 return false; 0195 }(); 0196 if (changed) { 0197 SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; 0198 modifyEntity(sinkId, store.maxRevision(), bufferType, entity); 0199 } else { 0200 SinkTraceCtx(mLogCtx) << "Entity was not modified: " << sinkId; 0201 } 0202 }); 0203 } 0204 0205 void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 0206 { 0207 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false); 0208 if (sinkId.isEmpty()) { 0209 SinkWarningCtx(mLogCtx) << "Failed to find the local id for " << remoteId; 0210 return; 0211 } 0212 Storage::EntityStore store(mResourceContext, mLogCtx); 0213 modifyIfChanged(store, bufferType, sinkId, entity); 0214 } 0215 0216 void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 0217 { 0218 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; 0219 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 0220 if (sinkId.isEmpty()) { 0221 SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; 0222 Q_ASSERT(false); 0223 return; 0224 } 0225 Storage::EntityStore store(mResourceContext, mLogCtx); 0226 if (!store.contains(bufferType, sinkId)) { 0227 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; 0228 createEntity(sinkId, bufferType, entity); 0229 } else { // modification 0230 modifyIfChanged(store, bufferType, sinkId, entity); 0231 } 0232 } 0233 0234 template<typename DomainType> 0235 void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) 0236 { 0237 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; 0238 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 0239 if (sinkId.isEmpty()) { 0240 SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; 0241 Q_ASSERT(false); 0242 return; 0243 } 0244 Storage::EntityStore store(mResourceContext, mLogCtx); 0245 if (!store.contains(bufferType, sinkId)) { 0246 if (!mergeCriteria.isEmpty()) { 0247 Sink::Query query; 0248 for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) { 0249 query.filter(it.key(), it.value()); 0250 } 0251 bool merge = false; 0252 DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), store}; 0253 auto resultSet = dataStoreQuery.execute(); 0254 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { 0255 merge = true; 0256 SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; 0257 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); 0258 }); 0259 0260 if (!merge) { 0261 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; 0262 createEntity(sinkId, bufferType, entity); 0263 } 0264 } else { 0265 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; 0266 createEntity(sinkId, bufferType, entity); 0267 } 0268 } else { // modification 0269 modifyIfChanged(store, bufferType, sinkId, entity); 0270 } 0271 } 0272 0273 QByteArrayList Synchronizer::resolveQuery(const QueryBase &query) 0274 { 0275 if (query.type().isEmpty()) { 0276 SinkWarningCtx(mLogCtx) << "Can't resolve a query without a type" << query; 0277 return {}; 0278 } 0279 QByteArrayList result; 0280 Storage::EntityStore store{mResourceContext, mLogCtx}; 0281 DataStoreQuery dataStoreQuery{query, query.type(), store}; 0282 auto resultSet = dataStoreQuery.execute(); 0283 resultSet.replaySet(0, 0, [&](const ResultSet::Result &r) { 0284 result << r.entity.identifier(); 0285 }); 0286 return result; 0287 } 0288 0289 QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) 0290 { 0291 if (filter.value.canConvert<QByteArray>()) { 0292 const auto value = filter.value.value<QByteArray>(); 0293 if (value.isEmpty()) { 0294 SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter; 0295 } else { 0296 return {filter.value.value<QByteArray>()}; 0297 } 0298 } else if (filter.value.canConvert<QueryBase>()) { 0299 return resolveQuery(filter.value.value<QueryBase>()); 0300 } else if (filter.value.canConvert<Query>()) { 0301 return resolveQuery(filter.value.value<Query>()); 0302 } else if (filter.value.canConvert<SyncScope>()) { 0303 return resolveQuery(filter.value.value<SyncScope>()); 0304 } else { 0305 SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; 0306 Q_ASSERT(false); 0307 } 0308 return {}; 0309 } 0310 0311 template<typename DomainType> 0312 void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove) 0313 { 0314 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity, newResource, remove); 0315 } 0316 0317 QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) 0318 { 0319 return {Synchronizer::SyncRequest{query, "sync"}}; 0320 } 0321 0322 void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue) 0323 { 0324 queue << request; 0325 } 0326 0327 void Synchronizer::addToQueue(const Synchronizer::SyncRequest &request) 0328 { 0329 mergeIntoQueue(request, mSyncRequestQueue); 0330 } 0331 0332 void Synchronizer::synchronize(const Sink::QueryBase &query) 0333 { 0334 SinkTraceCtx(mLogCtx) << "Synchronizing" << query; 0335 auto newRequests = getSyncRequests(query); 0336 for (const auto &request: newRequests) { 0337 auto shouldSkip = [&] { 0338 for (auto &r : mSyncRequestQueue) { 0339 if (r == request) { 0340 //Merge 0341 SinkTraceCtx(mLogCtx) << "Merging equal request " << request.query << "\n to" << r.query; 0342 return true; 0343 } 0344 } 0345 return false; 0346 }; 0347 0348 if (shouldSkip()) { 0349 continue; 0350 } 0351 mergeIntoQueue(request, mSyncRequestQueue); 0352 } 0353 processSyncQueue().exec(); 0354 } 0355 0356 void Synchronizer::clearQueue() 0357 { 0358 //Complete all pending flushes. Without this pending flushes would get stuck indefinitely when we clear the queue on failure. 0359 //TODO we should probably fail them instead 0360 for (const auto &request : mSyncRequestQueue) { 0361 if (request.requestType == Synchronizer::SyncRequest::Flush) { 0362 SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; 0363 emitNotification(Notification::FlushCompletion, 0, "", request.requestId); 0364 } 0365 } 0366 mSyncRequestQueue.clear(); 0367 } 0368 0369 void Synchronizer::abort() 0370 { 0371 SinkLogCtx(mLogCtx) << "Aborting all running synchronization requests"; 0372 clearQueue(); 0373 mAbort = true; 0374 } 0375 0376 void Synchronizer::flush(int commandId, const QByteArray &flushId) 0377 { 0378 Q_ASSERT(!flushId.isEmpty()); 0379 SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; 0380 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; 0381 processSyncQueue().exec(); 0382 } 0383 0384 void Synchronizer::flushComplete(const QByteArray &flushId) 0385 { 0386 SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; 0387 if (mPendingSyncRequests.contains(flushId)) { 0388 const auto requests = mPendingSyncRequests.values(flushId); 0389 for (const auto &r : requests) { 0390 //We want to process the pending request before any others in the queue 0391 mSyncRequestQueue.prepend(r); 0392 } 0393 mPendingSyncRequests.remove(flushId); 0394 processSyncQueue().exec(); 0395 } 0396 } 0397 0398 void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArray &applicableEntitiesType, const QByteArrayList &entities) 0399 { 0400 Sink::Notification n; 0401 n.id = id; 0402 n.type = type; 0403 n.message = message; 0404 n.code = code; 0405 n.entitiesType = applicableEntitiesType; 0406 n.entities = entities; 0407 emit notify(n); 0408 } 0409 0410 void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArray &entitiesType, const QByteArrayList &entities) 0411 { 0412 Sink::Notification n; 0413 n.id = id; 0414 n.type = type; 0415 n.progress = progress; 0416 n.total = total; 0417 n.entitiesType = entitiesType; 0418 n.entities = entities; 0419 emit notify(n); 0420 } 0421 0422 void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) 0423 { 0424 if (progress > 0 && total > 0) { 0425 //Limit progress updates for large amounts 0426 if (total >= 1000 && progress % 100 != 0) { 0427 return; 0428 } else if (total >= 100 && progress % 10 != 0) { 0429 return; 0430 } 0431 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; 0432 const auto applicableEntities = [&] { 0433 if (entities.isEmpty()) { 0434 return mCurrentRequest.applicableEntities; 0435 } 0436 return entities; 0437 }(); 0438 emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, mCurrentRequest.query.type(), applicableEntities); 0439 } 0440 } 0441 0442 void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) 0443 { 0444 if (error) { 0445 if (error.errorCode == ApplicationDomain::ConnectionError) { 0446 //Couldn't connect, so we assume we don't have a network connection. 0447 setStatus(ApplicationDomain::OfflineStatus, s, requestId); 0448 } else if (error.errorCode == ApplicationDomain::NoServerError) { 0449 //Failed to contact the server. 0450 setStatus(ApplicationDomain::OfflineStatus, s, requestId); 0451 } else if (error.errorCode == ApplicationDomain::ConfigurationError) { 0452 //There is an error with the configuration. 0453 setStatus(ApplicationDomain::ErrorStatus, s, requestId); 0454 } else if (error.errorCode == ApplicationDomain::LoginError) { 0455 //If we failed to login altough we could connect that indicates a problem with our setup. 0456 setStatus(ApplicationDomain::ErrorStatus, s, requestId); 0457 } else if (error.errorCode == ApplicationDomain::ConnectionLostError) { 0458 //We've lost the connection so we assume the connection to the server broke. 0459 setStatus(ApplicationDomain::OfflineStatus, s, requestId); 0460 } 0461 //We don't know what kind of error this was, so we assume it's transient and don't change our status. 0462 } else { 0463 //An operation against the server worked, so we're probably online. 0464 setStatus(ApplicationDomain::ConnectedStatus, s, requestId); 0465 } 0466 } 0467 0468 KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 0469 { 0470 if (request.options & SyncRequest::RequestFlush) { 0471 return KAsync::start([=] { 0472 //Trigger a flush and record original request without flush option 0473 auto modifiedRequest = request; 0474 modifiedRequest.options = SyncRequest::NoOptions; 0475 //Normally we won't have a requestId here 0476 if (modifiedRequest.requestId.isEmpty()) { 0477 modifiedRequest.requestId = createUuid(); 0478 } 0479 SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId; 0480 0481 //The sync request will be executed once the flush has completed 0482 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); 0483 0484 flatbuffers::FlatBufferBuilder fbb; 0485 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); 0486 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 0487 Sink::Commands::FinishFlushBuffer(fbb, location); 0488 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 0489 }); 0490 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 0491 return KAsync::start([this, request] { 0492 SinkLogCtx(mLogCtx) << "Synchronizing:" << request.query; 0493 setBusy(true, "Synchronization has started.", request.requestId); 0494 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntitiesType, request.applicableEntities); 0495 }).then(synchronizeWithSource(request.query)).then([this] { 0496 //Commit after every request, so implementations only have to commit more if they add a lot of data. 0497 commit(); 0498 }).then<void>([this, request](const KAsync::Error &error) { 0499 setStatusFromResult(error, "Synchronization has ended.", request.requestId); 0500 if (error) { 0501 //Emit notification with error 0502 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; 0503 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntitiesType, request.applicableEntities); 0504 return KAsync::error(error); 0505 } else { 0506 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 0507 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntitiesType, request.applicableEntities); 0508 return KAsync::null(); 0509 } 0510 }); 0511 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 0512 return KAsync::start([=] { 0513 Q_ASSERT(!request.requestId.isEmpty()); 0514 //FIXME it looks like this is emitted before the replay actually finishes 0515 if (request.flushType == Flush::FlushReplayQueue) { 0516 SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; 0517 emitNotification(Notification::FlushCompletion, 0, "", request.requestId); 0518 } else { 0519 flatbuffers::FlatBufferBuilder fbb; 0520 auto flushId = fbb.CreateString(request.requestId.toStdString()); 0521 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 0522 Sink::Commands::FinishFlushBuffer(fbb, location); 0523 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 0524 } 0525 }); 0526 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 0527 if (ChangeReplay::allChangesReplayed()) { 0528 return KAsync::null(); 0529 } else { 0530 return KAsync::start([this, request] { 0531 setBusy(true, "ChangeReplay has started.", request.requestId); 0532 SinkLogCtx(mLogCtx) << "Replaying changes."; 0533 }) 0534 .then(replayNextRevision()) 0535 .then<void>([this, request](const KAsync::Error &error) { 0536 setStatusFromResult(error, "Changereplay has ended.", request.requestId); 0537 if (error) { 0538 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error; 0539 return KAsync::error(error); 0540 } else { 0541 SinkLogCtx(mLogCtx) << "Done replaying changes"; 0542 return KAsync::null(); 0543 } 0544 }); 0545 } 0546 } else { 0547 SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; 0548 return KAsync::error(KAsync::Error{"Unknown request type."}); 0549 } 0550 0551 } 0552 0553 /* 0554 * We're using a stack so we can go back to whatever we had after the temporary busy status. 0555 * Whenever we do change the status we emit a status notification. 0556 */ 0557 void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) 0558 { 0559 //We won't be able to execute any of the coming requests, so clear them 0560 if (state == ApplicationDomain::OfflineStatus || state == ApplicationDomain::ErrorStatus) { 0561 clearQueue(); 0562 } 0563 if (state != mCurrentState.top()) { 0564 //The busy state is transient and we want to override it. 0565 if (mCurrentState.top() == ApplicationDomain::BusyStatus) { 0566 mCurrentState.pop(); 0567 } 0568 if (state != mCurrentState.top()) { 0569 //Always leave the first state intact 0570 if (mCurrentState.count() > 1 && state != ApplicationDomain::BusyStatus) { 0571 mCurrentState.pop(); 0572 } 0573 mCurrentState.push(state); 0574 } 0575 //We should never have more than: (NoStatus, $SOMESTATUS, BusyStatus) 0576 if (mCurrentState.count() > 3) { 0577 qWarning() << mCurrentState; 0578 Q_ASSERT(false); 0579 } 0580 emitNotification(Notification::Status, state, reason, requestId); 0581 } 0582 } 0583 0584 void Synchronizer::resetStatus(const QByteArray requestId) 0585 { 0586 mCurrentState.pop(); 0587 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); 0588 } 0589 0590 void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) 0591 { 0592 if (busy) { 0593 setStatus(ApplicationDomain::BusyStatus, reason, requestId); 0594 } else { 0595 if (mCurrentState.top() == ApplicationDomain::BusyStatus) { 0596 resetStatus(requestId); 0597 } 0598 } 0599 } 0600 0601 KAsync::Job<void> Synchronizer::processSyncQueue() 0602 { 0603 if (secret().isEmpty()) { 0604 SinkTraceCtx(mLogCtx) << "Secret not available but required."; 0605 emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {}); 0606 return KAsync::null<void>(); 0607 } 0608 if (mSyncRequestQueue.isEmpty()) { 0609 SinkLogCtx(mLogCtx) << "All requests processed."; 0610 return KAsync::null<void>(); 0611 } 0612 if (mSyncInProgress) { 0613 SinkTraceCtx(mLogCtx) << "Sync still in progress."; 0614 return KAsync::null<void>(); 0615 } 0616 //Don't process any new requests until we're done with the pending ones. 0617 //Otherwise we might process a flush before the previous request actually completed. 0618 if (!mPendingSyncRequests.isEmpty()) { 0619 SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; 0620 return KAsync::null<void>(); 0621 } 0622 0623 const auto request = mSyncRequestQueue.takeFirst(); 0624 return KAsync::start([=] { 0625 SinkTraceCtx(mLogCtx) << "Start processing request " << request.requestType; 0626 mTime.start(); 0627 mMessageQueue->startTransaction(); 0628 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 0629 mSyncInProgress = true; 0630 mCurrentRequest = request; 0631 }) 0632 .then(processRequest(request)) 0633 .then<void>([this, request](const KAsync::Error &error) { 0634 SinkTraceCtx(mLogCtx) << "Sync request processed " << Sink::Log::TraceTime(mTime.elapsed()); 0635 setBusy(false, {}, request.requestId); 0636 mCurrentRequest = {}; 0637 mEntityStore->abortTransaction(); 0638 mSyncTransaction.abort(); 0639 mMessageQueue->commit(); 0640 mSyncStore.clear(); 0641 mSyncInProgress = false; 0642 mAbort = false; 0643 if (allChangesReplayed()) { 0644 emit changesReplayed(); 0645 } 0646 if (error) { 0647 SinkWarningCtx(mLogCtx) << "Error during sync: " << error; 0648 emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); 0649 } 0650 //In case we got more requests meanwhile. 0651 return processSyncQueue(); 0652 }); 0653 } 0654 0655 bool Synchronizer::aborting() const 0656 { 0657 return mAbort; 0658 } 0659 0660 void Synchronizer::commit() 0661 { 0662 SinkTraceCtx(mLogCtx) << "Commit." << Sink::Log::TraceTime(mTime.elapsed()); 0663 mMessageQueue->commit(); 0664 mSyncTransaction.commit(); 0665 mSyncStore.clear(); 0666 0667 //Avoid accumulating free pages at the cost of not executing a full sync on a consistent db view 0668 if (mEntityStore->hasTransaction()) { 0669 mEntityStore->abortTransaction(); 0670 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 0671 } 0672 0673 QCoreApplication::processEvents(QEventLoop::AllEvents, 10); 0674 0675 if (mSyncInProgress) { 0676 mMessageQueue->startTransaction(); 0677 } 0678 } 0679 0680 Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() 0681 { 0682 if (!mSyncTransaction) { 0683 SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; 0684 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); 0685 } 0686 return mSyncTransaction; 0687 } 0688 0689 void Synchronizer::revisionChanged() 0690 { 0691 //One replay request is enough 0692 for (const auto &r : mSyncRequestQueue) { 0693 if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) { 0694 return; 0695 } 0696 } 0697 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; 0698 processSyncQueue().exec(); 0699 } 0700 0701 bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 0702 { 0703 Sink::EntityBuffer buffer(value); 0704 const Sink::Entity &entity = buffer.entity(); 0705 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 0706 Q_ASSERT(metadataBuffer); 0707 if (!metadataBuffer->replayToSource()) { 0708 SinkTraceCtx(mLogCtx) << "Change is coming from the source"; 0709 } 0710 return metadataBuffer->replayToSource(); 0711 } 0712 0713 KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 0714 { 0715 SinkTraceCtx(mLogCtx) << "Replaying" << type << key; 0716 0717 Sink::EntityBuffer buffer(value); 0718 const Sink::Entity &entity = buffer.entity(); 0719 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 0720 if (!metadataBuffer) { 0721 SinkErrorCtx(mLogCtx) << "No metadata buffer available."; 0722 return KAsync::error("No metadata buffer"); 0723 } 0724 if (mSyncTransaction) { 0725 SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; 0726 mSyncTransaction.abort(); 0727 } 0728 if (mSyncStore) { 0729 SinkErrorCtx(mLogCtx) << "Leftover sync store."; 0730 mSyncStore.clear(); 0731 } 0732 Q_ASSERT(metadataBuffer); 0733 Q_ASSERT(!mSyncStore); 0734 Q_ASSERT(!mSyncTransaction); 0735 //The entitystore transaction is handled by processSyncQueue 0736 Q_ASSERT(mEntityStore->hasTransaction()); 0737 0738 const auto operation = metadataBuffer->operation(); 0739 // TODO: should not use internal representations 0740 const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); 0741 const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); 0742 QByteArray oldRemoteId; 0743 0744 if (operation != Sink::Operation_Creation) { 0745 oldRemoteId = syncStore().resolveLocalId(type, uid); 0746 //oldRemoteId can be empty if the resource implementation didn't return a remoteid 0747 } 0748 SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision() << "Modified properties" << modifiedProperties; 0749 0750 //If the entity has been removed already and this is not the removal, skip over. 0751 //This is important so we can unblock changereplay by removing entities. 0752 bool skipOver = false; 0753 store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) { 0754 if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) { 0755 skipOver = true; 0756 } 0757 }); 0758 if (skipOver) { 0759 SinkLogCtx(mLogCtx) << "Skipping over already removed entity"; 0760 return KAsync::null(); 0761 } 0762 0763 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); 0764 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? 0765 if (type == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) { 0766 job = replay(store().readEntity<ApplicationDomain::Folder>(key), operation, oldRemoteId, modifiedProperties); 0767 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) { 0768 job = replay(store().readEntity<ApplicationDomain::Mail>(key), operation, oldRemoteId, modifiedProperties); 0769 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Contact>()) { 0770 job = replay(store().readEntity<ApplicationDomain::Contact>(key), operation, oldRemoteId, modifiedProperties); 0771 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Addressbook>()) { 0772 job = replay(store().readEntity<ApplicationDomain::Addressbook>(key), operation, oldRemoteId, modifiedProperties); 0773 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Event>()) { 0774 job = replay(store().readEntity<ApplicationDomain::Event>(key), operation, oldRemoteId, modifiedProperties); 0775 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Todo>()) { 0776 job = replay(store().readEntity<ApplicationDomain::Todo>(key), operation, oldRemoteId, modifiedProperties); 0777 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Calendar>()) { 0778 job = replay(store().readEntity<ApplicationDomain::Calendar>(key), operation, oldRemoteId, modifiedProperties); 0779 } else { 0780 SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; 0781 } 0782 0783 return job.then([=](const KAsync::Error &error, const QByteArray &remoteId) { 0784 0785 //Returning an error here means we stop replaying, so we only to that for known-to-be-transient errors. 0786 if (error) { 0787 switch (error.errorCode) { 0788 case ApplicationDomain::ConnectionError: 0789 case ApplicationDomain::NoServerError: 0790 case ApplicationDomain::ConfigurationError: 0791 case ApplicationDomain::LoginError: 0792 case ApplicationDomain::ConnectionLostError: 0793 SinkTraceCtx(mLogCtx) << "Error during changereplay (aborting):" << error; 0794 return KAsync::error(error); 0795 default: 0796 SinkErrorCtx(mLogCtx) << "Error during changereplay (continuing):" << error; 0797 break; 0798 0799 } 0800 } 0801 0802 switch (operation) { 0803 case Sink::Operation_Creation: { 0804 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; 0805 if (!remoteId.isEmpty()) { 0806 syncStore().recordRemoteId(type, uid, remoteId); 0807 } 0808 } 0809 break; 0810 case Sink::Operation_Modification: { 0811 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; 0812 if (!remoteId.isEmpty()) { 0813 syncStore().updateRemoteId(type, uid, remoteId); 0814 } 0815 } 0816 break; 0817 case Sink::Operation_Removal: { 0818 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; 0819 if (!oldRemoteId.isEmpty()) { 0820 syncStore().removeRemoteId(type, uid, oldRemoteId); 0821 } 0822 } 0823 break; 0824 default: 0825 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; 0826 } 0827 0828 //We need to commit here otherwise the next change-replay step will abort the transaction 0829 mSyncStore.clear(); 0830 mSyncTransaction.commit(); 0831 0832 //Ignore errors if not caught above 0833 return KAsync::null(); 0834 }); 0835 } 0836 0837 void Synchronizer::notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) 0838 { 0839 0840 Sink::EntityBuffer buffer(value); 0841 const Sink::Entity &entity = buffer.entity(); 0842 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 0843 if (!metadataBuffer) { 0844 SinkErrorCtx(mLogCtx) << "No metadata buffer available."; 0845 Q_ASSERT(false); 0846 return; 0847 } 0848 if (metadataBuffer->operation() == Sink::Operation_Removal) { 0849 const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); 0850 const auto oldRemoteId = syncStore().resolveLocalId(type, uid); 0851 SinkLogCtx(mLogCtx) << "Cleaning up removal with remote id: " << oldRemoteId; 0852 if (!oldRemoteId.isEmpty()) { 0853 syncStore().removeRemoteId(type, uid, oldRemoteId); 0854 } 0855 } 0856 mSyncStore.clear(); 0857 mSyncTransaction.commit(); 0858 } 0859 0860 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0861 { 0862 return KAsync::null<QByteArray>(); 0863 } 0864 0865 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0866 { 0867 return KAsync::null<QByteArray>(); 0868 } 0869 0870 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0871 { 0872 return KAsync::null<QByteArray>(); 0873 } 0874 0875 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0876 { 0877 return KAsync::null<QByteArray>(); 0878 } 0879 0880 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0881 { 0882 return KAsync::null<QByteArray>(); 0883 } 0884 0885 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0886 { 0887 return KAsync::null<QByteArray>(); 0888 } 0889 0890 KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 0891 { 0892 return KAsync::null<QByteArray>(); 0893 } 0894 0895 bool Synchronizer::allChangesReplayed() 0896 { 0897 if (!mSyncRequestQueue.isEmpty()) { 0898 SinkTraceCtx(mLogCtx) << "Queue is not empty"; 0899 return false; 0900 } 0901 return ChangeReplay::allChangesReplayed(); 0902 } 0903 0904 #define REGISTER_TYPE(T) \ 0905 template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \ 0906 template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove); 0907 0908 SINK_REGISTER_TYPES() 0909