File indexing completed on 2024-06-02 05:21:32

0001 /*
0002     SPDX-FileCopyrightText: 2011 Volker Krause <vkrause@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "retrieveitemsjob.h"
0008 #include "maildirresource_debug.h"
0009 #include <Akonadi/CollectionModifyJob>
0010 #include <Akonadi/ItemCreateJob>
0011 #include <Akonadi/ItemDeleteJob>
0012 #include <Akonadi/ItemFetchJob>
0013 #include <Akonadi/ItemModifyJob>
0014 #include <Akonadi/MessageFlags>
0015 #include <Akonadi/TransactionSequence>
0016 #include <Akonadi/VectorHelper>
0017 #include <KMime/Message>
0018 #include <QDirIterator>
0019 
0020 RetrieveItemsJob::RetrieveItemsJob(const Akonadi::Collection &collection, const KPIM::Maildir &md, QObject *parent)
0021     : Job(parent)
0022     , m_collection(collection)
0023     , m_maildir(md)
0024     , m_mimeType(KMime::Message::mimeType())
0025 {
0026     Q_ASSERT(m_collection.isValid());
0027     Q_ASSERT(m_maildir.isValid());
0028 }
0029 
0030 void RetrieveItemsJob::setMimeType(const QString &mimeType)
0031 {
0032     m_mimeType = mimeType;
0033 }
0034 
0035 void RetrieveItemsJob::doStart()
0036 {
0037     Q_ASSERT(!m_mimeType.isEmpty());
0038     auto job = new Akonadi::ItemFetchJob(m_collection, this);
0039     connect(job, &Akonadi::ItemFetchJob::result, this, &RetrieveItemsJob::localListDone);
0040 }
0041 
0042 void RetrieveItemsJob::localListDone(KJob *job)
0043 {
0044     if (job->error()) {
0045         qCDebug(MAILDIRRESOURCE_LOG) << "Error during RetrieveItemsJob::localListDone " << job->errorString();
0046         return; // handled by base class
0047     }
0048 
0049     const Akonadi::Item::List items = qobject_cast<Akonadi::ItemFetchJob *>(job)->items();
0050     m_localItems.reserve(items.size());
0051     for (const Akonadi::Item &item : items) {
0052         if (!item.remoteId().isEmpty()) {
0053             m_localItems.insert(item.remoteId(), item);
0054         }
0055     }
0056 
0057     m_listingPath = m_maildir.path() + QLatin1StringView("/new/");
0058     delete m_entryIterator;
0059     m_entryIterator = new QDirIterator(m_maildir.pathToNew(), QDir::Files);
0060     m_previousMtime = m_collection.remoteRevision().toLongLong();
0061     m_highestMtime = 0;
0062     QMetaObject::invokeMethod(this, &RetrieveItemsJob::processEntry, Qt::QueuedConnection);
0063 }
0064 
0065 void RetrieveItemsJob::processEntry()
0066 {
0067     Akonadi::TransactionSequence *lastTrx = nullptr;
0068 
0069     while (m_entryIterator->hasNext() || m_listingPath.endsWith(QLatin1StringView("/new/"))) {
0070         if (!m_entryIterator->hasNext()) {
0071             m_listingPath = m_maildir.path() + QLatin1StringView("/cur/");
0072             delete m_entryIterator;
0073             m_entryIterator = new QDirIterator(m_maildir.pathToCurrent(), QDir::Files);
0074             if (!m_entryIterator->hasNext()) {
0075                 break;
0076             }
0077         }
0078         m_entryIterator->next();
0079 
0080         const QFileInfo entryInfo = m_entryIterator->fileInfo();
0081         const QString fileName = entryInfo.fileName();
0082         const qint64 currentMtime = entryInfo.lastModified().toMSecsSinceEpoch();
0083         m_highestMtime = qMax(m_highestMtime, currentMtime);
0084         if (currentMtime <= m_previousMtime) {
0085             auto localItemIter = m_localItems.find(fileName);
0086             if (localItemIter != m_localItems.end()) { // old, we got this one already
0087                 m_localItems.erase(localItemIter);
0088                 continue;
0089             }
0090         }
0091 
0092         Akonadi::Item item;
0093         item.setRemoteId(fileName);
0094         item.setMimeType(m_mimeType);
0095         const qint64 entrySize = entryInfo.size();
0096         if (entrySize >= 0) {
0097             item.setSize(entrySize);
0098         }
0099 
0100         auto msg = new KMime::Message;
0101         msg->setHead(KMime::CRLFtoLF(m_maildir.readEntryHeadersFromFile(m_listingPath + fileName)));
0102         msg->parse();
0103 
0104         const Akonadi::Item::Flags flags = m_maildir.readEntryFlags(fileName);
0105         for (const Akonadi::Item::Flag &flag : flags) {
0106             item.setFlag(flag);
0107         }
0108 
0109         item.setPayload(KMime::Message::Ptr(msg));
0110         Akonadi::MessageFlags::copyMessageFlags(*msg, item);
0111         auto localItemIter = m_localItems.find(fileName);
0112         Akonadi::TransactionSequence *trx = transaction();
0113         if (localItemIter == m_localItems.end()) { // new item
0114             new Akonadi::ItemCreateJob(item, m_collection, trx);
0115         } else { // modification
0116             item.setId((*localItemIter).id());
0117             new Akonadi::ItemModifyJob(item, trx);
0118             m_localItems.erase(localItemIter);
0119         }
0120         if (trx != lastTrx) {
0121             lastTrx = trx;
0122             QMetaObject::invokeMethod(this, &RetrieveItemsJob::processEntry, Qt::QueuedConnection);
0123             return;
0124         }
0125     }
0126 
0127     entriesProcessed();
0128     // connect(job, &Akonadi::ItemCreateJob::result, this, &RetrieveItemsJob::processEntryDone);
0129 }
0130 
0131 void RetrieveItemsJob::processEntryDone(KJob *)
0132 {
0133     processEntry();
0134 }
0135 
0136 void RetrieveItemsJob::entriesProcessed()
0137 {
0138     delete m_entryIterator;
0139     m_entryIterator = nullptr;
0140     if (!m_localItems.isEmpty()) {
0141         auto job = new Akonadi::ItemDeleteJob(Akonadi::valuesToVector(m_localItems), transaction());
0142         m_maildir.removeCachedKeys(m_localItems.keys());
0143         // We ensure m_transaction is valid by calling transaction() above,
0144         // however calling it again here could cause it to give us another transaction
0145         // object (see transaction() comment for details)
0146         m_transaction->setIgnoreJobFailure(job);
0147     }
0148 
0149     // update mtime
0150     if (m_highestMtime != m_previousMtime) {
0151         Akonadi::Collection newCol(m_collection);
0152         newCol.setRemoteRevision(QString::number(m_highestMtime));
0153         auto job = new Akonadi::CollectionModifyJob(newCol, transaction());
0154         m_transaction->setIgnoreJobFailure(job);
0155     }
0156 
0157     if (!m_transaction) { // no jobs created here -> done
0158         emitResult();
0159     } else {
0160         connect(m_transaction, &Akonadi::TransactionSequence::result, this, &RetrieveItemsJob::transactionDone);
0161         m_transaction->commit();
0162     }
0163 }
0164 
0165 Akonadi::TransactionSequence *RetrieveItemsJob::transaction()
0166 {
0167     // Commit transaction every 100 items, otherwise we are forcing server to
0168     // hold the database transaction opened for potentially massive amount of
0169     // operations, which slowly overloads the database journal causing simple
0170     // INSERT to take several seconds
0171     if (++m_transactionSize >= 100) {
0172         qCDebug(MAILDIRRESOURCE_LOG) << "Commit!";
0173         m_transaction->commit();
0174         m_transaction = nullptr;
0175         m_transactionSize = 0;
0176     }
0177 
0178     if (!m_transaction) {
0179         m_transaction = new Akonadi::TransactionSequence(this);
0180         m_transaction->setAutomaticCommittingEnabled(false);
0181     }
0182     return m_transaction;
0183 }
0184 
0185 void RetrieveItemsJob::transactionDone(KJob *job)
0186 {
0187     if (job->error()) {
0188         qCDebug(MAILDIRRESOURCE_LOG) << "Error during transaction " << job->errorString();
0189         return; // handled by base class
0190     }
0191     emitResult();
0192 }
0193 
0194 #include "moc_retrieveitemsjob.cpp"