File indexing completed on 2024-05-12 05:26:07
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #pragma once 0021 0022 #include "sink_export.h" 0023 #include <QObject> 0024 #include <QStack> 0025 #include <QTime> 0026 #include <KAsync/Async> 0027 #include <domainadaptor.h> 0028 #include <query.h> 0029 #include <messagequeue.h> 0030 #include <storage.h> 0031 #include <storage/entitystore.h> 0032 #include "changereplay.h" 0033 #include "synchronizerstore.h" 0034 0035 namespace Sink { 0036 class SynchronizerStore; 0037 0038 /** 0039 * Synchronize and add what we don't already have to local queue 0040 */ 0041 class SINK_EXPORT Synchronizer : public ChangeReplay 0042 { 0043 Q_OBJECT 0044 public: 0045 Synchronizer(const Sink::ResourceContext &resourceContext); 0046 virtual ~Synchronizer() Q_DECL_OVERRIDE; 0047 0048 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); 0049 void synchronize(const Sink::QueryBase &query); 0050 void flush(int commandId, const QByteArray &flushId); 0051 0052 //Read only access to main storage 0053 Storage::EntityStore &store(); 0054 0055 //Read/Write access to sync storage 0056 SynchronizerStore &syncStore(); 0057 0058 void commit(); 0059 Sink::Storage::DataStore::Transaction &syncTransaction(); 0060 0061 bool allChangesReplayed() Q_DECL_OVERRIDE; 0062 void flushComplete(const QByteArray &flushId); 0063 0064 void setSecret(const QString &s); 0065 0066 //Abort all running synchronization requests 0067 void abort(); 0068 0069 KAsync::Job<void> processSyncQueue(); 0070 0071 signals: 0072 void notify(Notification); 0073 0074 public slots: 0075 virtual void revisionChanged() Q_DECL_OVERRIDE; 0076 0077 protected: 0078 ///Base implementation calls the replay$Type calls 0079 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; 0080 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; 0081 virtual void notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; 0082 0083 protected: 0084 ///Implement to write back changes to the server 0085 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0086 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0087 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0088 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0089 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Event &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0090 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Todo &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0091 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &); 0092 protected: 0093 QString secret() const; 0094 0095 ///Calls the callback to enqueue the command 0096 void enqueueCommand(int commandId, const QByteArray &data); 0097 0098 void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); 0099 void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false); 0100 void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType); 0101 0102 /** 0103 * A synchronous algorithm to remove entities that are no longer existing. 0104 * 0105 * A list of entities is generated by @param entryGenerator. 0106 * The entiry Generator typically iterates over an index to produce all existing entries. 0107 * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, 0108 * an entity delete command is enqueued. 0109 * 0110 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. 0111 */ 0112 int scanForRemovals(const QByteArray &bufferType, 0113 const std::function<void(const std::function<void(const QByteArray &sinkId)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); 0114 int scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists); 0115 0116 /** 0117 * An algorithm to create or modify the entity. 0118 * 0119 * Depending on whether the entity is locally available, or has changed. 0120 */ 0121 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 0122 template <typename DomainType> 0123 void SINK_EXPORT createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); 0124 void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 0125 0126 // template <typename DomainType> 0127 // void create(const DomainType &entity); 0128 template <typename DomainType> 0129 void SINK_EXPORT modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false); 0130 // template <typename DomainType> 0131 // void remove(const DomainType &entity); 0132 0133 QByteArrayList resolveQuery(const QueryBase &query); 0134 QByteArrayList resolveFilter(const QueryBase::Comparator &filter); 0135 0136 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0; 0137 0138 public: 0139 struct SyncRequest { 0140 enum RequestType { 0141 Synchronization, 0142 ChangeReplay, 0143 Flush 0144 }; 0145 0146 enum RequestOptions { 0147 NoOptions, 0148 RequestFlush 0149 }; 0150 0151 SyncRequest() = default; 0152 0153 SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) 0154 : requestId(requestId_), 0155 requestType(Synchronization), 0156 options(o), 0157 query(q), 0158 applicableEntitiesType(q.type()), 0159 applicableEntities(q.ids()) 0160 { 0161 } 0162 0163 SyncRequest(RequestType type) 0164 : requestType(type) 0165 { 0166 } 0167 0168 SyncRequest(RequestType type, const QByteArray &requestId_) 0169 : requestId(requestId_), 0170 requestType(type) 0171 { 0172 } 0173 0174 SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) 0175 : flushType(flushType_), 0176 requestId(requestId_), 0177 requestType(type) 0178 { 0179 } 0180 0181 int flushType = 0; 0182 QByteArray requestId; 0183 RequestType requestType; 0184 RequestOptions options = NoOptions; 0185 Sink::QueryBase query; 0186 QByteArray applicableEntitiesType; 0187 QByteArrayList applicableEntities; 0188 }; 0189 0190 protected: 0191 /** 0192 * This allows the synchronizer to turn a single query into multiple synchronization requests. 0193 * 0194 * The idea is the following; 0195 * The input query is a specification by the application of what data needs to be made available. 0196 * Requests could be: 0197 * * Give me everything (signified by the default constructed/empty query) 0198 * * Give me all mails of folder X 0199 * * Give me all mails of folders matching some constraints 0200 * 0201 * getSyncRequests allows the resource implementation to apply it's own defaults to that request; 0202 * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. 0203 * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. 0204 * 0205 * This will allow synchronizeWithSource to focus on just getting to the content. 0206 */ 0207 virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query); 0208 0209 /** 0210 * This allows the synchronizer to merge new requests with existing requests in the queue. 0211 */ 0212 virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue); 0213 void addToQueue(const Synchronizer::SyncRequest &request); 0214 0215 void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = {}, const QByteArray &applicableEntitiesType = {}, const QByteArrayList &entities = {}); 0216 void emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArray &entitiesType, const QByteArrayList &entities); 0217 0218 /** 0219 * Report progress for current task 0220 */ 0221 virtual void reportProgress(int progress, int total, const QByteArrayList &entities = {}) Q_DECL_OVERRIDE; 0222 0223 Sink::Log::Context mLogCtx; 0224 0225 /** 0226 * True while aborting. 0227 * 0228 * Stop the synchronization as soon as possible. 0229 */ 0230 bool aborting() const; 0231 0232 private: 0233 QStack<ApplicationDomain::Status> mCurrentState; 0234 void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId); 0235 void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId); 0236 void resetStatus(const QByteArray requestId); 0237 void setBusy(bool busy, const QString &reason, const QByteArray requestId); 0238 void clearQueue(); 0239 0240 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 0241 KAsync::Job<void> processRequest(const SyncRequest &request); 0242 0243 Sink::ResourceContext mResourceContext; 0244 Sink::Storage::EntityStore::Ptr mEntityStore; 0245 QSharedPointer<SynchronizerStore> mSyncStore; 0246 Sink::Storage::DataStore mSyncStorage; 0247 Sink::Storage::DataStore::Transaction mSyncTransaction; 0248 std::function<void(int commandId, const QByteArray &data)> mEnqueue; 0249 QList<SyncRequest> mSyncRequestQueue; 0250 SyncRequest mCurrentRequest; 0251 MessageQueue *mMessageQueue; 0252 bool mSyncInProgress; 0253 bool mAbort; 0254 QMultiHash<QByteArray, SyncRequest> mPendingSyncRequests; 0255 QString mSecret; 0256 QTime mTime; 0257 }; 0258 0259 } 0260