File indexing completed on 2024-11-10 04:40:41
0001 /* 0002 SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org> 0003 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org> 0004 SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include "itemsync.h" 0010 0011 #include "collection.h" 0012 #include "item_p.h" 0013 #include "itemcreatejob.h" 0014 #include "itemdeletejob.h" 0015 #include "itemfetchjob.h" 0016 #include "itemfetchscope.h" 0017 #include "job_p.h" 0018 #include "protocol_p.h" 0019 #include "transactionsequence.h" 0020 0021 #include "akonadicore_debug.h" 0022 0023 using namespace Akonadi; 0024 0025 /** 0026 * @internal 0027 */ 0028 class Akonadi::ItemSyncPrivate : public JobPrivate 0029 { 0030 public: 0031 explicit ItemSyncPrivate(ItemSync *parent) 0032 : JobPrivate(parent) 0033 , mTransactionMode(ItemSync::SingleTransaction) 0034 , mCurrentTransaction(nullptr) 0035 , mTransactionJobs(0) 0036 , mPendingJobs(0) 0037 , mProgress(0) 0038 , mTotalItems(-1) 0039 , mTotalItemsProcessed(0) 0040 , mStreaming(false) 0041 , mIncremental(false) 0042 , mDeliveryDone(false) 0043 , mFinished(false) 0044 , mFullListingDone(false) 0045 , mProcessingBatch(false) 0046 , mDisableAutomaticDeliveryDone(false) 0047 , mBatchSize(10) 0048 , mMergeMode(Akonadi::ItemSync::RIDMerge) 0049 { 0050 } 0051 0052 void createOrMerge(const Item &item); 0053 void checkDone(); 0054 void slotItemsReceived(const Item::List &items); 0055 void slotLocalListDone(KJob *job); 0056 void slotLocalDeleteDone(KJob *job); 0057 void slotLocalChangeDone(KJob *job); 0058 void execute(); 0059 void processItems(); 0060 void processBatch(); 0061 void deleteItems(const Item::List &items); 0062 void slotTransactionResult(KJob *job); 0063 void requestTransaction(); 0064 Job *subjobParent() const; 0065 void fetchLocalItemsToDelete(); 0066 QString jobDebuggingString() const override; 0067 bool allProcessed() const; 0068 0069 Q_DECLARE_PUBLIC(ItemSync) 0070 Collection mSyncCollection; 0071 QSet<QString> mListedItems; 0072 0073 ItemSync::TransactionMode mTransactionMode; 0074 TransactionSequence *mCurrentTransaction = nullptr; 0075 int mTransactionJobs; 0076 0077 Akonadi::Item::List mRemoteItemQueue; 0078 Akonadi::Item::List mRemovedRemoteItemQueue; 0079 Akonadi::Item::List mCurrentBatchRemoteItems; 0080 Akonadi::Item::List mCurrentBatchRemovedRemoteItems; 0081 Akonadi::Item::List mItemsToDelete; 0082 0083 QDateTime mItemSyncStart; 0084 0085 // create counter 0086 int mPendingJobs; 0087 int mProgress; 0088 int mTotalItems; 0089 int mTotalItemsProcessed; 0090 0091 bool mStreaming; 0092 bool mIncremental; 0093 bool mDeliveryDone; 0094 bool mFinished; 0095 bool mFullListingDone; 0096 bool mProcessingBatch; 0097 bool mDisableAutomaticDeliveryDone; 0098 0099 int mBatchSize; 0100 Akonadi::ItemSync::MergeMode mMergeMode; 0101 }; 0102 0103 void ItemSyncPrivate::createOrMerge(const Item &item) 0104 { 0105 Q_Q(ItemSync); 0106 // don't try to do anything in error state 0107 if (q->error()) { 0108 return; 0109 } 0110 mPendingJobs++; 0111 Item modifiedItem = item; 0112 if (mItemSyncStart.isValid()) { 0113 modifiedItem.setModificationTime(mItemSyncStart); 0114 } 0115 auto create = new ItemCreateJob(modifiedItem, mSyncCollection, subjobParent()); 0116 ItemCreateJob::MergeOptions merge = ItemCreateJob::Silent; 0117 if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) { 0118 merge |= ItemCreateJob::GID; 0119 } else { 0120 merge |= ItemCreateJob::RID; 0121 } 0122 create->setMerge(merge); 0123 q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) { 0124 slotLocalChangeDone(job); 0125 }); 0126 } 0127 0128 bool ItemSyncPrivate::allProcessed() const 0129 { 0130 return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() 0131 && mCurrentBatchRemovedRemoteItems.isEmpty(); 0132 } 0133 0134 void ItemSyncPrivate::checkDone() 0135 { 0136 Q_Q(ItemSync); 0137 q->setProcessedAmount(KJob::Bytes, mProgress); 0138 if (mPendingJobs > 0) { 0139 return; 0140 } 0141 0142 if (mTransactionJobs > 0) { 0143 // Commit the current transaction if we're in batch processing mode or done 0144 // and wait until the transaction is committed to process the next batch 0145 if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) { 0146 if (mCurrentTransaction) { 0147 // Note that mCurrentTransaction->commit() is a no-op if we're already rolling back 0148 // so this signal is a bit misleading (but it's only used by unittests it seems) 0149 Q_EMIT q->transactionCommitted(); 0150 mCurrentTransaction->commit(); 0151 mCurrentTransaction = nullptr; 0152 } 0153 return; 0154 } 0155 } 0156 mProcessingBatch = false; 0157 0158 if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) { 0159 qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling"; 0160 mFinished = true; 0161 q->emitResult(); 0162 return; 0163 } 0164 0165 if (!mRemoteItemQueue.isEmpty()) { 0166 execute(); 0167 // We don't have enough items, request more 0168 if (!mProcessingBatch) { 0169 Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size()); 0170 } 0171 return; 0172 } 0173 Q_EMIT q->readyForNextBatch(mBatchSize); 0174 0175 if (allProcessed() && !mFinished) { 0176 // prevent double result emission, can happen since checkDone() is called from all over the place 0177 qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished"; 0178 mFinished = true; 0179 q->emitResult(); 0180 } 0181 } 0182 0183 ItemSync::ItemSync(const Collection &collection, const QDateTime ×tamp, QObject *parent) 0184 : Job(new ItemSyncPrivate(this), parent) 0185 { 0186 qCDebug(AKONADICORE_LOG) << "Created ItemSync(colId=" << collection.id() << ", timestamp=" << timestamp << ")"; 0187 Q_D(ItemSync); 0188 d->mSyncCollection = collection; 0189 if (timestamp.isValid()) { 0190 d->mItemSyncStart = timestamp; 0191 } 0192 } 0193 0194 ItemSync::~ItemSync() 0195 { 0196 } 0197 0198 void ItemSync::setFullSyncItems(const Item::List &items) 0199 { 0200 /* 0201 * We received a list of items from the server: 0202 * * fetch all local id's + rid's only 0203 * * check each full sync item whether it's locally available 0204 * * if it is modify the item 0205 * * if it's not create it 0206 * * delete all superfluous items 0207 */ 0208 Q_D(ItemSync); 0209 Q_ASSERT(!d->mIncremental); 0210 if (!d->mStreaming) { 0211 d->mDeliveryDone = true; 0212 } 0213 d->mRemoteItemQueue += items; 0214 d->mTotalItemsProcessed += items.count(); 0215 qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count() << "Already processed: " << d->mTotalItemsProcessed 0216 << "Expected total amount: " << d->mTotalItems; 0217 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { 0218 d->mDeliveryDone = true; 0219 } 0220 d->execute(); 0221 } 0222 0223 void ItemSync::setTotalItems(int amount) 0224 { 0225 Q_D(ItemSync); 0226 Q_ASSERT(!d->mIncremental); 0227 Q_ASSERT(amount >= 0); 0228 setStreamingEnabled(true); 0229 qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount; 0230 d->mTotalItems = amount; 0231 setTotalAmount(KJob::Bytes, amount); 0232 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) { 0233 d->mDeliveryDone = true; 0234 d->execute(); 0235 } 0236 } 0237 0238 void ItemSync::setDisableAutomaticDeliveryDone(bool disable) 0239 { 0240 Q_D(ItemSync); 0241 d->mDisableAutomaticDeliveryDone = disable; 0242 } 0243 0244 void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems) 0245 { 0246 /* 0247 * We received an incremental listing of items: 0248 * * for each changed item: 0249 * ** If locally available => modify 0250 * ** else => create 0251 * * removed items can be removed right away 0252 */ 0253 Q_D(ItemSync); 0254 d->mIncremental = true; 0255 if (!d->mStreaming) { 0256 d->mDeliveryDone = true; 0257 } 0258 d->mRemoteItemQueue += changedItems; 0259 d->mRemovedRemoteItemQueue += removedItems; 0260 d->mTotalItemsProcessed += changedItems.count() + removedItems.count(); 0261 qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed 0262 << " Wanted: " << d->mTotalItems; 0263 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { 0264 d->mDeliveryDone = true; 0265 } 0266 d->execute(); 0267 } 0268 0269 void ItemSync::doStart() 0270 { 0271 } 0272 0273 void ItemSyncPrivate::fetchLocalItemsToDelete() 0274 { 0275 Q_Q(ItemSync); 0276 if (mIncremental) { 0277 qFatal("This must not be called while in incremental mode"); 0278 return; 0279 } 0280 auto job = new ItemFetchJob(mSyncCollection, subjobParent()); 0281 job->fetchScope().setFetchRemoteIdentification(true); 0282 job->fetchScope().setFetchModificationTime(false); 0283 job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually); 0284 // we only can fetch parts already in the cache, otherwise this will deadlock 0285 job->fetchScope().setCacheOnly(true); 0286 0287 QObject::connect(job, &ItemFetchJob::itemsReceived, q, [this](const Akonadi::Item::List &lst) { 0288 slotItemsReceived(lst); 0289 }); 0290 QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) { 0291 slotLocalListDone(job); 0292 }); 0293 mPendingJobs++; 0294 } 0295 0296 void ItemSyncPrivate::slotItemsReceived(const Item::List &items) 0297 { 0298 for (const Akonadi::Item &item : items) { 0299 // Don't delete items that have not yet been synchronized 0300 if (item.remoteId().isEmpty()) { 0301 continue; 0302 } 0303 if (!mListedItems.contains(item.remoteId())) { 0304 mItemsToDelete << Item(item.id()); 0305 } 0306 } 0307 } 0308 0309 void ItemSyncPrivate::slotLocalListDone(KJob *job) 0310 { 0311 mPendingJobs--; 0312 if (job->error()) { 0313 qCWarning(AKONADICORE_LOG) << job->errorString(); 0314 } 0315 deleteItems(mItemsToDelete); 0316 checkDone(); 0317 } 0318 0319 QString ItemSyncPrivate::jobDebuggingString() const 0320 { 0321 // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job 0322 // started, so this requires passing jobDebuggingString to jobEnded(). 0323 return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name()); 0324 } 0325 0326 void ItemSyncPrivate::execute() 0327 { 0328 // shouldn't happen 0329 if (mFinished) { 0330 qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job."; 0331 Q_ASSERT(false); 0332 return; 0333 } 0334 // not doing anything, start processing 0335 if (!mProcessingBatch) { 0336 if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) { 0337 // we have a new batch to process 0338 const int num = qMin(mBatchSize, mRemoteItemQueue.size()); 0339 mCurrentBatchRemoteItems.reserve(mBatchSize); 0340 std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems)); 0341 mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num); 0342 0343 mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue; 0344 mRemovedRemoteItemQueue.clear(); 0345 } else { 0346 // nothing to do, let's wait for more data 0347 return; 0348 } 0349 mProcessingBatch = true; 0350 processBatch(); 0351 return; 0352 } 0353 checkDone(); 0354 } 0355 0356 // process the current batch of items 0357 void ItemSyncPrivate::processBatch() 0358 { 0359 Q_Q(ItemSync); 0360 if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) { 0361 return; 0362 } 0363 if (q->error() == Job::UserCanceled) { 0364 checkDone(); 0365 return; 0366 } 0367 0368 // request a transaction, there are items that require processing 0369 requestTransaction(); 0370 0371 processItems(); 0372 0373 // removed 0374 if (!mIncremental && allProcessed()) { 0375 // the full listing is done and we know which items to remove 0376 fetchLocalItemsToDelete(); 0377 } else { 0378 deleteItems(mCurrentBatchRemovedRemoteItems); 0379 mCurrentBatchRemovedRemoteItems.clear(); 0380 } 0381 0382 checkDone(); 0383 } 0384 0385 void ItemSyncPrivate::processItems() 0386 { 0387 // added / updated 0388 for (const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) { 0389 if (remoteItem.remoteId().isEmpty()) { 0390 qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier"; 0391 continue; 0392 } 0393 if (!mIncremental) { 0394 mListedItems << remoteItem.remoteId(); 0395 } 0396 createOrMerge(remoteItem); 0397 } 0398 mCurrentBatchRemoteItems.clear(); 0399 } 0400 0401 void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete) 0402 { 0403 Q_Q(ItemSync); 0404 // if in error state, better not change anything anymore 0405 if (q->error()) { 0406 return; 0407 } 0408 0409 if (itemsToDelete.isEmpty()) { 0410 return; 0411 } 0412 0413 mPendingJobs++; 0414 auto job = new ItemDeleteJob(itemsToDelete, subjobParent()); 0415 q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) { 0416 slotLocalDeleteDone(job); 0417 }); 0418 0419 // It can happen that the groupware servers report us deleted items 0420 // twice, in this case this item delete job will fail on the second try. 0421 // To avoid a rollback of the complete transaction we gracefully allow the job 0422 // to fail :) 0423 auto transaction = qobject_cast<TransactionSequence *>(subjobParent()); 0424 if (transaction) { 0425 transaction->setIgnoreJobFailure(job); 0426 } 0427 } 0428 0429 void ItemSyncPrivate::slotLocalDeleteDone(KJob *job) 0430 { 0431 if (job->error()) { 0432 qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString(); 0433 } 0434 mPendingJobs--; 0435 mProgress++; 0436 0437 checkDone(); 0438 } 0439 0440 void ItemSyncPrivate::slotLocalChangeDone(KJob *job) 0441 { 0442 if (job->error() && job->error() != Job::KilledJobError) { 0443 qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString(); 0444 mRemoteItemQueue.clear(); // don't try to process any more items after a rollback 0445 } 0446 mPendingJobs--; 0447 mProgress++; 0448 0449 checkDone(); 0450 } 0451 0452 void ItemSyncPrivate::slotTransactionResult(KJob *job) 0453 { 0454 --mTransactionJobs; 0455 if (mCurrentTransaction == job) { 0456 mCurrentTransaction = nullptr; 0457 } 0458 0459 checkDone(); 0460 } 0461 0462 void ItemSyncPrivate::requestTransaction() 0463 { 0464 Q_Q(ItemSync); 0465 // we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially 0466 if (!mCurrentTransaction) { 0467 ++mTransactionJobs; 0468 mCurrentTransaction = new TransactionSequence(q); 0469 mCurrentTransaction->setAutomaticCommittingEnabled(false); 0470 QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) { 0471 slotTransactionResult(job); 0472 }); 0473 } 0474 } 0475 0476 Job *ItemSyncPrivate::subjobParent() const 0477 { 0478 Q_Q(const ItemSync); 0479 if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) { 0480 return mCurrentTransaction; 0481 } 0482 return const_cast<ItemSync *>(q); 0483 } 0484 0485 void ItemSync::setStreamingEnabled(bool enable) 0486 { 0487 Q_D(ItemSync); 0488 d->mStreaming = enable; 0489 } 0490 0491 void ItemSync::deliveryDone() 0492 { 0493 Q_D(ItemSync); 0494 Q_ASSERT(d->mStreaming); 0495 d->mDeliveryDone = true; 0496 d->execute(); 0497 } 0498 0499 void ItemSync::slotResult(KJob *job) 0500 { 0501 if (job->error()) { 0502 qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString(); 0503 // pretend there were no errors 0504 Akonadi::Job::removeSubjob(job); 0505 // propagate the first error we got but continue, we might still be fed with stuff from a resource 0506 if (!error()) { 0507 setError(job->error()); 0508 setErrorText(job->errorText()); 0509 } 0510 } else { 0511 Akonadi::Job::slotResult(job); 0512 } 0513 } 0514 0515 void ItemSync::rollback() 0516 { 0517 Q_D(ItemSync); 0518 qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back."; 0519 setError(UserCanceled); 0520 if (d->mCurrentTransaction) { 0521 d->mCurrentTransaction->rollback(); 0522 } 0523 d->mDeliveryDone = true; // user won't deliver more data 0524 d->execute(); // end this in an ordered way, since we have an error set no real change will be done 0525 } 0526 0527 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode) 0528 { 0529 Q_D(ItemSync); 0530 d->mTransactionMode = mode; 0531 } 0532 0533 int ItemSync::batchSize() const 0534 { 0535 Q_D(const ItemSync); 0536 return d->mBatchSize; 0537 } 0538 0539 void ItemSync::setBatchSize(int size) 0540 { 0541 Q_D(ItemSync); 0542 d->mBatchSize = size; 0543 } 0544 0545 ItemSync::MergeMode ItemSync::mergeMode() const 0546 { 0547 Q_D(const ItemSync); 0548 return d->mMergeMode; 0549 } 0550 0551 void ItemSync::setMergeMode(MergeMode mergeMode) 0552 { 0553 Q_D(ItemSync); 0554 d->mMergeMode = mergeMode; 0555 } 0556 0557 #include "moc_itemsync.cpp"