File indexing completed on 2024-11-10 04:40:43

0001 /*
0002     SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 /// @cond PRIVATE
0008 
0009 #include "monitor_p.h"
0010 
0011 #include "akonadicore_debug.h"
0012 #include "changemediator_p.h"
0013 #include "changenotification.h"
0014 #include "collectionfetchjob.h"
0015 #include "collectionstatistics.h"
0016 #include "itemfetchjob.h"
0017 #include "notificationmanagerinterface.h"
0018 #include "notificationsubscriber.h"
0019 #include "protocolhelper_p.h"
0020 #include "session.h"
0021 #include "vectorhelper.h"
0022 
0023 #include "shared/akranges.h"
0024 
0025 #include <utility>
0026 
0027 using namespace Akonadi;
0028 using namespace AkRanges;
0029 
0030 class operation;
0031 
0032 static const int PipelineSize = 5;
0033 
0034 MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent)
0035     : q_ptr(parent)
0036     , dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory)
0037     , ntfConnection(nullptr)
0038     , monitorAll(false)
0039     , exclusive(false)
0040     , mFetchChangedOnly(false)
0041     , session(Session::defaultSession())
0042     , collectionCache(nullptr)
0043     , mCommandBuffer(parent, "handleCommands")
0044     , pendingModificationChanges(Protocol::ModifySubscriptionCommand::None)
0045     , monitorReady(false)
0046     , fetchCollection(false)
0047     , fetchCollectionStatistics(false)
0048     , collectionMoveTranslationEnabled(true)
0049     , useRefCounting(false)
0050 {
0051 }
0052 
0053 MonitorPrivate::~MonitorPrivate()
0054 {
0055     disconnectFromNotificationManager();
0056     delete dependenciesFactory;
0057     delete collectionCache;
0058     delete itemCache;
0059     delete tagCache;
0060 }
0061 
0062 void MonitorPrivate::init()
0063 {
0064     // needs to be at least 3x pipeline size for the collection move case
0065     collectionCache = dependenciesFactory->createCollectionCache(3 * PipelineSize, session);
0066     // needs to be at least 1x pipeline size
0067     itemCache = dependenciesFactory->createItemListCache(PipelineSize, session);
0068     // 20 tags looks like a reasonable amount to keep around
0069     tagCache = dependenciesFactory->createTagListCache(20, session);
0070 
0071     QObject::connect(collectionCache, &CollectionCache::dataAvailable, q_ptr, [this]() {
0072         dataAvailable();
0073     });
0074     QObject::connect(itemCache, &ItemCache::dataAvailable, q_ptr, [this]() {
0075         dataAvailable();
0076     });
0077     QObject::connect(tagCache, &TagCache::dataAvailable, q_ptr, [this]() {
0078         dataAvailable();
0079     });
0080     QObject::connect(ServerManager::self(), &ServerManager::stateChanged, q_ptr, [this](auto state) {
0081         serverStateChanged(state);
0082     });
0083 
0084     statisticsCompressionTimer.setSingleShot(true);
0085     statisticsCompressionTimer.setInterval(500);
0086     QObject::connect(&statisticsCompressionTimer, &QTimer::timeout, q_ptr, [this]() {
0087         slotFlushRecentlyChangedCollections();
0088     });
0089 }
0090 
0091 bool MonitorPrivate::connectToNotificationManager()
0092 {
0093     if (ntfConnection) {
0094         ntfConnection->deleteLater();
0095         ntfConnection = nullptr;
0096     }
0097 
0098     if (!session) {
0099         return false;
0100     }
0101 
0102     ntfConnection = dependenciesFactory->createNotificationConnection(session, &mCommandBuffer);
0103     if (!ntfConnection) {
0104         return false;
0105     }
0106 
0107     slotUpdateSubscription();
0108 
0109     ntfConnection->reconnect();
0110 
0111     return true;
0112 }
0113 
0114 void MonitorPrivate::disconnectFromNotificationManager()
0115 {
0116     if (ntfConnection) {
0117         ntfConnection->disconnect(q_ptr);
0118         dependenciesFactory->destroyNotificationConnection(session, ntfConnection.data());
0119     }
0120 }
0121 
0122 void MonitorPrivate::serverStateChanged(ServerManager::State state)
0123 {
0124     if (state == ServerManager::Running) {
0125         connectToNotificationManager();
0126     }
0127 }
0128 
0129 void MonitorPrivate::invalidateCollectionCache(qint64 id)
0130 {
0131     collectionCache->update(id, mCollectionFetchScope);
0132 }
0133 
0134 void MonitorPrivate::invalidateItemCache(qint64 id)
0135 {
0136     itemCache->update({id}, mItemFetchScope);
0137     // Also invalidate content of all any pending notification for given item
0138     for (auto it = pendingNotifications.begin(), end = pendingNotifications.end(); it != end; ++it) {
0139         if ((*it)->type() == Protocol::Command::ItemChangeNotification) {
0140             auto &ntf = Protocol::cmdCast<Protocol::ItemChangeNotification>(*it);
0141             const auto items = ntf.items();
0142             if (std::any_of(items.cbegin(), items.cend(), [id](const Protocol::FetchItemsResponse &r) {
0143                     return r.id() == id;
0144                 })) {
0145                 ntf.setMustRetrieve(true);
0146             }
0147         }
0148     }
0149 }
0150 
0151 void MonitorPrivate::invalidateTagCache(qint64 id)
0152 {
0153     tagCache->update({id}, mTagFetchScope);
0154 }
0155 
0156 int MonitorPrivate::pipelineSize() const
0157 {
0158     return PipelineSize;
0159 }
0160 
0161 void MonitorPrivate::scheduleSubscriptionUpdate()
0162 {
0163     if (pendingModificationTimer || !monitorReady) {
0164         return;
0165     }
0166 
0167     pendingModificationTimer = new QTimer(q_ptr);
0168     pendingModificationTimer->setSingleShot(true);
0169     pendingModificationTimer->setInterval(0);
0170     pendingModificationTimer->start();
0171     q_ptr->connect(pendingModificationTimer, &QTimer::timeout, q_ptr, [this]() {
0172         slotUpdateSubscription();
0173     });
0174 }
0175 
0176 void MonitorPrivate::slotUpdateSubscription()
0177 {
0178     if (pendingModificationTimer) {
0179         pendingModificationTimer->stop();
0180         std::exchange(pendingModificationTimer, nullptr)->deleteLater();
0181     }
0182 
0183     if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
0184         pendingModification.setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope));
0185     }
0186     if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
0187         pendingModification.setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope));
0188     }
0189     if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::TagFetchScope) {
0190         pendingModification.setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope));
0191     }
0192     pendingModificationChanges = Protocol::ModifySubscriptionCommand::None;
0193 
0194     if (ntfConnection) {
0195         ntfConnection->sendCommand(3, Protocol::ModifySubscriptionCommandPtr::create(pendingModification));
0196         pendingModification = Protocol::ModifySubscriptionCommand();
0197     }
0198 }
0199 
0200 bool MonitorPrivate::isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion) const
0201 {
0202     if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0203         // Lazy fetching can only affects items.
0204         return false;
0205     }
0206 
0207     if (msg->type() == Protocol::Command::TagChangeNotification) {
0208         const auto op = Protocol::cmdCast<Protocol::TagChangeNotification>(msg).operation();
0209         return ((op == Protocol::TagChangeNotification::Add && !hasListeners(&Monitor::tagAdded))
0210                 || (op == Protocol::TagChangeNotification::Modify && !hasListeners(&Monitor::tagChanged))
0211                 || (op == Protocol::TagChangeNotification::Remove && !hasListeners(&Monitor::tagRemoved)));
0212     }
0213 
0214     if (!fetchCollectionStatistics && msg->type() == Protocol::Command::ItemChangeNotification) {
0215         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0216         const auto op = itemNtf.operation();
0217         if ((op == Protocol::ItemChangeNotification::Add && !hasListeners(&Monitor::itemAdded))
0218             || (op == Protocol::ItemChangeNotification::Remove && !hasListeners(&Monitor::itemRemoved) && !hasListeners(&Monitor::itemsRemoved))
0219             || (op == Protocol::ItemChangeNotification::Modify && !hasListeners(&Monitor::itemChanged))
0220             || (op == Protocol::ItemChangeNotification::ModifyFlags
0221                 && !hasListeners(&Monitor::itemsFlagsChanged)
0222                 // Newly delivered ModifyFlags notifications will be converted to
0223                 // itemChanged(item, "FLAGS") for legacy clients.
0224                 && (!allowModifyFlagsConversion || !hasListeners(&Monitor::itemChanged)))
0225             || (op == Protocol::ItemChangeNotification::ModifyTags && !hasListeners(&Monitor::itemsTagsChanged))
0226             || (op == Protocol::ItemChangeNotification::Move && !hasListeners(&Monitor::itemMoved) && !hasListeners(&Monitor::itemsMoved))
0227             || (op == Protocol::ItemChangeNotification::Link && !hasListeners(&Monitor::itemLinked) && !hasListeners(&Monitor::itemsLinked))
0228             || (op == Protocol::ItemChangeNotification::Unlink && !hasListeners(&Monitor::itemUnlinked) && !hasListeners(&Monitor::itemsUnlinked))) {
0229             return true;
0230         }
0231 
0232         if (!useRefCounting) {
0233             return false;
0234         }
0235 
0236         const Collection::Id parentCollectionId = itemNtf.parentCollection();
0237 
0238         if ((op == Protocol::ItemChangeNotification::Add) || (op == Protocol::ItemChangeNotification::Remove)
0239             || (op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags)
0240             || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::Link)
0241             || (op == Protocol::ItemChangeNotification::Unlink)) {
0242             if (isMonitored(parentCollectionId)) {
0243                 return false;
0244             }
0245         }
0246 
0247         if (op == Protocol::ItemChangeNotification::Move) {
0248             // We can't ignore the move. It must be transformed later into a removal or insertion.
0249             return !isMonitored(parentCollectionId) && !isMonitored(itemNtf.parentDestCollection());
0250         }
0251         return true;
0252     }
0253 
0254     return false;
0255 }
0256 
0257 void MonitorPrivate::checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const
0258 {
0259     if (msg->type() != Protocol::Command::ItemChangeNotification) {
0260         needsSplit = false;
0261         batchSupported = false;
0262         return;
0263     }
0264 
0265     const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0266     const bool isBatch = (itemNtf.items().count() > 1);
0267 
0268     switch (itemNtf.operation()) {
0269     case Protocol::ItemChangeNotification::Add:
0270     case Protocol::ItemChangeNotification::Modify:
0271         needsSplit = isBatch;
0272         batchSupported = false;
0273         return;
0274         needsSplit = isBatch;
0275         batchSupported = false;
0276         return;
0277     case Protocol::ItemChangeNotification::ModifyFlags:
0278         batchSupported = hasListeners(&Monitor::itemsFlagsChanged);
0279         needsSplit = isBatch && !batchSupported && hasListeners(&Monitor::itemChanged);
0280         return;
0281     case Protocol::ItemChangeNotification::ModifyTags:
0282     case Protocol::ItemChangeNotification::ModifyRelations:
0283         // Tags and relations were added after batch notifications, so they are always supported
0284         batchSupported = true;
0285         needsSplit = false;
0286         return;
0287     case Protocol::ItemChangeNotification::Move:
0288         needsSplit = isBatch && hasListeners(&Monitor::itemMoved);
0289         batchSupported = hasListeners(&Monitor::itemsMoved);
0290         return;
0291     case Protocol::ItemChangeNotification::Remove:
0292         needsSplit = isBatch && hasListeners(&Monitor::itemRemoved);
0293         batchSupported = hasListeners(&Monitor::itemsRemoved);
0294         return;
0295     case Protocol::ItemChangeNotification::Link:
0296         needsSplit = isBatch && hasListeners(&Monitor::itemLinked);
0297         batchSupported = hasListeners(&Monitor::itemsLinked);
0298         return;
0299     case Protocol::ItemChangeNotification::Unlink:
0300         needsSplit = isBatch && hasListeners(&Monitor::itemUnlinked);
0301         batchSupported = hasListeners(&Monitor::itemsUnlinked);
0302         return;
0303     default:
0304         needsSplit = isBatch;
0305         batchSupported = false;
0306         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << itemNtf.operation() << "in item change notification";
0307         return;
0308     }
0309 }
0310 
0311 Protocol::ChangeNotificationList MonitorPrivate::splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const
0312 {
0313     Protocol::ChangeNotificationList list;
0314 
0315     Protocol::ItemChangeNotification baseMsg;
0316     baseMsg.setSessionId(msg.sessionId());
0317     if (legacy && msg.operation() == Protocol::ItemChangeNotification::ModifyFlags) {
0318         baseMsg.setOperation(Protocol::ItemChangeNotification::Modify);
0319         baseMsg.setItemParts(QSet<QByteArray>() << "FLAGS");
0320     } else {
0321         baseMsg.setOperation(msg.operation());
0322         baseMsg.setItemParts(msg.itemParts());
0323     }
0324     baseMsg.setParentCollection(msg.parentCollection());
0325     baseMsg.setParentDestCollection(msg.parentDestCollection());
0326     baseMsg.setResource(msg.resource());
0327     baseMsg.setDestinationResource(msg.destinationResource());
0328     baseMsg.setAddedFlags(msg.addedFlags());
0329     baseMsg.setRemovedFlags(msg.removedFlags());
0330     baseMsg.setAddedTags(msg.addedTags());
0331     baseMsg.setRemovedTags(msg.removedTags());
0332 
0333     const auto &items = msg.items();
0334     list.reserve(items.count());
0335     for (const auto &item : items) {
0336         auto copy = Protocol::ItemChangeNotificationPtr::create(baseMsg);
0337         copy->setItems({Protocol::FetchItemsResponse(item)});
0338         list.push_back(std::move(copy));
0339     }
0340 
0341     return list;
0342 }
0343 
0344 bool MonitorPrivate::fetchCollections() const
0345 {
0346     return fetchCollection;
0347 }
0348 
0349 bool MonitorPrivate::fetchItems() const
0350 {
0351     return !mItemFetchScope.isEmpty();
0352 }
0353 
0354 bool MonitorPrivate::ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg)
0355 {
0356     if (msg->type() == Protocol::Command::TagChangeNotification) {
0357         const auto &tagMsg = Protocol::cmdCast<Protocol::TagChangeNotification>(msg);
0358         if (tagMsg.metadata().contains("FETCH_TAG")) {
0359             if (!tagCache->ensureCached({tagMsg.tag().id()}, mTagFetchScope)) {
0360                 return false;
0361             }
0362         }
0363         return true;
0364     }
0365 
0366     if (msg->type() == Protocol::Command::RelationChangeNotification) {
0367         return true;
0368     }
0369 
0370     if (msg->type() == Protocol::Command::SubscriptionChangeNotification) {
0371         return true;
0372     }
0373 
0374     if (msg->type() == Protocol::Command::DebugChangeNotification) {
0375         return true;
0376     }
0377 
0378     if (msg->type() == Protocol::Command::CollectionChangeNotification
0379         && Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).operation() == Protocol::CollectionChangeNotification::Remove) {
0380         // For collection removals the collection is gone already, so we can't fetch it,
0381         // but we have to at least obtain the ancestor chain.
0382         const qint64 parentCollection = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentCollection();
0383         return parentCollection <= -1 || collectionCache->ensureCached(parentCollection, mCollectionFetchScope);
0384     }
0385 
0386     bool allCached = true;
0387     if (fetchCollections()) {
0388         const qint64 parentCollection = (msg->type() == Protocol::Command::ItemChangeNotification)
0389             ? Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).parentCollection()
0390             : (msg->type() == Protocol::Command::CollectionChangeNotification)
0391             ? Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentCollection()
0392             : -1;
0393         if (parentCollection > -1 && !collectionCache->ensureCached(parentCollection, mCollectionFetchScope)) {
0394             allCached = false;
0395         }
0396 
0397         qint64 parentDestCollection = -1;
0398 
0399         if ((msg->type() == Protocol::Command::ItemChangeNotification)
0400             && (Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).operation() == Protocol::ItemChangeNotification::Move)) {
0401             parentDestCollection = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).parentDestCollection();
0402         } else if ((msg->type() == Protocol::Command::CollectionChangeNotification)
0403                    && (Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).operation() == Protocol::CollectionChangeNotification::Move)) {
0404             parentDestCollection = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentDestCollection();
0405         }
0406         if (parentDestCollection > -1 && !collectionCache->ensureCached(parentDestCollection, mCollectionFetchScope)) {
0407             allCached = false;
0408         }
0409     }
0410 
0411     if (msg->isRemove()) {
0412         return allCached;
0413     }
0414 
0415     if (msg->type() == Protocol::Command::ItemChangeNotification && fetchItems()) {
0416         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0417         if (mFetchChangedOnly
0418             && (itemNtf.operation() == Protocol::ItemChangeNotification::Modify || itemNtf.operation() == Protocol::ItemChangeNotification::ModifyFlags)) {
0419             const auto changedParts = itemNtf.itemParts();
0420             const auto requestedParts = mItemFetchScope.payloadParts();
0421             const auto requestedAttrs = mItemFetchScope.attributes();
0422             QSet<QByteArray> missingParts;
0423             QSet<QByteArray> missingAttributes;
0424             for (const QByteArray &part : changedParts) {
0425                 const auto partName = part.mid(4);
0426                 if (part.startsWith("PLD:") && // krazy:exclude=strings since QByteArray
0427                     (!mItemFetchScope.fullPayload() || !requestedParts.contains(partName))) {
0428                     missingParts.insert(partName);
0429                 } else if (part.startsWith("ATR:") && // krazy:exclude=strings since QByteArray
0430                            (!mItemFetchScope.allAttributes() || !requestedAttrs.contains(partName))) {
0431                     missingAttributes.insert(partName);
0432                 }
0433             }
0434 
0435             if (!missingParts.isEmpty() || !missingAttributes.isEmpty()) {
0436                 ItemFetchScope scope(mItemFetchScope);
0437                 scope.fetchFullPayload(false);
0438                 for (const auto &part : requestedParts) {
0439                     scope.fetchPayloadPart(part, false);
0440                 }
0441                 for (const auto &attr : requestedAttrs) {
0442                     scope.fetchAttribute(attr, false);
0443                 }
0444                 for (const auto &part : missingParts) {
0445                     scope.fetchPayloadPart(part, true);
0446                 }
0447                 for (const auto &attr : missingAttributes) {
0448                     scope.fetchAttribute(attr, true);
0449                 }
0450 
0451                 if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), scope)) {
0452                     return false;
0453                 }
0454             }
0455 
0456             return allCached;
0457         }
0458 
0459         // Make sure all tags for ModifyTags operation are in cache too
0460         if (itemNtf.operation() == Protocol::ItemChangeNotification::ModifyTags) {
0461             if (!tagCache->ensureCached((itemNtf.addedTags() + itemNtf.removedTags()) | Actions::toQList, mTagFetchScope)) {
0462                 return false;
0463             }
0464         }
0465 
0466         if (itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve()) {
0467             if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope)) {
0468                 return false;
0469             }
0470         }
0471 
0472         return allCached;
0473 
0474     } else if (msg->type() == Protocol::Command::CollectionChangeNotification && fetchCollections()) {
0475         const auto &colMsg = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg);
0476         if (colMsg.metadata().contains("FETCH_COLLECTION")) {
0477             if (!collectionCache->ensureCached(colMsg.collection().id(), mCollectionFetchScope)) {
0478                 return false;
0479             }
0480         }
0481 
0482         return allCached;
0483     }
0484 
0485     return allCached;
0486 }
0487 
0488 bool MonitorPrivate::emitNotification(const Protocol::ChangeNotificationPtr &msg)
0489 {
0490     bool someoneWasListening = false;
0491     if (msg->type() == Protocol::Command::TagChangeNotification) {
0492         const auto &tagNtf = Protocol::cmdCast<Protocol::TagChangeNotification>(msg);
0493         const bool fetched = tagNtf.metadata().contains("FETCH_TAG");
0494         Tag tag;
0495         if (fetched) {
0496             const auto tags = tagCache->retrieve({tagNtf.tag().id()});
0497             tag = tags.isEmpty() ? Tag() : tags.at(0);
0498         } else {
0499             tag = ProtocolHelper::parseTag(tagNtf.tag());
0500         }
0501         someoneWasListening = emitTagNotification(tagNtf, tag);
0502     } else if (msg->type() == Protocol::Command::RelationChangeNotification) {
0503         const auto &relNtf = Protocol::cmdCast<Protocol::RelationChangeNotification>(msg);
0504         const Relation rel = ProtocolHelper::parseRelationFetchResult(relNtf.relation());
0505         someoneWasListening = emitRelationNotification(relNtf, rel);
0506     } else if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0507         const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg);
0508         const Collection parent = collectionCache->retrieve(colNtf.parentCollection());
0509         Collection destParent;
0510         if (colNtf.operation() == Protocol::CollectionChangeNotification::Move) {
0511             destParent = collectionCache->retrieve(colNtf.parentDestCollection());
0512         }
0513 
0514         const bool fetched = colNtf.metadata().contains("FETCH_COLLECTION");
0515         // For removals this will retrieve an invalid collection. We'll deal with that in emitCollectionNotification
0516         const Collection col = fetched ? collectionCache->retrieve(colNtf.collection().id()) : ProtocolHelper::parseCollection(colNtf.collection(), true);
0517         // It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while
0518         // the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them.
0519         if (col.isValid() || colNtf.operation() == Protocol::CollectionChangeNotification::Remove || !fetchCollections()) {
0520             someoneWasListening = emitCollectionNotification(colNtf, col, parent, destParent);
0521         }
0522     } else if (msg->type() == Protocol::Command::ItemChangeNotification) {
0523         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0524         const Collection parent = collectionCache->retrieve(itemNtf.parentCollection());
0525         Collection destParent;
0526         if (itemNtf.operation() == Protocol::ItemChangeNotification::Move) {
0527             destParent = collectionCache->retrieve(itemNtf.parentDestCollection());
0528         }
0529         const bool fetched = itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve();
0530         // For removals this will retrieve an empty set. We'll deal with that in emitItemNotification
0531         Item::List items;
0532         if (fetched && fetchItems()) {
0533             items = itemCache->retrieve(Protocol::ChangeNotification::itemsToUids(itemNtf.items()));
0534         } else {
0535             const auto &ntfItems = itemNtf.items();
0536             items.reserve(ntfItems.size());
0537             for (const auto &ntfItem : ntfItems) {
0538                 items.push_back(ProtocolHelper::parseItemFetchResult(ntfItem, &mItemFetchScope));
0539             }
0540         }
0541         // It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while
0542         // the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them.
0543         if (!items.isEmpty() || itemNtf.operation() == Protocol::ItemChangeNotification::Remove || !fetchItems()) {
0544             someoneWasListening = emitItemsNotification(itemNtf, items, parent, destParent);
0545         }
0546     } else if (msg->type() == Protocol::Command::SubscriptionChangeNotification) {
0547         const auto &subNtf = Protocol::cmdCast<Protocol::SubscriptionChangeNotification>(msg);
0548         NotificationSubscriber subscriber;
0549         subscriber.setSubscriber(subNtf.subscriber());
0550         subscriber.setSessionId(subNtf.sessionId());
0551         subscriber.setMonitoredCollections(subNtf.collections());
0552         subscriber.setMonitoredItems(subNtf.items());
0553         subscriber.setMonitoredTags(subNtf.tags());
0554         QSet<Monitor::Type> monitorTypes;
0555         const auto types = subNtf.types();
0556         for (auto type : types) {
0557             if (type == Protocol::ModifySubscriptionCommand::NoType) {
0558                 continue;
0559             }
0560             monitorTypes.insert([](Protocol::ModifySubscriptionCommand::ChangeType type) {
0561                 switch (type) {
0562                 case Protocol::ModifySubscriptionCommand::ItemChanges:
0563                     return Monitor::Items;
0564                 case Protocol::ModifySubscriptionCommand::CollectionChanges:
0565                     return Monitor::Collections;
0566                 case Protocol::ModifySubscriptionCommand::TagChanges:
0567                     return Monitor::Tags;
0568                 case Protocol::ModifySubscriptionCommand::RelationChanges:
0569                     return Monitor::Relations;
0570                 case Protocol::ModifySubscriptionCommand::SubscriptionChanges:
0571                     return Monitor::Subscribers;
0572                 case Protocol::ModifySubscriptionCommand::ChangeNotifications:
0573                     return Monitor::Notifications;
0574                 default:
0575                     Q_ASSERT(false);
0576                     return Monitor::Items; // unreachable
0577                 }
0578             }(type));
0579         }
0580         subscriber.setMonitoredTypes(monitorTypes);
0581         subscriber.setMonitoredMimeTypes(subNtf.mimeTypes());
0582         subscriber.setMonitoredResources(subNtf.resources());
0583         subscriber.setIgnoredSessions(subNtf.ignoredSessions());
0584         subscriber.setIsAllMonitored(subNtf.allMonitored());
0585         subscriber.setIsExclusive(subNtf.exclusive());
0586         subscriber.setItemFetchScope(ProtocolHelper::parseItemFetchScope(subNtf.itemFetchScope()));
0587         subscriber.setCollectionFetchScope(ProtocolHelper::parseCollectionFetchScope(subNtf.collectionFetchScope()));
0588         someoneWasListening = emitSubscriptionChangeNotification(subNtf, subscriber);
0589     } else if (msg->type() == Protocol::Command::DebugChangeNotification) {
0590         const auto &changeNtf = Protocol::cmdCast<Protocol::DebugChangeNotification>(msg);
0591         ChangeNotification notification;
0592         notification.setListeners(changeNtf.listeners());
0593         notification.setTimestamp(QDateTime::fromMSecsSinceEpoch(changeNtf.timestamp()));
0594         notification.setNotification(changeNtf.notification());
0595         switch (changeNtf.notification()->type()) {
0596         case Protocol::Command::ItemChangeNotification:
0597             notification.setType(ChangeNotification::Items);
0598             break;
0599         case Protocol::Command::CollectionChangeNotification:
0600             notification.setType(ChangeNotification::Collection);
0601             break;
0602         case Protocol::Command::TagChangeNotification:
0603             notification.setType(ChangeNotification::Tag);
0604             break;
0605         case Protocol::Command::RelationChangeNotification:
0606             notification.setType(ChangeNotification::Relation);
0607             break;
0608         case Protocol::Command::SubscriptionChangeNotification:
0609             notification.setType(ChangeNotification::Subscription);
0610             break;
0611         default:
0612             Q_ASSERT(false); // huh?
0613             return false;
0614         }
0615 
0616         someoneWasListening = emitDebugChangeNotification(changeNtf, notification);
0617     }
0618 
0619     return someoneWasListening;
0620 }
0621 
0622 void MonitorPrivate::updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg)
0623 {
0624     if (msg->type() == Protocol::Command::ItemChangeNotification) {
0625         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0626         notifyCollectionStatisticsWatchers(itemNtf.parentCollection(), itemNtf.resource());
0627         // FIXME use the proper resource of the target collection, for cross resource moves
0628         notifyCollectionStatisticsWatchers(itemNtf.parentDestCollection(), itemNtf.destinationResource());
0629     } else if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0630         const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg);
0631         if (colNtf.operation() == Protocol::CollectionChangeNotification::Remove) {
0632             // no need for statistics updates anymore
0633             recentlyChangedCollections.remove(colNtf.collection().id());
0634         }
0635     }
0636 }
0637 
0638 void MonitorPrivate::slotSessionDestroyed(QObject *object)
0639 {
0640     auto objectSession = qobject_cast<Session *>(object);
0641     if (objectSession) {
0642         sessions.removeAll(objectSession->sessionId());
0643         pendingModification.stopIgnoringSession(objectSession->sessionId());
0644         scheduleSubscriptionUpdate();
0645     }
0646 }
0647 
0648 void MonitorPrivate::slotStatisticsChangedFinished(KJob *job)
0649 {
0650     if (job->error()) {
0651         qCWarning(AKONADICORE_LOG) << "Error on fetching collection statistics: " << job->errorText();
0652     } else {
0653         auto statisticsJob = static_cast<CollectionStatisticsJob *>(job);
0654         Q_ASSERT(statisticsJob->collection().isValid());
0655         Q_EMIT q_ptr->collectionStatisticsChanged(statisticsJob->collection().id(), statisticsJob->statistics());
0656     }
0657 }
0658 
0659 void MonitorPrivate::slotFlushRecentlyChangedCollections()
0660 {
0661     for (Collection::Id collection : std::as_const(recentlyChangedCollections)) {
0662         Q_ASSERT(collection >= 0);
0663         if (fetchCollectionStatistics) {
0664             fetchStatistics(collection);
0665         } else {
0666             static const CollectionStatistics dummyStatistics;
0667             Q_EMIT q_ptr->collectionStatisticsChanged(collection, dummyStatistics);
0668         }
0669     }
0670     recentlyChangedCollections.clear();
0671 }
0672 
0673 int MonitorPrivate::translateAndCompress(QQueue<Protocol::ChangeNotificationPtr> &notificationQueue, const Protocol::ChangeNotificationPtr &msg)
0674 {
0675     // Always handle tags and relations
0676     if (msg->type() == Protocol::Command::TagChangeNotification || msg->type() == Protocol::Command::RelationChangeNotification) {
0677         notificationQueue.enqueue(msg);
0678         return 1;
0679     }
0680 
0681     // We have to split moves into insert or remove if the source or destination
0682     // is not monitored.
0683     if (!msg->isMove()) {
0684         notificationQueue.enqueue(msg);
0685         return 1;
0686     }
0687 
0688     bool sourceWatched = false;
0689     bool destWatched = false;
0690 
0691     if (msg->type() == Protocol::Command::ItemChangeNotification) {
0692         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0693         if (useRefCounting) {
0694             sourceWatched = isMonitored(itemNtf.parentCollection());
0695             destWatched = isMonitored(itemNtf.parentDestCollection());
0696         } else {
0697             if (!resources.isEmpty()) {
0698                 sourceWatched = resources.contains(itemNtf.resource());
0699                 destWatched = isMoveDestinationResourceMonitored(itemNtf);
0700             }
0701             if (!sourceWatched) {
0702                 sourceWatched = isCollectionMonitored(itemNtf.parentCollection());
0703             }
0704             if (!destWatched) {
0705                 destWatched = isCollectionMonitored(itemNtf.parentDestCollection());
0706             }
0707         }
0708     } else if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0709         const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg);
0710         if (!resources.isEmpty()) {
0711             sourceWatched = resources.contains(colNtf.resource());
0712             destWatched = isMoveDestinationResourceMonitored(colNtf);
0713         }
0714         if (!sourceWatched) {
0715             sourceWatched = isCollectionMonitored(colNtf.parentCollection());
0716         }
0717         if (!destWatched) {
0718             destWatched = isCollectionMonitored(colNtf.parentDestCollection());
0719         }
0720     } else {
0721         Q_ASSERT(false);
0722         return 0;
0723     }
0724 
0725     if (!sourceWatched && !destWatched) {
0726         return 0;
0727     }
0728 
0729     if ((sourceWatched && destWatched) || (!collectionMoveTranslationEnabled && msg->type() == Protocol::Command::CollectionChangeNotification)) {
0730         notificationQueue.enqueue(msg);
0731         return 1;
0732     }
0733 
0734     if (sourceWatched) {
0735         if (msg->type() == Protocol::Command::ItemChangeNotification) {
0736             auto removalMessage = Protocol::ItemChangeNotificationPtr::create(Protocol::cmdCast<Protocol::ItemChangeNotification>(msg));
0737             removalMessage->setOperation(Protocol::ItemChangeNotification::Remove);
0738             removalMessage->setParentDestCollection(-1);
0739             notificationQueue.enqueue(removalMessage);
0740             return 1;
0741         } else {
0742             auto removalMessage = Protocol::CollectionChangeNotificationPtr::create(Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg));
0743             removalMessage->setOperation(Protocol::CollectionChangeNotification::Remove);
0744             removalMessage->setParentDestCollection(-1);
0745             notificationQueue.enqueue(removalMessage);
0746             return 1;
0747         }
0748     }
0749 
0750     // Transform into an insertion
0751     if (msg->type() == Protocol::Command::ItemChangeNotification) {
0752         auto insertionMessage = Protocol::ItemChangeNotificationPtr::create(Protocol::cmdCast<Protocol::ItemChangeNotification>(msg));
0753         insertionMessage->setOperation(Protocol::ItemChangeNotification::Add);
0754         insertionMessage->setParentCollection(insertionMessage->parentDestCollection());
0755         insertionMessage->setParentDestCollection(-1);
0756         // We don't support batch insertion, so we have to do it one by one
0757         const auto split = splitMessage(*insertionMessage, false);
0758         for (const Protocol::ChangeNotificationPtr &insertion : split) {
0759             notificationQueue.enqueue(insertion);
0760         }
0761         return split.count();
0762     } else if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0763         auto insertionMessage = Protocol::CollectionChangeNotificationPtr::create(Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg));
0764         insertionMessage->setOperation(Protocol::CollectionChangeNotification::Add);
0765         insertionMessage->setParentCollection(insertionMessage->parentDestCollection());
0766         insertionMessage->setParentDestCollection(-1);
0767         notificationQueue.enqueue(insertionMessage);
0768         return 1;
0769     }
0770 
0771     Q_ASSERT(false);
0772     return 0;
0773 }
0774 
0775 void MonitorPrivate::handleCommands()
0776 {
0777     Q_Q(Monitor);
0778 
0779     CommandBufferLocker lock(&mCommandBuffer);
0780     CommandBufferNotifyBlocker notify(&mCommandBuffer);
0781     while (!mCommandBuffer.isEmpty()) {
0782         const auto cmd = mCommandBuffer.dequeue();
0783         lock.unlock();
0784         const auto command = cmd.command;
0785 
0786         if (command->isResponse()) {
0787             switch (command->type()) {
0788             case Protocol::Command::Hello: {
0789                 qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus";
0790                 QByteArray subname;
0791                 if (!q->objectName().isEmpty()) {
0792                     subname = q->objectName().toLatin1();
0793                 } else {
0794                     subname = session->sessionId();
0795                 }
0796                 subname += " - " + QByteArray::number(quintptr(q));
0797                 qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\"";
0798                 auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId());
0799                 ntfConnection->sendCommand(2, subCmd);
0800                 break;
0801             }
0802 
0803             case Protocol::Command::CreateSubscription: {
0804                 auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create();
0805                 for (const auto &col : std::as_const(collections)) {
0806                     msubCmd->startMonitoringCollection(col.id());
0807                 }
0808                 for (const auto &res : std::as_const(resources)) {
0809                     msubCmd->startMonitoringResource(res);
0810                 }
0811                 for (auto itemId : std::as_const(items)) {
0812                     msubCmd->startMonitoringItem(itemId);
0813                 }
0814                 for (auto tagId : std::as_const(tags)) {
0815                     msubCmd->startMonitoringTag(tagId);
0816                 }
0817                 for (auto type : std::as_const(types)) {
0818                     msubCmd->startMonitoringType(monitorTypeToProtocol(type));
0819                 }
0820                 for (const auto &mimetype : std::as_const(mimetypes)) {
0821                     msubCmd->startMonitoringMimeType(mimetype);
0822                 }
0823                 for (const auto &session : std::as_const(sessions)) {
0824                     msubCmd->startIgnoringSession(session);
0825                 }
0826                 msubCmd->setAllMonitored(monitorAll);
0827                 msubCmd->setIsExclusive(exclusive);
0828                 msubCmd->setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope));
0829                 msubCmd->setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope));
0830                 msubCmd->setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope));
0831                 pendingModification = Protocol::ModifySubscriptionCommand();
0832                 ntfConnection->sendCommand(3, msubCmd);
0833                 break;
0834             }
0835 
0836             case Protocol::Command::ModifySubscription:
0837                 // TODO: Handle errors
0838                 if (!monitorReady) {
0839                     monitorReady = true;
0840                     Q_EMIT q_ptr->monitorReady();
0841                 }
0842                 break;
0843 
0844             default:
0845                 qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command);
0846                 break;
0847             }
0848         } else {
0849             switch (command->type()) {
0850             case Protocol::Command::ItemChangeNotification:
0851             case Protocol::Command::CollectionChangeNotification:
0852             case Protocol::Command::TagChangeNotification:
0853             case Protocol::Command::RelationChangeNotification:
0854             case Protocol::Command::SubscriptionChangeNotification:
0855             case Protocol::Command::DebugChangeNotification:
0856                 slotNotify(command.staticCast<Protocol::ChangeNotification>());
0857                 break;
0858             default:
0859                 qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command);
0860                 break;
0861             }
0862         }
0863 
0864         lock.relock();
0865     }
0866     notify.unblock();
0867     lock.unlock();
0868 }
0869 
0870 /*
0871 
0872   server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit
0873                                   |                                           |
0874                                   x --> discard                               x --> pipeline
0875 
0876   fetchJobDone --> pipeline ?dataAvailable --> emit
0877  */
0878 
0879 void MonitorPrivate::slotNotify(const Protocol::ChangeNotificationPtr &msg)
0880 {
0881     int appendedMessages = 0;
0882     int modifiedMessages = 0;
0883     int erasedMessages = 0;
0884 
0885     invalidateCaches(msg);
0886     updatePendingStatistics(msg);
0887     bool needsSplit = true;
0888     bool supportsBatch = false;
0889 
0890     if (isLazilyIgnored(msg, true)) {
0891         return;
0892     }
0893 
0894     checkBatchSupport(msg, needsSplit, supportsBatch);
0895 
0896     const bool isModifyFlags = (msg->type() == Protocol::Command::ItemChangeNotification
0897                                 && Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).operation() == Protocol::ItemChangeNotification::ModifyFlags);
0898     if (supportsBatch || (!needsSplit && !supportsBatch && !isModifyFlags) || msg->type() == Protocol::Command::CollectionChangeNotification) {
0899         // Make sure the batch msg is always queued before the split notifications
0900         const int oldSize = pendingNotifications.size();
0901         const int appended = translateAndCompress(pendingNotifications, msg);
0902         if (appended > 0) {
0903             appendedMessages += appended;
0904         } else {
0905             ++modifiedMessages;
0906         }
0907         // translateAndCompress can remove an existing "modify" when msg is a "delete".
0908         // Or it can merge two ModifyFlags and return false.
0909         // We need to detect such removals, for ChangeRecorder.
0910         if (pendingNotifications.count() != oldSize + appended) {
0911             ++erasedMessages; // this count isn't exact, but it doesn't matter
0912         }
0913     } else if (needsSplit) {
0914         // If it's not queued at least make sure we fetch all the items from split
0915         // notifications in one go.
0916         if (msg->type() == Protocol::Command::ItemChangeNotification) {
0917             const auto items = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).items();
0918             itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(items), mItemFetchScope);
0919         }
0920     }
0921 
0922     // if the message contains more items, but we need to emit single-item notification,
0923     // split the message into one message per item and queue them
0924     // if the message contains only one item, but batches are not supported
0925     // (and thus neither is flagsModified), splitMessage() will convert the
0926     // notification to regular Modify with "FLAGS" part changed
0927     if (needsSplit || (!needsSplit && !supportsBatch && isModifyFlags)) {
0928         // Make sure inter-resource move notifications are translated into
0929         // Add/Remove notifications
0930         if (msg->type() == Protocol::Command::ItemChangeNotification) {
0931             const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
0932             if (itemNtf.operation() == Protocol::ItemChangeNotification::Move && itemNtf.resource() != itemNtf.destinationResource()) {
0933                 if (needsSplit) {
0934                     const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch);
0935                     for (const auto &splitMsg : split) {
0936                         appendedMessages += translateAndCompress(pendingNotifications, splitMsg);
0937                     }
0938                 } else {
0939                     appendedMessages += translateAndCompress(pendingNotifications, msg);
0940                 }
0941             } else {
0942                 const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch);
0943                 pendingNotifications << (split | Actions::toQList);
0944                 appendedMessages += split.count();
0945             }
0946         }
0947     }
0948 
0949     // tell ChangeRecorder (even if 0 appended, the compression could have made changes to existing messages)
0950     if (appendedMessages > 0 || modifiedMessages > 0 || erasedMessages > 0) {
0951         if (erasedMessages > 0) {
0952             notificationsErased();
0953         } else {
0954             notificationsEnqueued(appendedMessages);
0955         }
0956     }
0957 
0958     dispatchNotifications();
0959 }
0960 
0961 void MonitorPrivate::flushPipeline()
0962 {
0963     while (!pipeline.isEmpty()) {
0964         const auto msg = pipeline.head();
0965         if (ensureDataAvailable(msg)) {
0966             // dequeue should be before emit, otherwise stuff might happen (like dataAvailable
0967             // being called again) and we end up dequeuing an empty pipeline
0968             pipeline.dequeue();
0969             emitNotification(msg);
0970         } else {
0971             break;
0972         }
0973     }
0974 }
0975 
0976 void MonitorPrivate::dataAvailable()
0977 {
0978     flushPipeline();
0979     dispatchNotifications();
0980 }
0981 
0982 void MonitorPrivate::dispatchNotifications()
0983 {
0984     // Note that this code is not used in a ChangeRecorder (pipelineSize==0)
0985     while (pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty()) {
0986         const auto msg = pendingNotifications.dequeue();
0987         const bool avail = ensureDataAvailable(msg);
0988         if (avail && pipeline.isEmpty()) {
0989             emitNotification(msg);
0990         } else {
0991             pipeline.enqueue(msg);
0992         }
0993     }
0994 }
0995 
0996 static Relation::List extractRelations(const QSet<Protocol::ItemChangeNotification::Relation> &rels)
0997 {
0998     Relation::List relations;
0999     if (rels.isEmpty()) {
1000         return relations;
1001     }
1002 
1003     relations.reserve(rels.size());
1004     for (const auto &rel : rels) {
1005         relations.push_back(Relation(rel.type.toLatin1(), Akonadi::Item(rel.leftId), Akonadi::Item(rel.rightId)));
1006     }
1007     return relations;
1008 }
1009 
1010 bool MonitorPrivate::emitItemsNotification(const Protocol::ItemChangeNotification &msg,
1011                                            const Item::List &items,
1012                                            const Collection &collection,
1013                                            const Collection &collectionDest)
1014 {
1015     Collection col = collection;
1016     Collection colDest = collectionDest;
1017     if (!col.isValid()) {
1018         col = Collection(msg.parentCollection());
1019         col.setResource(QString::fromUtf8(msg.resource()));
1020     }
1021     if (!colDest.isValid()) {
1022         colDest = Collection(msg.parentDestCollection());
1023         // HACK: destination resource is delivered in the parts field...
1024         if (!msg.itemParts().isEmpty()) {
1025             colDest.setResource(QString::fromLatin1(*(msg.itemParts().cbegin())));
1026         }
1027     }
1028 
1029     Relation::List addedRelations;
1030     Relation::List removedRelations;
1031     if (msg.operation() == Protocol::ItemChangeNotification::ModifyRelations) {
1032         addedRelations = extractRelations(msg.addedRelations());
1033         removedRelations = extractRelations(msg.removedRelations());
1034     }
1035 
1036     Tag::List addedTags;
1037     Tag::List removedTags;
1038     if (msg.operation() == Protocol::ItemChangeNotification::ModifyTags) {
1039         addedTags = tagCache->retrieve(msg.addedTags() | Actions::toQList);
1040         removedTags = tagCache->retrieve(msg.removedTags() | Actions::toQList);
1041     }
1042 
1043     Item::List its = items;
1044     for (auto it = its.begin(), end = its.end(); it != end; ++it) {
1045         if (msg.operation() == Protocol::ItemChangeNotification::Move) {
1046             it->setParentCollection(colDest);
1047         } else {
1048             it->setParentCollection(col);
1049         }
1050     }
1051     bool handled = false;
1052     switch (msg.operation()) {
1053     case Protocol::ItemChangeNotification::Add:
1054         return emitToListeners(&Monitor::itemAdded, its.first(), col);
1055     case Protocol::ItemChangeNotification::Modify:
1056         return emitToListeners(&Monitor::itemChanged, its.first(), msg.itemParts());
1057     case Protocol::ItemChangeNotification::ModifyFlags:
1058         return emitToListeners(&Monitor::itemsFlagsChanged, its, msg.addedFlags(), msg.removedFlags());
1059     case Protocol::ItemChangeNotification::Move:
1060         handled |= emitToListeners(&Monitor::itemMoved, its.first(), col, colDest);
1061         handled |= emitToListeners(&Monitor::itemsMoved, its, col, colDest);
1062         return handled;
1063     case Protocol::ItemChangeNotification::Remove:
1064         handled |= emitToListeners(&Monitor::itemRemoved, its.first());
1065         handled |= emitToListeners(&Monitor::itemsRemoved, its);
1066         return handled;
1067     case Protocol::ItemChangeNotification::Link:
1068         handled |= emitToListeners(&Monitor::itemLinked, its.first(), col);
1069         handled |= emitToListeners(&Monitor::itemsLinked, its, col);
1070         return handled;
1071     case Protocol::ItemChangeNotification::Unlink:
1072         handled |= emitToListeners(&Monitor::itemUnlinked, its.first(), col);
1073         handled |= emitToListeners(&Monitor::itemsUnlinked, its, col);
1074         return handled;
1075     case Protocol::ItemChangeNotification::ModifyTags:
1076         return emitToListeners(&Monitor::itemsTagsChanged, its, addedTags | Actions::toQSet, removedTags | Actions::toQSet);
1077     case Protocol::ItemChangeNotification::ModifyRelations:
1078         return emitToListeners(&Monitor::itemsRelationsChanged, its, addedRelations, removedRelations);
1079     default:
1080         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in item change notification";
1081         return false;
1082     }
1083 }
1084 
1085 bool MonitorPrivate::emitCollectionNotification(const Protocol::CollectionChangeNotification &msg,
1086                                                 const Collection &col,
1087                                                 const Collection &par,
1088                                                 const Collection &dest)
1089 {
1090     Collection parent = par;
1091     if (!parent.isValid()) {
1092         parent = Collection(msg.parentCollection());
1093     }
1094     Collection destination = dest;
1095     if (!destination.isValid()) {
1096         destination = Collection(msg.parentDestCollection());
1097     }
1098 
1099     Collection collection = col;
1100     Q_ASSERT(collection.isValid());
1101     if (!collection.isValid()) {
1102         qCWarning(AKONADICORE_LOG) << "Failed to get valid Collection for a Collection change!";
1103         return true; // prevent Monitor disconnecting from a signal
1104     }
1105 
1106     if (msg.operation() == Protocol::CollectionChangeNotification::Move) {
1107         collection.setParentCollection(destination);
1108     } else {
1109         collection.setParentCollection(parent);
1110     }
1111 
1112     bool handled = false;
1113     switch (msg.operation()) {
1114     case Protocol::CollectionChangeNotification::Add:
1115         return emitToListeners(&Monitor::collectionAdded, collection, parent);
1116     case Protocol::CollectionChangeNotification::Modify:
1117         handled |= emitToListeners(qOverload<const Akonadi::Collection &>(&Monitor::collectionChanged), collection);
1118         handled |=
1119             emitToListeners(qOverload<const Akonadi::Collection &, const QSet<QByteArray> &>(&Monitor::collectionChanged), collection, msg.changedParts());
1120         return handled;
1121     case Protocol::CollectionChangeNotification::Move:
1122         return emitToListeners(&Monitor::collectionMoved, collection, parent, destination);
1123     case Protocol::CollectionChangeNotification::Remove:
1124         return emitToListeners(&Monitor::collectionRemoved, collection);
1125     case Protocol::CollectionChangeNotification::Subscribe:
1126         return emitToListeners(&Monitor::collectionSubscribed, collection, parent);
1127     case Protocol::CollectionChangeNotification::Unsubscribe:
1128         return emitToListeners(&Monitor::collectionUnsubscribed, collection);
1129     default:
1130         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in collection change notification";
1131         return false;
1132     }
1133 }
1134 
1135 bool MonitorPrivate::emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tag)
1136 {
1137     Q_UNUSED(msg)
1138     switch (msg.operation()) {
1139     case Protocol::TagChangeNotification::Add:
1140         return emitToListeners(&Monitor::tagAdded, tag);
1141     case Protocol::TagChangeNotification::Modify:
1142         return emitToListeners(&Monitor::tagChanged, tag);
1143     case Protocol::TagChangeNotification::Remove:
1144         return emitToListeners(&Monitor::tagRemoved, tag);
1145     default:
1146         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification";
1147         return false;
1148     }
1149 }
1150 
1151 bool MonitorPrivate::emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation)
1152 {
1153     if (!relation.isValid()) {
1154         return false;
1155     }
1156 
1157     switch (msg.operation()) {
1158     case Protocol::RelationChangeNotification::Add:
1159         return emitToListeners(&Monitor::relationAdded, relation);
1160     case Protocol::RelationChangeNotification::Remove:
1161         return emitToListeners(&Monitor::relationRemoved, relation);
1162     default:
1163         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification";
1164         return false;
1165     }
1166 }
1167 
1168 bool MonitorPrivate::emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const Akonadi::NotificationSubscriber &subscriber)
1169 {
1170     if (!subscriber.isValid()) {
1171         return false;
1172     }
1173 
1174     switch (msg.operation()) {
1175     case Protocol::SubscriptionChangeNotification::Add:
1176         return emitToListeners(&Monitor::notificationSubscriberAdded, subscriber);
1177     case Protocol::SubscriptionChangeNotification::Modify:
1178         return emitToListeners(&Monitor::notificationSubscriberChanged, subscriber);
1179     case Protocol::SubscriptionChangeNotification::Remove:
1180         return emitToListeners(&Monitor::notificationSubscriberRemoved, subscriber);
1181     default:
1182         qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in subscription change notification";
1183         return false;
1184     }
1185 }
1186 
1187 bool MonitorPrivate::emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf)
1188 {
1189     Q_UNUSED(msg)
1190 
1191     if (!ntf.isValid()) {
1192         return false;
1193     }
1194 
1195     return emitToListeners(&Monitor::debugNotification, ntf);
1196 }
1197 
1198 void MonitorPrivate::invalidateCaches(const Protocol::ChangeNotificationPtr &msg)
1199 {
1200     // remove invalidates
1201     // modify removes the cache entry, as we need to re-fetch
1202     // And subscription modify the visibility of the collection by the collectionFetchScope.
1203     switch (msg->type()) {
1204     case Protocol::Command::CollectionChangeNotification: {
1205         const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg);
1206         switch (colNtf.operation()) {
1207         case Protocol::CollectionChangeNotification::Modify:
1208         case Protocol::CollectionChangeNotification::Move:
1209         case Protocol::CollectionChangeNotification::Subscribe:
1210             collectionCache->update(colNtf.collection().id(), mCollectionFetchScope);
1211             break;
1212         case Protocol::CollectionChangeNotification::Remove:
1213             collectionCache->invalidate(colNtf.collection().id());
1214             break;
1215         default:
1216             break;
1217         }
1218     } break;
1219     case Protocol::Command::ItemChangeNotification: {
1220         const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg);
1221         switch (itemNtf.operation()) {
1222         case Protocol::ItemChangeNotification::Modify:
1223         case Protocol::ItemChangeNotification::ModifyFlags:
1224         case Protocol::ItemChangeNotification::ModifyTags:
1225         case Protocol::ItemChangeNotification::ModifyRelations:
1226         case Protocol::ItemChangeNotification::Move:
1227             itemCache->update(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope);
1228             break;
1229         case Protocol::ItemChangeNotification::Remove:
1230             itemCache->invalidate(Protocol::ChangeNotification::itemsToUids(itemNtf.items()));
1231             break;
1232         default:
1233             break;
1234         }
1235     } break;
1236     case Protocol::Command::TagChangeNotification: {
1237         const auto &tagNtf = Protocol::cmdCast<Protocol::TagChangeNotification>(msg);
1238         switch (tagNtf.operation()) {
1239         case Protocol::TagChangeNotification::Modify:
1240             tagCache->update({tagNtf.tag().id()}, mTagFetchScope);
1241             break;
1242         case Protocol::TagChangeNotification::Remove:
1243             tagCache->invalidate({tagNtf.tag().id()});
1244             break;
1245         default:
1246             break;
1247         }
1248     } break;
1249     default:
1250         break;
1251     }
1252 }
1253 
1254 void MonitorPrivate::invalidateCache(const Collection &col)
1255 {
1256     collectionCache->update(col.id(), mCollectionFetchScope);
1257 }
1258 
1259 void MonitorPrivate::ref(Collection::Id id)
1260 {
1261     if (!refCountMap.contains(id)) {
1262         refCountMap.insert(id, 0);
1263     }
1264     ++refCountMap[id];
1265 
1266     if (m_buffer.isBuffered(id)) {
1267         m_buffer.purge(id);
1268     }
1269 }
1270 
1271 Akonadi::Collection::Id MonitorPrivate::deref(Collection::Id id)
1272 {
1273     Q_ASSERT(refCountMap.contains(id));
1274     if (--refCountMap[id] == 0) {
1275         refCountMap.remove(id);
1276         return m_buffer.buffer(id);
1277     }
1278     return -1;
1279 }
1280 
1281 void MonitorPrivate::PurgeBuffer::purge(Collection::Id id)
1282 {
1283     m_buffer.removeOne(id);
1284 }
1285 
1286 Akonadi::Collection::Id MonitorPrivate::PurgeBuffer::buffer(Collection::Id id)
1287 {
1288     // Ensure that we don't put a duplicate @p id into the buffer.
1289     purge(id);
1290 
1291     Collection::Id bumpedId = -1;
1292     if (m_buffer.size() == MAXBUFFERSIZE) {
1293         bumpedId = m_buffer.dequeue();
1294         purge(bumpedId);
1295     }
1296 
1297     m_buffer.enqueue(id);
1298 
1299     return bumpedId;
1300 }
1301 
1302 int MonitorPrivate::PurgeBuffer::buffersize()
1303 {
1304     return MAXBUFFERSIZE;
1305 }
1306 
1307 bool MonitorPrivate::isMonitored(Collection::Id colId) const
1308 {
1309     if (!useRefCounting) {
1310         return true;
1311     }
1312     return refCountMap.contains(colId) || m_buffer.isBuffered(colId);
1313 }
1314 
1315 void MonitorPrivate::notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource)
1316 {
1317     if (collection > 0 && (monitorAll || isCollectionMonitored(collection) || resources.contains(resource))) {
1318         recentlyChangedCollections.insert(collection);
1319         if (!statisticsCompressionTimer.isActive()) {
1320             statisticsCompressionTimer.start();
1321         }
1322     }
1323 }
1324 
1325 Protocol::ModifySubscriptionCommand::ChangeType MonitorPrivate::monitorTypeToProtocol(Monitor::Type type)
1326 {
1327     switch (type) {
1328     case Monitor::Collections:
1329         return Protocol::ModifySubscriptionCommand::CollectionChanges;
1330     case Monitor::Items:
1331         return Protocol::ModifySubscriptionCommand::ItemChanges;
1332     case Monitor::Tags:
1333         return Protocol::ModifySubscriptionCommand::TagChanges;
1334     case Monitor::Relations:
1335         return Protocol::ModifySubscriptionCommand::RelationChanges;
1336     case Monitor::Subscribers:
1337         return Protocol::ModifySubscriptionCommand::SubscriptionChanges;
1338     case Monitor::Notifications:
1339         return Protocol::ModifySubscriptionCommand::ChangeNotifications;
1340     default:
1341         Q_ASSERT(false);
1342         return Protocol::ModifySubscriptionCommand::NoType;
1343     }
1344 }
1345 
1346 void MonitorPrivate::updateListeners(QMetaMethod signal, ListenerAction action)
1347 {
1348 #define UPDATE_LISTENERS(sig)                                                                                                                                  \
1349     if (signal == QMetaMethod::fromSignal(sig)) {                                                                                                              \
1350         updateListener(sig, action);                                                                                                                           \
1351         return;                                                                                                                                                \
1352     }
1353 
1354     UPDATE_LISTENERS(&Monitor::itemChanged)
1355     UPDATE_LISTENERS(&Monitor::itemChanged)
1356     UPDATE_LISTENERS(&Monitor::itemsFlagsChanged)
1357     UPDATE_LISTENERS(&Monitor::itemsTagsChanged)
1358     UPDATE_LISTENERS(&Monitor::itemsRelationsChanged)
1359     UPDATE_LISTENERS(&Monitor::itemMoved)
1360     UPDATE_LISTENERS(&Monitor::itemsMoved)
1361     UPDATE_LISTENERS(&Monitor::itemAdded)
1362     UPDATE_LISTENERS(&Monitor::itemRemoved)
1363     UPDATE_LISTENERS(&Monitor::itemsRemoved)
1364     UPDATE_LISTENERS(&Monitor::itemLinked)
1365     UPDATE_LISTENERS(&Monitor::itemsLinked)
1366     UPDATE_LISTENERS(&Monitor::itemUnlinked)
1367     UPDATE_LISTENERS(&Monitor::itemsUnlinked)
1368     UPDATE_LISTENERS(&Monitor::collectionAdded)
1369 
1370     UPDATE_LISTENERS(qOverload<const Akonadi::Collection &>(&Monitor::collectionChanged))
1371     UPDATE_LISTENERS((qOverload<const Akonadi::Collection &, const QSet<QByteArray> &>(&Monitor::collectionChanged)))
1372     UPDATE_LISTENERS(&Monitor::collectionMoved)
1373     UPDATE_LISTENERS(&Monitor::collectionRemoved)
1374     UPDATE_LISTENERS(&Monitor::collectionSubscribed)
1375     UPDATE_LISTENERS(&Monitor::collectionUnsubscribed)
1376     UPDATE_LISTENERS(&Monitor::collectionStatisticsChanged)
1377 
1378     UPDATE_LISTENERS(&Monitor::tagAdded)
1379     UPDATE_LISTENERS(&Monitor::tagChanged)
1380     UPDATE_LISTENERS(&Monitor::tagRemoved)
1381 
1382     UPDATE_LISTENERS(&Monitor::relationAdded)
1383     UPDATE_LISTENERS(&Monitor::relationRemoved)
1384 
1385     UPDATE_LISTENERS(&Monitor::notificationSubscriberAdded)
1386     UPDATE_LISTENERS(&Monitor::notificationSubscriberChanged)
1387     UPDATE_LISTENERS(&Monitor::notificationSubscriberRemoved)
1388     UPDATE_LISTENERS(&Monitor::debugNotification)
1389 
1390 #undef UPDATE_LISTENERS
1391 }
1392 
1393 /// @endcond