File indexing completed on 2024-11-10 04:40:43
0001 /* 0002 SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 /// @cond PRIVATE 0008 0009 #include "monitor_p.h" 0010 0011 #include "akonadicore_debug.h" 0012 #include "changemediator_p.h" 0013 #include "changenotification.h" 0014 #include "collectionfetchjob.h" 0015 #include "collectionstatistics.h" 0016 #include "itemfetchjob.h" 0017 #include "notificationmanagerinterface.h" 0018 #include "notificationsubscriber.h" 0019 #include "protocolhelper_p.h" 0020 #include "session.h" 0021 #include "vectorhelper.h" 0022 0023 #include "shared/akranges.h" 0024 0025 #include <utility> 0026 0027 using namespace Akonadi; 0028 using namespace AkRanges; 0029 0030 class operation; 0031 0032 static const int PipelineSize = 5; 0033 0034 MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent) 0035 : q_ptr(parent) 0036 , dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory) 0037 , ntfConnection(nullptr) 0038 , monitorAll(false) 0039 , exclusive(false) 0040 , mFetchChangedOnly(false) 0041 , session(Session::defaultSession()) 0042 , collectionCache(nullptr) 0043 , mCommandBuffer(parent, "handleCommands") 0044 , pendingModificationChanges(Protocol::ModifySubscriptionCommand::None) 0045 , monitorReady(false) 0046 , fetchCollection(false) 0047 , fetchCollectionStatistics(false) 0048 , collectionMoveTranslationEnabled(true) 0049 , useRefCounting(false) 0050 { 0051 } 0052 0053 MonitorPrivate::~MonitorPrivate() 0054 { 0055 disconnectFromNotificationManager(); 0056 delete dependenciesFactory; 0057 delete collectionCache; 0058 delete itemCache; 0059 delete tagCache; 0060 } 0061 0062 void MonitorPrivate::init() 0063 { 0064 // needs to be at least 3x pipeline size for the collection move case 0065 collectionCache = dependenciesFactory->createCollectionCache(3 * PipelineSize, session); 0066 // needs to be at least 1x pipeline size 0067 itemCache = dependenciesFactory->createItemListCache(PipelineSize, session); 0068 // 20 tags looks like a reasonable amount to keep around 0069 tagCache = dependenciesFactory->createTagListCache(20, session); 0070 0071 QObject::connect(collectionCache, &CollectionCache::dataAvailable, q_ptr, [this]() { 0072 dataAvailable(); 0073 }); 0074 QObject::connect(itemCache, &ItemCache::dataAvailable, q_ptr, [this]() { 0075 dataAvailable(); 0076 }); 0077 QObject::connect(tagCache, &TagCache::dataAvailable, q_ptr, [this]() { 0078 dataAvailable(); 0079 }); 0080 QObject::connect(ServerManager::self(), &ServerManager::stateChanged, q_ptr, [this](auto state) { 0081 serverStateChanged(state); 0082 }); 0083 0084 statisticsCompressionTimer.setSingleShot(true); 0085 statisticsCompressionTimer.setInterval(500); 0086 QObject::connect(&statisticsCompressionTimer, &QTimer::timeout, q_ptr, [this]() { 0087 slotFlushRecentlyChangedCollections(); 0088 }); 0089 } 0090 0091 bool MonitorPrivate::connectToNotificationManager() 0092 { 0093 if (ntfConnection) { 0094 ntfConnection->deleteLater(); 0095 ntfConnection = nullptr; 0096 } 0097 0098 if (!session) { 0099 return false; 0100 } 0101 0102 ntfConnection = dependenciesFactory->createNotificationConnection(session, &mCommandBuffer); 0103 if (!ntfConnection) { 0104 return false; 0105 } 0106 0107 slotUpdateSubscription(); 0108 0109 ntfConnection->reconnect(); 0110 0111 return true; 0112 } 0113 0114 void MonitorPrivate::disconnectFromNotificationManager() 0115 { 0116 if (ntfConnection) { 0117 ntfConnection->disconnect(q_ptr); 0118 dependenciesFactory->destroyNotificationConnection(session, ntfConnection.data()); 0119 } 0120 } 0121 0122 void MonitorPrivate::serverStateChanged(ServerManager::State state) 0123 { 0124 if (state == ServerManager::Running) { 0125 connectToNotificationManager(); 0126 } 0127 } 0128 0129 void MonitorPrivate::invalidateCollectionCache(qint64 id) 0130 { 0131 collectionCache->update(id, mCollectionFetchScope); 0132 } 0133 0134 void MonitorPrivate::invalidateItemCache(qint64 id) 0135 { 0136 itemCache->update({id}, mItemFetchScope); 0137 // Also invalidate content of all any pending notification for given item 0138 for (auto it = pendingNotifications.begin(), end = pendingNotifications.end(); it != end; ++it) { 0139 if ((*it)->type() == Protocol::Command::ItemChangeNotification) { 0140 auto &ntf = Protocol::cmdCast<Protocol::ItemChangeNotification>(*it); 0141 const auto items = ntf.items(); 0142 if (std::any_of(items.cbegin(), items.cend(), [id](const Protocol::FetchItemsResponse &r) { 0143 return r.id() == id; 0144 })) { 0145 ntf.setMustRetrieve(true); 0146 } 0147 } 0148 } 0149 } 0150 0151 void MonitorPrivate::invalidateTagCache(qint64 id) 0152 { 0153 tagCache->update({id}, mTagFetchScope); 0154 } 0155 0156 int MonitorPrivate::pipelineSize() const 0157 { 0158 return PipelineSize; 0159 } 0160 0161 void MonitorPrivate::scheduleSubscriptionUpdate() 0162 { 0163 if (pendingModificationTimer || !monitorReady) { 0164 return; 0165 } 0166 0167 pendingModificationTimer = new QTimer(q_ptr); 0168 pendingModificationTimer->setSingleShot(true); 0169 pendingModificationTimer->setInterval(0); 0170 pendingModificationTimer->start(); 0171 q_ptr->connect(pendingModificationTimer, &QTimer::timeout, q_ptr, [this]() { 0172 slotUpdateSubscription(); 0173 }); 0174 } 0175 0176 void MonitorPrivate::slotUpdateSubscription() 0177 { 0178 if (pendingModificationTimer) { 0179 pendingModificationTimer->stop(); 0180 std::exchange(pendingModificationTimer, nullptr)->deleteLater(); 0181 } 0182 0183 if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::ItemFetchScope) { 0184 pendingModification.setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); 0185 } 0186 if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { 0187 pendingModification.setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); 0188 } 0189 if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::TagFetchScope) { 0190 pendingModification.setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); 0191 } 0192 pendingModificationChanges = Protocol::ModifySubscriptionCommand::None; 0193 0194 if (ntfConnection) { 0195 ntfConnection->sendCommand(3, Protocol::ModifySubscriptionCommandPtr::create(pendingModification)); 0196 pendingModification = Protocol::ModifySubscriptionCommand(); 0197 } 0198 } 0199 0200 bool MonitorPrivate::isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion) const 0201 { 0202 if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0203 // Lazy fetching can only affects items. 0204 return false; 0205 } 0206 0207 if (msg->type() == Protocol::Command::TagChangeNotification) { 0208 const auto op = Protocol::cmdCast<Protocol::TagChangeNotification>(msg).operation(); 0209 return ((op == Protocol::TagChangeNotification::Add && !hasListeners(&Monitor::tagAdded)) 0210 || (op == Protocol::TagChangeNotification::Modify && !hasListeners(&Monitor::tagChanged)) 0211 || (op == Protocol::TagChangeNotification::Remove && !hasListeners(&Monitor::tagRemoved))); 0212 } 0213 0214 if (!fetchCollectionStatistics && msg->type() == Protocol::Command::ItemChangeNotification) { 0215 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0216 const auto op = itemNtf.operation(); 0217 if ((op == Protocol::ItemChangeNotification::Add && !hasListeners(&Monitor::itemAdded)) 0218 || (op == Protocol::ItemChangeNotification::Remove && !hasListeners(&Monitor::itemRemoved) && !hasListeners(&Monitor::itemsRemoved)) 0219 || (op == Protocol::ItemChangeNotification::Modify && !hasListeners(&Monitor::itemChanged)) 0220 || (op == Protocol::ItemChangeNotification::ModifyFlags 0221 && !hasListeners(&Monitor::itemsFlagsChanged) 0222 // Newly delivered ModifyFlags notifications will be converted to 0223 // itemChanged(item, "FLAGS") for legacy clients. 0224 && (!allowModifyFlagsConversion || !hasListeners(&Monitor::itemChanged))) 0225 || (op == Protocol::ItemChangeNotification::ModifyTags && !hasListeners(&Monitor::itemsTagsChanged)) 0226 || (op == Protocol::ItemChangeNotification::Move && !hasListeners(&Monitor::itemMoved) && !hasListeners(&Monitor::itemsMoved)) 0227 || (op == Protocol::ItemChangeNotification::Link && !hasListeners(&Monitor::itemLinked) && !hasListeners(&Monitor::itemsLinked)) 0228 || (op == Protocol::ItemChangeNotification::Unlink && !hasListeners(&Monitor::itemUnlinked) && !hasListeners(&Monitor::itemsUnlinked))) { 0229 return true; 0230 } 0231 0232 if (!useRefCounting) { 0233 return false; 0234 } 0235 0236 const Collection::Id parentCollectionId = itemNtf.parentCollection(); 0237 0238 if ((op == Protocol::ItemChangeNotification::Add) || (op == Protocol::ItemChangeNotification::Remove) 0239 || (op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) 0240 || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::Link) 0241 || (op == Protocol::ItemChangeNotification::Unlink)) { 0242 if (isMonitored(parentCollectionId)) { 0243 return false; 0244 } 0245 } 0246 0247 if (op == Protocol::ItemChangeNotification::Move) { 0248 // We can't ignore the move. It must be transformed later into a removal or insertion. 0249 return !isMonitored(parentCollectionId) && !isMonitored(itemNtf.parentDestCollection()); 0250 } 0251 return true; 0252 } 0253 0254 return false; 0255 } 0256 0257 void MonitorPrivate::checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const 0258 { 0259 if (msg->type() != Protocol::Command::ItemChangeNotification) { 0260 needsSplit = false; 0261 batchSupported = false; 0262 return; 0263 } 0264 0265 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0266 const bool isBatch = (itemNtf.items().count() > 1); 0267 0268 switch (itemNtf.operation()) { 0269 case Protocol::ItemChangeNotification::Add: 0270 case Protocol::ItemChangeNotification::Modify: 0271 needsSplit = isBatch; 0272 batchSupported = false; 0273 return; 0274 needsSplit = isBatch; 0275 batchSupported = false; 0276 return; 0277 case Protocol::ItemChangeNotification::ModifyFlags: 0278 batchSupported = hasListeners(&Monitor::itemsFlagsChanged); 0279 needsSplit = isBatch && !batchSupported && hasListeners(&Monitor::itemChanged); 0280 return; 0281 case Protocol::ItemChangeNotification::ModifyTags: 0282 case Protocol::ItemChangeNotification::ModifyRelations: 0283 // Tags and relations were added after batch notifications, so they are always supported 0284 batchSupported = true; 0285 needsSplit = false; 0286 return; 0287 case Protocol::ItemChangeNotification::Move: 0288 needsSplit = isBatch && hasListeners(&Monitor::itemMoved); 0289 batchSupported = hasListeners(&Monitor::itemsMoved); 0290 return; 0291 case Protocol::ItemChangeNotification::Remove: 0292 needsSplit = isBatch && hasListeners(&Monitor::itemRemoved); 0293 batchSupported = hasListeners(&Monitor::itemsRemoved); 0294 return; 0295 case Protocol::ItemChangeNotification::Link: 0296 needsSplit = isBatch && hasListeners(&Monitor::itemLinked); 0297 batchSupported = hasListeners(&Monitor::itemsLinked); 0298 return; 0299 case Protocol::ItemChangeNotification::Unlink: 0300 needsSplit = isBatch && hasListeners(&Monitor::itemUnlinked); 0301 batchSupported = hasListeners(&Monitor::itemsUnlinked); 0302 return; 0303 default: 0304 needsSplit = isBatch; 0305 batchSupported = false; 0306 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << itemNtf.operation() << "in item change notification"; 0307 return; 0308 } 0309 } 0310 0311 Protocol::ChangeNotificationList MonitorPrivate::splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const 0312 { 0313 Protocol::ChangeNotificationList list; 0314 0315 Protocol::ItemChangeNotification baseMsg; 0316 baseMsg.setSessionId(msg.sessionId()); 0317 if (legacy && msg.operation() == Protocol::ItemChangeNotification::ModifyFlags) { 0318 baseMsg.setOperation(Protocol::ItemChangeNotification::Modify); 0319 baseMsg.setItemParts(QSet<QByteArray>() << "FLAGS"); 0320 } else { 0321 baseMsg.setOperation(msg.operation()); 0322 baseMsg.setItemParts(msg.itemParts()); 0323 } 0324 baseMsg.setParentCollection(msg.parentCollection()); 0325 baseMsg.setParentDestCollection(msg.parentDestCollection()); 0326 baseMsg.setResource(msg.resource()); 0327 baseMsg.setDestinationResource(msg.destinationResource()); 0328 baseMsg.setAddedFlags(msg.addedFlags()); 0329 baseMsg.setRemovedFlags(msg.removedFlags()); 0330 baseMsg.setAddedTags(msg.addedTags()); 0331 baseMsg.setRemovedTags(msg.removedTags()); 0332 0333 const auto &items = msg.items(); 0334 list.reserve(items.count()); 0335 for (const auto &item : items) { 0336 auto copy = Protocol::ItemChangeNotificationPtr::create(baseMsg); 0337 copy->setItems({Protocol::FetchItemsResponse(item)}); 0338 list.push_back(std::move(copy)); 0339 } 0340 0341 return list; 0342 } 0343 0344 bool MonitorPrivate::fetchCollections() const 0345 { 0346 return fetchCollection; 0347 } 0348 0349 bool MonitorPrivate::fetchItems() const 0350 { 0351 return !mItemFetchScope.isEmpty(); 0352 } 0353 0354 bool MonitorPrivate::ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg) 0355 { 0356 if (msg->type() == Protocol::Command::TagChangeNotification) { 0357 const auto &tagMsg = Protocol::cmdCast<Protocol::TagChangeNotification>(msg); 0358 if (tagMsg.metadata().contains("FETCH_TAG")) { 0359 if (!tagCache->ensureCached({tagMsg.tag().id()}, mTagFetchScope)) { 0360 return false; 0361 } 0362 } 0363 return true; 0364 } 0365 0366 if (msg->type() == Protocol::Command::RelationChangeNotification) { 0367 return true; 0368 } 0369 0370 if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { 0371 return true; 0372 } 0373 0374 if (msg->type() == Protocol::Command::DebugChangeNotification) { 0375 return true; 0376 } 0377 0378 if (msg->type() == Protocol::Command::CollectionChangeNotification 0379 && Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).operation() == Protocol::CollectionChangeNotification::Remove) { 0380 // For collection removals the collection is gone already, so we can't fetch it, 0381 // but we have to at least obtain the ancestor chain. 0382 const qint64 parentCollection = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentCollection(); 0383 return parentCollection <= -1 || collectionCache->ensureCached(parentCollection, mCollectionFetchScope); 0384 } 0385 0386 bool allCached = true; 0387 if (fetchCollections()) { 0388 const qint64 parentCollection = (msg->type() == Protocol::Command::ItemChangeNotification) 0389 ? Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).parentCollection() 0390 : (msg->type() == Protocol::Command::CollectionChangeNotification) 0391 ? Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentCollection() 0392 : -1; 0393 if (parentCollection > -1 && !collectionCache->ensureCached(parentCollection, mCollectionFetchScope)) { 0394 allCached = false; 0395 } 0396 0397 qint64 parentDestCollection = -1; 0398 0399 if ((msg->type() == Protocol::Command::ItemChangeNotification) 0400 && (Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).operation() == Protocol::ItemChangeNotification::Move)) { 0401 parentDestCollection = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).parentDestCollection(); 0402 } else if ((msg->type() == Protocol::Command::CollectionChangeNotification) 0403 && (Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).operation() == Protocol::CollectionChangeNotification::Move)) { 0404 parentDestCollection = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg).parentDestCollection(); 0405 } 0406 if (parentDestCollection > -1 && !collectionCache->ensureCached(parentDestCollection, mCollectionFetchScope)) { 0407 allCached = false; 0408 } 0409 } 0410 0411 if (msg->isRemove()) { 0412 return allCached; 0413 } 0414 0415 if (msg->type() == Protocol::Command::ItemChangeNotification && fetchItems()) { 0416 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0417 if (mFetchChangedOnly 0418 && (itemNtf.operation() == Protocol::ItemChangeNotification::Modify || itemNtf.operation() == Protocol::ItemChangeNotification::ModifyFlags)) { 0419 const auto changedParts = itemNtf.itemParts(); 0420 const auto requestedParts = mItemFetchScope.payloadParts(); 0421 const auto requestedAttrs = mItemFetchScope.attributes(); 0422 QSet<QByteArray> missingParts; 0423 QSet<QByteArray> missingAttributes; 0424 for (const QByteArray &part : changedParts) { 0425 const auto partName = part.mid(4); 0426 if (part.startsWith("PLD:") && // krazy:exclude=strings since QByteArray 0427 (!mItemFetchScope.fullPayload() || !requestedParts.contains(partName))) { 0428 missingParts.insert(partName); 0429 } else if (part.startsWith("ATR:") && // krazy:exclude=strings since QByteArray 0430 (!mItemFetchScope.allAttributes() || !requestedAttrs.contains(partName))) { 0431 missingAttributes.insert(partName); 0432 } 0433 } 0434 0435 if (!missingParts.isEmpty() || !missingAttributes.isEmpty()) { 0436 ItemFetchScope scope(mItemFetchScope); 0437 scope.fetchFullPayload(false); 0438 for (const auto &part : requestedParts) { 0439 scope.fetchPayloadPart(part, false); 0440 } 0441 for (const auto &attr : requestedAttrs) { 0442 scope.fetchAttribute(attr, false); 0443 } 0444 for (const auto &part : missingParts) { 0445 scope.fetchPayloadPart(part, true); 0446 } 0447 for (const auto &attr : missingAttributes) { 0448 scope.fetchAttribute(attr, true); 0449 } 0450 0451 if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), scope)) { 0452 return false; 0453 } 0454 } 0455 0456 return allCached; 0457 } 0458 0459 // Make sure all tags for ModifyTags operation are in cache too 0460 if (itemNtf.operation() == Protocol::ItemChangeNotification::ModifyTags) { 0461 if (!tagCache->ensureCached((itemNtf.addedTags() + itemNtf.removedTags()) | Actions::toQList, mTagFetchScope)) { 0462 return false; 0463 } 0464 } 0465 0466 if (itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve()) { 0467 if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope)) { 0468 return false; 0469 } 0470 } 0471 0472 return allCached; 0473 0474 } else if (msg->type() == Protocol::Command::CollectionChangeNotification && fetchCollections()) { 0475 const auto &colMsg = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg); 0476 if (colMsg.metadata().contains("FETCH_COLLECTION")) { 0477 if (!collectionCache->ensureCached(colMsg.collection().id(), mCollectionFetchScope)) { 0478 return false; 0479 } 0480 } 0481 0482 return allCached; 0483 } 0484 0485 return allCached; 0486 } 0487 0488 bool MonitorPrivate::emitNotification(const Protocol::ChangeNotificationPtr &msg) 0489 { 0490 bool someoneWasListening = false; 0491 if (msg->type() == Protocol::Command::TagChangeNotification) { 0492 const auto &tagNtf = Protocol::cmdCast<Protocol::TagChangeNotification>(msg); 0493 const bool fetched = tagNtf.metadata().contains("FETCH_TAG"); 0494 Tag tag; 0495 if (fetched) { 0496 const auto tags = tagCache->retrieve({tagNtf.tag().id()}); 0497 tag = tags.isEmpty() ? Tag() : tags.at(0); 0498 } else { 0499 tag = ProtocolHelper::parseTag(tagNtf.tag()); 0500 } 0501 someoneWasListening = emitTagNotification(tagNtf, tag); 0502 } else if (msg->type() == Protocol::Command::RelationChangeNotification) { 0503 const auto &relNtf = Protocol::cmdCast<Protocol::RelationChangeNotification>(msg); 0504 const Relation rel = ProtocolHelper::parseRelationFetchResult(relNtf.relation()); 0505 someoneWasListening = emitRelationNotification(relNtf, rel); 0506 } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0507 const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg); 0508 const Collection parent = collectionCache->retrieve(colNtf.parentCollection()); 0509 Collection destParent; 0510 if (colNtf.operation() == Protocol::CollectionChangeNotification::Move) { 0511 destParent = collectionCache->retrieve(colNtf.parentDestCollection()); 0512 } 0513 0514 const bool fetched = colNtf.metadata().contains("FETCH_COLLECTION"); 0515 // For removals this will retrieve an invalid collection. We'll deal with that in emitCollectionNotification 0516 const Collection col = fetched ? collectionCache->retrieve(colNtf.collection().id()) : ProtocolHelper::parseCollection(colNtf.collection(), true); 0517 // It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while 0518 // the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. 0519 if (col.isValid() || colNtf.operation() == Protocol::CollectionChangeNotification::Remove || !fetchCollections()) { 0520 someoneWasListening = emitCollectionNotification(colNtf, col, parent, destParent); 0521 } 0522 } else if (msg->type() == Protocol::Command::ItemChangeNotification) { 0523 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0524 const Collection parent = collectionCache->retrieve(itemNtf.parentCollection()); 0525 Collection destParent; 0526 if (itemNtf.operation() == Protocol::ItemChangeNotification::Move) { 0527 destParent = collectionCache->retrieve(itemNtf.parentDestCollection()); 0528 } 0529 const bool fetched = itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve(); 0530 // For removals this will retrieve an empty set. We'll deal with that in emitItemNotification 0531 Item::List items; 0532 if (fetched && fetchItems()) { 0533 items = itemCache->retrieve(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); 0534 } else { 0535 const auto &ntfItems = itemNtf.items(); 0536 items.reserve(ntfItems.size()); 0537 for (const auto &ntfItem : ntfItems) { 0538 items.push_back(ProtocolHelper::parseItemFetchResult(ntfItem, &mItemFetchScope)); 0539 } 0540 } 0541 // It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while 0542 // the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. 0543 if (!items.isEmpty() || itemNtf.operation() == Protocol::ItemChangeNotification::Remove || !fetchItems()) { 0544 someoneWasListening = emitItemsNotification(itemNtf, items, parent, destParent); 0545 } 0546 } else if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { 0547 const auto &subNtf = Protocol::cmdCast<Protocol::SubscriptionChangeNotification>(msg); 0548 NotificationSubscriber subscriber; 0549 subscriber.setSubscriber(subNtf.subscriber()); 0550 subscriber.setSessionId(subNtf.sessionId()); 0551 subscriber.setMonitoredCollections(subNtf.collections()); 0552 subscriber.setMonitoredItems(subNtf.items()); 0553 subscriber.setMonitoredTags(subNtf.tags()); 0554 QSet<Monitor::Type> monitorTypes; 0555 const auto types = subNtf.types(); 0556 for (auto type : types) { 0557 if (type == Protocol::ModifySubscriptionCommand::NoType) { 0558 continue; 0559 } 0560 monitorTypes.insert([](Protocol::ModifySubscriptionCommand::ChangeType type) { 0561 switch (type) { 0562 case Protocol::ModifySubscriptionCommand::ItemChanges: 0563 return Monitor::Items; 0564 case Protocol::ModifySubscriptionCommand::CollectionChanges: 0565 return Monitor::Collections; 0566 case Protocol::ModifySubscriptionCommand::TagChanges: 0567 return Monitor::Tags; 0568 case Protocol::ModifySubscriptionCommand::RelationChanges: 0569 return Monitor::Relations; 0570 case Protocol::ModifySubscriptionCommand::SubscriptionChanges: 0571 return Monitor::Subscribers; 0572 case Protocol::ModifySubscriptionCommand::ChangeNotifications: 0573 return Monitor::Notifications; 0574 default: 0575 Q_ASSERT(false); 0576 return Monitor::Items; // unreachable 0577 } 0578 }(type)); 0579 } 0580 subscriber.setMonitoredTypes(monitorTypes); 0581 subscriber.setMonitoredMimeTypes(subNtf.mimeTypes()); 0582 subscriber.setMonitoredResources(subNtf.resources()); 0583 subscriber.setIgnoredSessions(subNtf.ignoredSessions()); 0584 subscriber.setIsAllMonitored(subNtf.allMonitored()); 0585 subscriber.setIsExclusive(subNtf.exclusive()); 0586 subscriber.setItemFetchScope(ProtocolHelper::parseItemFetchScope(subNtf.itemFetchScope())); 0587 subscriber.setCollectionFetchScope(ProtocolHelper::parseCollectionFetchScope(subNtf.collectionFetchScope())); 0588 someoneWasListening = emitSubscriptionChangeNotification(subNtf, subscriber); 0589 } else if (msg->type() == Protocol::Command::DebugChangeNotification) { 0590 const auto &changeNtf = Protocol::cmdCast<Protocol::DebugChangeNotification>(msg); 0591 ChangeNotification notification; 0592 notification.setListeners(changeNtf.listeners()); 0593 notification.setTimestamp(QDateTime::fromMSecsSinceEpoch(changeNtf.timestamp())); 0594 notification.setNotification(changeNtf.notification()); 0595 switch (changeNtf.notification()->type()) { 0596 case Protocol::Command::ItemChangeNotification: 0597 notification.setType(ChangeNotification::Items); 0598 break; 0599 case Protocol::Command::CollectionChangeNotification: 0600 notification.setType(ChangeNotification::Collection); 0601 break; 0602 case Protocol::Command::TagChangeNotification: 0603 notification.setType(ChangeNotification::Tag); 0604 break; 0605 case Protocol::Command::RelationChangeNotification: 0606 notification.setType(ChangeNotification::Relation); 0607 break; 0608 case Protocol::Command::SubscriptionChangeNotification: 0609 notification.setType(ChangeNotification::Subscription); 0610 break; 0611 default: 0612 Q_ASSERT(false); // huh? 0613 return false; 0614 } 0615 0616 someoneWasListening = emitDebugChangeNotification(changeNtf, notification); 0617 } 0618 0619 return someoneWasListening; 0620 } 0621 0622 void MonitorPrivate::updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg) 0623 { 0624 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0625 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0626 notifyCollectionStatisticsWatchers(itemNtf.parentCollection(), itemNtf.resource()); 0627 // FIXME use the proper resource of the target collection, for cross resource moves 0628 notifyCollectionStatisticsWatchers(itemNtf.parentDestCollection(), itemNtf.destinationResource()); 0629 } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0630 const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg); 0631 if (colNtf.operation() == Protocol::CollectionChangeNotification::Remove) { 0632 // no need for statistics updates anymore 0633 recentlyChangedCollections.remove(colNtf.collection().id()); 0634 } 0635 } 0636 } 0637 0638 void MonitorPrivate::slotSessionDestroyed(QObject *object) 0639 { 0640 auto objectSession = qobject_cast<Session *>(object); 0641 if (objectSession) { 0642 sessions.removeAll(objectSession->sessionId()); 0643 pendingModification.stopIgnoringSession(objectSession->sessionId()); 0644 scheduleSubscriptionUpdate(); 0645 } 0646 } 0647 0648 void MonitorPrivate::slotStatisticsChangedFinished(KJob *job) 0649 { 0650 if (job->error()) { 0651 qCWarning(AKONADICORE_LOG) << "Error on fetching collection statistics: " << job->errorText(); 0652 } else { 0653 auto statisticsJob = static_cast<CollectionStatisticsJob *>(job); 0654 Q_ASSERT(statisticsJob->collection().isValid()); 0655 Q_EMIT q_ptr->collectionStatisticsChanged(statisticsJob->collection().id(), statisticsJob->statistics()); 0656 } 0657 } 0658 0659 void MonitorPrivate::slotFlushRecentlyChangedCollections() 0660 { 0661 for (Collection::Id collection : std::as_const(recentlyChangedCollections)) { 0662 Q_ASSERT(collection >= 0); 0663 if (fetchCollectionStatistics) { 0664 fetchStatistics(collection); 0665 } else { 0666 static const CollectionStatistics dummyStatistics; 0667 Q_EMIT q_ptr->collectionStatisticsChanged(collection, dummyStatistics); 0668 } 0669 } 0670 recentlyChangedCollections.clear(); 0671 } 0672 0673 int MonitorPrivate::translateAndCompress(QQueue<Protocol::ChangeNotificationPtr> ¬ificationQueue, const Protocol::ChangeNotificationPtr &msg) 0674 { 0675 // Always handle tags and relations 0676 if (msg->type() == Protocol::Command::TagChangeNotification || msg->type() == Protocol::Command::RelationChangeNotification) { 0677 notificationQueue.enqueue(msg); 0678 return 1; 0679 } 0680 0681 // We have to split moves into insert or remove if the source or destination 0682 // is not monitored. 0683 if (!msg->isMove()) { 0684 notificationQueue.enqueue(msg); 0685 return 1; 0686 } 0687 0688 bool sourceWatched = false; 0689 bool destWatched = false; 0690 0691 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0692 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0693 if (useRefCounting) { 0694 sourceWatched = isMonitored(itemNtf.parentCollection()); 0695 destWatched = isMonitored(itemNtf.parentDestCollection()); 0696 } else { 0697 if (!resources.isEmpty()) { 0698 sourceWatched = resources.contains(itemNtf.resource()); 0699 destWatched = isMoveDestinationResourceMonitored(itemNtf); 0700 } 0701 if (!sourceWatched) { 0702 sourceWatched = isCollectionMonitored(itemNtf.parentCollection()); 0703 } 0704 if (!destWatched) { 0705 destWatched = isCollectionMonitored(itemNtf.parentDestCollection()); 0706 } 0707 } 0708 } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0709 const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg); 0710 if (!resources.isEmpty()) { 0711 sourceWatched = resources.contains(colNtf.resource()); 0712 destWatched = isMoveDestinationResourceMonitored(colNtf); 0713 } 0714 if (!sourceWatched) { 0715 sourceWatched = isCollectionMonitored(colNtf.parentCollection()); 0716 } 0717 if (!destWatched) { 0718 destWatched = isCollectionMonitored(colNtf.parentDestCollection()); 0719 } 0720 } else { 0721 Q_ASSERT(false); 0722 return 0; 0723 } 0724 0725 if (!sourceWatched && !destWatched) { 0726 return 0; 0727 } 0728 0729 if ((sourceWatched && destWatched) || (!collectionMoveTranslationEnabled && msg->type() == Protocol::Command::CollectionChangeNotification)) { 0730 notificationQueue.enqueue(msg); 0731 return 1; 0732 } 0733 0734 if (sourceWatched) { 0735 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0736 auto removalMessage = Protocol::ItemChangeNotificationPtr::create(Protocol::cmdCast<Protocol::ItemChangeNotification>(msg)); 0737 removalMessage->setOperation(Protocol::ItemChangeNotification::Remove); 0738 removalMessage->setParentDestCollection(-1); 0739 notificationQueue.enqueue(removalMessage); 0740 return 1; 0741 } else { 0742 auto removalMessage = Protocol::CollectionChangeNotificationPtr::create(Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg)); 0743 removalMessage->setOperation(Protocol::CollectionChangeNotification::Remove); 0744 removalMessage->setParentDestCollection(-1); 0745 notificationQueue.enqueue(removalMessage); 0746 return 1; 0747 } 0748 } 0749 0750 // Transform into an insertion 0751 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0752 auto insertionMessage = Protocol::ItemChangeNotificationPtr::create(Protocol::cmdCast<Protocol::ItemChangeNotification>(msg)); 0753 insertionMessage->setOperation(Protocol::ItemChangeNotification::Add); 0754 insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); 0755 insertionMessage->setParentDestCollection(-1); 0756 // We don't support batch insertion, so we have to do it one by one 0757 const auto split = splitMessage(*insertionMessage, false); 0758 for (const Protocol::ChangeNotificationPtr &insertion : split) { 0759 notificationQueue.enqueue(insertion); 0760 } 0761 return split.count(); 0762 } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0763 auto insertionMessage = Protocol::CollectionChangeNotificationPtr::create(Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg)); 0764 insertionMessage->setOperation(Protocol::CollectionChangeNotification::Add); 0765 insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); 0766 insertionMessage->setParentDestCollection(-1); 0767 notificationQueue.enqueue(insertionMessage); 0768 return 1; 0769 } 0770 0771 Q_ASSERT(false); 0772 return 0; 0773 } 0774 0775 void MonitorPrivate::handleCommands() 0776 { 0777 Q_Q(Monitor); 0778 0779 CommandBufferLocker lock(&mCommandBuffer); 0780 CommandBufferNotifyBlocker notify(&mCommandBuffer); 0781 while (!mCommandBuffer.isEmpty()) { 0782 const auto cmd = mCommandBuffer.dequeue(); 0783 lock.unlock(); 0784 const auto command = cmd.command; 0785 0786 if (command->isResponse()) { 0787 switch (command->type()) { 0788 case Protocol::Command::Hello: { 0789 qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus"; 0790 QByteArray subname; 0791 if (!q->objectName().isEmpty()) { 0792 subname = q->objectName().toLatin1(); 0793 } else { 0794 subname = session->sessionId(); 0795 } 0796 subname += " - " + QByteArray::number(quintptr(q)); 0797 qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\""; 0798 auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId()); 0799 ntfConnection->sendCommand(2, subCmd); 0800 break; 0801 } 0802 0803 case Protocol::Command::CreateSubscription: { 0804 auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(); 0805 for (const auto &col : std::as_const(collections)) { 0806 msubCmd->startMonitoringCollection(col.id()); 0807 } 0808 for (const auto &res : std::as_const(resources)) { 0809 msubCmd->startMonitoringResource(res); 0810 } 0811 for (auto itemId : std::as_const(items)) { 0812 msubCmd->startMonitoringItem(itemId); 0813 } 0814 for (auto tagId : std::as_const(tags)) { 0815 msubCmd->startMonitoringTag(tagId); 0816 } 0817 for (auto type : std::as_const(types)) { 0818 msubCmd->startMonitoringType(monitorTypeToProtocol(type)); 0819 } 0820 for (const auto &mimetype : std::as_const(mimetypes)) { 0821 msubCmd->startMonitoringMimeType(mimetype); 0822 } 0823 for (const auto &session : std::as_const(sessions)) { 0824 msubCmd->startIgnoringSession(session); 0825 } 0826 msubCmd->setAllMonitored(monitorAll); 0827 msubCmd->setIsExclusive(exclusive); 0828 msubCmd->setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); 0829 msubCmd->setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); 0830 msubCmd->setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); 0831 pendingModification = Protocol::ModifySubscriptionCommand(); 0832 ntfConnection->sendCommand(3, msubCmd); 0833 break; 0834 } 0835 0836 case Protocol::Command::ModifySubscription: 0837 // TODO: Handle errors 0838 if (!monitorReady) { 0839 monitorReady = true; 0840 Q_EMIT q_ptr->monitorReady(); 0841 } 0842 break; 0843 0844 default: 0845 qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command); 0846 break; 0847 } 0848 } else { 0849 switch (command->type()) { 0850 case Protocol::Command::ItemChangeNotification: 0851 case Protocol::Command::CollectionChangeNotification: 0852 case Protocol::Command::TagChangeNotification: 0853 case Protocol::Command::RelationChangeNotification: 0854 case Protocol::Command::SubscriptionChangeNotification: 0855 case Protocol::Command::DebugChangeNotification: 0856 slotNotify(command.staticCast<Protocol::ChangeNotification>()); 0857 break; 0858 default: 0859 qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command); 0860 break; 0861 } 0862 } 0863 0864 lock.relock(); 0865 } 0866 notify.unblock(); 0867 lock.unlock(); 0868 } 0869 0870 /* 0871 0872 server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit 0873 | | 0874 x --> discard x --> pipeline 0875 0876 fetchJobDone --> pipeline ?dataAvailable --> emit 0877 */ 0878 0879 void MonitorPrivate::slotNotify(const Protocol::ChangeNotificationPtr &msg) 0880 { 0881 int appendedMessages = 0; 0882 int modifiedMessages = 0; 0883 int erasedMessages = 0; 0884 0885 invalidateCaches(msg); 0886 updatePendingStatistics(msg); 0887 bool needsSplit = true; 0888 bool supportsBatch = false; 0889 0890 if (isLazilyIgnored(msg, true)) { 0891 return; 0892 } 0893 0894 checkBatchSupport(msg, needsSplit, supportsBatch); 0895 0896 const bool isModifyFlags = (msg->type() == Protocol::Command::ItemChangeNotification 0897 && Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).operation() == Protocol::ItemChangeNotification::ModifyFlags); 0898 if (supportsBatch || (!needsSplit && !supportsBatch && !isModifyFlags) || msg->type() == Protocol::Command::CollectionChangeNotification) { 0899 // Make sure the batch msg is always queued before the split notifications 0900 const int oldSize = pendingNotifications.size(); 0901 const int appended = translateAndCompress(pendingNotifications, msg); 0902 if (appended > 0) { 0903 appendedMessages += appended; 0904 } else { 0905 ++modifiedMessages; 0906 } 0907 // translateAndCompress can remove an existing "modify" when msg is a "delete". 0908 // Or it can merge two ModifyFlags and return false. 0909 // We need to detect such removals, for ChangeRecorder. 0910 if (pendingNotifications.count() != oldSize + appended) { 0911 ++erasedMessages; // this count isn't exact, but it doesn't matter 0912 } 0913 } else if (needsSplit) { 0914 // If it's not queued at least make sure we fetch all the items from split 0915 // notifications in one go. 0916 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0917 const auto items = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg).items(); 0918 itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(items), mItemFetchScope); 0919 } 0920 } 0921 0922 // if the message contains more items, but we need to emit single-item notification, 0923 // split the message into one message per item and queue them 0924 // if the message contains only one item, but batches are not supported 0925 // (and thus neither is flagsModified), splitMessage() will convert the 0926 // notification to regular Modify with "FLAGS" part changed 0927 if (needsSplit || (!needsSplit && !supportsBatch && isModifyFlags)) { 0928 // Make sure inter-resource move notifications are translated into 0929 // Add/Remove notifications 0930 if (msg->type() == Protocol::Command::ItemChangeNotification) { 0931 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 0932 if (itemNtf.operation() == Protocol::ItemChangeNotification::Move && itemNtf.resource() != itemNtf.destinationResource()) { 0933 if (needsSplit) { 0934 const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); 0935 for (const auto &splitMsg : split) { 0936 appendedMessages += translateAndCompress(pendingNotifications, splitMsg); 0937 } 0938 } else { 0939 appendedMessages += translateAndCompress(pendingNotifications, msg); 0940 } 0941 } else { 0942 const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); 0943 pendingNotifications << (split | Actions::toQList); 0944 appendedMessages += split.count(); 0945 } 0946 } 0947 } 0948 0949 // tell ChangeRecorder (even if 0 appended, the compression could have made changes to existing messages) 0950 if (appendedMessages > 0 || modifiedMessages > 0 || erasedMessages > 0) { 0951 if (erasedMessages > 0) { 0952 notificationsErased(); 0953 } else { 0954 notificationsEnqueued(appendedMessages); 0955 } 0956 } 0957 0958 dispatchNotifications(); 0959 } 0960 0961 void MonitorPrivate::flushPipeline() 0962 { 0963 while (!pipeline.isEmpty()) { 0964 const auto msg = pipeline.head(); 0965 if (ensureDataAvailable(msg)) { 0966 // dequeue should be before emit, otherwise stuff might happen (like dataAvailable 0967 // being called again) and we end up dequeuing an empty pipeline 0968 pipeline.dequeue(); 0969 emitNotification(msg); 0970 } else { 0971 break; 0972 } 0973 } 0974 } 0975 0976 void MonitorPrivate::dataAvailable() 0977 { 0978 flushPipeline(); 0979 dispatchNotifications(); 0980 } 0981 0982 void MonitorPrivate::dispatchNotifications() 0983 { 0984 // Note that this code is not used in a ChangeRecorder (pipelineSize==0) 0985 while (pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty()) { 0986 const auto msg = pendingNotifications.dequeue(); 0987 const bool avail = ensureDataAvailable(msg); 0988 if (avail && pipeline.isEmpty()) { 0989 emitNotification(msg); 0990 } else { 0991 pipeline.enqueue(msg); 0992 } 0993 } 0994 } 0995 0996 static Relation::List extractRelations(const QSet<Protocol::ItemChangeNotification::Relation> &rels) 0997 { 0998 Relation::List relations; 0999 if (rels.isEmpty()) { 1000 return relations; 1001 } 1002 1003 relations.reserve(rels.size()); 1004 for (const auto &rel : rels) { 1005 relations.push_back(Relation(rel.type.toLatin1(), Akonadi::Item(rel.leftId), Akonadi::Item(rel.rightId))); 1006 } 1007 return relations; 1008 } 1009 1010 bool MonitorPrivate::emitItemsNotification(const Protocol::ItemChangeNotification &msg, 1011 const Item::List &items, 1012 const Collection &collection, 1013 const Collection &collectionDest) 1014 { 1015 Collection col = collection; 1016 Collection colDest = collectionDest; 1017 if (!col.isValid()) { 1018 col = Collection(msg.parentCollection()); 1019 col.setResource(QString::fromUtf8(msg.resource())); 1020 } 1021 if (!colDest.isValid()) { 1022 colDest = Collection(msg.parentDestCollection()); 1023 // HACK: destination resource is delivered in the parts field... 1024 if (!msg.itemParts().isEmpty()) { 1025 colDest.setResource(QString::fromLatin1(*(msg.itemParts().cbegin()))); 1026 } 1027 } 1028 1029 Relation::List addedRelations; 1030 Relation::List removedRelations; 1031 if (msg.operation() == Protocol::ItemChangeNotification::ModifyRelations) { 1032 addedRelations = extractRelations(msg.addedRelations()); 1033 removedRelations = extractRelations(msg.removedRelations()); 1034 } 1035 1036 Tag::List addedTags; 1037 Tag::List removedTags; 1038 if (msg.operation() == Protocol::ItemChangeNotification::ModifyTags) { 1039 addedTags = tagCache->retrieve(msg.addedTags() | Actions::toQList); 1040 removedTags = tagCache->retrieve(msg.removedTags() | Actions::toQList); 1041 } 1042 1043 Item::List its = items; 1044 for (auto it = its.begin(), end = its.end(); it != end; ++it) { 1045 if (msg.operation() == Protocol::ItemChangeNotification::Move) { 1046 it->setParentCollection(colDest); 1047 } else { 1048 it->setParentCollection(col); 1049 } 1050 } 1051 bool handled = false; 1052 switch (msg.operation()) { 1053 case Protocol::ItemChangeNotification::Add: 1054 return emitToListeners(&Monitor::itemAdded, its.first(), col); 1055 case Protocol::ItemChangeNotification::Modify: 1056 return emitToListeners(&Monitor::itemChanged, its.first(), msg.itemParts()); 1057 case Protocol::ItemChangeNotification::ModifyFlags: 1058 return emitToListeners(&Monitor::itemsFlagsChanged, its, msg.addedFlags(), msg.removedFlags()); 1059 case Protocol::ItemChangeNotification::Move: 1060 handled |= emitToListeners(&Monitor::itemMoved, its.first(), col, colDest); 1061 handled |= emitToListeners(&Monitor::itemsMoved, its, col, colDest); 1062 return handled; 1063 case Protocol::ItemChangeNotification::Remove: 1064 handled |= emitToListeners(&Monitor::itemRemoved, its.first()); 1065 handled |= emitToListeners(&Monitor::itemsRemoved, its); 1066 return handled; 1067 case Protocol::ItemChangeNotification::Link: 1068 handled |= emitToListeners(&Monitor::itemLinked, its.first(), col); 1069 handled |= emitToListeners(&Monitor::itemsLinked, its, col); 1070 return handled; 1071 case Protocol::ItemChangeNotification::Unlink: 1072 handled |= emitToListeners(&Monitor::itemUnlinked, its.first(), col); 1073 handled |= emitToListeners(&Monitor::itemsUnlinked, its, col); 1074 return handled; 1075 case Protocol::ItemChangeNotification::ModifyTags: 1076 return emitToListeners(&Monitor::itemsTagsChanged, its, addedTags | Actions::toQSet, removedTags | Actions::toQSet); 1077 case Protocol::ItemChangeNotification::ModifyRelations: 1078 return emitToListeners(&Monitor::itemsRelationsChanged, its, addedRelations, removedRelations); 1079 default: 1080 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in item change notification"; 1081 return false; 1082 } 1083 } 1084 1085 bool MonitorPrivate::emitCollectionNotification(const Protocol::CollectionChangeNotification &msg, 1086 const Collection &col, 1087 const Collection &par, 1088 const Collection &dest) 1089 { 1090 Collection parent = par; 1091 if (!parent.isValid()) { 1092 parent = Collection(msg.parentCollection()); 1093 } 1094 Collection destination = dest; 1095 if (!destination.isValid()) { 1096 destination = Collection(msg.parentDestCollection()); 1097 } 1098 1099 Collection collection = col; 1100 Q_ASSERT(collection.isValid()); 1101 if (!collection.isValid()) { 1102 qCWarning(AKONADICORE_LOG) << "Failed to get valid Collection for a Collection change!"; 1103 return true; // prevent Monitor disconnecting from a signal 1104 } 1105 1106 if (msg.operation() == Protocol::CollectionChangeNotification::Move) { 1107 collection.setParentCollection(destination); 1108 } else { 1109 collection.setParentCollection(parent); 1110 } 1111 1112 bool handled = false; 1113 switch (msg.operation()) { 1114 case Protocol::CollectionChangeNotification::Add: 1115 return emitToListeners(&Monitor::collectionAdded, collection, parent); 1116 case Protocol::CollectionChangeNotification::Modify: 1117 handled |= emitToListeners(qOverload<const Akonadi::Collection &>(&Monitor::collectionChanged), collection); 1118 handled |= 1119 emitToListeners(qOverload<const Akonadi::Collection &, const QSet<QByteArray> &>(&Monitor::collectionChanged), collection, msg.changedParts()); 1120 return handled; 1121 case Protocol::CollectionChangeNotification::Move: 1122 return emitToListeners(&Monitor::collectionMoved, collection, parent, destination); 1123 case Protocol::CollectionChangeNotification::Remove: 1124 return emitToListeners(&Monitor::collectionRemoved, collection); 1125 case Protocol::CollectionChangeNotification::Subscribe: 1126 return emitToListeners(&Monitor::collectionSubscribed, collection, parent); 1127 case Protocol::CollectionChangeNotification::Unsubscribe: 1128 return emitToListeners(&Monitor::collectionUnsubscribed, collection); 1129 default: 1130 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in collection change notification"; 1131 return false; 1132 } 1133 } 1134 1135 bool MonitorPrivate::emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tag) 1136 { 1137 Q_UNUSED(msg) 1138 switch (msg.operation()) { 1139 case Protocol::TagChangeNotification::Add: 1140 return emitToListeners(&Monitor::tagAdded, tag); 1141 case Protocol::TagChangeNotification::Modify: 1142 return emitToListeners(&Monitor::tagChanged, tag); 1143 case Protocol::TagChangeNotification::Remove: 1144 return emitToListeners(&Monitor::tagRemoved, tag); 1145 default: 1146 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; 1147 return false; 1148 } 1149 } 1150 1151 bool MonitorPrivate::emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation) 1152 { 1153 if (!relation.isValid()) { 1154 return false; 1155 } 1156 1157 switch (msg.operation()) { 1158 case Protocol::RelationChangeNotification::Add: 1159 return emitToListeners(&Monitor::relationAdded, relation); 1160 case Protocol::RelationChangeNotification::Remove: 1161 return emitToListeners(&Monitor::relationRemoved, relation); 1162 default: 1163 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; 1164 return false; 1165 } 1166 } 1167 1168 bool MonitorPrivate::emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const Akonadi::NotificationSubscriber &subscriber) 1169 { 1170 if (!subscriber.isValid()) { 1171 return false; 1172 } 1173 1174 switch (msg.operation()) { 1175 case Protocol::SubscriptionChangeNotification::Add: 1176 return emitToListeners(&Monitor::notificationSubscriberAdded, subscriber); 1177 case Protocol::SubscriptionChangeNotification::Modify: 1178 return emitToListeners(&Monitor::notificationSubscriberChanged, subscriber); 1179 case Protocol::SubscriptionChangeNotification::Remove: 1180 return emitToListeners(&Monitor::notificationSubscriberRemoved, subscriber); 1181 default: 1182 qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in subscription change notification"; 1183 return false; 1184 } 1185 } 1186 1187 bool MonitorPrivate::emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf) 1188 { 1189 Q_UNUSED(msg) 1190 1191 if (!ntf.isValid()) { 1192 return false; 1193 } 1194 1195 return emitToListeners(&Monitor::debugNotification, ntf); 1196 } 1197 1198 void MonitorPrivate::invalidateCaches(const Protocol::ChangeNotificationPtr &msg) 1199 { 1200 // remove invalidates 1201 // modify removes the cache entry, as we need to re-fetch 1202 // And subscription modify the visibility of the collection by the collectionFetchScope. 1203 switch (msg->type()) { 1204 case Protocol::Command::CollectionChangeNotification: { 1205 const auto &colNtf = Protocol::cmdCast<Protocol::CollectionChangeNotification>(msg); 1206 switch (colNtf.operation()) { 1207 case Protocol::CollectionChangeNotification::Modify: 1208 case Protocol::CollectionChangeNotification::Move: 1209 case Protocol::CollectionChangeNotification::Subscribe: 1210 collectionCache->update(colNtf.collection().id(), mCollectionFetchScope); 1211 break; 1212 case Protocol::CollectionChangeNotification::Remove: 1213 collectionCache->invalidate(colNtf.collection().id()); 1214 break; 1215 default: 1216 break; 1217 } 1218 } break; 1219 case Protocol::Command::ItemChangeNotification: { 1220 const auto &itemNtf = Protocol::cmdCast<Protocol::ItemChangeNotification>(msg); 1221 switch (itemNtf.operation()) { 1222 case Protocol::ItemChangeNotification::Modify: 1223 case Protocol::ItemChangeNotification::ModifyFlags: 1224 case Protocol::ItemChangeNotification::ModifyTags: 1225 case Protocol::ItemChangeNotification::ModifyRelations: 1226 case Protocol::ItemChangeNotification::Move: 1227 itemCache->update(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope); 1228 break; 1229 case Protocol::ItemChangeNotification::Remove: 1230 itemCache->invalidate(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); 1231 break; 1232 default: 1233 break; 1234 } 1235 } break; 1236 case Protocol::Command::TagChangeNotification: { 1237 const auto &tagNtf = Protocol::cmdCast<Protocol::TagChangeNotification>(msg); 1238 switch (tagNtf.operation()) { 1239 case Protocol::TagChangeNotification::Modify: 1240 tagCache->update({tagNtf.tag().id()}, mTagFetchScope); 1241 break; 1242 case Protocol::TagChangeNotification::Remove: 1243 tagCache->invalidate({tagNtf.tag().id()}); 1244 break; 1245 default: 1246 break; 1247 } 1248 } break; 1249 default: 1250 break; 1251 } 1252 } 1253 1254 void MonitorPrivate::invalidateCache(const Collection &col) 1255 { 1256 collectionCache->update(col.id(), mCollectionFetchScope); 1257 } 1258 1259 void MonitorPrivate::ref(Collection::Id id) 1260 { 1261 if (!refCountMap.contains(id)) { 1262 refCountMap.insert(id, 0); 1263 } 1264 ++refCountMap[id]; 1265 1266 if (m_buffer.isBuffered(id)) { 1267 m_buffer.purge(id); 1268 } 1269 } 1270 1271 Akonadi::Collection::Id MonitorPrivate::deref(Collection::Id id) 1272 { 1273 Q_ASSERT(refCountMap.contains(id)); 1274 if (--refCountMap[id] == 0) { 1275 refCountMap.remove(id); 1276 return m_buffer.buffer(id); 1277 } 1278 return -1; 1279 } 1280 1281 void MonitorPrivate::PurgeBuffer::purge(Collection::Id id) 1282 { 1283 m_buffer.removeOne(id); 1284 } 1285 1286 Akonadi::Collection::Id MonitorPrivate::PurgeBuffer::buffer(Collection::Id id) 1287 { 1288 // Ensure that we don't put a duplicate @p id into the buffer. 1289 purge(id); 1290 1291 Collection::Id bumpedId = -1; 1292 if (m_buffer.size() == MAXBUFFERSIZE) { 1293 bumpedId = m_buffer.dequeue(); 1294 purge(bumpedId); 1295 } 1296 1297 m_buffer.enqueue(id); 1298 1299 return bumpedId; 1300 } 1301 1302 int MonitorPrivate::PurgeBuffer::buffersize() 1303 { 1304 return MAXBUFFERSIZE; 1305 } 1306 1307 bool MonitorPrivate::isMonitored(Collection::Id colId) const 1308 { 1309 if (!useRefCounting) { 1310 return true; 1311 } 1312 return refCountMap.contains(colId) || m_buffer.isBuffered(colId); 1313 } 1314 1315 void MonitorPrivate::notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource) 1316 { 1317 if (collection > 0 && (monitorAll || isCollectionMonitored(collection) || resources.contains(resource))) { 1318 recentlyChangedCollections.insert(collection); 1319 if (!statisticsCompressionTimer.isActive()) { 1320 statisticsCompressionTimer.start(); 1321 } 1322 } 1323 } 1324 1325 Protocol::ModifySubscriptionCommand::ChangeType MonitorPrivate::monitorTypeToProtocol(Monitor::Type type) 1326 { 1327 switch (type) { 1328 case Monitor::Collections: 1329 return Protocol::ModifySubscriptionCommand::CollectionChanges; 1330 case Monitor::Items: 1331 return Protocol::ModifySubscriptionCommand::ItemChanges; 1332 case Monitor::Tags: 1333 return Protocol::ModifySubscriptionCommand::TagChanges; 1334 case Monitor::Relations: 1335 return Protocol::ModifySubscriptionCommand::RelationChanges; 1336 case Monitor::Subscribers: 1337 return Protocol::ModifySubscriptionCommand::SubscriptionChanges; 1338 case Monitor::Notifications: 1339 return Protocol::ModifySubscriptionCommand::ChangeNotifications; 1340 default: 1341 Q_ASSERT(false); 1342 return Protocol::ModifySubscriptionCommand::NoType; 1343 } 1344 } 1345 1346 void MonitorPrivate::updateListeners(QMetaMethod signal, ListenerAction action) 1347 { 1348 #define UPDATE_LISTENERS(sig) \ 1349 if (signal == QMetaMethod::fromSignal(sig)) { \ 1350 updateListener(sig, action); \ 1351 return; \ 1352 } 1353 1354 UPDATE_LISTENERS(&Monitor::itemChanged) 1355 UPDATE_LISTENERS(&Monitor::itemChanged) 1356 UPDATE_LISTENERS(&Monitor::itemsFlagsChanged) 1357 UPDATE_LISTENERS(&Monitor::itemsTagsChanged) 1358 UPDATE_LISTENERS(&Monitor::itemsRelationsChanged) 1359 UPDATE_LISTENERS(&Monitor::itemMoved) 1360 UPDATE_LISTENERS(&Monitor::itemsMoved) 1361 UPDATE_LISTENERS(&Monitor::itemAdded) 1362 UPDATE_LISTENERS(&Monitor::itemRemoved) 1363 UPDATE_LISTENERS(&Monitor::itemsRemoved) 1364 UPDATE_LISTENERS(&Monitor::itemLinked) 1365 UPDATE_LISTENERS(&Monitor::itemsLinked) 1366 UPDATE_LISTENERS(&Monitor::itemUnlinked) 1367 UPDATE_LISTENERS(&Monitor::itemsUnlinked) 1368 UPDATE_LISTENERS(&Monitor::collectionAdded) 1369 1370 UPDATE_LISTENERS(qOverload<const Akonadi::Collection &>(&Monitor::collectionChanged)) 1371 UPDATE_LISTENERS((qOverload<const Akonadi::Collection &, const QSet<QByteArray> &>(&Monitor::collectionChanged))) 1372 UPDATE_LISTENERS(&Monitor::collectionMoved) 1373 UPDATE_LISTENERS(&Monitor::collectionRemoved) 1374 UPDATE_LISTENERS(&Monitor::collectionSubscribed) 1375 UPDATE_LISTENERS(&Monitor::collectionUnsubscribed) 1376 UPDATE_LISTENERS(&Monitor::collectionStatisticsChanged) 1377 1378 UPDATE_LISTENERS(&Monitor::tagAdded) 1379 UPDATE_LISTENERS(&Monitor::tagChanged) 1380 UPDATE_LISTENERS(&Monitor::tagRemoved) 1381 1382 UPDATE_LISTENERS(&Monitor::relationAdded) 1383 UPDATE_LISTENERS(&Monitor::relationRemoved) 1384 1385 UPDATE_LISTENERS(&Monitor::notificationSubscriberAdded) 1386 UPDATE_LISTENERS(&Monitor::notificationSubscriberChanged) 1387 UPDATE_LISTENERS(&Monitor::notificationSubscriberRemoved) 1388 UPDATE_LISTENERS(&Monitor::debugNotification) 1389 1390 #undef UPDATE_LISTENERS 1391 } 1392 1393 /// @endcond