File indexing completed on 2024-12-22 04:57:41

0001 /*  This file is part of the KDE project
0002     SPDX-FileCopyrightText: 2011 Kevin Krammer <kevin.krammer@gmx.at>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "retrieveitemsjob.h"
0008 
0009 #include "mixedmaildir_debug.h"
0010 #include "mixedmaildirstore.h"
0011 
0012 #include "filestore/itemfetchjob.h"
0013 
0014 #include <Akonadi/MessageParts>
0015 #include <Akonadi/MessageStatus>
0016 
0017 #include <Akonadi/Collection>
0018 #include <Akonadi/CollectionModifyJob>
0019 #include <Akonadi/ItemCreateJob>
0020 #include <Akonadi/ItemDeleteJob>
0021 #include <Akonadi/ItemFetchJob>
0022 #include <Akonadi/ItemFetchScope>
0023 #include <Akonadi/ItemModifyJob>
0024 #include <Akonadi/TransactionSequence>
0025 #include <Akonadi/VectorHelper>
0026 
0027 #include "mixedmaildirresource_debug.h"
0028 
0029 #include <QDateTime>
0030 #include <QQueue>
0031 #include <QVariant>
0032 
0033 using namespace Akonadi;
0034 
0035 enum {
0036     MaxItemCreateJobs = 100,
0037     MaxItemModifyJobs = 100,
0038 };
0039 
0040 class RetrieveItemsJobPrivate
0041 {
0042     RetrieveItemsJob *const q;
0043 
0044 public:
0045     RetrieveItemsJobPrivate(RetrieveItemsJob *parent, const Collection &collection, MixedMaildirStore *store)
0046         : q(parent)
0047         , mCollection(collection)
0048         , mStore(store)
0049     {
0050     }
0051 
0052     TransactionSequence *transaction()
0053     {
0054         if (!mTransaction) {
0055             mTransaction = new TransactionSequence(q);
0056             mTransaction->setAutomaticCommittingEnabled(false);
0057             QObject::connect(mTransaction, &TransactionSequence::result, q, [this](KJob *job) {
0058                 transactionResult(job);
0059             });
0060         }
0061         return mTransaction;
0062     }
0063 
0064 public:
0065     const Collection mCollection;
0066     MixedMaildirStore *const mStore;
0067     TransactionSequence *mTransaction = nullptr;
0068 
0069     QHash<QString, Item> mServerItemsByRemoteId;
0070 
0071     QQueue<Item> mNewItems;
0072     QQueue<Item> mChangedItems;
0073     Item::List mAvailableItems;
0074     Item::List mItemsMarkedAsDeleted;
0075     qint64 mHighestModTime = -1;
0076     int mNumItemCreateJobs = 0;
0077     int mNumItemModifyJobs = 0;
0078 
0079 public: // slots
0080     void akonadiFetchResult(KJob *job);
0081     void transactionResult(KJob *job);
0082     void storeListResult(KJob *);
0083     void processNewItem();
0084     void fetchNewResult(KJob *);
0085     void processChangedItem();
0086     void fetchChangedResult(KJob *);
0087     void itemCreateJobResult(KJob *);
0088     void itemModifyJobResult(KJob *);
0089 };
0090 
0091 void RetrieveItemsJobPrivate::itemCreateJobResult(KJob *job)
0092 {
0093     if (job->error()) {
0094         qCCritical(MIXEDMAILDIR_LOG) << "Error running ItemCreateJob: " << job->errorText();
0095     }
0096 
0097     mNumItemCreateJobs--;
0098     QMetaObject::invokeMethod(q, "processNewItem", Qt::QueuedConnection);
0099 }
0100 
0101 void RetrieveItemsJobPrivate::itemModifyJobResult(KJob *job)
0102 {
0103     if (job->error()) {
0104         qCCritical(MIXEDMAILDIR_LOG) << "Error running ItemModifyJob: " << job->errorText();
0105     }
0106 
0107     mNumItemModifyJobs--;
0108     QMetaObject::invokeMethod(q, "processChangedItem", Qt::QueuedConnection);
0109 }
0110 
0111 void RetrieveItemsJobPrivate::akonadiFetchResult(KJob *job)
0112 {
0113     if (job->error() != 0) {
0114         return; // handled by base class
0115     }
0116 
0117     auto itemFetch = qobject_cast<ItemFetchJob *>(job);
0118     Q_ASSERT(itemFetch != nullptr);
0119 
0120     Item::List items = itemFetch->items();
0121     itemFetch->clearItems(); // save memory
0122     qCDebug(MIXEDMAILDIR_LOG) << "Akonadi fetch got" << items.count() << "items";
0123 
0124     mServerItemsByRemoteId.reserve(items.size());
0125     for (int i = 0; i < items.count(); ++i) {
0126         Item &item = items[i];
0127         // items without remoteId have not been written to the resource yet
0128         if (!item.remoteId().isEmpty()) {
0129             // set the parent collection (with all ancestors) in every item
0130             item.setParentCollection(mCollection);
0131             mServerItemsByRemoteId.insert(item.remoteId(), item);
0132         }
0133     }
0134 
0135     qCDebug(MIXEDMAILDIR_LOG) << "of which" << mServerItemsByRemoteId.count() << "have remoteId";
0136 
0137     FileStore::ItemFetchJob *storeFetch = mStore->fetchItems(mCollection);
0138     // just basic items, no data
0139 
0140     QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) {
0141         storeListResult(job);
0142     });
0143 }
0144 
0145 void RetrieveItemsJobPrivate::storeListResult(KJob *job)
0146 {
0147     qCDebug(MIXEDMAILDIRRESOURCE_LOG) << "storeList->error=" << job->error();
0148     auto storeList = qobject_cast<FileStore::ItemFetchJob *>(job);
0149     Q_ASSERT(storeList != nullptr);
0150 
0151     if (storeList->error() != 0) {
0152         q->setError(storeList->error());
0153         q->setErrorText(storeList->errorText());
0154         q->emitResult();
0155         return;
0156     }
0157 
0158     // if some items have tags, we need to complete the retrieval and schedule tagging
0159     // to a later time so we can then fetch the items to get their Akonadi URLs
0160     // forward the property to this instance so the resource can take care of that
0161     const QVariant var = storeList->property("remoteIdToTagList");
0162     if (var.isValid()) {
0163         q->setProperty("remoteIdToTagList", var);
0164     }
0165 
0166     const qint64 collectionTimestamp = mCollection.remoteRevision().toLongLong();
0167 
0168     const Item::List storedItems = storeList->items();
0169     for (const Item &item : storedItems) {
0170         // messages marked as deleted have been deleted from mbox files but never got purged
0171         Akonadi::MessageStatus status;
0172         status.setStatusFromFlags(item.flags());
0173         if (status.isDeleted()) {
0174             mItemsMarkedAsDeleted << item;
0175             continue;
0176         }
0177 
0178         mAvailableItems << item;
0179 
0180         const QHash<QString, Item>::iterator it = mServerItemsByRemoteId.find(item.remoteId());
0181         if (it == mServerItemsByRemoteId.end()) {
0182             // item not in server items -> new
0183             mNewItems << item;
0184         } else {
0185             // item both on server and in store, check modification time
0186             const QDateTime modTime = item.modificationTime();
0187             if (!modTime.isValid() || modTime.toMSecsSinceEpoch() > collectionTimestamp) {
0188                 mChangedItems << it.value();
0189             }
0190 
0191             // remove from hash so only no longer existing items remain
0192             mServerItemsByRemoteId.erase(it);
0193         }
0194     }
0195 
0196     qCDebug(MIXEDMAILDIR_LOG) << "Store fetch got" << storedItems.count() << "items"
0197                               << "of which" << mNewItems.count() << "are new and" << mChangedItems.count() << "are changed and"
0198                               << mServerItemsByRemoteId.count() << "need to be removed";
0199 
0200     // all items remaining in mServerItemsByRemoteId are no longer in the store
0201 
0202     if (!mServerItemsByRemoteId.isEmpty()) {
0203         auto deleteJob = new ItemDeleteJob(Akonadi::valuesToVector(mServerItemsByRemoteId), transaction());
0204         transaction()->setIgnoreJobFailure(deleteJob);
0205     }
0206 
0207     processNewItem();
0208 }
0209 
0210 void RetrieveItemsJobPrivate::processNewItem()
0211 {
0212     if (mNewItems.isEmpty()) {
0213         processChangedItem();
0214         return;
0215     }
0216 
0217     const Item item = mNewItems.dequeue();
0218     FileStore::ItemFetchJob *storeFetch = mStore->fetchItem(item);
0219     storeFetch->fetchScope().fetchPayloadPart(MessagePart::Envelope);
0220 
0221     QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) {
0222         fetchNewResult(job);
0223     });
0224 }
0225 
0226 void RetrieveItemsJobPrivate::fetchNewResult(KJob *job)
0227 {
0228     auto fetchJob = qobject_cast<FileStore::ItemFetchJob *>(job);
0229     Q_ASSERT(fetchJob != nullptr);
0230 
0231     if (fetchJob->items().count() != 1) {
0232         const Item item = fetchJob->item();
0233         qCWarning(MIXEDMAILDIRRESOURCE_LOG) << "Store fetch for new item" << item.remoteId() << "in collection" << item.parentCollection().id() << ","
0234                                             << item.parentCollection().remoteId() << "did not return the expected item. error=" << fetchJob->error() << ","
0235                                             << fetchJob->errorText();
0236         processNewItem();
0237         return;
0238     }
0239 
0240     const Item item = fetchJob->items().at(0);
0241     const QDateTime modTime = item.modificationTime();
0242     if (modTime.isValid()) {
0243         mHighestModTime = qMax(modTime.toMSecsSinceEpoch(), mHighestModTime);
0244     }
0245 
0246     auto itemCreate = new ItemCreateJob(item, mCollection, transaction());
0247     mNumItemCreateJobs++;
0248     QObject::connect(itemCreate, &ItemCreateJob::result, q, [this](KJob *job) {
0249         itemCreateJobResult(job);
0250     });
0251 
0252     if (mNumItemCreateJobs < MaxItemCreateJobs) {
0253         QMetaObject::invokeMethod(q, "processNewItem", Qt::QueuedConnection);
0254     }
0255 }
0256 
0257 void RetrieveItemsJobPrivate::processChangedItem()
0258 {
0259     if (mChangedItems.isEmpty()) {
0260         if (!mTransaction) {
0261             // no jobs created here -> done
0262             q->emitResult();
0263             return;
0264         }
0265 
0266         if (mHighestModTime > -1) {
0267             Collection collection(mCollection);
0268             collection.setRemoteRevision(QString::number(mHighestModTime));
0269             auto job = new CollectionModifyJob(collection, transaction());
0270             transaction()->setIgnoreJobFailure(job);
0271         }
0272         transaction()->commit();
0273         return;
0274     }
0275 
0276     const Item item = mChangedItems.dequeue();
0277     FileStore::ItemFetchJob *storeFetch = mStore->fetchItem(item);
0278     storeFetch->fetchScope().fetchPayloadPart(MessagePart::Envelope);
0279 
0280     QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) {
0281         fetchChangedResult(job);
0282     });
0283 }
0284 
0285 void RetrieveItemsJobPrivate::fetchChangedResult(KJob *job)
0286 {
0287     auto fetchJob = qobject_cast<FileStore::ItemFetchJob *>(job);
0288     Q_ASSERT(fetchJob != nullptr);
0289 
0290     if (fetchJob->items().count() != 1) {
0291         const Item item = fetchJob->item();
0292         qCWarning(MIXEDMAILDIRRESOURCE_LOG) << "Store fetch for changed item" << item.remoteId() << "in collection" << item.parentCollection().id() << ","
0293                                             << item.parentCollection().remoteId() << "did not return the expected item. error=" << fetchJob->error() << ","
0294                                             << fetchJob->errorText();
0295         processChangedItem();
0296         return;
0297     }
0298 
0299     const Item item = fetchJob->items().at(0);
0300     const QDateTime modTime = item.modificationTime();
0301     if (modTime.isValid()) {
0302         mHighestModTime = qMax(modTime.toMSecsSinceEpoch(), mHighestModTime);
0303     }
0304 
0305     auto itemModify = new ItemModifyJob(item, transaction());
0306     QObject::connect(itemModify, &ItemModifyJob::result, q, [this](KJob *job) {
0307         itemModifyJobResult(job);
0308     });
0309     mNumItemModifyJobs++;
0310     if (mNumItemModifyJobs < MaxItemModifyJobs) {
0311         QMetaObject::invokeMethod(q, "processChangedItem", Qt::QueuedConnection);
0312     }
0313 }
0314 
0315 void RetrieveItemsJobPrivate::transactionResult(KJob *job)
0316 {
0317     if (job->error() != 0) {
0318         return; // handled by base class
0319     }
0320 
0321     q->emitResult();
0322 }
0323 
0324 RetrieveItemsJob::RetrieveItemsJob(const Akonadi::Collection &collection, MixedMaildirStore *store, QObject *parent)
0325     : Job(parent)
0326     , d(new RetrieveItemsJobPrivate(this, collection, store))
0327 {
0328     Q_ASSERT(d->mCollection.isValid());
0329     Q_ASSERT(!d->mCollection.remoteId().isEmpty());
0330     Q_ASSERT(d->mStore != nullptr);
0331 }
0332 
0333 RetrieveItemsJob::~RetrieveItemsJob() = default;
0334 
0335 Collection RetrieveItemsJob::collection() const
0336 {
0337     return d->mCollection;
0338 }
0339 
0340 Item::List RetrieveItemsJob::availableItems() const
0341 {
0342     return d->mAvailableItems;
0343 }
0344 
0345 Item::List RetrieveItemsJob::itemsMarkedAsDeleted() const
0346 {
0347     return d->mItemsMarkedAsDeleted;
0348 }
0349 
0350 void RetrieveItemsJob::doStart()
0351 {
0352     auto job = new Akonadi::ItemFetchJob(d->mCollection, this);
0353     connect(job, &ItemFetchJob::result, this, [this](KJob *job) {
0354         d->akonadiFetchResult(job);
0355     });
0356 }
0357 
0358 #include "moc_retrieveitemsjob.cpp"