File indexing completed on 2024-12-22 04:57:41
0001 /* This file is part of the KDE project 0002 SPDX-FileCopyrightText: 2011 Kevin Krammer <kevin.krammer@gmx.at> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "retrieveitemsjob.h" 0008 0009 #include "mixedmaildir_debug.h" 0010 #include "mixedmaildirstore.h" 0011 0012 #include "filestore/itemfetchjob.h" 0013 0014 #include <Akonadi/MessageParts> 0015 #include <Akonadi/MessageStatus> 0016 0017 #include <Akonadi/Collection> 0018 #include <Akonadi/CollectionModifyJob> 0019 #include <Akonadi/ItemCreateJob> 0020 #include <Akonadi/ItemDeleteJob> 0021 #include <Akonadi/ItemFetchJob> 0022 #include <Akonadi/ItemFetchScope> 0023 #include <Akonadi/ItemModifyJob> 0024 #include <Akonadi/TransactionSequence> 0025 #include <Akonadi/VectorHelper> 0026 0027 #include "mixedmaildirresource_debug.h" 0028 0029 #include <QDateTime> 0030 #include <QQueue> 0031 #include <QVariant> 0032 0033 using namespace Akonadi; 0034 0035 enum { 0036 MaxItemCreateJobs = 100, 0037 MaxItemModifyJobs = 100, 0038 }; 0039 0040 class RetrieveItemsJobPrivate 0041 { 0042 RetrieveItemsJob *const q; 0043 0044 public: 0045 RetrieveItemsJobPrivate(RetrieveItemsJob *parent, const Collection &collection, MixedMaildirStore *store) 0046 : q(parent) 0047 , mCollection(collection) 0048 , mStore(store) 0049 { 0050 } 0051 0052 TransactionSequence *transaction() 0053 { 0054 if (!mTransaction) { 0055 mTransaction = new TransactionSequence(q); 0056 mTransaction->setAutomaticCommittingEnabled(false); 0057 QObject::connect(mTransaction, &TransactionSequence::result, q, [this](KJob *job) { 0058 transactionResult(job); 0059 }); 0060 } 0061 return mTransaction; 0062 } 0063 0064 public: 0065 const Collection mCollection; 0066 MixedMaildirStore *const mStore; 0067 TransactionSequence *mTransaction = nullptr; 0068 0069 QHash<QString, Item> mServerItemsByRemoteId; 0070 0071 QQueue<Item> mNewItems; 0072 QQueue<Item> mChangedItems; 0073 Item::List mAvailableItems; 0074 Item::List mItemsMarkedAsDeleted; 0075 qint64 mHighestModTime = -1; 0076 int mNumItemCreateJobs = 0; 0077 int mNumItemModifyJobs = 0; 0078 0079 public: // slots 0080 void akonadiFetchResult(KJob *job); 0081 void transactionResult(KJob *job); 0082 void storeListResult(KJob *); 0083 void processNewItem(); 0084 void fetchNewResult(KJob *); 0085 void processChangedItem(); 0086 void fetchChangedResult(KJob *); 0087 void itemCreateJobResult(KJob *); 0088 void itemModifyJobResult(KJob *); 0089 }; 0090 0091 void RetrieveItemsJobPrivate::itemCreateJobResult(KJob *job) 0092 { 0093 if (job->error()) { 0094 qCCritical(MIXEDMAILDIR_LOG) << "Error running ItemCreateJob: " << job->errorText(); 0095 } 0096 0097 mNumItemCreateJobs--; 0098 QMetaObject::invokeMethod(q, "processNewItem", Qt::QueuedConnection); 0099 } 0100 0101 void RetrieveItemsJobPrivate::itemModifyJobResult(KJob *job) 0102 { 0103 if (job->error()) { 0104 qCCritical(MIXEDMAILDIR_LOG) << "Error running ItemModifyJob: " << job->errorText(); 0105 } 0106 0107 mNumItemModifyJobs--; 0108 QMetaObject::invokeMethod(q, "processChangedItem", Qt::QueuedConnection); 0109 } 0110 0111 void RetrieveItemsJobPrivate::akonadiFetchResult(KJob *job) 0112 { 0113 if (job->error() != 0) { 0114 return; // handled by base class 0115 } 0116 0117 auto itemFetch = qobject_cast<ItemFetchJob *>(job); 0118 Q_ASSERT(itemFetch != nullptr); 0119 0120 Item::List items = itemFetch->items(); 0121 itemFetch->clearItems(); // save memory 0122 qCDebug(MIXEDMAILDIR_LOG) << "Akonadi fetch got" << items.count() << "items"; 0123 0124 mServerItemsByRemoteId.reserve(items.size()); 0125 for (int i = 0; i < items.count(); ++i) { 0126 Item &item = items[i]; 0127 // items without remoteId have not been written to the resource yet 0128 if (!item.remoteId().isEmpty()) { 0129 // set the parent collection (with all ancestors) in every item 0130 item.setParentCollection(mCollection); 0131 mServerItemsByRemoteId.insert(item.remoteId(), item); 0132 } 0133 } 0134 0135 qCDebug(MIXEDMAILDIR_LOG) << "of which" << mServerItemsByRemoteId.count() << "have remoteId"; 0136 0137 FileStore::ItemFetchJob *storeFetch = mStore->fetchItems(mCollection); 0138 // just basic items, no data 0139 0140 QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) { 0141 storeListResult(job); 0142 }); 0143 } 0144 0145 void RetrieveItemsJobPrivate::storeListResult(KJob *job) 0146 { 0147 qCDebug(MIXEDMAILDIRRESOURCE_LOG) << "storeList->error=" << job->error(); 0148 auto storeList = qobject_cast<FileStore::ItemFetchJob *>(job); 0149 Q_ASSERT(storeList != nullptr); 0150 0151 if (storeList->error() != 0) { 0152 q->setError(storeList->error()); 0153 q->setErrorText(storeList->errorText()); 0154 q->emitResult(); 0155 return; 0156 } 0157 0158 // if some items have tags, we need to complete the retrieval and schedule tagging 0159 // to a later time so we can then fetch the items to get their Akonadi URLs 0160 // forward the property to this instance so the resource can take care of that 0161 const QVariant var = storeList->property("remoteIdToTagList"); 0162 if (var.isValid()) { 0163 q->setProperty("remoteIdToTagList", var); 0164 } 0165 0166 const qint64 collectionTimestamp = mCollection.remoteRevision().toLongLong(); 0167 0168 const Item::List storedItems = storeList->items(); 0169 for (const Item &item : storedItems) { 0170 // messages marked as deleted have been deleted from mbox files but never got purged 0171 Akonadi::MessageStatus status; 0172 status.setStatusFromFlags(item.flags()); 0173 if (status.isDeleted()) { 0174 mItemsMarkedAsDeleted << item; 0175 continue; 0176 } 0177 0178 mAvailableItems << item; 0179 0180 const QHash<QString, Item>::iterator it = mServerItemsByRemoteId.find(item.remoteId()); 0181 if (it == mServerItemsByRemoteId.end()) { 0182 // item not in server items -> new 0183 mNewItems << item; 0184 } else { 0185 // item both on server and in store, check modification time 0186 const QDateTime modTime = item.modificationTime(); 0187 if (!modTime.isValid() || modTime.toMSecsSinceEpoch() > collectionTimestamp) { 0188 mChangedItems << it.value(); 0189 } 0190 0191 // remove from hash so only no longer existing items remain 0192 mServerItemsByRemoteId.erase(it); 0193 } 0194 } 0195 0196 qCDebug(MIXEDMAILDIR_LOG) << "Store fetch got" << storedItems.count() << "items" 0197 << "of which" << mNewItems.count() << "are new and" << mChangedItems.count() << "are changed and" 0198 << mServerItemsByRemoteId.count() << "need to be removed"; 0199 0200 // all items remaining in mServerItemsByRemoteId are no longer in the store 0201 0202 if (!mServerItemsByRemoteId.isEmpty()) { 0203 auto deleteJob = new ItemDeleteJob(Akonadi::valuesToVector(mServerItemsByRemoteId), transaction()); 0204 transaction()->setIgnoreJobFailure(deleteJob); 0205 } 0206 0207 processNewItem(); 0208 } 0209 0210 void RetrieveItemsJobPrivate::processNewItem() 0211 { 0212 if (mNewItems.isEmpty()) { 0213 processChangedItem(); 0214 return; 0215 } 0216 0217 const Item item = mNewItems.dequeue(); 0218 FileStore::ItemFetchJob *storeFetch = mStore->fetchItem(item); 0219 storeFetch->fetchScope().fetchPayloadPart(MessagePart::Envelope); 0220 0221 QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) { 0222 fetchNewResult(job); 0223 }); 0224 } 0225 0226 void RetrieveItemsJobPrivate::fetchNewResult(KJob *job) 0227 { 0228 auto fetchJob = qobject_cast<FileStore::ItemFetchJob *>(job); 0229 Q_ASSERT(fetchJob != nullptr); 0230 0231 if (fetchJob->items().count() != 1) { 0232 const Item item = fetchJob->item(); 0233 qCWarning(MIXEDMAILDIRRESOURCE_LOG) << "Store fetch for new item" << item.remoteId() << "in collection" << item.parentCollection().id() << "," 0234 << item.parentCollection().remoteId() << "did not return the expected item. error=" << fetchJob->error() << "," 0235 << fetchJob->errorText(); 0236 processNewItem(); 0237 return; 0238 } 0239 0240 const Item item = fetchJob->items().at(0); 0241 const QDateTime modTime = item.modificationTime(); 0242 if (modTime.isValid()) { 0243 mHighestModTime = qMax(modTime.toMSecsSinceEpoch(), mHighestModTime); 0244 } 0245 0246 auto itemCreate = new ItemCreateJob(item, mCollection, transaction()); 0247 mNumItemCreateJobs++; 0248 QObject::connect(itemCreate, &ItemCreateJob::result, q, [this](KJob *job) { 0249 itemCreateJobResult(job); 0250 }); 0251 0252 if (mNumItemCreateJobs < MaxItemCreateJobs) { 0253 QMetaObject::invokeMethod(q, "processNewItem", Qt::QueuedConnection); 0254 } 0255 } 0256 0257 void RetrieveItemsJobPrivate::processChangedItem() 0258 { 0259 if (mChangedItems.isEmpty()) { 0260 if (!mTransaction) { 0261 // no jobs created here -> done 0262 q->emitResult(); 0263 return; 0264 } 0265 0266 if (mHighestModTime > -1) { 0267 Collection collection(mCollection); 0268 collection.setRemoteRevision(QString::number(mHighestModTime)); 0269 auto job = new CollectionModifyJob(collection, transaction()); 0270 transaction()->setIgnoreJobFailure(job); 0271 } 0272 transaction()->commit(); 0273 return; 0274 } 0275 0276 const Item item = mChangedItems.dequeue(); 0277 FileStore::ItemFetchJob *storeFetch = mStore->fetchItem(item); 0278 storeFetch->fetchScope().fetchPayloadPart(MessagePart::Envelope); 0279 0280 QObject::connect(storeFetch, &FileStore::ItemFetchJob::result, q, [this](KJob *job) { 0281 fetchChangedResult(job); 0282 }); 0283 } 0284 0285 void RetrieveItemsJobPrivate::fetchChangedResult(KJob *job) 0286 { 0287 auto fetchJob = qobject_cast<FileStore::ItemFetchJob *>(job); 0288 Q_ASSERT(fetchJob != nullptr); 0289 0290 if (fetchJob->items().count() != 1) { 0291 const Item item = fetchJob->item(); 0292 qCWarning(MIXEDMAILDIRRESOURCE_LOG) << "Store fetch for changed item" << item.remoteId() << "in collection" << item.parentCollection().id() << "," 0293 << item.parentCollection().remoteId() << "did not return the expected item. error=" << fetchJob->error() << "," 0294 << fetchJob->errorText(); 0295 processChangedItem(); 0296 return; 0297 } 0298 0299 const Item item = fetchJob->items().at(0); 0300 const QDateTime modTime = item.modificationTime(); 0301 if (modTime.isValid()) { 0302 mHighestModTime = qMax(modTime.toMSecsSinceEpoch(), mHighestModTime); 0303 } 0304 0305 auto itemModify = new ItemModifyJob(item, transaction()); 0306 QObject::connect(itemModify, &ItemModifyJob::result, q, [this](KJob *job) { 0307 itemModifyJobResult(job); 0308 }); 0309 mNumItemModifyJobs++; 0310 if (mNumItemModifyJobs < MaxItemModifyJobs) { 0311 QMetaObject::invokeMethod(q, "processChangedItem", Qt::QueuedConnection); 0312 } 0313 } 0314 0315 void RetrieveItemsJobPrivate::transactionResult(KJob *job) 0316 { 0317 if (job->error() != 0) { 0318 return; // handled by base class 0319 } 0320 0321 q->emitResult(); 0322 } 0323 0324 RetrieveItemsJob::RetrieveItemsJob(const Akonadi::Collection &collection, MixedMaildirStore *store, QObject *parent) 0325 : Job(parent) 0326 , d(new RetrieveItemsJobPrivate(this, collection, store)) 0327 { 0328 Q_ASSERT(d->mCollection.isValid()); 0329 Q_ASSERT(!d->mCollection.remoteId().isEmpty()); 0330 Q_ASSERT(d->mStore != nullptr); 0331 } 0332 0333 RetrieveItemsJob::~RetrieveItemsJob() = default; 0334 0335 Collection RetrieveItemsJob::collection() const 0336 { 0337 return d->mCollection; 0338 } 0339 0340 Item::List RetrieveItemsJob::availableItems() const 0341 { 0342 return d->mAvailableItems; 0343 } 0344 0345 Item::List RetrieveItemsJob::itemsMarkedAsDeleted() const 0346 { 0347 return d->mItemsMarkedAsDeleted; 0348 } 0349 0350 void RetrieveItemsJob::doStart() 0351 { 0352 auto job = new Akonadi::ItemFetchJob(d->mCollection, this); 0353 connect(job, &ItemFetchJob::result, this, [this](KJob *job) { 0354 d->akonadiFetchResult(job); 0355 }); 0356 } 0357 0358 #include "moc_retrieveitemsjob.cpp"