File indexing completed on 2024-06-02 05:21:32
0001 /* 0002 SPDX-FileCopyrightText: 2011 Volker Krause <vkrause@kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "retrieveitemsjob.h" 0008 #include "maildirresource_debug.h" 0009 #include <Akonadi/CollectionModifyJob> 0010 #include <Akonadi/ItemCreateJob> 0011 #include <Akonadi/ItemDeleteJob> 0012 #include <Akonadi/ItemFetchJob> 0013 #include <Akonadi/ItemModifyJob> 0014 #include <Akonadi/MessageFlags> 0015 #include <Akonadi/TransactionSequence> 0016 #include <Akonadi/VectorHelper> 0017 #include <KMime/Message> 0018 #include <QDirIterator> 0019 0020 RetrieveItemsJob::RetrieveItemsJob(const Akonadi::Collection &collection, const KPIM::Maildir &md, QObject *parent) 0021 : Job(parent) 0022 , m_collection(collection) 0023 , m_maildir(md) 0024 , m_mimeType(KMime::Message::mimeType()) 0025 { 0026 Q_ASSERT(m_collection.isValid()); 0027 Q_ASSERT(m_maildir.isValid()); 0028 } 0029 0030 void RetrieveItemsJob::setMimeType(const QString &mimeType) 0031 { 0032 m_mimeType = mimeType; 0033 } 0034 0035 void RetrieveItemsJob::doStart() 0036 { 0037 Q_ASSERT(!m_mimeType.isEmpty()); 0038 auto job = new Akonadi::ItemFetchJob(m_collection, this); 0039 connect(job, &Akonadi::ItemFetchJob::result, this, &RetrieveItemsJob::localListDone); 0040 } 0041 0042 void RetrieveItemsJob::localListDone(KJob *job) 0043 { 0044 if (job->error()) { 0045 qCDebug(MAILDIRRESOURCE_LOG) << "Error during RetrieveItemsJob::localListDone " << job->errorString(); 0046 return; // handled by base class 0047 } 0048 0049 const Akonadi::Item::List items = qobject_cast<Akonadi::ItemFetchJob *>(job)->items(); 0050 m_localItems.reserve(items.size()); 0051 for (const Akonadi::Item &item : items) { 0052 if (!item.remoteId().isEmpty()) { 0053 m_localItems.insert(item.remoteId(), item); 0054 } 0055 } 0056 0057 m_listingPath = m_maildir.path() + QLatin1StringView("/new/"); 0058 delete m_entryIterator; 0059 m_entryIterator = new QDirIterator(m_maildir.pathToNew(), QDir::Files); 0060 m_previousMtime = m_collection.remoteRevision().toLongLong(); 0061 m_highestMtime = 0; 0062 QMetaObject::invokeMethod(this, &RetrieveItemsJob::processEntry, Qt::QueuedConnection); 0063 } 0064 0065 void RetrieveItemsJob::processEntry() 0066 { 0067 Akonadi::TransactionSequence *lastTrx = nullptr; 0068 0069 while (m_entryIterator->hasNext() || m_listingPath.endsWith(QLatin1StringView("/new/"))) { 0070 if (!m_entryIterator->hasNext()) { 0071 m_listingPath = m_maildir.path() + QLatin1StringView("/cur/"); 0072 delete m_entryIterator; 0073 m_entryIterator = new QDirIterator(m_maildir.pathToCurrent(), QDir::Files); 0074 if (!m_entryIterator->hasNext()) { 0075 break; 0076 } 0077 } 0078 m_entryIterator->next(); 0079 0080 const QFileInfo entryInfo = m_entryIterator->fileInfo(); 0081 const QString fileName = entryInfo.fileName(); 0082 const qint64 currentMtime = entryInfo.lastModified().toMSecsSinceEpoch(); 0083 m_highestMtime = qMax(m_highestMtime, currentMtime); 0084 if (currentMtime <= m_previousMtime) { 0085 auto localItemIter = m_localItems.find(fileName); 0086 if (localItemIter != m_localItems.end()) { // old, we got this one already 0087 m_localItems.erase(localItemIter); 0088 continue; 0089 } 0090 } 0091 0092 Akonadi::Item item; 0093 item.setRemoteId(fileName); 0094 item.setMimeType(m_mimeType); 0095 const qint64 entrySize = entryInfo.size(); 0096 if (entrySize >= 0) { 0097 item.setSize(entrySize); 0098 } 0099 0100 auto msg = new KMime::Message; 0101 msg->setHead(KMime::CRLFtoLF(m_maildir.readEntryHeadersFromFile(m_listingPath + fileName))); 0102 msg->parse(); 0103 0104 const Akonadi::Item::Flags flags = m_maildir.readEntryFlags(fileName); 0105 for (const Akonadi::Item::Flag &flag : flags) { 0106 item.setFlag(flag); 0107 } 0108 0109 item.setPayload(KMime::Message::Ptr(msg)); 0110 Akonadi::MessageFlags::copyMessageFlags(*msg, item); 0111 auto localItemIter = m_localItems.find(fileName); 0112 Akonadi::TransactionSequence *trx = transaction(); 0113 if (localItemIter == m_localItems.end()) { // new item 0114 new Akonadi::ItemCreateJob(item, m_collection, trx); 0115 } else { // modification 0116 item.setId((*localItemIter).id()); 0117 new Akonadi::ItemModifyJob(item, trx); 0118 m_localItems.erase(localItemIter); 0119 } 0120 if (trx != lastTrx) { 0121 lastTrx = trx; 0122 QMetaObject::invokeMethod(this, &RetrieveItemsJob::processEntry, Qt::QueuedConnection); 0123 return; 0124 } 0125 } 0126 0127 entriesProcessed(); 0128 // connect(job, &Akonadi::ItemCreateJob::result, this, &RetrieveItemsJob::processEntryDone); 0129 } 0130 0131 void RetrieveItemsJob::processEntryDone(KJob *) 0132 { 0133 processEntry(); 0134 } 0135 0136 void RetrieveItemsJob::entriesProcessed() 0137 { 0138 delete m_entryIterator; 0139 m_entryIterator = nullptr; 0140 if (!m_localItems.isEmpty()) { 0141 auto job = new Akonadi::ItemDeleteJob(Akonadi::valuesToVector(m_localItems), transaction()); 0142 m_maildir.removeCachedKeys(m_localItems.keys()); 0143 // We ensure m_transaction is valid by calling transaction() above, 0144 // however calling it again here could cause it to give us another transaction 0145 // object (see transaction() comment for details) 0146 m_transaction->setIgnoreJobFailure(job); 0147 } 0148 0149 // update mtime 0150 if (m_highestMtime != m_previousMtime) { 0151 Akonadi::Collection newCol(m_collection); 0152 newCol.setRemoteRevision(QString::number(m_highestMtime)); 0153 auto job = new Akonadi::CollectionModifyJob(newCol, transaction()); 0154 m_transaction->setIgnoreJobFailure(job); 0155 } 0156 0157 if (!m_transaction) { // no jobs created here -> done 0158 emitResult(); 0159 } else { 0160 connect(m_transaction, &Akonadi::TransactionSequence::result, this, &RetrieveItemsJob::transactionDone); 0161 m_transaction->commit(); 0162 } 0163 } 0164 0165 Akonadi::TransactionSequence *RetrieveItemsJob::transaction() 0166 { 0167 // Commit transaction every 100 items, otherwise we are forcing server to 0168 // hold the database transaction opened for potentially massive amount of 0169 // operations, which slowly overloads the database journal causing simple 0170 // INSERT to take several seconds 0171 if (++m_transactionSize >= 100) { 0172 qCDebug(MAILDIRRESOURCE_LOG) << "Commit!"; 0173 m_transaction->commit(); 0174 m_transaction = nullptr; 0175 m_transactionSize = 0; 0176 } 0177 0178 if (!m_transaction) { 0179 m_transaction = new Akonadi::TransactionSequence(this); 0180 m_transaction->setAutomaticCommittingEnabled(false); 0181 } 0182 return m_transaction; 0183 } 0184 0185 void RetrieveItemsJob::transactionDone(KJob *job) 0186 { 0187 if (job->error()) { 0188 qCDebug(MAILDIRRESOURCE_LOG) << "Error during transaction " << job->errorString(); 0189 return; // handled by base class 0190 } 0191 emitResult(); 0192 } 0193 0194 #include "moc_retrieveitemsjob.cpp"