File indexing completed on 2024-05-12 05:26:07

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #pragma once
0021 
0022 #include "sink_export.h"
0023 #include <QObject>
0024 #include <QStack>
0025 #include <QTime>
0026 #include <KAsync/Async>
0027 #include <domainadaptor.h>
0028 #include <query.h>
0029 #include <messagequeue.h>
0030 #include <storage.h>
0031 #include <storage/entitystore.h>
0032 #include "changereplay.h"
0033 #include "synchronizerstore.h"
0034 
0035 namespace Sink {
0036 class SynchronizerStore;
0037 
0038 /**
0039  * Synchronize and add what we don't already have to local queue
0040  */
0041 class SINK_EXPORT Synchronizer : public ChangeReplay
0042 {
0043     Q_OBJECT
0044 public:
0045     Synchronizer(const Sink::ResourceContext &resourceContext);
0046     virtual ~Synchronizer() Q_DECL_OVERRIDE;
0047 
0048     void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue);
0049     void synchronize(const Sink::QueryBase &query);
0050     void flush(int commandId, const QByteArray &flushId);
0051 
0052     //Read only access to main storage
0053     Storage::EntityStore &store();
0054 
0055     //Read/Write access to sync storage
0056     SynchronizerStore &syncStore();
0057 
0058     void commit();
0059     Sink::Storage::DataStore::Transaction &syncTransaction();
0060 
0061     bool allChangesReplayed() Q_DECL_OVERRIDE;
0062     void flushComplete(const QByteArray &flushId);
0063 
0064     void setSecret(const QString &s);
0065 
0066     //Abort all running synchronization requests
0067     void abort();
0068 
0069     KAsync::Job<void> processSyncQueue();
0070 
0071 signals:
0072     void notify(Notification);
0073 
0074 public slots:
0075     virtual void revisionChanged() Q_DECL_OVERRIDE;
0076 
0077 protected:
0078     ///Base implementation calls the replay$Type calls
0079     KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override;
0080     virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override;
0081     virtual void notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) override;
0082 
0083 protected:
0084     ///Implement to write back changes to the server
0085     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0086     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0087     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0088     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0089     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Event &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0090     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Todo &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0091     virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &oldRemoteId, const QList<QByteArray> &);
0092 protected:
0093     QString secret() const;
0094 
0095     ///Calls the callback to enqueue the command
0096     void enqueueCommand(int commandId, const QByteArray &data);
0097 
0098     void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject);
0099     void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false);
0100     void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType);
0101 
0102     /**
0103     * A synchronous algorithm to remove entities that are no longer existing.
0104     *
0105     * A list of entities is generated by @param entryGenerator.
0106     * The entiry Generator typically iterates over an index to produce all existing entries.
0107     * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false,
0108     * an entity delete command is enqueued.
0109     *
0110     * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
0111     */
0112     int scanForRemovals(const QByteArray &bufferType,
0113         const std::function<void(const std::function<void(const QByteArray &sinkId)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
0114     int scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists);
0115 
0116     /**
0117      * An algorithm to create or modify the entity.
0118      *
0119      * Depending on whether the entity is locally available, or has changed.
0120      */
0121     void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
0122     template <typename DomainType>
0123     void SINK_EXPORT createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria);
0124     void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
0125 
0126     // template <typename DomainType>
0127     // void create(const DomainType &entity);
0128     template <typename DomainType>
0129     void SINK_EXPORT modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false);
0130     // template <typename DomainType>
0131     // void remove(const DomainType &entity);
0132  
0133     QByteArrayList resolveQuery(const QueryBase &query);
0134     QByteArrayList resolveFilter(const QueryBase::Comparator &filter);
0135 
0136     virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) = 0;
0137 
0138 public:
0139     struct SyncRequest {
0140         enum RequestType {
0141             Synchronization,
0142             ChangeReplay,
0143             Flush
0144         };
0145 
0146         enum RequestOptions {
0147             NoOptions,
0148             RequestFlush
0149         };
0150 
0151         SyncRequest() = default;
0152 
0153         SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions)
0154             : requestId(requestId_),
0155             requestType(Synchronization),
0156             options(o),
0157             query(q),
0158             applicableEntitiesType(q.type()),
0159             applicableEntities(q.ids())
0160         {
0161         }
0162 
0163         SyncRequest(RequestType type)
0164             : requestType(type)
0165         {
0166         }
0167 
0168         SyncRequest(RequestType type, const QByteArray &requestId_)
0169             : requestId(requestId_),
0170             requestType(type)
0171         {
0172         }
0173 
0174         SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_)
0175             : flushType(flushType_),
0176             requestId(requestId_),
0177             requestType(type)
0178         {
0179         }
0180 
0181         int flushType = 0;
0182         QByteArray requestId;
0183         RequestType requestType;
0184         RequestOptions options = NoOptions;
0185         Sink::QueryBase query;
0186         QByteArray applicableEntitiesType;
0187         QByteArrayList applicableEntities;
0188     };
0189 
0190 protected:
0191     /**
0192      * This allows the synchronizer to turn a single query into multiple synchronization requests.
0193      *
0194      * The idea is the following;
0195      * The input query is a specification by the application of what data needs to be made available.
0196      * Requests could be:
0197      * * Give me everything (signified by the default constructed/empty query)
0198      * * Give me all mails of folder X
0199      * * Give me all mails of folders matching some constraints
0200      *
0201      * getSyncRequests allows the resource implementation to apply it's own defaults to that request;
0202      * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data.
0203      * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders.
0204      *
0205      * This will allow synchronizeWithSource to focus on just getting to the content.
0206      */
0207     virtual QList<Synchronizer::SyncRequest> getSyncRequests(const Sink::QueryBase &query);
0208 
0209     /**
0210      * This allows the synchronizer to merge new requests with existing requests in the queue.
0211      */
0212     virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue);
0213     void addToQueue(const Synchronizer::SyncRequest &request);
0214 
0215     void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = {}, const QByteArray &applicableEntitiesType = {}, const QByteArrayList &entities = {});
0216     void emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArray &entitiesType, const QByteArrayList &entities);
0217 
0218     /**
0219      * Report progress for current task
0220      */
0221     virtual void reportProgress(int progress, int total, const QByteArrayList &entities = {}) Q_DECL_OVERRIDE;
0222 
0223     Sink::Log::Context mLogCtx;
0224 
0225     /**
0226      * True while aborting.
0227      *
0228      * Stop the synchronization as soon as possible.
0229      */
0230     bool aborting() const;
0231 
0232 private:
0233     QStack<ApplicationDomain::Status> mCurrentState;
0234     void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId);
0235     void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId);
0236     void resetStatus(const QByteArray requestId);
0237     void setBusy(bool busy, const QString &reason, const QByteArray requestId);
0238     void clearQueue();
0239 
0240     void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
0241     KAsync::Job<void> processRequest(const SyncRequest &request);
0242 
0243     Sink::ResourceContext mResourceContext;
0244     Sink::Storage::EntityStore::Ptr mEntityStore;
0245     QSharedPointer<SynchronizerStore> mSyncStore;
0246     Sink::Storage::DataStore mSyncStorage;
0247     Sink::Storage::DataStore::Transaction mSyncTransaction;
0248     std::function<void(int commandId, const QByteArray &data)> mEnqueue;
0249     QList<SyncRequest> mSyncRequestQueue;
0250     SyncRequest mCurrentRequest;
0251     MessageQueue *mMessageQueue;
0252     bool mSyncInProgress;
0253     bool mAbort;
0254     QMultiHash<QByteArray, SyncRequest> mPendingSyncRequests;
0255     QString mSecret;
0256     QTime mTime;
0257 };
0258 
0259 }
0260