File indexing completed on 2024-06-23 05:06:52

0001 /*
0002     SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #pragma once
0008 
0009 #include "akonadicore_export.h"
0010 #include "changenotificationdependenciesfactory_p.h"
0011 #include "collection.h"
0012 #include "collectionfetchscope.h"
0013 #include "collectionstatisticsjob.h"
0014 #include "commandbuffer_p.h"
0015 #include "connection_p.h"
0016 #include "entitycache_p.h"
0017 #include "item.h"
0018 #include "itemfetchscope.h"
0019 #include "job.h"
0020 #include "monitor.h"
0021 #include "servermanager.h"
0022 #include "tagfetchscope.h"
0023 
0024 #include "private/protocol_p.h"
0025 
0026 #include <QObject>
0027 #include <QTimer>
0028 
0029 #include <QMimeDatabase>
0030 #include <QMimeType>
0031 #include <QPointer>
0032 
0033 namespace Akonadi
0034 {
0035 class Monitor;
0036 class ChangeNotification;
0037 
0038 // A helper struct to wrap pointer to member function (which cannot be contained
0039 // in a regular pointer)
0040 struct SignalId {
0041     constexpr SignalId() = default;
0042 
0043     using Unit = uint;
0044     static constexpr int Size = sizeof(&Monitor::itemAdded) / sizeof(Unit);
0045     Unit data[sizeof(&Monitor::itemAdded) / sizeof(Unit)] = {0};
0046 
0047     inline bool operator==(SignalId other) const
0048     {
0049         for (int i = Size - 1; i >= 0; --i) {
0050             if (data[i] != other.data[i]) {
0051                 return false;
0052             }
0053         }
0054         return true;
0055     }
0056 };
0057 
0058 inline size_t qHash(SignalId sig, size_t seed = 0) noexcept
0059 {
0060     // The 4 LSBs of the address should be enough to give us a good hash
0061     return ::qHashBits(sig.data, SignalId::Size, seed);
0062 }
0063 
0064 /**
0065  * @internal
0066  */
0067 class AKONADICORE_EXPORT MonitorPrivate
0068 {
0069 public:
0070     enum ListenerAction {
0071         AddListener,
0072         RemoveListener,
0073     };
0074 
0075     MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent);
0076     virtual ~MonitorPrivate();
0077     void init();
0078 
0079     Monitor *q_ptr;
0080     Q_DECLARE_PUBLIC(Monitor)
0081     ChangeNotificationDependenciesFactory *dependenciesFactory = nullptr;
0082     QPointer<Connection> ntfConnection;
0083     Collection::List collections;
0084     QSet<QByteArray> resources;
0085     QSet<Item::Id> items;
0086     QSet<Tag::Id> tags;
0087     QSet<Monitor::Type> types;
0088     QSet<QString> mimetypes;
0089     bool monitorAll;
0090     bool exclusive;
0091     QList<QByteArray> sessions;
0092     ItemFetchScope mItemFetchScope;
0093     TagFetchScope mTagFetchScope;
0094     CollectionFetchScope mCollectionFetchScope;
0095     bool mFetchChangedOnly;
0096     Session *session = nullptr;
0097     CollectionCache *collectionCache = nullptr;
0098     ItemListCache *itemCache = nullptr;
0099     TagListCache *tagCache = nullptr;
0100     QMimeDatabase mimeDatabase;
0101     QHash<SignalId, quint16> listeners;
0102 
0103     CommandBuffer mCommandBuffer;
0104 
0105     Protocol::ModifySubscriptionCommand::ModifiedParts pendingModificationChanges;
0106     Protocol::ModifySubscriptionCommand pendingModification;
0107     QTimer *pendingModificationTimer = nullptr;
0108     bool monitorReady = false;
0109 
0110     // The waiting list
0111     QQueue<Protocol::ChangeNotificationPtr> pendingNotifications;
0112     // The messages for which data is currently being fetched
0113     QQueue<Protocol::ChangeNotificationPtr> pipeline;
0114     // In a pure Monitor, the pipeline contains items that were dequeued from pendingNotifications.
0115     // The ordering [pipeline] [pendingNotifications] is kept at all times.
0116     // [] [A B C]  -> [A B] [C]  -> [B] [C] -> [B C] [] -> [C] [] -> []
0117     // In a ChangeRecorder, the pipeline contains one item only, and not dequeued yet.
0118     // [] [A B C] -> [A] [A B C] -> [] [A B C] -> (changeProcessed) [] [B C] -> [B] [B C] etc...
0119 
0120     bool fetchCollection;
0121     bool fetchCollectionStatistics;
0122     bool collectionMoveTranslationEnabled;
0123 
0124     // Virtual methods for ChangeRecorder
0125     virtual void notificationsEnqueued(int)
0126     {
0127     }
0128     virtual void notificationsErased()
0129     {
0130     }
0131 
0132     // Virtual so it can be overridden in FakeMonitor.
0133     virtual bool connectToNotificationManager();
0134     void disconnectFromNotificationManager();
0135 
0136     void dispatchNotifications();
0137     void flushPipeline();
0138 
0139     bool ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg);
0140     /**
0141      * Sends out the change notification @p msg.
0142      * @param msg the change notification to send
0143      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
0144      */
0145     virtual bool emitNotification(const Protocol::ChangeNotificationPtr &msg);
0146     void updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg);
0147     void invalidateCaches(const Protocol::ChangeNotificationPtr &msg);
0148 
0149     /** Used by ResourceBase to inform us about collection changes before the notifications are emitted,
0150         needed to avoid the missing RID race on change replay.
0151     */
0152     void invalidateCache(const Collection &col);
0153 
0154     /// Virtual so that ChangeRecorder can set it to 0 and handle the pipeline itself
0155     virtual int pipelineSize() const;
0156 
0157     // private Q_SLOTS
0158     void dataAvailable();
0159     void slotSessionDestroyed(QObject *object);
0160     void slotStatisticsChangedFinished(KJob *job);
0161     void slotFlushRecentlyChangedCollections();
0162 
0163     /**
0164       Returns whether a message was appended to @p notificationQueue
0165     */
0166     int translateAndCompress(QQueue<Protocol::ChangeNotificationPtr> &notificationQueue, const Protocol::ChangeNotificationPtr &msg);
0167 
0168     void handleCommands();
0169 
0170     virtual void slotNotify(const Protocol::ChangeNotificationPtr &msg);
0171 
0172     /**
0173      * Sends out a change notification for an item.
0174      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
0175      */
0176     bool emitItemsNotification(const Protocol::ItemChangeNotification &msg,
0177                                const Item::List &items = Item::List(),
0178                                const Collection &collection = Collection(),
0179                                const Collection &collectionDest = Collection());
0180     /**
0181      * Sends out a change notification for a collection.
0182      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
0183      */
0184     bool emitCollectionNotification(const Protocol::CollectionChangeNotification &msg,
0185                                     const Collection &col = Collection(),
0186                                     const Collection &par = Collection(),
0187                                     const Collection &dest = Collection());
0188 
0189     bool emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tags);
0190 
0191     bool emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation);
0192 
0193     bool emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const NotificationSubscriber &subscriber);
0194 
0195     bool emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf);
0196 
0197     void serverStateChanged(Akonadi::ServerManager::State state);
0198 
0199     /**
0200      * This method is called by the ChangeMediator to enforce an invalidation of the passed collection.
0201      */
0202     void invalidateCollectionCache(qint64 collectionId);
0203 
0204     /**
0205      * This method is called by the ChangeMediator to enforce an invalidation of the passed item.
0206      */
0207     void invalidateItemCache(qint64 itemId);
0208 
0209     /**
0210      * This method is called by the ChangeMediator to enforce an invalidation of the passed tag.
0211      */
0212     void invalidateTagCache(qint64 tagId);
0213 
0214     void scheduleSubscriptionUpdate();
0215     void slotUpdateSubscription();
0216 
0217     void updateListeners(QMetaMethod signal, ListenerAction action);
0218 
0219     template<typename Signal>
0220     void updateListener(Signal signal, ListenerAction action)
0221     {
0222         auto it = listeners.find(signalId(signal));
0223         if (action == AddListener) {
0224             if (it == listeners.end()) {
0225                 it = listeners.insert(signalId(signal), 0);
0226             }
0227             ++(*it);
0228         } else {
0229             if (--(*it) == 0) {
0230                 listeners.erase(it);
0231             }
0232         }
0233     }
0234 
0235     static Protocol::ModifySubscriptionCommand::ChangeType monitorTypeToProtocol(Monitor::Type type);
0236 
0237     /**
0238       @brief Class used to determine when to purge items in a Collection
0239 
0240       The buffer method can be used to buffer a Collection. This may cause another Collection
0241       to be purged if it is removed from the buffer.
0242 
0243       The purge method is used to purge a Collection from the buffer, but not the model.
0244       This is used for example, to not buffer Collections anymore if they get referenced,
0245       and to ensure that one Collection does not appear twice in the buffer.
0246 
0247       Check whether a Collection is buffered using the isBuffered method.
0248     */
0249     class AKONADI_TESTS_EXPORT PurgeBuffer
0250     {
0251         // Buffer the most recent 10 unreferenced Collections
0252         static const int MAXBUFFERSIZE = 10;
0253 
0254     public:
0255         explicit PurgeBuffer()
0256         {
0257         }
0258 
0259         /**
0260           Adds @p id to the Collections to be buffered
0261 
0262           @returns The collection id which was removed form the buffer or -1 if none.
0263         */
0264         Collection::Id buffer(Collection::Id id);
0265 
0266         /**
0267         Removes @p id from the Collections being buffered
0268         */
0269         void purge(Collection::Id id);
0270 
0271         bool isBuffered(Collection::Id id) const
0272         {
0273             return m_buffer.contains(id);
0274         }
0275 
0276         static int buffersize();
0277 
0278     private:
0279         QQueue<Collection::Id> m_buffer;
0280     } m_buffer;
0281 
0282     QHash<Collection::Id, int> refCountMap;
0283     bool useRefCounting;
0284     void ref(Collection::Id id);
0285     Collection::Id deref(Collection::Id id);
0286 
0287     /**
0288      * Returns true if the collection is monitored by monitor.
0289      *
0290      * A collection is always monitored if useRefCounting is false.
0291      * If ref counting is used, the collection is only monitored,
0292      * if the collection is either in refCountMap or m_buffer.
0293      * If ref counting is used and the collection is not in refCountMap or m_buffer,
0294      * no updates for the contained items are emitted, because they are lazily ignored.
0295      */
0296     bool isMonitored(Collection::Id colId) const;
0297 
0298 private:
0299     // collections that need a statistics update
0300     QSet<Collection::Id> recentlyChangedCollections;
0301     QTimer statisticsCompressionTimer;
0302 
0303     /**
0304       @returns True if @p msg should be ignored. Otherwise appropriate signals are emitted for it.
0305     */
0306     bool isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion = false) const;
0307 
0308     /**
0309       Sets @p needsSplit to True when @p msg contains more than one item and there's at least one
0310       listener that does not support batch operations. Sets @p batchSupported to True when
0311       there's at least one listener that supports batch operations.
0312     */
0313     void checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const;
0314 
0315     Protocol::ChangeNotificationList splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const;
0316 
0317     bool isCollectionMonitored(Collection::Id collection) const
0318     {
0319         if (collection < 0) {
0320             return false;
0321         }
0322         if (collections.contains(Collection(collection))) {
0323             return true;
0324         }
0325         if (collections.contains(Collection::root())) {
0326             return true;
0327         }
0328         return false;
0329     }
0330 
0331     bool isMimeTypeMonitored(const QString &mimetype) const
0332     {
0333         if (mimetypes.contains(mimetype)) {
0334             return true;
0335         }
0336 
0337         const QMimeType mimeType = mimeDatabase.mimeTypeForName(mimetype);
0338         if (!mimeType.isValid()) {
0339             return false;
0340         }
0341 
0342         for (const QString &mt : mimetypes) {
0343             if (mimeType.inherits(mt)) {
0344                 return true;
0345             }
0346         }
0347 
0348         return false;
0349     }
0350 
0351     template<typename T>
0352     bool isMoveDestinationResourceMonitored(const T &msg) const
0353     {
0354         if (msg.operation() != T::Move) {
0355             return false;
0356         }
0357         return resources.contains(msg.destinationResource());
0358     }
0359 
0360     void fetchStatistics(Collection::Id colId)
0361     {
0362         auto job = new CollectionStatisticsJob(Collection(colId), session);
0363         QObject::connect(job, &KJob::result, q_ptr, [this](KJob *job) {
0364             slotStatisticsChangedFinished(job);
0365         });
0366     }
0367 
0368     void notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource);
0369     bool fetchCollections() const;
0370     bool fetchItems() const;
0371 
0372     // A hack to "cast" pointer to member function to something we can easily
0373     // use as a key in the hashtable
0374     template<typename Signal>
0375     constexpr SignalId signalId(Signal signal) const
0376     {
0377         union {
0378             Signal in;
0379             SignalId out;
0380         } h = {signal};
0381         return h.out;
0382     }
0383 
0384     template<typename Signal>
0385     bool hasListeners(Signal signal) const
0386     {
0387         auto it = listeners.find(signalId(signal));
0388         return it != listeners.end();
0389     }
0390 
0391     template<typename Signal, typename... Args>
0392     bool emitToListeners(Signal signal, Args... args)
0393     {
0394         if (hasListeners(signal)) {
0395             Q_EMIT(q_ptr->*signal)(std::forward<Args>(args)...);
0396             return true;
0397         }
0398         return false;
0399     }
0400 };
0401 
0402 }