File indexing completed on 2024-06-02 05:20:58
0001 /* 0002 SPDX-FileCopyrightText: 2015-2020 Krzysztof Nowicki <krissn@op.pl> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "ewsfetchitemsjob.h" 0008 0009 #include <KLocalizedString> 0010 0011 #include <Akonadi/AgentBase> 0012 #include <Akonadi/ItemFetchScope> 0013 0014 #include "ewsclient.h" 0015 #include "ewsfetchitemdetailjob.h" 0016 #include "ewsitemhandler.h" 0017 #include "ewsresource.h" 0018 #include "ewsresource_debug.h" 0019 #include "ewssyncfolderitemsrequest.h" 0020 #include "tags/ewsakonaditagssyncjob.h" 0021 #include "tags/ewstagstore.h" 0022 0023 using namespace Akonadi; 0024 0025 static constexpr int listBatchSize = 100; 0026 static constexpr int fetchBatchSize = 50; 0027 0028 /** 0029 * The fetch items job is processed in two stages. 0030 * 0031 * The first stage is to query the list of messages on the remote and local sides. For this purpose 0032 * an EwsSyncFolderItemsRequest is started to retrieve remote items (list of ids only) and an ItemFetchJob 0033 * is started to fetch local items (from cache only). Both of these jobs are started simultaneously. 0034 * 0035 * The second stage begins when both item list query jobs have finished. The goal of this stage is 0036 * to determine a list of items to fetch more details for. Since the EwsSyncFolderItemsRequest can 0037 * retrieve incremental information further processing depends on whether the sync request was a 0038 * full sync (no sync state) or an incremental sync. 0039 * 0040 * In case of a full sync both item lists are compared to determine lists of new/changed items and 0041 * deleted items. For an incremental sync there is no need to compare as the lists of 0042 * added/changed/deleted items are already given. 'IsRead' flag changes changes are treated 0043 * specially - the modification is trivial and is performed straight away without fetching item 0044 * details. 0045 * 0046 * The list of new/changed items is then used to perform a second remote request in order to 0047 * retrieve the details of these items. For e-mail items the second fetch only retrieves the 0048 * item headers. For other items the full MIME content is fetched. 0049 * 0050 * In case of an incremental sync the compare code checks if items marked as 'changed' or 'deleted' 0051 * exist in Akonadi database. If not an error is raised. This serves as an information to the 0052 * resource class that an incremental sync has failed due to an out-of-sync state and a full sync 0053 * is required to bring thing back to order. 0054 * 0055 * In addition to a regular sync it is sometimes necessary to check for existence of some specific 0056 * items. This happens when some operation failed and the resource tries to work its way around to 0057 * get Akonadi into a synchronous state after the failure. For this purpose the caller can provide 0058 * a list of item identifiers to look for besides the regular sync. If this list contains elements 0059 * another request (EwsGetItemRequest) is issued in parallel for the selected items. Once this 0060 * request returns the list is cross-checked with the server responses. If an item is found on the 0061 * server side it is added to the "new items" list. Otherwise it is added to the "deleted items" 0062 * list. In the latter case the code for determining if the deleted item is in the Akonadi database 0063 * will not raise an error for such deleted item. This helps to avoid unnecessary full syncs. 0064 * 0065 * The fetch item job also allows providing a list of expected item changes. This is necessary 0066 * because the incremental sync will return events for operations made by the resource itself. This 0067 * can lead to false failures when for example an item was deleted (i.e. does not exist in Akonadi 0068 * any more) and the incremental sync returns a delete event for this item. Typically this would 0069 * result in an error and force a full sync. Providing this list allows for this particular error 0070 * to be safely ignored. 0071 */ 0072 0073 EwsFetchItemsJob::EwsFetchItemsJob(const Collection &collection, 0074 EwsClient &client, 0075 const QString &syncState, 0076 const EwsId::List &itemsToCheck, 0077 EwsTagStore *tagStore, 0078 EwsResource *parent) 0079 : EwsJob(parent) 0080 , mCollection(collection) 0081 , mClient(client) 0082 , mItemsToCheck(itemsToCheck) 0083 , mPendingJobs(0) 0084 , mTotalItemsToFetch(0) 0085 , mTotalItemsFetched(0) 0086 , mSyncState(syncState) 0087 , mFullSync(syncState.isNull()) 0088 , mTagStore(tagStore) 0089 , mTagsSynced(false) 0090 { 0091 qRegisterMetaType<EwsId::List>(); 0092 } 0093 0094 EwsFetchItemsJob::~EwsFetchItemsJob() = default; 0095 0096 void EwsFetchItemsJob::start() 0097 { 0098 Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 item list", mCollection.name())); 0099 Q_EMIT reportPercent(0); 0100 0101 /* Begin stage 1 - query item list from local and remote side. */ 0102 auto syncItemsReq = new EwsSyncFolderItemsRequest(mClient, this); 0103 syncItemsReq->setFolderId(EwsId(mCollection.remoteId(), mCollection.remoteRevision())); 0104 EwsItemShape shape(EwsShapeIdOnly); 0105 shape << EwsResource::tagsProperty; 0106 syncItemsReq->setItemShape(shape); 0107 if (!mSyncState.isNull()) { 0108 syncItemsReq->setSyncState(mSyncState); 0109 } 0110 syncItemsReq->setMaxChanges(listBatchSize); 0111 connect(syncItemsReq, &EwsSyncFolderItemsRequest::result, this, &EwsFetchItemsJob::remoteItemFetchDone); 0112 addSubjob(syncItemsReq); 0113 0114 auto itemJob = new ItemFetchJob(mCollection); 0115 ItemFetchScope itemScope; 0116 itemScope.setCacheOnly(true); 0117 itemScope.fetchFullPayload(false); 0118 itemJob->setFetchScope(itemScope); 0119 connect(itemJob, &ItemFetchJob::result, this, &EwsFetchItemsJob::localItemFetchDone); 0120 addSubjob(itemJob); 0121 0122 mPendingJobs = 2; 0123 syncItemsReq->start(); 0124 itemJob->start(); 0125 0126 if (!mItemsToCheck.isEmpty()) { 0127 auto getItemReq = new EwsGetItemRequest(mClient, this); 0128 getItemReq->setItemIds(mItemsToCheck); 0129 getItemReq->setItemShape(EwsItemShape(EwsShapeIdOnly)); 0130 connect(getItemReq, &EwsGetItemRequest::result, this, &EwsFetchItemsJob::checkedItemsFetchFinished); 0131 ++mPendingJobs; 0132 getItemReq->start(); 0133 } 0134 } 0135 0136 void EwsFetchItemsJob::localItemFetchDone(KJob *job) 0137 { 0138 auto fetchJob = qobject_cast<ItemFetchJob *>(job); 0139 0140 qCDebug(EWSRES_LOG) << "EwsFetchItemsJob::localItemFetchDone"; 0141 if (!fetchJob) { 0142 setErrorMsg(QStringLiteral("Invalid item fetch job pointer.")); 0143 doKill(); 0144 emitResult(); 0145 return; 0146 } 0147 0148 if (!fetchJob->error()) { 0149 removeSubjob(job); 0150 mLocalItems = fetchJob->items(); 0151 --mPendingJobs; 0152 if (mPendingJobs == 0) { 0153 compareItemLists(); 0154 } 0155 } 0156 } 0157 0158 void EwsFetchItemsJob::remoteItemFetchDone(KJob *job) 0159 { 0160 auto itemReq = qobject_cast<EwsSyncFolderItemsRequest *>(job); 0161 0162 qCDebug(EWSRES_LOG) << "EwsFetchItemsJob::remoteItemFetchDone"; 0163 if (!itemReq) { 0164 setErrorMsg(QStringLiteral("Invalid find item request pointer.")); 0165 doKill(); 0166 emitResult(); 0167 return; 0168 } 0169 0170 if (!itemReq->error()) { 0171 removeSubjob(job); 0172 const auto reqChanges{itemReq->changes()}; 0173 for (const EwsSyncFolderItemsRequest::Change &change : reqChanges) { 0174 switch (change.type()) { 0175 case EwsSyncFolderItemsRequest::Create: 0176 mRemoteAddedItems.append(change.item()); 0177 break; 0178 case EwsSyncFolderItemsRequest::Update: 0179 mRemoteChangedItems.append(change.item()); 0180 break; 0181 case EwsSyncFolderItemsRequest::Delete: 0182 mRemoteDeletedIds.append(change.itemId()); 0183 break; 0184 case EwsSyncFolderItemsRequest::ReadFlagChange: 0185 mRemoteFlagChangedIds.insert(change.itemId(), change.isRead()); 0186 break; 0187 default: 0188 break; 0189 } 0190 } 0191 0192 if (!itemReq->includesLastItem()) { 0193 auto syncItemsReq = new EwsSyncFolderItemsRequest(mClient, this); 0194 syncItemsReq->setFolderId(EwsId(mCollection.remoteId(), mCollection.remoteRevision())); 0195 EwsItemShape shape(EwsShapeIdOnly); 0196 shape << EwsResource::tagsProperty; 0197 syncItemsReq->setItemShape(shape); 0198 syncItemsReq->setSyncState(itemReq->syncState()); 0199 syncItemsReq->setMaxChanges(listBatchSize); 0200 connect(syncItemsReq, &KJob::result, this, &EwsFetchItemsJob::remoteItemFetchDone); 0201 addSubjob(syncItemsReq); 0202 syncItemsReq->start(); 0203 } else { 0204 mSyncState = itemReq->syncState(); 0205 --mPendingJobs; 0206 if (mPendingJobs == 0) { 0207 compareItemLists(); 0208 } 0209 } 0210 const auto totalItems = mRemoteAddedItems.size() + mRemoteChangedItems.size() + mRemoteDeletedIds.size() + mRemoteFlagChangedIds.size(); 0211 if (!mLocalItems.empty()) { 0212 Q_EMIT reportPercent(qMin(totalItems * 50 / mLocalItems.size(), 50)); 0213 } 0214 Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 item list (%2 items)", mCollection.name(), totalItems)); 0215 } else { 0216 setEwsResponseCode(itemReq->ewsResponseCode()); 0217 qCWarningNC(EWSRES_LOG) << QStringLiteral("EwsFetchItemsJob: Failed remote item sync"); 0218 } 0219 } 0220 0221 void EwsFetchItemsJob::checkedItemsFetchFinished(KJob *job) 0222 { 0223 auto req = qobject_cast<EwsGetItemRequest *>(job); 0224 0225 if (!req) { 0226 setErrorMsg(QStringLiteral("Invalid item fetch job pointer.")); 0227 doKill(); 0228 emitResult(); 0229 return; 0230 } 0231 0232 if (!req->error()) { 0233 removeSubjob(job); 0234 0235 Q_ASSERT(mItemsToCheck.size() == req->responses().size()); 0236 0237 EwsId::List::const_iterator it = mItemsToCheck.cbegin(); 0238 const auto reqResponses{req->responses()}; 0239 for (const EwsGetItemRequest::Response &resp : reqResponses) { 0240 if (resp.isSuccess()) { 0241 qCDebugNC(EWSRES_LOG) << QStringLiteral("Checked item %1 found - readding").arg(ewsHash(it->id())); 0242 mRemoteAddedItems.append(resp.item()); 0243 } else { 0244 qCDebugNC(EWSRES_LOG) << QStringLiteral("Checked item %1 not found - removing").arg(ewsHash(it->id())); 0245 mRemoteDeletedIds.append(*it); 0246 } 0247 ++it; 0248 } 0249 --mPendingJobs; 0250 if (mPendingJobs == 0) { 0251 compareItemLists(); 0252 } 0253 } 0254 } 0255 0256 bool EwsFetchItemsJob::processIncrementalRemoteItemUpdates(const EwsItem::List &items, 0257 QHash<QString, Item> &itemHash, 0258 QHash<EwsItemType, Item::List> &toFetchItems) 0259 { 0260 for (const EwsItem &ewsItem : items) { 0261 auto id(ewsItem[EwsItemFieldItemId].value<EwsId>()); 0262 auto it = itemHash.find(id.id()); 0263 if (it == itemHash.end()) { 0264 setErrorMsg(QStringLiteral("Got update for item %1, but item not found in local store.").arg(ewsHash(id.id()))); 0265 emitResult(); 0266 return false; 0267 } 0268 const auto qitup = mQueuedUpdates[EwsModifiedEvent].find(id.id()); 0269 if (qitup != mQueuedUpdates[EwsModifiedEvent].end() && *qitup == id.changeKey()) { 0270 qCDebugNC(EWSRES_LOG) << QStringLiteral("Match for queued modification of item %1").arg(ewsHash(id.id())); 0271 continue; 0272 } 0273 Item &item = *it; 0274 if (item.remoteRevision() == id.changeKey()) { 0275 qCDebugNC(EWSRES_LOG) << QStringLiteral("Matching change key for item %1 - not syncing").arg(ewsHash(id.id())); 0276 continue; 0277 } 0278 item.clearPayload(); 0279 item.setRemoteRevision(id.changeKey()); 0280 if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) { 0281 qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync"); 0282 syncTags(); 0283 return false; 0284 } 0285 EwsItemType type = ewsItem.internalType(); 0286 toFetchItems[type].append(item); 0287 ++mTotalItemsToFetch; 0288 } 0289 0290 return true; 0291 } 0292 0293 void EwsFetchItemsJob::compareItemLists() 0294 { 0295 /* Begin stage 2 - determine list of new/changed items and fetch details about them. */ 0296 0297 QHash<EwsItemType, Item::List> toFetchItems; 0298 0299 QHash<QString, Item> itemHash; 0300 for (const Item &item : std::as_const(mLocalItems)) { 0301 itemHash.insert(item.remoteId(), item); 0302 } 0303 0304 for (const EwsItem &ewsItem : std::as_const(mRemoteAddedItems)) { 0305 /* In case of a full sync all existing items appear as added on the remote side. Therefore 0306 * look for the item in the local list before creating a new copy. */ 0307 auto id(ewsItem[EwsItemFieldItemId].value<EwsId>()); 0308 QHash<QString, Item>::iterator it = itemHash.find(id.id()); 0309 EwsItemType type = ewsItem.internalType(); 0310 if (type == EwsItemTypeUnknown) { 0311 /* Ignore unknown items. */ 0312 continue; 0313 } 0314 auto handler(EwsItemHandler::itemHandler(type)); 0315 if (!handler) { 0316 /* Ignore items where no handler exists. */ 0317 continue; 0318 } 0319 QString mimeType = handler->mimeType(); 0320 if (it == itemHash.end()) { 0321 Item item(mimeType); 0322 item.setParentCollection(mCollection); 0323 auto id = ewsItem[EwsItemFieldItemId].value<EwsId>(); 0324 item.setRemoteId(id.id()); 0325 item.setRemoteRevision(id.changeKey()); 0326 if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) { 0327 qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync"); 0328 syncTags(); 0329 return; 0330 } 0331 toFetchItems[type].append(item); 0332 ++mTotalItemsToFetch; 0333 } else { 0334 Item &item = *it; 0335 /* In case of a full sync even unchanged items appear as new. Compare the change keys 0336 * to determine if a fetch is needed. */ 0337 if (item.remoteRevision() != id.changeKey()) { 0338 item.clearPayload(); 0339 item.setRemoteRevision(id.changeKey()); 0340 if (!mTagStore->readEwsProperties(item, ewsItem, mTagsSynced)) { 0341 qCDebugNC(EWSRES_LOG) << QStringLiteral("Missing tags encountered - forcing sync"); 0342 syncTags(); 0343 return; 0344 } 0345 toFetchItems[type].append(item); 0346 ++mTotalItemsToFetch; 0347 } else { 0348 qCDebugNC(EWSRES_LOG) << QStringLiteral("Matching change key for item %1 - not syncing").arg(ewsHash(id.id())); 0349 } 0350 itemHash.erase(it); 0351 } 0352 } 0353 0354 if (mFullSync) { 0355 /* In case of a full sync all items that are still on the local item list do not exist 0356 * remotely and need to be deleted locally. */ 0357 const QHash<QString, Item>::iterator end(itemHash.end()); 0358 for (QHash<QString, Item>::iterator it = itemHash.begin(); it != end; ++it) { 0359 mDeletedItems.append(it.value()); 0360 } 0361 } else { 0362 if (!processIncrementalRemoteItemUpdates(mRemoteChangedItems, itemHash, toFetchItems)) { 0363 return; 0364 } 0365 0366 // In case of an incremental sync deleted items will be given explicitly. */ 0367 for (const EwsId &id : std::as_const(mRemoteDeletedIds)) { 0368 QHash<QString, Item>::iterator it = itemHash.find(id.id()); 0369 if (it == itemHash.end()) { 0370 /* If an item is not found locally, it can mean two things: 0371 * 1. The item got deleted earlier without the resource being told about it. 0372 * 2. The item was never known by Akonadi due to a sync problem. 0373 * Either way the item doesn't exist any more and there is no point crying about it. */ 0374 qCDebugNC(EWSRES_LOG) << QStringLiteral("Got delete for item %1, but item not found in local store.").arg(ewsHash(id.id())); 0375 } else { 0376 mDeletedItems.append(*it); 0377 } 0378 } 0379 0380 QHash<EwsId, bool>::const_iterator it; 0381 EwsItemHandler *handler = EwsItemHandler::itemHandler(EwsItemTypeMessage); 0382 for (it = mRemoteFlagChangedIds.cbegin(); it != mRemoteFlagChangedIds.cend(); ++it) { 0383 QHash<QString, Item>::iterator iit = itemHash.find(it.key().id()); 0384 if (iit == itemHash.end()) { 0385 setErrorMsg(QStringLiteral("Got read flag change for item %1, but item not found in local store.").arg(it.key().id())); 0386 emitResult(); 0387 return; 0388 } 0389 Item &item = *iit; 0390 handler->setSeenFlag(item, it.value()); 0391 mChangedItems.append(item); 0392 itemHash.erase(iit); 0393 } 0394 } 0395 0396 qCDebugNC(EWSRES_LOG) 0397 << QStringLiteral("Changed %2, deleted %3, new %4").arg(mRemoteChangedItems.size()).arg(mDeletedItems.size()).arg(mRemoteAddedItems.size()); 0398 0399 Q_EMIT reportStatus(AgentBase::Running, i18nc("@info:status", "Retrieving %1 items", mCollection.name())); 0400 0401 bool fetch = false; 0402 for (const auto &iType : toFetchItems.keys()) { 0403 for (int i = 0; i < toFetchItems[iType].size(); i += fetchBatchSize) { 0404 EwsItemHandler *handler = EwsItemHandler::itemHandler(static_cast<EwsItemType>(iType)); 0405 if (!handler) { 0406 // TODO: Temporarily ignore unsupported item types. 0407 qCWarning(EWSRES_LOG) << QStringLiteral("Unable to initialize fetch for item type %1").arg(iType); 0408 /*setErrorMsg(QStringLiteral("Unable to initialize fetch for item type %1").arg(iType)); 0409 emitResult(); 0410 return;*/ 0411 } else { 0412 EwsFetchItemDetailJob *job = handler->fetchItemDetailJob(mClient, this, mCollection); 0413 Item::List itemList = toFetchItems[iType].mid(i, fetchBatchSize); 0414 job->setItemLists(itemList, &mDeletedItems); 0415 connect(job, &KJob::result, this, &EwsFetchItemsJob::itemDetailFetchDone); 0416 addSubjob(job); 0417 fetch = true; 0418 } 0419 } 0420 } 0421 if (!fetch) { 0422 // Nothing to fetch - we're done here. 0423 emitResult(); 0424 } else { 0425 subjobs().first()->start(); 0426 } 0427 } 0428 0429 void EwsFetchItemsJob::itemDetailFetchDone(KJob *job) 0430 { 0431 const auto detailJob = qobject_cast<EwsFetchItemDetailJob *>(job); 0432 if (detailJob) { 0433 qCWarningNC(EWSRES_LOG) << QStringLiteral("itemDetailFetchDone: ") << detailJob->error(); 0434 if (!detailJob->error()) { 0435 removeSubjob(job); 0436 0437 const auto changedItems = detailJob->changedItems(); 0438 for (const auto &item : changedItems) { 0439 if (item.isValid()) { 0440 mChangedItems.append(item); 0441 } else { 0442 mNewItems.append(item); 0443 } 0444 } 0445 0446 mTotalItemsFetched = mChangedItems.size(); 0447 Q_EMIT reportPercent(50 + (mTotalItemsFetched * 50) / mTotalItemsToFetch); 0448 0449 if (subjobs().isEmpty()) { 0450 emitResult(); 0451 } else { 0452 subjobs().first()->start(); 0453 } 0454 } else { 0455 setEwsResponseCode(detailJob->ewsResponseCode()); 0456 } 0457 } 0458 } 0459 0460 void EwsFetchItemsJob::setQueuedUpdates(const QueuedUpdateList &updates) 0461 { 0462 mQueuedUpdates.clear(); 0463 for (const QueuedUpdate &upd : updates) { 0464 mQueuedUpdates[upd.type].insert(upd.id, upd.changeKey); 0465 qCDebugNC(EWSRES_LOG) << QStringLiteral("Queued update %1 for item %2").arg(upd.type).arg(ewsHash(upd.id)); 0466 } 0467 } 0468 0469 void EwsFetchItemsJob::syncTags() 0470 { 0471 if (mTagsSynced) { 0472 setErrorMsg(QStringLiteral("Missing tags encountered despite previous sync.")); 0473 emitResult(); 0474 } else { 0475 auto job = new EwsAkonadiTagsSyncJob(mTagStore, mClient, qobject_cast<EwsResource *>(parent())->rootCollection(), this); 0476 connect(job, &EwsAkonadiTagsSyncJob::result, this, &EwsFetchItemsJob::tagSyncFinished); 0477 job->start(); 0478 mTagsSynced = true; 0479 } 0480 } 0481 0482 void EwsFetchItemsJob::tagSyncFinished(KJob *job) 0483 { 0484 if (job->error()) { 0485 setErrorMsg(job->errorText()); 0486 emitResult(); 0487 } else { 0488 compareItemLists(); 0489 } 0490 } 0491 0492 #include "moc_ewsfetchitemsjob.cpp"