File indexing completed on 2025-01-05 04:46:59

0001 /*
0002     SPDX-FileCopyrightText: 2006-2007 Volker Krause <vkrause@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "notificationcollector.h"
0008 #include "aggregatedfetchscope.h"
0009 #include "akonadi.h"
0010 #include "cachecleaner.h"
0011 #include "connection.h"
0012 #include "handler/itemfetchhelper.h"
0013 #include "handlerhelper.h"
0014 #include "intervalcheck.h"
0015 #include "notificationmanager.h"
0016 #include "search/searchmanager.h"
0017 #include "selectquerybuilder.h"
0018 #include "shared/akranges.h"
0019 #include "storage/collectionstatistics.h"
0020 #include "storage/datastore.h"
0021 #include "storage/entity.h"
0022 
0023 #include "akonadiserver_debug.h"
0024 
0025 #include <QScopedValueRollback>
0026 
0027 using namespace Akonadi;
0028 using namespace Akonadi::Server;
0029 
0030 NotificationCollector::NotificationCollector(AkonadiServer &akonadi, DataStore *db)
0031     : mDb(db)
0032     , mAkonadi(akonadi)
0033 {
0034     QObject::connect(db, &DataStore::transactionCommitted, db, [this]() {
0035         if (!mIgnoreTransactions) {
0036             dispatchNotifications();
0037         }
0038     });
0039     QObject::connect(db, &DataStore::transactionRolledBack, db, [this]() {
0040         if (!mIgnoreTransactions) {
0041             clear();
0042         }
0043     });
0044 }
0045 
0046 void NotificationCollector::itemAdded(const PimItem &item, bool seen, const Collection &collection, const QByteArray &resource)
0047 {
0048     mAkonadi.searchManager().scheduleSearchUpdate();
0049     mAkonadi.collectionStatistics().itemAdded(collection, item.size(), seen);
0050     itemNotification(Protocol::ItemChangeNotification::Add, item, collection, Collection(), resource);
0051 }
0052 
0053 void NotificationCollector::itemChanged(const PimItem &item, const QSet<QByteArray> &changedParts, const Collection &collection, const QByteArray &resource)
0054 {
0055     mAkonadi.searchManager().scheduleSearchUpdate();
0056     itemNotification(Protocol::ItemChangeNotification::Modify, item, collection, Collection(), resource, changedParts);
0057 }
0058 
0059 void NotificationCollector::itemsFlagsChanged(const PimItem::List &items,
0060                                               const QSet<QByteArray> &addedFlags,
0061                                               const QSet<QByteArray> &removedFlags,
0062                                               const Collection &collection,
0063                                               const QByteArray &resource)
0064 {
0065     int seenCount = (addedFlags.contains(AKONADI_FLAG_SEEN) || addedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0);
0066     seenCount -= (removedFlags.contains(AKONADI_FLAG_SEEN) || removedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0);
0067 
0068     mAkonadi.collectionStatistics().itemsSeenChanged(collection, seenCount);
0069     itemNotification(Protocol::ItemChangeNotification::ModifyFlags, items, collection, Collection(), resource, QSet<QByteArray>(), addedFlags, removedFlags);
0070 }
0071 
0072 void NotificationCollector::itemsTagsChanged(const PimItem::List &items,
0073                                              const QSet<qint64> &addedTags,
0074                                              const QSet<qint64> &removedTags,
0075                                              const Collection &collection,
0076                                              const QByteArray &resource)
0077 {
0078     itemNotification(Protocol::ItemChangeNotification::ModifyTags,
0079                      items,
0080                      collection,
0081                      Collection(),
0082                      resource,
0083                      QSet<QByteArray>(),
0084                      QSet<QByteArray>(),
0085                      QSet<QByteArray>(),
0086                      addedTags,
0087                      removedTags);
0088 }
0089 
0090 void NotificationCollector::itemsRelationsChanged(const PimItem::List &items,
0091                                                   const Relation::List &addedRelations,
0092                                                   const Relation::List &removedRelations,
0093                                                   const Collection &collection,
0094                                                   const QByteArray &resource)
0095 {
0096     itemNotification(Protocol::ItemChangeNotification::ModifyRelations,
0097                      items,
0098                      collection,
0099                      Collection(),
0100                      resource,
0101                      QSet<QByteArray>(),
0102                      QSet<QByteArray>(),
0103                      QSet<QByteArray>(),
0104                      QSet<qint64>(),
0105                      QSet<qint64>(),
0106                      addedRelations,
0107                      removedRelations);
0108 }
0109 
0110 void NotificationCollector::itemsMoved(const PimItem::List &items,
0111                                        const Collection &collectionSrc,
0112                                        const Collection &collectionDest,
0113                                        const QByteArray &sourceResource)
0114 {
0115     mAkonadi.searchManager().scheduleSearchUpdate();
0116     itemNotification(Protocol::ItemChangeNotification::Move, items, collectionSrc, collectionDest, sourceResource);
0117 }
0118 
0119 void NotificationCollector::itemsRemoved(const PimItem::List &items, const Collection &collection, const QByteArray &resource)
0120 {
0121     itemNotification(Protocol::ItemChangeNotification::Remove, items, collection, Collection(), resource);
0122 }
0123 
0124 void NotificationCollector::itemsLinked(const PimItem::List &items, const Collection &collection)
0125 {
0126     itemNotification(Protocol::ItemChangeNotification::Link, items, collection, Collection(), QByteArray());
0127 }
0128 
0129 void NotificationCollector::itemsUnlinked(const PimItem::List &items, const Collection &collection)
0130 {
0131     itemNotification(Protocol::ItemChangeNotification::Unlink, items, collection, Collection(), QByteArray());
0132 }
0133 
0134 void NotificationCollector::collectionAdded(const Collection &collection, const QByteArray &resource)
0135 {
0136     if (auto cleaner = mAkonadi.cacheCleaner()) {
0137         cleaner->collectionAdded(collection.id());
0138     }
0139     mAkonadi.intervalChecker().collectionAdded(collection.id());
0140     collectionNotification(Protocol::CollectionChangeNotification::Add, collection, collection.parentId(), -1, resource);
0141 }
0142 
0143 void NotificationCollector::collectionChanged(const Collection &collection, const QList<QByteArray> &changes, const QByteArray &resource)
0144 {
0145     if (auto cleaner = mAkonadi.cacheCleaner()) {
0146         cleaner->collectionChanged(collection.id());
0147     }
0148     mAkonadi.intervalChecker().collectionChanged(collection.id());
0149     if (changes.contains(AKONADI_PARAM_ENABLED)) {
0150         mAkonadi.collectionStatistics().invalidateCollection(collection);
0151     }
0152     collectionNotification(Protocol::CollectionChangeNotification::Modify,
0153                            collection,
0154                            collection.parentId(),
0155                            -1,
0156                            resource,
0157                            changes | AkRanges::Actions::toQSet);
0158 }
0159 
0160 void NotificationCollector::collectionMoved(const Collection &collection, const Collection &source, const QByteArray &resource, const QByteArray &destResource)
0161 {
0162     if (auto cleaner = mAkonadi.cacheCleaner()) {
0163         cleaner->collectionChanged(collection.id());
0164     }
0165     mAkonadi.intervalChecker().collectionChanged(collection.id());
0166     collectionNotification(Protocol::CollectionChangeNotification::Move,
0167                            collection,
0168                            source.id(),
0169                            collection.parentId(),
0170                            resource,
0171                            QSet<QByteArray>(),
0172                            destResource);
0173 }
0174 
0175 void NotificationCollector::collectionRemoved(const Collection &collection, const QByteArray &resource)
0176 {
0177     if (auto cleaner = mAkonadi.cacheCleaner()) {
0178         cleaner->collectionRemoved(collection.id());
0179     }
0180     mAkonadi.intervalChecker().collectionRemoved(collection.id());
0181     mAkonadi.collectionStatistics().invalidateCollection(collection);
0182     collectionNotification(Protocol::CollectionChangeNotification::Remove, collection, collection.parentId(), -1, resource);
0183 }
0184 
0185 void NotificationCollector::collectionSubscribed(const Collection &collection, const QByteArray &resource)
0186 {
0187     if (auto cleaner = mAkonadi.cacheCleaner()) {
0188         cleaner->collectionAdded(collection.id());
0189     }
0190     mAkonadi.intervalChecker().collectionAdded(collection.id());
0191     collectionNotification(Protocol::CollectionChangeNotification::Subscribe, collection, collection.parentId(), -1, resource, QSet<QByteArray>());
0192 }
0193 
0194 void NotificationCollector::collectionUnsubscribed(const Collection &collection, const QByteArray &resource)
0195 {
0196     if (auto cleaner = mAkonadi.cacheCleaner()) {
0197         cleaner->collectionRemoved(collection.id());
0198     }
0199     mAkonadi.intervalChecker().collectionRemoved(collection.id());
0200     mAkonadi.collectionStatistics().invalidateCollection(collection);
0201     collectionNotification(Protocol::CollectionChangeNotification::Unsubscribe, collection, collection.parentId(), -1, resource, QSet<QByteArray>());
0202 }
0203 
0204 void NotificationCollector::tagAdded(const Tag &tag)
0205 {
0206     tagNotification(Protocol::TagChangeNotification::Add, tag);
0207 }
0208 
0209 void NotificationCollector::tagChanged(const Tag &tag)
0210 {
0211     tagNotification(Protocol::TagChangeNotification::Modify, tag);
0212 }
0213 
0214 void NotificationCollector::tagRemoved(const Tag &tag, const QByteArray &resource, const QString &remoteId)
0215 {
0216     tagNotification(Protocol::TagChangeNotification::Remove, tag, resource, remoteId);
0217 }
0218 
0219 void NotificationCollector::relationAdded(const Relation &relation)
0220 {
0221     relationNotification(Protocol::RelationChangeNotification::Add, relation);
0222 }
0223 
0224 void NotificationCollector::relationRemoved(const Relation &relation)
0225 {
0226     relationNotification(Protocol::RelationChangeNotification::Remove, relation);
0227 }
0228 
0229 void NotificationCollector::clear()
0230 {
0231     mNotifications.clear();
0232 }
0233 
0234 void NotificationCollector::setConnection(Connection *connection)
0235 {
0236     mConnection = connection;
0237 }
0238 
0239 void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op,
0240                                              const PimItem &item,
0241                                              const Collection &collection,
0242                                              const Collection &collectionDest,
0243                                              const QByteArray &resource,
0244                                              const QSet<QByteArray> &parts)
0245 {
0246     PimItem::List items;
0247     items << item;
0248     itemNotification(op, items, collection, collectionDest, resource, parts);
0249 }
0250 
0251 void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op,
0252                                              const PimItem::List &items,
0253                                              const Collection &collection,
0254                                              const Collection &collectionDest,
0255                                              const QByteArray &resource,
0256                                              const QSet<QByteArray> &parts,
0257                                              const QSet<QByteArray> &addedFlags,
0258                                              const QSet<QByteArray> &removedFlags,
0259                                              const QSet<qint64> &addedTags,
0260                                              const QSet<qint64> &removedTags,
0261                                              const Relation::List &addedRelations,
0262                                              const Relation::List &removedRelations)
0263 {
0264     QMap<Entity::Id, QList<PimItem>> vCollections;
0265 
0266     if ((op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags)
0267         || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::ModifyRelations)) {
0268         vCollections = DataStore::self()->virtualCollections(items);
0269     }
0270 
0271     auto msg = Protocol::ItemChangeNotificationPtr::create();
0272     if (mConnection) {
0273         msg->setSessionId(mConnection->sessionId());
0274     }
0275     msg->setOperation(op);
0276 
0277     msg->setItemParts(parts);
0278     msg->setAddedFlags(addedFlags);
0279     msg->setRemovedFlags(removedFlags);
0280     msg->setAddedTags(addedTags);
0281     msg->setRemovedTags(removedTags);
0282     if (!addedRelations.isEmpty()) {
0283         QSet<Protocol::ItemChangeNotification::Relation> rels;
0284         for (const Relation &rel : addedRelations) {
0285             rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name()));
0286         }
0287         msg->setAddedRelations(rels);
0288     }
0289     if (!removedRelations.isEmpty()) {
0290         QSet<Protocol::ItemChangeNotification::Relation> rels;
0291         for (const Relation &rel : removedRelations) {
0292             rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name()));
0293         }
0294         msg->setRemovedRelations(rels);
0295     }
0296 
0297     if (collectionDest.isValid()) {
0298         QByteArray destResourceName;
0299         destResourceName = collectionDest.resource().name().toLatin1();
0300         msg->setDestinationResource(destResourceName);
0301     }
0302 
0303     msg->setParentDestCollection(collectionDest.id());
0304 
0305     QList<Protocol::FetchItemsResponse> ntfItems;
0306     for (const PimItem &item : items) {
0307         Protocol::FetchItemsResponse i;
0308         i.setId(item.id());
0309         i.setRemoteId(item.remoteId());
0310         i.setRemoteRevision(item.remoteRevision());
0311         i.setMimeType(item.mimeType().name());
0312         ntfItems.push_back(std::move(i));
0313     }
0314 
0315     /* Notify all virtual collections the items are linked to. */
0316     QHash<qint64, Protocol::FetchItemsResponse> virtItems;
0317     for (const auto &ntfItem : ntfItems) {
0318         virtItems.insert(ntfItem.id(), ntfItem);
0319     }
0320     for (auto iter = vCollections.cbegin(), end = vCollections.constEnd(); iter != end; ++iter) {
0321         auto copy = Protocol::ItemChangeNotificationPtr::create(*msg);
0322         QList<Protocol::FetchItemsResponse> items;
0323         items.reserve(iter->size());
0324         for (const auto &item : std::as_const(*iter)) {
0325             items.append(virtItems.value(item.id()));
0326         }
0327         copy->setItems(items);
0328         copy->setParentCollection(iter.key());
0329         copy->setResource(resource);
0330 
0331         mAkonadi.collectionStatistics().invalidateCollection(Collection::retrieveById(iter.key()));
0332         dispatchNotification(copy);
0333     }
0334 
0335     msg->setItems(ntfItems);
0336 
0337     Collection col;
0338     if (!collection.isValid()) {
0339         msg->setParentCollection(items.first().collection().id());
0340         col = items.first().collection();
0341     } else {
0342         msg->setParentCollection(collection.id());
0343         col = collection;
0344     }
0345 
0346     QByteArray res = resource;
0347     if (res.isEmpty()) {
0348         if (col.resourceId() <= 0) {
0349             col = Collection::retrieveById(col.id());
0350         }
0351         res = col.resource().name().toLatin1();
0352     }
0353     msg->setResource(res);
0354 
0355     // Add and ModifyFlags are handled incrementally
0356     // (see itemAdded() and itemsFlagsChanged())
0357     if (msg->operation() != Protocol::ItemChangeNotification::Add && msg->operation() != Protocol::ItemChangeNotification::ModifyFlags) {
0358         mAkonadi.collectionStatistics().invalidateCollection(col);
0359     }
0360     dispatchNotification(msg);
0361 }
0362 
0363 void NotificationCollector::collectionNotification(Protocol::CollectionChangeNotification::Operation op,
0364                                                    const Collection &collection,
0365                                                    Collection::Id source,
0366                                                    Collection::Id destination,
0367                                                    const QByteArray &resource,
0368                                                    const QSet<QByteArray> &changes,
0369                                                    const QByteArray &destResource)
0370 {
0371     auto msg = Protocol::CollectionChangeNotificationPtr::create();
0372     msg->setOperation(op);
0373     if (mConnection) {
0374         msg->setSessionId(mConnection->sessionId());
0375     }
0376     msg->setParentCollection(source);
0377     msg->setParentDestCollection(destination);
0378     msg->setDestinationResource(destResource);
0379     msg->setChangedParts(changes);
0380 
0381     auto msgCollection = HandlerHelper::fetchCollectionsResponse(mAkonadi, collection);
0382     if (auto mgr = mAkonadi.notificationManager()) {
0383         auto fetchScope = mgr->collectionFetchScope();
0384         // Make sure we have all the data
0385         if (!fetchScope->fetchIdOnly() && msgCollection.name().isEmpty()) {
0386             const auto col = Collection::retrieveById(msgCollection.id());
0387             const auto mts = col.mimeTypes();
0388             QStringList mimeTypes;
0389             mimeTypes.reserve(mts.size());
0390             for (const auto &mt : mts) {
0391                 mimeTypes.push_back(mt.name());
0392             }
0393             msgCollection = HandlerHelper::fetchCollectionsResponse(mAkonadi, col, {}, false, 0, {}, {}, mimeTypes);
0394         }
0395         // Get up-to-date statistics
0396         if (fetchScope->fetchStatistics()) {
0397             Collection col;
0398             col.setId(msgCollection.id());
0399             const auto stats = mAkonadi.collectionStatistics().statistics(col);
0400             msgCollection.setStatistics(Protocol::FetchCollectionStatsResponse(stats.count, stats.count - stats.read, stats.size));
0401         }
0402         // Get attributes
0403         const auto requestedAttrs = fetchScope->attributes();
0404         auto msgColAttrs = msgCollection.attributes();
0405         // TODO: This assumes that we have either none or all attributes in msgCollection
0406         if (msgColAttrs.isEmpty() && !requestedAttrs.isEmpty()) {
0407             SelectQueryBuilder<CollectionAttribute> qb;
0408             qb.addColumn(CollectionAttribute::typeFullColumnName());
0409             qb.addColumn(CollectionAttribute::valueFullColumnName());
0410             qb.addValueCondition(CollectionAttribute::collectionIdFullColumnName(), Query::Equals, msgCollection.id());
0411             Query::Condition cond(Query::Or);
0412             for (const auto &attr : requestedAttrs) {
0413                 cond.addValueCondition(CollectionAttribute::typeFullColumnName(), Query::Equals, attr);
0414             }
0415             qb.addCondition(cond);
0416             if (!qb.exec()) {
0417                 qCWarning(AKONADISERVER_LOG) << "NotificationCollector failed to query attributes for Collection" << collection.name() << "(ID"
0418                                              << collection.id() << ")";
0419             }
0420             const auto attrs = qb.result();
0421             for (const auto &attr : attrs) {
0422                 msgColAttrs.insert(attr.type(), attr.value());
0423             }
0424             msgCollection.setAttributes(msgColAttrs);
0425         }
0426     }
0427     msg->setCollection(std::move(msgCollection));
0428 
0429     if (!collection.enabled()) {
0430         msg->addMetadata("DISABLED");
0431     }
0432 
0433     QByteArray res = resource;
0434     if (res.isEmpty()) {
0435         res = collection.resource().name().toLatin1();
0436     }
0437     msg->setResource(res);
0438 
0439     dispatchNotification(msg);
0440 }
0441 
0442 void NotificationCollector::tagNotification(Protocol::TagChangeNotification::Operation op, const Tag &tag, const QByteArray &resource, const QString &remoteId)
0443 {
0444     auto msg = Protocol::TagChangeNotificationPtr::create();
0445     msg->setOperation(op);
0446     if (mConnection) {
0447         msg->setSessionId(mConnection->sessionId());
0448     }
0449     msg->setResource(resource);
0450     Protocol::FetchTagsResponse msgTag;
0451     msgTag.setId(tag.id());
0452     msgTag.setRemoteId(remoteId.toUtf8());
0453     msgTag.setParentId(tag.parentId());
0454     if (auto mgr = mAkonadi.notificationManager()) {
0455         auto fetchScope = mgr->tagFetchScope();
0456         if (!fetchScope->fetchIdOnly() && msgTag.gid().isEmpty()) {
0457             msgTag = HandlerHelper::fetchTagsResponse(Tag::retrieveById(msgTag.id()), fetchScope->toFetchScope(), mConnection);
0458         }
0459 
0460         const auto requestedAttrs = fetchScope->attributes();
0461         auto msgTagAttrs = msgTag.attributes();
0462         if (msgTagAttrs.isEmpty() && !requestedAttrs.isEmpty()) {
0463             SelectQueryBuilder<TagAttribute> qb;
0464             qb.addColumn(TagAttribute::typeFullColumnName());
0465             qb.addColumn(TagAttribute::valueFullColumnName());
0466             qb.addValueCondition(TagAttribute::tagIdFullColumnName(), Query::Equals, msgTag.id());
0467             Query::Condition cond(Query::Or);
0468             for (const auto &attr : requestedAttrs) {
0469                 cond.addValueCondition(TagAttribute::typeFullColumnName(), Query::Equals, attr);
0470             }
0471             qb.addCondition(cond);
0472             if (!qb.exec()) {
0473                 qCWarning(AKONADISERVER_LOG) << "NotificationCollection failed to query attributes for Tag" << tag.id();
0474             }
0475             const auto attrs = qb.result();
0476             for (const auto &attr : attrs) {
0477                 msgTagAttrs.insert(attr.type(), attr.value());
0478             }
0479             msgTag.setAttributes(msgTagAttrs);
0480         }
0481     }
0482     msg->setTag(std::move(msgTag));
0483 
0484     dispatchNotification(msg);
0485 }
0486 
0487 void NotificationCollector::relationNotification(Protocol::RelationChangeNotification::Operation op, const Relation &relation)
0488 {
0489     auto msg = Protocol::RelationChangeNotificationPtr::create();
0490     msg->setOperation(op);
0491     if (mConnection) {
0492         msg->setSessionId(mConnection->sessionId());
0493     }
0494     msg->setRelation(HandlerHelper::fetchRelationsResponse(relation));
0495 
0496     dispatchNotification(msg);
0497 }
0498 
0499 void NotificationCollector::completeNotification(const Protocol::ChangeNotificationPtr &changeMsg)
0500 {
0501     if (changeMsg->type() == Protocol::Command::ItemChangeNotification) {
0502         const auto msg = changeMsg.staticCast<Protocol::ItemChangeNotification>();
0503         auto const mgr = mAkonadi.notificationManager();
0504         if (mgr && msg->operation() != Protocol::ItemChangeNotification::Remove) {
0505             if (mDb->inTransaction()) {
0506                 qCWarning(AKONADISERVER_LOG) << "NotificationCollector requested FetchHelper from within a transaction."
0507                                              << "Aborting since this would deadlock!";
0508                 return;
0509             }
0510             auto fetchScope = mgr->itemFetchScope();
0511             // NOTE: Checking and retrieving missing elements for each Item manually
0512             // here would require a complex code (and I'm too lazy), so instead we simply
0513             // feed the Items to FetchHelper and retrieve them all with the setup from
0514             // the aggregated fetch scope. The worst case is that we re-fetch everything
0515             // we already have, but that's still better than the pre-ntf-payload situation
0516             QList<qint64> ids;
0517             const auto items = msg->items();
0518             ids.reserve(items.size());
0519             bool allHaveRID = true;
0520             for (const auto &item : items) {
0521                 ids.push_back(item.id());
0522                 allHaveRID &= !item.remoteId().isEmpty();
0523             }
0524 
0525             // FetchHelper may trigger ItemRetriever, which needs RemoteID. If we
0526             // don't have one (maybe because the Resource has not stored it yet,
0527             // we emit a notification without it and leave it up to the Monitor
0528             // to retrieve the Item on demand - we should have a RID stored in
0529             // Akonadi by then.
0530             if (mConnection && (allHaveRID || msg->operation() != Protocol::ItemChangeNotification::Add)) {
0531                 // Prevent transactions inside FetchHelper to recursively call our slot
0532                 QScopedValueRollback<bool> ignoreTransactions(mIgnoreTransactions);
0533                 mIgnoreTransactions = true;
0534                 CommandContext context;
0535                 auto itemFetchScope = fetchScope->toFetchScope();
0536                 auto tagFetchScope = mgr->tagFetchScope()->toFetchScope();
0537                 itemFetchScope.setFetch(Protocol::ItemFetchScope::CacheOnly);
0538                 ItemFetchHelper helper(mConnection, context, Scope(ids), itemFetchScope, tagFetchScope, mAkonadi);
0539                 // The Item was just changed, which means the atime was
0540                 // updated, no need to do it again a couple milliseconds later.
0541                 helper.disableATimeUpdates();
0542                 QList<Protocol::FetchItemsResponse> fetchedItems;
0543                 auto callback = [&fetchedItems](Protocol::FetchItemsResponse &&cmd) {
0544                     fetchedItems.push_back(std::move(cmd));
0545                 };
0546                 if (helper.fetchItems(std::move(callback))) {
0547                     msg->setItems(fetchedItems);
0548                 } else {
0549                     qCWarning(AKONADISERVER_LOG) << "NotificationCollector railed to retrieve Items for notification!";
0550                 }
0551             } else {
0552                 QList<Protocol::FetchItemsResponse> fetchedItems;
0553                 for (const auto &item : items) {
0554                     Protocol::FetchItemsResponse resp;
0555                     resp.setId(item.id());
0556                     resp.setRevision(item.revision());
0557                     resp.setMimeType(item.mimeType());
0558                     resp.setParentId(item.parentId());
0559                     resp.setGid(item.gid());
0560                     resp.setSize(item.size());
0561                     resp.setMTime(item.mTime());
0562                     resp.setFlags(item.flags());
0563                     fetchedItems.push_back(std::move(resp));
0564                 }
0565                 msg->setItems(fetchedItems);
0566                 msg->setMustRetrieve(true);
0567             }
0568         }
0569     }
0570 }
0571 
0572 void NotificationCollector::dispatchNotification(const Protocol::ChangeNotificationPtr &msg)
0573 {
0574     if (!mDb || mDb->inTransaction()) {
0575         if (msg->type() == Protocol::Command::CollectionChangeNotification) {
0576             Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
0577         } else {
0578             mNotifications.append(msg);
0579         }
0580     } else {
0581         completeNotification(msg);
0582         notify({msg});
0583     }
0584 }
0585 
0586 bool NotificationCollector::dispatchNotifications()
0587 {
0588     if (!mNotifications.isEmpty()) {
0589         for (auto &ntf : mNotifications) {
0590             completeNotification(ntf);
0591         }
0592         notify(std::move(mNotifications));
0593         clear();
0594         return true;
0595     }
0596 
0597     return false;
0598 }
0599 
0600 void NotificationCollector::notify(Protocol::ChangeNotificationList &&msgs)
0601 {
0602     if (auto mgr = mAkonadi.notificationManager()) {
0603         QMetaObject::invokeMethod(mgr, "slotNotify", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationList, msgs));
0604     }
0605 }