File indexing completed on 2024-06-02 05:20:58

0001 /*
0002     SPDX-FileCopyrightText: 2015-2020 Krzysztof Nowicki <krissn@op.pl>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "ewsfetchitemsjob.h"
0008 
0009 #include <KLocalizedString>
0010 
0011 #include <Akonadi/AgentBase>
0012 #include <Akonadi/ItemFetchScope>
0013 
0014 #include "ewsclient.h"
0015 #include "ewsfetchitemdetailjob.h"
0016 #include "ewsitemhandler.h"
0017 #include "ewsresource.h"
0018 #include "ewsresource_debug.h"
0019 #include "ewssyncfolderitemsrequest.h"
0020 #include "tags/ewsakonaditagssyncjob.h"
0021 #include "tags/ewstagstore.h"
0022 
0023 using namespace Akonadi;
0024 
0025 static constexpr int listBatchSize = 100;
0026 static constexpr int fetchBatchSize = 50;
0027 
0028 /**
0029  * The fetch items job is processed in two stages.
0030  *
0031  * The first stage is to query the list of messages on the remote and local sides. For this purpose
0032  * an EwsSyncFolderItemsRequest is started to retrieve remote items (list of ids only) and an ItemFetchJob
0033  * is started to fetch local items (from cache only). Both of these jobs are started simultaneously.
0034  *
0035  * The second stage begins when both item list query jobs have finished. The goal of this stage is
0036  * to determine a list of items to fetch more details for. Since the EwsSyncFolderItemsRequest can
0037  * retrieve incremental information further processing depends on whether the sync request was a
0038  * full sync (no sync state) or an incremental sync.
0039  *
0040  * In case of a full sync both item lists are compared to determine lists of new/changed items and
0041  * deleted items. For an incremental sync there is no need to compare as the lists of
0042  * added/changed/deleted items are already given. 'IsRead' flag changes changes are treated
0043  * specially - the modification is trivial and is performed straight away without fetching item
0044  * details.
0045  *
0046  * The list of new/changed items is then used to perform a second remote request in order to
0047  * retrieve the details of these items. For e-mail items the second fetch only retrieves the
0048  * item headers. For other items the full MIME content is fetched.
0049  *
0050  * In case of an incremental sync the compare code checks if items marked as 'changed' or 'deleted'
0051  * exist in Akonadi database. If not an error is raised. This serves as an information to the
0052  * resource class that an incremental sync has failed due to an out-of-sync state and a full sync
0053  * is required to bring thing back to order.
0054  *
0055  * In addition to a regular sync it is sometimes necessary to check for existence of some specific
0056  * items. This happens when some operation failed and the resource tries to work its way around to
0057  * get Akonadi into a synchronous state after the failure. For this purpose the caller can provide
0058  * a list of item identifiers to look for besides the regular sync. If this list contains elements
0059  * another request (EwsGetItemRequest) is issued in parallel for the selected items. Once this
0060  * request returns the list is cross-checked with the server responses. If an item is found on the
0061  * server side it is added to the "new items" list. Otherwise it is added to the "deleted items"
0062  * list. In the latter case the code for determining if the deleted item is in the Akonadi database
0063  * will not raise an error for such deleted item. This helps to avoid unnecessary full syncs.
0064  *
0065  * The fetch item job also allows providing a list of expected item changes. This is necessary
0066  * because the incremental sync will return events for operations made by the resource itself. This
0067  * can lead to false failures when for example an item was deleted (i.e. does not exist in Akonadi
0068  * any more) and the incremental sync returns a delete event for this item. Typically this would
0069  * result in an error and force a full sync. Providing this list allows for this particular error
0070  * to be safely ignored.
0071  */
0072 
0073 EwsFetchItemsJob::EwsFetchItemsJob(const Collection &collection,
0074                                    EwsClient &client,
0075                                    const QString &syncState,
0076                                    const EwsId::List &itemsToCheck,
0077                                    EwsTagStore *tagStore,
0078                                    EwsResource *parent)
0079     : EwsJob(parent)
0080     , mCollection(collection)
0081     , mClient(client)
0082     , mItemsToCheck(itemsToCheck)
0083     , mPendingJobs(0)
0084     , mTotalItemsToFetch(0)
0085     , mTotalItemsFetched(0)
0086     , mSyncState(syncState)
0087     , mFullSync(syncState.isNull())
0088     , mTagStore(tagStore)
0089     , mTagsSynced(false)
0090 {
0091     qRegisterMetaType<EwsId::List>();
0092 }
0093 
0094 EwsFetchItemsJob::~EwsFetchItemsJob() = default;
0095 
0096 void EwsFetchItemsJob::start()
0097 {
0098     Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 item list", mCollection.name()));
0099     Q_EMIT reportPercent(0);
0100 
0101     /* Begin stage 1 - query item list from local and remote side. */
0102     auto syncItemsReq = new EwsSyncFolderItemsRequest(mClient, this);
0103     syncItemsReq->setFolderId(EwsId(mCollection.remoteId(), mCollection.remoteRevision()));
0104     EwsItemShape shape(EwsShapeIdOnly);
0105     shape << EwsResource::tagsProperty;
0106     syncItemsReq->setItemShape(shape);
0107     if (!mSyncState.isNull()) {
0108         syncItemsReq->setSyncState(mSyncState);
0109     }
0110     syncItemsReq->setMaxChanges(listBatchSize);
0111     connect(syncItemsReq, &EwsSyncFolderItemsRequest::result, this, &EwsFetchItemsJob::remoteItemFetchDone);
0112     addSubjob(syncItemsReq);
0113 
0114     auto itemJob = new ItemFetchJob(mCollection);
0115     ItemFetchScope itemScope;
0116     itemScope.setCacheOnly(true);
0117     itemScope.fetchFullPayload(false);
0118     itemJob->setFetchScope(itemScope);
0119     connect(itemJob, &ItemFetchJob::result, this, &EwsFetchItemsJob::localItemFetchDone);
0120     addSubjob(itemJob);
0121 
0122     mPendingJobs = 2;
0123     syncItemsReq->start();
0124     itemJob->start();
0125 
0126     if (!mItemsToCheck.isEmpty()) {
0127         auto getItemReq = new EwsGetItemRequest(mClient, this);
0128         getItemReq->setItemIds(mItemsToCheck);
0129         getItemReq->setItemShape(EwsItemShape(EwsShapeIdOnly));
0130         connect(getItemReq, &EwsGetItemRequest::result, this, &EwsFetchItemsJob::checkedItemsFetchFinished);
0131         ++mPendingJobs;
0132         getItemReq->start();
0133     }
0134 }
0135 
0136 void EwsFetchItemsJob::localItemFetchDone(KJob *job)
0137 {
0138     auto fetchJob = qobject_cast<ItemFetchJob *>(job);
0139 
0140     qCDebug(EWSRES_LOG) << "EwsFetchItemsJob::localItemFetchDone";
0141     if (!fetchJob) {
0142         setErrorMsg(QStringLiteral("Invalid item fetch job pointer."));
0143         doKill();
0144         emitResult();
0145         return;
0146     }
0147 
0148     if (!fetchJob->error()) {
0149         removeSubjob(job);
0150         mLocalItems = fetchJob->items();
0151         --mPendingJobs;
0152         if (mPendingJobs == 0) {
0153             compareItemLists();
0154         }
0155     }
0156 }
0157 
0158 void EwsFetchItemsJob::remoteItemFetchDone(KJob *job)
0159 {
0160     auto itemReq = qobject_cast<EwsSyncFolderItemsRequest *>(job);
0161 
0162     qCDebug(EWSRES_LOG) << "EwsFetchItemsJob::remoteItemFetchDone";
0163     if (!itemReq) {
0164         setErrorMsg(QStringLiteral("Invalid find item request pointer."));
0165         doKill();
0166         emitResult();
0167         return;
0168     }
0169 
0170     if (!itemReq->error()) {
0171         removeSubjob(job);
0172         const auto reqChanges{itemReq->changes()};
0173         for (const EwsSyncFolderItemsRequest::Change &change : reqChanges) {
0174             switch (change.type()) {
0175             case EwsSyncFolderItemsRequest::Create:
0176                 mRemoteAddedItems.append(change.item());
0177                 break;
0178             case EwsSyncFolderItemsRequest::Update:
0179                 mRemoteChangedItems.append(change.item());
0180                 break;
0181             case EwsSyncFolderItemsRequest::Delete:
0182                 mRemoteDeletedIds.append(change.itemId());
0183                 break;
0184             case EwsSyncFolderItemsRequest::ReadFlagChange:
0185                 mRemoteFlagChangedIds.insert(change.itemId(), change.isRead());
0186                 break;
0187             default:
0188                 break;
0189             }
0190         }
0191 
0192         if (!itemReq->includesLastItem()) {
0193             auto syncItemsReq = new EwsSyncFolderItemsRequest(mClient, this);
0194             syncItemsReq->setFolderId(EwsId(mCollection.remoteId(), mCollection.remoteRevision()));
0195             EwsItemShape shape(EwsShapeIdOnly);
0196             shape << EwsResource::tagsProperty;
0197             syncItemsReq->setItemShape(shape);
0198             syncItemsReq->setSyncState(itemReq->syncState());
0199             syncItemsReq->setMaxChanges(listBatchSize);
0200             connect(syncItemsReq, &KJob::result, this, &EwsFetchItemsJob::remoteItemFetchDone);
0201             addSubjob(syncItemsReq);
0202             syncItemsReq->start();
0203         } else {
0204             mSyncState = itemReq->syncState();
0205             --mPendingJobs;
0206             if (mPendingJobs == 0) {
0207                 compareItemLists();
0208             }
0209         }
0210         const auto totalItems = mRemoteAddedItems.size() + mRemoteChangedItems.size() + mRemoteDeletedIds.size() + mRemoteFlagChangedIds.size();
0211         if (!mLocalItems.empty()) {
0212             Q_EMIT reportPercent(qMin(totalItems * 50 / mLocalItems.size(), 50));
0213         }
0214         Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 item list (%2 items)", mCollection.name(), totalItems));
0215     } else {
0216         setEwsResponseCode(itemReq->ewsResponseCode());
0217         qCWarningNC(EWSRES_LOG) << QStringLiteral("EwsFetchItemsJob: Failed remote item sync");
0218     }
0219 }
0220 
0221 void EwsFetchItemsJob::checkedItemsFetchFinished(KJob *job)
0222 {
0223     auto req = qobject_cast<EwsGetItemRequest *>(job);
0224 
0225     if (!req) {
0226         setErrorMsg(QStringLiteral("Invalid item fetch job pointer."));
0227         doKill();
0228         emitResult();
0229         return;
0230     }
0231 
0232     if (!req->error()) {
0233         removeSubjob(job);
0234 
0235         Q_ASSERT(mItemsToCheck.size() == req->responses().size());
0236 
0237         EwsId::List::const_iterator it = mItemsToCheck.cbegin();
0238         const auto reqResponses{req->responses()};
0239         for (const EwsGetItemRequest::Response &resp : reqResponses) {
0240             if (resp.isSuccess()) {
0241                 qCDebugNC(EWSRES_LOG) << QStringLiteral("Checked item %1 found - readding").arg(ewsHash(it->id()));
0242                 mRemoteAddedItems.append(resp.item());
0243             } else {
0244                 qCDebugNC(EWSRES_LOG) << QStringLiteral("Checked item %1 not found - removing").arg(ewsHash(it->id()));
0245                 mRemoteDeletedIds.append(*it);
0246             }
0247             ++it;
0248         }
0249         --mPendingJobs;
0250         if (mPendingJobs == 0) {
0251             compareItemLists();
0252         }
0253     }
0254 }
0255 
0256 bool EwsFetchItemsJob::processIncrementalRemoteItemUpdates(const EwsItem::List &items,
0257                                                            QHash<QString, Item> &itemHash,
0258                                                            QHash<EwsItemType, Item::List> &toFetchItems)
0259 {
0260     for (const EwsItem &ewsItem : items) {
0261         auto id(ewsItem[EwsItemFieldItemId].value<EwsId>());
0262         auto it = itemHash.find(id.id());
0263         if (it == itemHash.end()) {
0264             setErrorMsg(QStringLiteral("Got update for item %1, but item not found in local store.").arg(ewsHash(id.id())));
0265             emitResult();
0266             return false;
0267         }
0268         const auto qitup = mQueuedUpdates[EwsModifiedEvent].find(id.id());
0269         if (qitup != mQueuedUpdates[EwsModifiedEvent].end() && *qitup == id.changeKey()) {
0270             qCDebugNC(EWSRES_LOG) << QStringLiteral("Match for queued modification of item %1").arg(ewsHash(id.id()));
0271             continue;
0272         }
0273         Item &item = *it;
0274         if (item.remoteRevision() == id.changeKey()) {
0275             qCDebugNC(EWSRES_LOG) << QStringLiteral("Matching change key for item %1 - not syncing").arg(ewsHash(id.id()));
0276             continue;
0277         }
0278         item.clearPayload();
0279         item.setRemoteRevision(id.changeKey());
0280         if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) {
0281             qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync");
0282             syncTags();
0283             return false;
0284         }
0285         EwsItemType type = ewsItem.internalType();
0286         toFetchItems[type].append(item);
0287         ++mTotalItemsToFetch;
0288     }
0289 
0290     return true;
0291 }
0292 
0293 void EwsFetchItemsJob::compareItemLists()
0294 {
0295     /* Begin stage 2 - determine list of new/changed items and fetch details about them. */
0296 
0297     QHash<EwsItemType, Item::List> toFetchItems;
0298 
0299     QHash<QString, Item> itemHash;
0300     for (const Item &item : std::as_const(mLocalItems)) {
0301         itemHash.insert(item.remoteId(), item);
0302     }
0303 
0304     for (const EwsItem &ewsItem : std::as_const(mRemoteAddedItems)) {
0305         /* In case of a full sync all existing items appear as added on the remote side. Therefore
0306          * look for the item in the local list before creating a new copy. */
0307         auto id(ewsItem[EwsItemFieldItemId].value<EwsId>());
0308         QHash<QString, Item>::iterator it = itemHash.find(id.id());
0309         EwsItemType type = ewsItem.internalType();
0310         if (type == EwsItemTypeUnknown) {
0311             /* Ignore unknown items. */
0312             continue;
0313         }
0314         auto handler(EwsItemHandler::itemHandler(type));
0315         if (!handler) {
0316             /* Ignore items where no handler exists. */
0317             continue;
0318         }
0319         QString mimeType = handler->mimeType();
0320         if (it == itemHash.end()) {
0321             Item item(mimeType);
0322             item.setParentCollection(mCollection);
0323             auto id = ewsItem[EwsItemFieldItemId].value<EwsId>();
0324             item.setRemoteId(id.id());
0325             item.setRemoteRevision(id.changeKey());
0326             if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) {
0327                 qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync");
0328                 syncTags();
0329                 return;
0330             }
0331             toFetchItems[type].append(item);
0332             ++mTotalItemsToFetch;
0333         } else {
0334             Item &item = *it;
0335             /* In case of a full sync even unchanged items appear as new. Compare the change keys
0336              * to determine if a fetch is needed. */
0337             if (item.remoteRevision() != id.changeKey()) {
0338                 item.clearPayload();
0339                 item.setRemoteRevision(id.changeKey());
0340                 if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) {
0341                     qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync");
0342                     syncTags();
0343                     return;
0344                 }
0345                 toFetchItems[type].append(item);
0346                 ++mTotalItemsToFetch;
0347             } else {
0348                 qCDebugNC(EWSRES_LOG) << QStringLiteral("Matching change key for item %1 - not syncing").arg(ewsHash(id.id()));
0349             }
0350             itemHash.erase(it);
0351         }
0352     }
0353 
0354     if (mFullSync) {
0355         /* In case of a full sync all items that are still on the local item list do not exist
0356          * remotely and need to be deleted locally. */
0357         const QHash<QString, Item>::iterator end(itemHash.end());
0358         for (QHash<QString, Item>::iterator it = itemHash.begin(); it != end; ++it) {
0359             mDeletedItems.append(it.value());
0360         }
0361     } else {
0362         if (!processIncrementalRemoteItemUpdates(mRemoteChangedItems, itemHash, toFetchItems)) {
0363             return;
0364         }
0365 
0366         // In case of an incremental sync deleted items will be given explicitly. */
0367         for (const EwsId &id : std::as_const(mRemoteDeletedIds)) {
0368             QHash<QString, Item>::iterator it = itemHash.find(id.id());
0369             if (it == itemHash.end()) {
0370                 /* If an item is not found locally, it can mean two things:
0371                  *  1. The item got deleted earlier without the resource being told about it.
0372                  *  2. The item was never known by Akonadi due to a sync problem.
0373                  * Either way the item doesn't exist any more and there is no point crying about it. */
0374                 qCDebugNC(EWSRES_LOG) << QStringLiteral("Got delete for item %1, but item not found in local store.").arg(ewsHash(id.id()));
0375             } else {
0376                 mDeletedItems.append(*it);
0377             }
0378         }
0379 
0380         QHash<EwsId, bool>::const_iterator it;
0381         EwsItemHandler *handler = EwsItemHandler::itemHandler(EwsItemTypeMessage);
0382         for (it = mRemoteFlagChangedIds.cbegin(); it != mRemoteFlagChangedIds.cend(); ++it) {
0383             QHash<QString, Item>::iterator iit = itemHash.find(it.key().id());
0384             if (iit == itemHash.end()) {
0385                 setErrorMsg(QStringLiteral("Got read flag change for item %1, but item not found in local store.").arg(it.key().id()));
0386                 emitResult();
0387                 return;
0388             }
0389             Item &item = *iit;
0390             handler->setSeenFlag(item, it.value());
0391             mChangedItems.append(item);
0392             itemHash.erase(iit);
0393         }
0394     }
0395 
0396     qCDebugNC(EWSRES_LOG)
0397         << QStringLiteral("Changed %2, deleted %3, new %4").arg(mRemoteChangedItems.size()).arg(mDeletedItems.size()).arg(mRemoteAddedItems.size());
0398 
0399     Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 items", mCollection.name()));
0400 
0401     bool fetch = false;
0402     for (const auto &iType : toFetchItems.keys()) {
0403         for (int i = 0; i < toFetchItems[iType].size(); i += fetchBatchSize) {
0404             EwsItemHandler *handler = EwsItemHandler::itemHandler(static_cast<EwsItemType>(iType));
0405             if (!handler) {
0406                 // TODO: Temporarily ignore unsupported item types.
0407                 qCWarning(EWSRES_LOG) << QStringLiteral("Unable to initialize fetch for item type %1").arg(iType);
0408                 /*setErrorMsg(QStringLiteral("Unable to initialize fetch for item type %1").arg(iType));
0409                 emitResult();
0410                 return;*/
0411             } else {
0412                 EwsFetchItemDetailJob *job = handler->fetchItemDetailJob(mClient, this, mCollection);
0413                 Item::List itemList = toFetchItems[iType].mid(i, fetchBatchSize);
0414                 job->setItemLists(itemList, &mDeletedItems);
0415                 connect(job, &KJob::result, this, &EwsFetchItemsJob::itemDetailFetchDone);
0416                 addSubjob(job);
0417                 fetch = true;
0418             }
0419         }
0420     }
0421     if (!fetch) {
0422         // Nothing to fetch - we're done here.
0423         emitResult();
0424     } else {
0425         subjobs().first()->start();
0426     }
0427 }
0428 
0429 void EwsFetchItemsJob::itemDetailFetchDone(KJob *job)
0430 {
0431     const auto detailJob = qobject_cast<EwsFetchItemDetailJob *>(job);
0432     if (detailJob) {
0433         qCWarningNC(EWSRES_LOG) << QStringLiteral("itemDetailFetchDone: ") << detailJob->error();
0434         if (!detailJob->error()) {
0435             removeSubjob(job);
0436 
0437             const auto changedItems = detailJob->changedItems();
0438             for (const auto &item : changedItems) {
0439                 if (item.isValid()) {
0440                     mChangedItems.append(item);
0441                 } else {
0442                     mNewItems.append(item);
0443                 }
0444             }
0445 
0446             mTotalItemsFetched = mChangedItems.size();
0447             Q_EMIT reportPercent(50 + (mTotalItemsFetched * 50) / mTotalItemsToFetch);
0448 
0449             if (subjobs().isEmpty()) {
0450                 emitResult();
0451             } else {
0452                 subjobs().first()->start();
0453             }
0454         } else {
0455             setEwsResponseCode(detailJob->ewsResponseCode());
0456         }
0457     }
0458 }
0459 
0460 void EwsFetchItemsJob::setQueuedUpdates(const QueuedUpdateList &updates)
0461 {
0462     mQueuedUpdates.clear();
0463     for (const QueuedUpdate &upd : updates) {
0464         mQueuedUpdates[upd.type].insert(upd.id, upd.changeKey);
0465         qCDebugNC(EWSRES_LOG) << QStringLiteral("Queued update %1 for item %2").arg(upd.type).arg(ewsHash(upd.id));
0466     }
0467 }
0468 
0469 void EwsFetchItemsJob::syncTags()
0470 {
0471     if (mTagsSynced) {
0472         setErrorMsg(QStringLiteral("Missing tags encountered despite previous sync."));
0473         emitResult();
0474     } else {
0475         auto job = new EwsAkonadiTagsSyncJob(mTagStore, mClient, qobject_cast<EwsResource *>(parent())->rootCollection(), this);
0476         connect(job, &EwsAkonadiTagsSyncJob::result, this, &EwsFetchItemsJob::tagSyncFinished);
0477         job->start();
0478         mTagsSynced = true;
0479     }
0480 }
0481 
0482 void EwsFetchItemsJob::tagSyncFinished(KJob *job)
0483 {
0484     if (job->error()) {
0485         setErrorMsg(job->errorText());
0486         emitResult();
0487     } else {
0488         compareItemLists();
0489     }
0490 }
0491 
0492 #include "moc_ewsfetchitemsjob.cpp"