File indexing completed on 2024-11-10 04:40:41

0001 /*
0002     SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org>
0003     SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
0004     SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #include "itemsync.h"
0010 
0011 #include "collection.h"
0012 #include "item_p.h"
0013 #include "itemcreatejob.h"
0014 #include "itemdeletejob.h"
0015 #include "itemfetchjob.h"
0016 #include "itemfetchscope.h"
0017 #include "job_p.h"
0018 #include "protocol_p.h"
0019 #include "transactionsequence.h"
0020 
0021 #include "akonadicore_debug.h"
0022 
0023 using namespace Akonadi;
0024 
0025 /**
0026  * @internal
0027  */
0028 class Akonadi::ItemSyncPrivate : public JobPrivate
0029 {
0030 public:
0031     explicit ItemSyncPrivate(ItemSync *parent)
0032         : JobPrivate(parent)
0033         , mTransactionMode(ItemSync::SingleTransaction)
0034         , mCurrentTransaction(nullptr)
0035         , mTransactionJobs(0)
0036         , mPendingJobs(0)
0037         , mProgress(0)
0038         , mTotalItems(-1)
0039         , mTotalItemsProcessed(0)
0040         , mStreaming(false)
0041         , mIncremental(false)
0042         , mDeliveryDone(false)
0043         , mFinished(false)
0044         , mFullListingDone(false)
0045         , mProcessingBatch(false)
0046         , mDisableAutomaticDeliveryDone(false)
0047         , mBatchSize(10)
0048         , mMergeMode(Akonadi::ItemSync::RIDMerge)
0049     {
0050     }
0051 
0052     void createOrMerge(const Item &item);
0053     void checkDone();
0054     void slotItemsReceived(const Item::List &items);
0055     void slotLocalListDone(KJob *job);
0056     void slotLocalDeleteDone(KJob *job);
0057     void slotLocalChangeDone(KJob *job);
0058     void execute();
0059     void processItems();
0060     void processBatch();
0061     void deleteItems(const Item::List &items);
0062     void slotTransactionResult(KJob *job);
0063     void requestTransaction();
0064     Job *subjobParent() const;
0065     void fetchLocalItemsToDelete();
0066     QString jobDebuggingString() const override;
0067     bool allProcessed() const;
0068 
0069     Q_DECLARE_PUBLIC(ItemSync)
0070     Collection mSyncCollection;
0071     QSet<QString> mListedItems;
0072 
0073     ItemSync::TransactionMode mTransactionMode;
0074     TransactionSequence *mCurrentTransaction = nullptr;
0075     int mTransactionJobs;
0076 
0077     Akonadi::Item::List mRemoteItemQueue;
0078     Akonadi::Item::List mRemovedRemoteItemQueue;
0079     Akonadi::Item::List mCurrentBatchRemoteItems;
0080     Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
0081     Akonadi::Item::List mItemsToDelete;
0082 
0083     QDateTime mItemSyncStart;
0084 
0085     // create counter
0086     int mPendingJobs;
0087     int mProgress;
0088     int mTotalItems;
0089     int mTotalItemsProcessed;
0090 
0091     bool mStreaming;
0092     bool mIncremental;
0093     bool mDeliveryDone;
0094     bool mFinished;
0095     bool mFullListingDone;
0096     bool mProcessingBatch;
0097     bool mDisableAutomaticDeliveryDone;
0098 
0099     int mBatchSize;
0100     Akonadi::ItemSync::MergeMode mMergeMode;
0101 };
0102 
0103 void ItemSyncPrivate::createOrMerge(const Item &item)
0104 {
0105     Q_Q(ItemSync);
0106     // don't try to do anything in error state
0107     if (q->error()) {
0108         return;
0109     }
0110     mPendingJobs++;
0111     Item modifiedItem = item;
0112     if (mItemSyncStart.isValid()) {
0113         modifiedItem.setModificationTime(mItemSyncStart);
0114     }
0115     auto create = new ItemCreateJob(modifiedItem, mSyncCollection, subjobParent());
0116     ItemCreateJob::MergeOptions merge = ItemCreateJob::Silent;
0117     if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
0118         merge |= ItemCreateJob::GID;
0119     } else {
0120         merge |= ItemCreateJob::RID;
0121     }
0122     create->setMerge(merge);
0123     q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) {
0124         slotLocalChangeDone(job);
0125     });
0126 }
0127 
0128 bool ItemSyncPrivate::allProcessed() const
0129 {
0130     return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty()
0131         && mCurrentBatchRemovedRemoteItems.isEmpty();
0132 }
0133 
0134 void ItemSyncPrivate::checkDone()
0135 {
0136     Q_Q(ItemSync);
0137     q->setProcessedAmount(KJob::Bytes, mProgress);
0138     if (mPendingJobs > 0) {
0139         return;
0140     }
0141 
0142     if (mTransactionJobs > 0) {
0143         // Commit the current transaction if we're in batch processing mode or done
0144         // and wait until the transaction is committed to process the next batch
0145         if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) {
0146             if (mCurrentTransaction) {
0147                 // Note that mCurrentTransaction->commit() is a no-op if we're already rolling back
0148                 // so this signal is a bit misleading (but it's only used by unittests it seems)
0149                 Q_EMIT q->transactionCommitted();
0150                 mCurrentTransaction->commit();
0151                 mCurrentTransaction = nullptr;
0152             }
0153             return;
0154         }
0155     }
0156     mProcessingBatch = false;
0157 
0158     if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) {
0159         qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling";
0160         mFinished = true;
0161         q->emitResult();
0162         return;
0163     }
0164 
0165     if (!mRemoteItemQueue.isEmpty()) {
0166         execute();
0167         // We don't have enough items, request more
0168         if (!mProcessingBatch) {
0169             Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
0170         }
0171         return;
0172     }
0173     Q_EMIT q->readyForNextBatch(mBatchSize);
0174 
0175     if (allProcessed() && !mFinished) {
0176         // prevent double result emission, can happen since checkDone() is called from all over the place
0177         qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished";
0178         mFinished = true;
0179         q->emitResult();
0180     }
0181 }
0182 
0183 ItemSync::ItemSync(const Collection &collection, const QDateTime &timestamp, QObject *parent)
0184     : Job(new ItemSyncPrivate(this), parent)
0185 {
0186     qCDebug(AKONADICORE_LOG) << "Created ItemSync(colId=" << collection.id() << ", timestamp=" << timestamp << ")";
0187     Q_D(ItemSync);
0188     d->mSyncCollection = collection;
0189     if (timestamp.isValid()) {
0190         d->mItemSyncStart = timestamp;
0191     }
0192 }
0193 
0194 ItemSync::~ItemSync()
0195 {
0196 }
0197 
0198 void ItemSync::setFullSyncItems(const Item::List &items)
0199 {
0200     /*
0201      * We received a list of items from the server:
0202      * * fetch all local id's + rid's only
0203      * * check each full sync item whether it's locally available
0204      * * if it is modify the item
0205      * * if it's not create it
0206      * * delete all superfluous items
0207      */
0208     Q_D(ItemSync);
0209     Q_ASSERT(!d->mIncremental);
0210     if (!d->mStreaming) {
0211         d->mDeliveryDone = true;
0212     }
0213     d->mRemoteItemQueue += items;
0214     d->mTotalItemsProcessed += items.count();
0215     qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count() << "Already processed: " << d->mTotalItemsProcessed
0216                              << "Expected total amount: " << d->mTotalItems;
0217     if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
0218         d->mDeliveryDone = true;
0219     }
0220     d->execute();
0221 }
0222 
0223 void ItemSync::setTotalItems(int amount)
0224 {
0225     Q_D(ItemSync);
0226     Q_ASSERT(!d->mIncremental);
0227     Q_ASSERT(amount >= 0);
0228     setStreamingEnabled(true);
0229     qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount;
0230     d->mTotalItems = amount;
0231     setTotalAmount(KJob::Bytes, amount);
0232     if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
0233         d->mDeliveryDone = true;
0234         d->execute();
0235     }
0236 }
0237 
0238 void ItemSync::setDisableAutomaticDeliveryDone(bool disable)
0239 {
0240     Q_D(ItemSync);
0241     d->mDisableAutomaticDeliveryDone = disable;
0242 }
0243 
0244 void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
0245 {
0246     /*
0247      * We received an incremental listing of items:
0248      * * for each changed item:
0249      * ** If locally available => modify
0250      * ** else => create
0251      * * removed items can be removed right away
0252      */
0253     Q_D(ItemSync);
0254     d->mIncremental = true;
0255     if (!d->mStreaming) {
0256         d->mDeliveryDone = true;
0257     }
0258     d->mRemoteItemQueue += changedItems;
0259     d->mRemovedRemoteItemQueue += removedItems;
0260     d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
0261     qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed
0262                              << " Wanted: " << d->mTotalItems;
0263     if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
0264         d->mDeliveryDone = true;
0265     }
0266     d->execute();
0267 }
0268 
0269 void ItemSync::doStart()
0270 {
0271 }
0272 
0273 void ItemSyncPrivate::fetchLocalItemsToDelete()
0274 {
0275     Q_Q(ItemSync);
0276     if (mIncremental) {
0277         qFatal("This must not be called while in incremental mode");
0278         return;
0279     }
0280     auto job = new ItemFetchJob(mSyncCollection, subjobParent());
0281     job->fetchScope().setFetchRemoteIdentification(true);
0282     job->fetchScope().setFetchModificationTime(false);
0283     job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
0284     // we only can fetch parts already in the cache, otherwise this will deadlock
0285     job->fetchScope().setCacheOnly(true);
0286 
0287     QObject::connect(job, &ItemFetchJob::itemsReceived, q, [this](const Akonadi::Item::List &lst) {
0288         slotItemsReceived(lst);
0289     });
0290     QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) {
0291         slotLocalListDone(job);
0292     });
0293     mPendingJobs++;
0294 }
0295 
0296 void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
0297 {
0298     for (const Akonadi::Item &item : items) {
0299         // Don't delete items that have not yet been synchronized
0300         if (item.remoteId().isEmpty()) {
0301             continue;
0302         }
0303         if (!mListedItems.contains(item.remoteId())) {
0304             mItemsToDelete << Item(item.id());
0305         }
0306     }
0307 }
0308 
0309 void ItemSyncPrivate::slotLocalListDone(KJob *job)
0310 {
0311     mPendingJobs--;
0312     if (job->error()) {
0313         qCWarning(AKONADICORE_LOG) << job->errorString();
0314     }
0315     deleteItems(mItemsToDelete);
0316     checkDone();
0317 }
0318 
0319 QString ItemSyncPrivate::jobDebuggingString() const
0320 {
0321     // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
0322     // started, so this requires passing jobDebuggingString to jobEnded().
0323     return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
0324 }
0325 
0326 void ItemSyncPrivate::execute()
0327 {
0328     // shouldn't happen
0329     if (mFinished) {
0330         qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job.";
0331         Q_ASSERT(false);
0332         return;
0333     }
0334     // not doing anything, start processing
0335     if (!mProcessingBatch) {
0336         if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
0337             // we have a new batch to process
0338             const int num = qMin(mBatchSize, mRemoteItemQueue.size());
0339             mCurrentBatchRemoteItems.reserve(mBatchSize);
0340             std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
0341             mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
0342 
0343             mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
0344             mRemovedRemoteItemQueue.clear();
0345         } else {
0346             // nothing to do, let's wait for more data
0347             return;
0348         }
0349         mProcessingBatch = true;
0350         processBatch();
0351         return;
0352     }
0353     checkDone();
0354 }
0355 
0356 // process the current batch of items
0357 void ItemSyncPrivate::processBatch()
0358 {
0359     Q_Q(ItemSync);
0360     if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
0361         return;
0362     }
0363     if (q->error() == Job::UserCanceled) {
0364         checkDone();
0365         return;
0366     }
0367 
0368     // request a transaction, there are items that require processing
0369     requestTransaction();
0370 
0371     processItems();
0372 
0373     // removed
0374     if (!mIncremental && allProcessed()) {
0375         // the full listing is done and we know which items to remove
0376         fetchLocalItemsToDelete();
0377     } else {
0378         deleteItems(mCurrentBatchRemovedRemoteItems);
0379         mCurrentBatchRemovedRemoteItems.clear();
0380     }
0381 
0382     checkDone();
0383 }
0384 
0385 void ItemSyncPrivate::processItems()
0386 {
0387     // added / updated
0388     for (const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) {
0389         if (remoteItem.remoteId().isEmpty()) {
0390             qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier";
0391             continue;
0392         }
0393         if (!mIncremental) {
0394             mListedItems << remoteItem.remoteId();
0395         }
0396         createOrMerge(remoteItem);
0397     }
0398     mCurrentBatchRemoteItems.clear();
0399 }
0400 
0401 void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
0402 {
0403     Q_Q(ItemSync);
0404     // if in error state, better not change anything anymore
0405     if (q->error()) {
0406         return;
0407     }
0408 
0409     if (itemsToDelete.isEmpty()) {
0410         return;
0411     }
0412 
0413     mPendingJobs++;
0414     auto job = new ItemDeleteJob(itemsToDelete, subjobParent());
0415     q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) {
0416         slotLocalDeleteDone(job);
0417     });
0418 
0419     // It can happen that the groupware servers report us deleted items
0420     // twice, in this case this item delete job will fail on the second try.
0421     // To avoid a rollback of the complete transaction we gracefully allow the job
0422     // to fail :)
0423     auto transaction = qobject_cast<TransactionSequence *>(subjobParent());
0424     if (transaction) {
0425         transaction->setIgnoreJobFailure(job);
0426     }
0427 }
0428 
0429 void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
0430 {
0431     if (job->error()) {
0432         qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString();
0433     }
0434     mPendingJobs--;
0435     mProgress++;
0436 
0437     checkDone();
0438 }
0439 
0440 void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
0441 {
0442     if (job->error() && job->error() != Job::KilledJobError) {
0443         qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString();
0444         mRemoteItemQueue.clear(); // don't try to process any more items after a rollback
0445     }
0446     mPendingJobs--;
0447     mProgress++;
0448 
0449     checkDone();
0450 }
0451 
0452 void ItemSyncPrivate::slotTransactionResult(KJob *job)
0453 {
0454     --mTransactionJobs;
0455     if (mCurrentTransaction == job) {
0456         mCurrentTransaction = nullptr;
0457     }
0458 
0459     checkDone();
0460 }
0461 
0462 void ItemSyncPrivate::requestTransaction()
0463 {
0464     Q_Q(ItemSync);
0465     // we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially
0466     if (!mCurrentTransaction) {
0467         ++mTransactionJobs;
0468         mCurrentTransaction = new TransactionSequence(q);
0469         mCurrentTransaction->setAutomaticCommittingEnabled(false);
0470         QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) {
0471             slotTransactionResult(job);
0472         });
0473     }
0474 }
0475 
0476 Job *ItemSyncPrivate::subjobParent() const
0477 {
0478     Q_Q(const ItemSync);
0479     if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
0480         return mCurrentTransaction;
0481     }
0482     return const_cast<ItemSync *>(q);
0483 }
0484 
0485 void ItemSync::setStreamingEnabled(bool enable)
0486 {
0487     Q_D(ItemSync);
0488     d->mStreaming = enable;
0489 }
0490 
0491 void ItemSync::deliveryDone()
0492 {
0493     Q_D(ItemSync);
0494     Q_ASSERT(d->mStreaming);
0495     d->mDeliveryDone = true;
0496     d->execute();
0497 }
0498 
0499 void ItemSync::slotResult(KJob *job)
0500 {
0501     if (job->error()) {
0502         qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString();
0503         // pretend there were no errors
0504         Akonadi::Job::removeSubjob(job);
0505         // propagate the first error we got but continue, we might still be fed with stuff from a resource
0506         if (!error()) {
0507             setError(job->error());
0508             setErrorText(job->errorText());
0509         }
0510     } else {
0511         Akonadi::Job::slotResult(job);
0512     }
0513 }
0514 
0515 void ItemSync::rollback()
0516 {
0517     Q_D(ItemSync);
0518     qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back.";
0519     setError(UserCanceled);
0520     if (d->mCurrentTransaction) {
0521         d->mCurrentTransaction->rollback();
0522     }
0523     d->mDeliveryDone = true; // user won't deliver more data
0524     d->execute(); // end this in an ordered way, since we have an error set no real change will be done
0525 }
0526 
0527 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
0528 {
0529     Q_D(ItemSync);
0530     d->mTransactionMode = mode;
0531 }
0532 
0533 int ItemSync::batchSize() const
0534 {
0535     Q_D(const ItemSync);
0536     return d->mBatchSize;
0537 }
0538 
0539 void ItemSync::setBatchSize(int size)
0540 {
0541     Q_D(ItemSync);
0542     d->mBatchSize = size;
0543 }
0544 
0545 ItemSync::MergeMode ItemSync::mergeMode() const
0546 {
0547     Q_D(const ItemSync);
0548     return d->mMergeMode;
0549 }
0550 
0551 void ItemSync::setMergeMode(MergeMode mergeMode)
0552 {
0553     Q_D(ItemSync);
0554     d->mMergeMode = mergeMode;
0555 }
0556 
0557 #include "moc_itemsync.cpp"