File indexing completed on 2025-01-05 04:46:59
0001 /* 0002 SPDX-FileCopyrightText: 2006-2007 Volker Krause <vkrause@kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "notificationcollector.h" 0008 #include "aggregatedfetchscope.h" 0009 #include "akonadi.h" 0010 #include "cachecleaner.h" 0011 #include "connection.h" 0012 #include "handler/itemfetchhelper.h" 0013 #include "handlerhelper.h" 0014 #include "intervalcheck.h" 0015 #include "notificationmanager.h" 0016 #include "search/searchmanager.h" 0017 #include "selectquerybuilder.h" 0018 #include "shared/akranges.h" 0019 #include "storage/collectionstatistics.h" 0020 #include "storage/datastore.h" 0021 #include "storage/entity.h" 0022 0023 #include "akonadiserver_debug.h" 0024 0025 #include <QScopedValueRollback> 0026 0027 using namespace Akonadi; 0028 using namespace Akonadi::Server; 0029 0030 NotificationCollector::NotificationCollector(AkonadiServer &akonadi, DataStore *db) 0031 : mDb(db) 0032 , mAkonadi(akonadi) 0033 { 0034 QObject::connect(db, &DataStore::transactionCommitted, db, [this]() { 0035 if (!mIgnoreTransactions) { 0036 dispatchNotifications(); 0037 } 0038 }); 0039 QObject::connect(db, &DataStore::transactionRolledBack, db, [this]() { 0040 if (!mIgnoreTransactions) { 0041 clear(); 0042 } 0043 }); 0044 } 0045 0046 void NotificationCollector::itemAdded(const PimItem &item, bool seen, const Collection &collection, const QByteArray &resource) 0047 { 0048 mAkonadi.searchManager().scheduleSearchUpdate(); 0049 mAkonadi.collectionStatistics().itemAdded(collection, item.size(), seen); 0050 itemNotification(Protocol::ItemChangeNotification::Add, item, collection, Collection(), resource); 0051 } 0052 0053 void NotificationCollector::itemChanged(const PimItem &item, const QSet<QByteArray> &changedParts, const Collection &collection, const QByteArray &resource) 0054 { 0055 mAkonadi.searchManager().scheduleSearchUpdate(); 0056 itemNotification(Protocol::ItemChangeNotification::Modify, item, collection, Collection(), resource, changedParts); 0057 } 0058 0059 void NotificationCollector::itemsFlagsChanged(const PimItem::List &items, 0060 const QSet<QByteArray> &addedFlags, 0061 const QSet<QByteArray> &removedFlags, 0062 const Collection &collection, 0063 const QByteArray &resource) 0064 { 0065 int seenCount = (addedFlags.contains(AKONADI_FLAG_SEEN) || addedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); 0066 seenCount -= (removedFlags.contains(AKONADI_FLAG_SEEN) || removedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); 0067 0068 mAkonadi.collectionStatistics().itemsSeenChanged(collection, seenCount); 0069 itemNotification(Protocol::ItemChangeNotification::ModifyFlags, items, collection, Collection(), resource, QSet<QByteArray>(), addedFlags, removedFlags); 0070 } 0071 0072 void NotificationCollector::itemsTagsChanged(const PimItem::List &items, 0073 const QSet<qint64> &addedTags, 0074 const QSet<qint64> &removedTags, 0075 const Collection &collection, 0076 const QByteArray &resource) 0077 { 0078 itemNotification(Protocol::ItemChangeNotification::ModifyTags, 0079 items, 0080 collection, 0081 Collection(), 0082 resource, 0083 QSet<QByteArray>(), 0084 QSet<QByteArray>(), 0085 QSet<QByteArray>(), 0086 addedTags, 0087 removedTags); 0088 } 0089 0090 void NotificationCollector::itemsRelationsChanged(const PimItem::List &items, 0091 const Relation::List &addedRelations, 0092 const Relation::List &removedRelations, 0093 const Collection &collection, 0094 const QByteArray &resource) 0095 { 0096 itemNotification(Protocol::ItemChangeNotification::ModifyRelations, 0097 items, 0098 collection, 0099 Collection(), 0100 resource, 0101 QSet<QByteArray>(), 0102 QSet<QByteArray>(), 0103 QSet<QByteArray>(), 0104 QSet<qint64>(), 0105 QSet<qint64>(), 0106 addedRelations, 0107 removedRelations); 0108 } 0109 0110 void NotificationCollector::itemsMoved(const PimItem::List &items, 0111 const Collection &collectionSrc, 0112 const Collection &collectionDest, 0113 const QByteArray &sourceResource) 0114 { 0115 mAkonadi.searchManager().scheduleSearchUpdate(); 0116 itemNotification(Protocol::ItemChangeNotification::Move, items, collectionSrc, collectionDest, sourceResource); 0117 } 0118 0119 void NotificationCollector::itemsRemoved(const PimItem::List &items, const Collection &collection, const QByteArray &resource) 0120 { 0121 itemNotification(Protocol::ItemChangeNotification::Remove, items, collection, Collection(), resource); 0122 } 0123 0124 void NotificationCollector::itemsLinked(const PimItem::List &items, const Collection &collection) 0125 { 0126 itemNotification(Protocol::ItemChangeNotification::Link, items, collection, Collection(), QByteArray()); 0127 } 0128 0129 void NotificationCollector::itemsUnlinked(const PimItem::List &items, const Collection &collection) 0130 { 0131 itemNotification(Protocol::ItemChangeNotification::Unlink, items, collection, Collection(), QByteArray()); 0132 } 0133 0134 void NotificationCollector::collectionAdded(const Collection &collection, const QByteArray &resource) 0135 { 0136 if (auto cleaner = mAkonadi.cacheCleaner()) { 0137 cleaner->collectionAdded(collection.id()); 0138 } 0139 mAkonadi.intervalChecker().collectionAdded(collection.id()); 0140 collectionNotification(Protocol::CollectionChangeNotification::Add, collection, collection.parentId(), -1, resource); 0141 } 0142 0143 void NotificationCollector::collectionChanged(const Collection &collection, const QList<QByteArray> &changes, const QByteArray &resource) 0144 { 0145 if (auto cleaner = mAkonadi.cacheCleaner()) { 0146 cleaner->collectionChanged(collection.id()); 0147 } 0148 mAkonadi.intervalChecker().collectionChanged(collection.id()); 0149 if (changes.contains(AKONADI_PARAM_ENABLED)) { 0150 mAkonadi.collectionStatistics().invalidateCollection(collection); 0151 } 0152 collectionNotification(Protocol::CollectionChangeNotification::Modify, 0153 collection, 0154 collection.parentId(), 0155 -1, 0156 resource, 0157 changes | AkRanges::Actions::toQSet); 0158 } 0159 0160 void NotificationCollector::collectionMoved(const Collection &collection, const Collection &source, const QByteArray &resource, const QByteArray &destResource) 0161 { 0162 if (auto cleaner = mAkonadi.cacheCleaner()) { 0163 cleaner->collectionChanged(collection.id()); 0164 } 0165 mAkonadi.intervalChecker().collectionChanged(collection.id()); 0166 collectionNotification(Protocol::CollectionChangeNotification::Move, 0167 collection, 0168 source.id(), 0169 collection.parentId(), 0170 resource, 0171 QSet<QByteArray>(), 0172 destResource); 0173 } 0174 0175 void NotificationCollector::collectionRemoved(const Collection &collection, const QByteArray &resource) 0176 { 0177 if (auto cleaner = mAkonadi.cacheCleaner()) { 0178 cleaner->collectionRemoved(collection.id()); 0179 } 0180 mAkonadi.intervalChecker().collectionRemoved(collection.id()); 0181 mAkonadi.collectionStatistics().invalidateCollection(collection); 0182 collectionNotification(Protocol::CollectionChangeNotification::Remove, collection, collection.parentId(), -1, resource); 0183 } 0184 0185 void NotificationCollector::collectionSubscribed(const Collection &collection, const QByteArray &resource) 0186 { 0187 if (auto cleaner = mAkonadi.cacheCleaner()) { 0188 cleaner->collectionAdded(collection.id()); 0189 } 0190 mAkonadi.intervalChecker().collectionAdded(collection.id()); 0191 collectionNotification(Protocol::CollectionChangeNotification::Subscribe, collection, collection.parentId(), -1, resource, QSet<QByteArray>()); 0192 } 0193 0194 void NotificationCollector::collectionUnsubscribed(const Collection &collection, const QByteArray &resource) 0195 { 0196 if (auto cleaner = mAkonadi.cacheCleaner()) { 0197 cleaner->collectionRemoved(collection.id()); 0198 } 0199 mAkonadi.intervalChecker().collectionRemoved(collection.id()); 0200 mAkonadi.collectionStatistics().invalidateCollection(collection); 0201 collectionNotification(Protocol::CollectionChangeNotification::Unsubscribe, collection, collection.parentId(), -1, resource, QSet<QByteArray>()); 0202 } 0203 0204 void NotificationCollector::tagAdded(const Tag &tag) 0205 { 0206 tagNotification(Protocol::TagChangeNotification::Add, tag); 0207 } 0208 0209 void NotificationCollector::tagChanged(const Tag &tag) 0210 { 0211 tagNotification(Protocol::TagChangeNotification::Modify, tag); 0212 } 0213 0214 void NotificationCollector::tagRemoved(const Tag &tag, const QByteArray &resource, const QString &remoteId) 0215 { 0216 tagNotification(Protocol::TagChangeNotification::Remove, tag, resource, remoteId); 0217 } 0218 0219 void NotificationCollector::relationAdded(const Relation &relation) 0220 { 0221 relationNotification(Protocol::RelationChangeNotification::Add, relation); 0222 } 0223 0224 void NotificationCollector::relationRemoved(const Relation &relation) 0225 { 0226 relationNotification(Protocol::RelationChangeNotification::Remove, relation); 0227 } 0228 0229 void NotificationCollector::clear() 0230 { 0231 mNotifications.clear(); 0232 } 0233 0234 void NotificationCollector::setConnection(Connection *connection) 0235 { 0236 mConnection = connection; 0237 } 0238 0239 void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, 0240 const PimItem &item, 0241 const Collection &collection, 0242 const Collection &collectionDest, 0243 const QByteArray &resource, 0244 const QSet<QByteArray> &parts) 0245 { 0246 PimItem::List items; 0247 items << item; 0248 itemNotification(op, items, collection, collectionDest, resource, parts); 0249 } 0250 0251 void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, 0252 const PimItem::List &items, 0253 const Collection &collection, 0254 const Collection &collectionDest, 0255 const QByteArray &resource, 0256 const QSet<QByteArray> &parts, 0257 const QSet<QByteArray> &addedFlags, 0258 const QSet<QByteArray> &removedFlags, 0259 const QSet<qint64> &addedTags, 0260 const QSet<qint64> &removedTags, 0261 const Relation::List &addedRelations, 0262 const Relation::List &removedRelations) 0263 { 0264 QMap<Entity::Id, QList<PimItem>> vCollections; 0265 0266 if ((op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) 0267 || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::ModifyRelations)) { 0268 vCollections = DataStore::self()->virtualCollections(items); 0269 } 0270 0271 auto msg = Protocol::ItemChangeNotificationPtr::create(); 0272 if (mConnection) { 0273 msg->setSessionId(mConnection->sessionId()); 0274 } 0275 msg->setOperation(op); 0276 0277 msg->setItemParts(parts); 0278 msg->setAddedFlags(addedFlags); 0279 msg->setRemovedFlags(removedFlags); 0280 msg->setAddedTags(addedTags); 0281 msg->setRemovedTags(removedTags); 0282 if (!addedRelations.isEmpty()) { 0283 QSet<Protocol::ItemChangeNotification::Relation> rels; 0284 for (const Relation &rel : addedRelations) { 0285 rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); 0286 } 0287 msg->setAddedRelations(rels); 0288 } 0289 if (!removedRelations.isEmpty()) { 0290 QSet<Protocol::ItemChangeNotification::Relation> rels; 0291 for (const Relation &rel : removedRelations) { 0292 rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); 0293 } 0294 msg->setRemovedRelations(rels); 0295 } 0296 0297 if (collectionDest.isValid()) { 0298 QByteArray destResourceName; 0299 destResourceName = collectionDest.resource().name().toLatin1(); 0300 msg->setDestinationResource(destResourceName); 0301 } 0302 0303 msg->setParentDestCollection(collectionDest.id()); 0304 0305 QList<Protocol::FetchItemsResponse> ntfItems; 0306 for (const PimItem &item : items) { 0307 Protocol::FetchItemsResponse i; 0308 i.setId(item.id()); 0309 i.setRemoteId(item.remoteId()); 0310 i.setRemoteRevision(item.remoteRevision()); 0311 i.setMimeType(item.mimeType().name()); 0312 ntfItems.push_back(std::move(i)); 0313 } 0314 0315 /* Notify all virtual collections the items are linked to. */ 0316 QHash<qint64, Protocol::FetchItemsResponse> virtItems; 0317 for (const auto &ntfItem : ntfItems) { 0318 virtItems.insert(ntfItem.id(), ntfItem); 0319 } 0320 for (auto iter = vCollections.cbegin(), end = vCollections.constEnd(); iter != end; ++iter) { 0321 auto copy = Protocol::ItemChangeNotificationPtr::create(*msg); 0322 QList<Protocol::FetchItemsResponse> items; 0323 items.reserve(iter->size()); 0324 for (const auto &item : std::as_const(*iter)) { 0325 items.append(virtItems.value(item.id())); 0326 } 0327 copy->setItems(items); 0328 copy->setParentCollection(iter.key()); 0329 copy->setResource(resource); 0330 0331 mAkonadi.collectionStatistics().invalidateCollection(Collection::retrieveById(iter.key())); 0332 dispatchNotification(copy); 0333 } 0334 0335 msg->setItems(ntfItems); 0336 0337 Collection col; 0338 if (!collection.isValid()) { 0339 msg->setParentCollection(items.first().collection().id()); 0340 col = items.first().collection(); 0341 } else { 0342 msg->setParentCollection(collection.id()); 0343 col = collection; 0344 } 0345 0346 QByteArray res = resource; 0347 if (res.isEmpty()) { 0348 if (col.resourceId() <= 0) { 0349 col = Collection::retrieveById(col.id()); 0350 } 0351 res = col.resource().name().toLatin1(); 0352 } 0353 msg->setResource(res); 0354 0355 // Add and ModifyFlags are handled incrementally 0356 // (see itemAdded() and itemsFlagsChanged()) 0357 if (msg->operation() != Protocol::ItemChangeNotification::Add && msg->operation() != Protocol::ItemChangeNotification::ModifyFlags) { 0358 mAkonadi.collectionStatistics().invalidateCollection(col); 0359 } 0360 dispatchNotification(msg); 0361 } 0362 0363 void NotificationCollector::collectionNotification(Protocol::CollectionChangeNotification::Operation op, 0364 const Collection &collection, 0365 Collection::Id source, 0366 Collection::Id destination, 0367 const QByteArray &resource, 0368 const QSet<QByteArray> &changes, 0369 const QByteArray &destResource) 0370 { 0371 auto msg = Protocol::CollectionChangeNotificationPtr::create(); 0372 msg->setOperation(op); 0373 if (mConnection) { 0374 msg->setSessionId(mConnection->sessionId()); 0375 } 0376 msg->setParentCollection(source); 0377 msg->setParentDestCollection(destination); 0378 msg->setDestinationResource(destResource); 0379 msg->setChangedParts(changes); 0380 0381 auto msgCollection = HandlerHelper::fetchCollectionsResponse(mAkonadi, collection); 0382 if (auto mgr = mAkonadi.notificationManager()) { 0383 auto fetchScope = mgr->collectionFetchScope(); 0384 // Make sure we have all the data 0385 if (!fetchScope->fetchIdOnly() && msgCollection.name().isEmpty()) { 0386 const auto col = Collection::retrieveById(msgCollection.id()); 0387 const auto mts = col.mimeTypes(); 0388 QStringList mimeTypes; 0389 mimeTypes.reserve(mts.size()); 0390 for (const auto &mt : mts) { 0391 mimeTypes.push_back(mt.name()); 0392 } 0393 msgCollection = HandlerHelper::fetchCollectionsResponse(mAkonadi, col, {}, false, 0, {}, {}, mimeTypes); 0394 } 0395 // Get up-to-date statistics 0396 if (fetchScope->fetchStatistics()) { 0397 Collection col; 0398 col.setId(msgCollection.id()); 0399 const auto stats = mAkonadi.collectionStatistics().statistics(col); 0400 msgCollection.setStatistics(Protocol::FetchCollectionStatsResponse(stats.count, stats.count - stats.read, stats.size)); 0401 } 0402 // Get attributes 0403 const auto requestedAttrs = fetchScope->attributes(); 0404 auto msgColAttrs = msgCollection.attributes(); 0405 // TODO: This assumes that we have either none or all attributes in msgCollection 0406 if (msgColAttrs.isEmpty() && !requestedAttrs.isEmpty()) { 0407 SelectQueryBuilder<CollectionAttribute> qb; 0408 qb.addColumn(CollectionAttribute::typeFullColumnName()); 0409 qb.addColumn(CollectionAttribute::valueFullColumnName()); 0410 qb.addValueCondition(CollectionAttribute::collectionIdFullColumnName(), Query::Equals, msgCollection.id()); 0411 Query::Condition cond(Query::Or); 0412 for (const auto &attr : requestedAttrs) { 0413 cond.addValueCondition(CollectionAttribute::typeFullColumnName(), Query::Equals, attr); 0414 } 0415 qb.addCondition(cond); 0416 if (!qb.exec()) { 0417 qCWarning(AKONADISERVER_LOG) << "NotificationCollector failed to query attributes for Collection" << collection.name() << "(ID" 0418 << collection.id() << ")"; 0419 } 0420 const auto attrs = qb.result(); 0421 for (const auto &attr : attrs) { 0422 msgColAttrs.insert(attr.type(), attr.value()); 0423 } 0424 msgCollection.setAttributes(msgColAttrs); 0425 } 0426 } 0427 msg->setCollection(std::move(msgCollection)); 0428 0429 if (!collection.enabled()) { 0430 msg->addMetadata("DISABLED"); 0431 } 0432 0433 QByteArray res = resource; 0434 if (res.isEmpty()) { 0435 res = collection.resource().name().toLatin1(); 0436 } 0437 msg->setResource(res); 0438 0439 dispatchNotification(msg); 0440 } 0441 0442 void NotificationCollector::tagNotification(Protocol::TagChangeNotification::Operation op, const Tag &tag, const QByteArray &resource, const QString &remoteId) 0443 { 0444 auto msg = Protocol::TagChangeNotificationPtr::create(); 0445 msg->setOperation(op); 0446 if (mConnection) { 0447 msg->setSessionId(mConnection->sessionId()); 0448 } 0449 msg->setResource(resource); 0450 Protocol::FetchTagsResponse msgTag; 0451 msgTag.setId(tag.id()); 0452 msgTag.setRemoteId(remoteId.toUtf8()); 0453 msgTag.setParentId(tag.parentId()); 0454 if (auto mgr = mAkonadi.notificationManager()) { 0455 auto fetchScope = mgr->tagFetchScope(); 0456 if (!fetchScope->fetchIdOnly() && msgTag.gid().isEmpty()) { 0457 msgTag = HandlerHelper::fetchTagsResponse(Tag::retrieveById(msgTag.id()), fetchScope->toFetchScope(), mConnection); 0458 } 0459 0460 const auto requestedAttrs = fetchScope->attributes(); 0461 auto msgTagAttrs = msgTag.attributes(); 0462 if (msgTagAttrs.isEmpty() && !requestedAttrs.isEmpty()) { 0463 SelectQueryBuilder<TagAttribute> qb; 0464 qb.addColumn(TagAttribute::typeFullColumnName()); 0465 qb.addColumn(TagAttribute::valueFullColumnName()); 0466 qb.addValueCondition(TagAttribute::tagIdFullColumnName(), Query::Equals, msgTag.id()); 0467 Query::Condition cond(Query::Or); 0468 for (const auto &attr : requestedAttrs) { 0469 cond.addValueCondition(TagAttribute::typeFullColumnName(), Query::Equals, attr); 0470 } 0471 qb.addCondition(cond); 0472 if (!qb.exec()) { 0473 qCWarning(AKONADISERVER_LOG) << "NotificationCollection failed to query attributes for Tag" << tag.id(); 0474 } 0475 const auto attrs = qb.result(); 0476 for (const auto &attr : attrs) { 0477 msgTagAttrs.insert(attr.type(), attr.value()); 0478 } 0479 msgTag.setAttributes(msgTagAttrs); 0480 } 0481 } 0482 msg->setTag(std::move(msgTag)); 0483 0484 dispatchNotification(msg); 0485 } 0486 0487 void NotificationCollector::relationNotification(Protocol::RelationChangeNotification::Operation op, const Relation &relation) 0488 { 0489 auto msg = Protocol::RelationChangeNotificationPtr::create(); 0490 msg->setOperation(op); 0491 if (mConnection) { 0492 msg->setSessionId(mConnection->sessionId()); 0493 } 0494 msg->setRelation(HandlerHelper::fetchRelationsResponse(relation)); 0495 0496 dispatchNotification(msg); 0497 } 0498 0499 void NotificationCollector::completeNotification(const Protocol::ChangeNotificationPtr &changeMsg) 0500 { 0501 if (changeMsg->type() == Protocol::Command::ItemChangeNotification) { 0502 const auto msg = changeMsg.staticCast<Protocol::ItemChangeNotification>(); 0503 auto const mgr = mAkonadi.notificationManager(); 0504 if (mgr && msg->operation() != Protocol::ItemChangeNotification::Remove) { 0505 if (mDb->inTransaction()) { 0506 qCWarning(AKONADISERVER_LOG) << "NotificationCollector requested FetchHelper from within a transaction." 0507 << "Aborting since this would deadlock!"; 0508 return; 0509 } 0510 auto fetchScope = mgr->itemFetchScope(); 0511 // NOTE: Checking and retrieving missing elements for each Item manually 0512 // here would require a complex code (and I'm too lazy), so instead we simply 0513 // feed the Items to FetchHelper and retrieve them all with the setup from 0514 // the aggregated fetch scope. The worst case is that we re-fetch everything 0515 // we already have, but that's still better than the pre-ntf-payload situation 0516 QList<qint64> ids; 0517 const auto items = msg->items(); 0518 ids.reserve(items.size()); 0519 bool allHaveRID = true; 0520 for (const auto &item : items) { 0521 ids.push_back(item.id()); 0522 allHaveRID &= !item.remoteId().isEmpty(); 0523 } 0524 0525 // FetchHelper may trigger ItemRetriever, which needs RemoteID. If we 0526 // don't have one (maybe because the Resource has not stored it yet, 0527 // we emit a notification without it and leave it up to the Monitor 0528 // to retrieve the Item on demand - we should have a RID stored in 0529 // Akonadi by then. 0530 if (mConnection && (allHaveRID || msg->operation() != Protocol::ItemChangeNotification::Add)) { 0531 // Prevent transactions inside FetchHelper to recursively call our slot 0532 QScopedValueRollback<bool> ignoreTransactions(mIgnoreTransactions); 0533 mIgnoreTransactions = true; 0534 CommandContext context; 0535 auto itemFetchScope = fetchScope->toFetchScope(); 0536 auto tagFetchScope = mgr->tagFetchScope()->toFetchScope(); 0537 itemFetchScope.setFetch(Protocol::ItemFetchScope::CacheOnly); 0538 ItemFetchHelper helper(mConnection, context, Scope(ids), itemFetchScope, tagFetchScope, mAkonadi); 0539 // The Item was just changed, which means the atime was 0540 // updated, no need to do it again a couple milliseconds later. 0541 helper.disableATimeUpdates(); 0542 QList<Protocol::FetchItemsResponse> fetchedItems; 0543 auto callback = [&fetchedItems](Protocol::FetchItemsResponse &&cmd) { 0544 fetchedItems.push_back(std::move(cmd)); 0545 }; 0546 if (helper.fetchItems(std::move(callback))) { 0547 msg->setItems(fetchedItems); 0548 } else { 0549 qCWarning(AKONADISERVER_LOG) << "NotificationCollector railed to retrieve Items for notification!"; 0550 } 0551 } else { 0552 QList<Protocol::FetchItemsResponse> fetchedItems; 0553 for (const auto &item : items) { 0554 Protocol::FetchItemsResponse resp; 0555 resp.setId(item.id()); 0556 resp.setRevision(item.revision()); 0557 resp.setMimeType(item.mimeType()); 0558 resp.setParentId(item.parentId()); 0559 resp.setGid(item.gid()); 0560 resp.setSize(item.size()); 0561 resp.setMTime(item.mTime()); 0562 resp.setFlags(item.flags()); 0563 fetchedItems.push_back(std::move(resp)); 0564 } 0565 msg->setItems(fetchedItems); 0566 msg->setMustRetrieve(true); 0567 } 0568 } 0569 } 0570 } 0571 0572 void NotificationCollector::dispatchNotification(const Protocol::ChangeNotificationPtr &msg) 0573 { 0574 if (!mDb || mDb->inTransaction()) { 0575 if (msg->type() == Protocol::Command::CollectionChangeNotification) { 0576 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg); 0577 } else { 0578 mNotifications.append(msg); 0579 } 0580 } else { 0581 completeNotification(msg); 0582 notify({msg}); 0583 } 0584 } 0585 0586 bool NotificationCollector::dispatchNotifications() 0587 { 0588 if (!mNotifications.isEmpty()) { 0589 for (auto &ntf : mNotifications) { 0590 completeNotification(ntf); 0591 } 0592 notify(std::move(mNotifications)); 0593 clear(); 0594 return true; 0595 } 0596 0597 return false; 0598 } 0599 0600 void NotificationCollector::notify(Protocol::ChangeNotificationList &&msgs) 0601 { 0602 if (auto mgr = mAkonadi.notificationManager()) { 0603 QMetaObject::invokeMethod(mgr, "slotNotify", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationList, msgs)); 0604 } 0605 }