File indexing completed on 2024-05-12 05:11:16
0001 /* 0002 * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL 0005 * 0006 */ 0007 0008 #include "scheduler.h" 0009 0010 #include "akonadi_indexer_agent_debug.h" 0011 #include "collectionindexingjob.h" 0012 0013 #include <Akonadi/AgentBase> 0014 #include <Akonadi/CollectionFetchJob> 0015 #include <Akonadi/CollectionFetchScope> 0016 #include <Akonadi/IndexPolicyAttribute> 0017 #include <Akonadi/ServerManager> 0018 0019 #include <KConfigGroup> 0020 #include <KLocalizedString> 0021 0022 #include <QTimer> 0023 #include <chrono> 0024 0025 using namespace std::chrono_literals; 0026 0027 JobFactory::~JobFactory() = default; 0028 0029 CollectionIndexingJob * 0030 JobFactory::createCollectionIndexingJob(Index &index, const Akonadi::Collection &col, const QList<Akonadi::Item::Id> &pending, bool fullSync, QObject *parent) 0031 { 0032 auto job = new CollectionIndexingJob(index, col, pending, parent); 0033 job->setFullSync(fullSync); 0034 return job; 0035 } 0036 0037 Scheduler::Scheduler(Index &index, const KSharedConfigPtr &config, const QSharedPointer<JobFactory> &jobFactory, QObject *parent) 0038 : QObject(parent) 0039 , m_config(config) 0040 , m_index(index) 0041 , m_jobFactory(jobFactory) 0042 , m_busyTimeout(5000) 0043 { 0044 if (!m_jobFactory) { 0045 m_jobFactory = QSharedPointer<JobFactory>(new JobFactory); 0046 } 0047 m_processTimer.setSingleShot(true); 0048 m_processTimer.setInterval(100ms); 0049 connect(&m_processTimer, &QTimer::timeout, this, &Scheduler::processNext); 0050 0051 KConfigGroup cfg = m_config->group(QStringLiteral("General")); 0052 const auto dirtyCollectionsResult2 = cfg.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>()); 0053 m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult2.begin(), dirtyCollectionsResult2.end()); 0054 0055 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Dirty collections " << m_dirtyCollections; 0056 for (Akonadi::Collection::Id col : std::as_const(m_dirtyCollections)) { 0057 scheduleCollection(Akonadi::Collection(col), true); 0058 } 0059 0060 bool initialIndexingDone = cfg.readEntry("initialIndexingDone", false); 0061 // Trigger a full sync initially 0062 if (!initialIndexingDone) { 0063 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "initial indexing"; 0064 QMetaObject::invokeMethod(this, &Scheduler::scheduleCompleteSync, Qt::QueuedConnection); 0065 } 0066 cfg.writeEntry("initialIndexingDone", true); 0067 cfg.sync(); 0068 } 0069 0070 Scheduler::~Scheduler() 0071 { 0072 collectDirtyCollections(); 0073 } 0074 0075 void Scheduler::setBusyTimeout(int timeout) 0076 { 0077 m_busyTimeout = timeout; 0078 } 0079 0080 int Scheduler::numberOfCollectionQueued() const 0081 { 0082 return m_collectionQueue.count(); 0083 } 0084 0085 void Scheduler::collectDirtyCollections() 0086 { 0087 KConfigGroup cfg = m_config->group(QStringLiteral("General")); 0088 // Store collections where we did not manage to index all, we'll need to do a full sync for them the next time 0089 QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator it = m_queues.constBegin(); 0090 QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator end = m_queues.constEnd(); 0091 for (; it != end; ++it) { 0092 if (!it.value().isEmpty()) { 0093 m_dirtyCollections.insert(it.key()); 0094 } 0095 } 0096 qCDebug(AKONADI_INDEXER_AGENT_LOG) << m_dirtyCollections; 0097 cfg.writeEntry("dirtyCollections", m_dirtyCollections.values()); 0098 cfg.sync(); 0099 } 0100 0101 void Scheduler::scheduleCollection(const Akonadi::Collection &col, bool fullSync) 0102 { 0103 if (!m_collectionQueue.contains(col.id())) { 0104 m_collectionQueue.enqueue(col.id()); 0105 } 0106 if (fullSync) { 0107 m_dirtyCollections.insert(col.id()); 0108 } 0109 processNext(); 0110 } 0111 0112 void Scheduler::addItem(const Akonadi::Item &item) 0113 { 0114 Q_ASSERT(item.parentCollection().isValid()); 0115 m_lastModifiedTimestamps.insert(item.parentCollection().id(), QDateTime::currentMSecsSinceEpoch()); 0116 m_queues[item.parentCollection().id()].append(item.id()); 0117 // Move to the back 0118 m_collectionQueue.removeOne(item.parentCollection().id()); 0119 m_collectionQueue.enqueue(item.parentCollection().id()); 0120 if (!m_processTimer.isActive()) { 0121 m_processTimer.start(); 0122 } 0123 } 0124 0125 void Scheduler::scheduleCompleteSync() 0126 { 0127 qCDebug(AKONADI_INDEXER_AGENT_LOG); 0128 { 0129 auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive); 0130 job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All); 0131 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index); 0132 job->fetchScope().fetchAttribute<Akonadi::IndexPolicyAttribute>(); 0133 connect(job, &KJob::finished, this, &Scheduler::slotRootCollectionsFetched); 0134 job->start(); 0135 } 0136 0137 // We want to index all collections, even if we don't index their content 0138 { 0139 auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive); 0140 job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All); 0141 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::NoFilter); 0142 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index); 0143 connect(job, &KJob::finished, this, &Scheduler::slotCollectionsToIndexFetched); 0144 job->start(); 0145 } 0146 } 0147 0148 void Scheduler::slotRootCollectionsFetched(KJob *kjob) 0149 { 0150 auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob); 0151 const Akonadi::Collection::List lstCols = cjob->collections(); 0152 for (const Akonadi::Collection &c : lstCols) { 0153 // For skipping search collections 0154 if (c.isVirtual()) { 0155 continue; 0156 } 0157 if (c == Akonadi::Collection::root()) { 0158 continue; 0159 } 0160 if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) { 0161 continue; 0162 } 0163 scheduleCollection(c, true); 0164 } 0165 0166 // If we did not schedule any collection 0167 if (m_collectionQueue.isEmpty()) { 0168 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "No collections scheduled"; 0169 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready")); 0170 } 0171 } 0172 0173 void Scheduler::slotCollectionsToIndexFetched(KJob *kjob) 0174 { 0175 auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob); 0176 const Akonadi::Collection::List lstCols = cjob->collections(); 0177 for (const Akonadi::Collection &c : lstCols) { 0178 // For skipping search collections 0179 if (c.isVirtual()) { 0180 continue; 0181 } 0182 if (c == Akonadi::Collection::root()) { 0183 continue; 0184 } 0185 if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) { 0186 continue; 0187 } 0188 m_index.index(c); 0189 } 0190 } 0191 0192 void Scheduler::abort() 0193 { 0194 if (m_currentJob) { 0195 m_currentJob->kill(KJob::Quietly); 0196 } 0197 m_currentJob = nullptr; 0198 collectDirtyCollections(); 0199 m_collectionQueue.clear(); 0200 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready")); 0201 } 0202 0203 void Scheduler::processNext() 0204 { 0205 m_processTimer.stop(); 0206 if (m_currentJob) { 0207 return; 0208 } 0209 if (m_collectionQueue.isEmpty()) { 0210 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing done"; 0211 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready")); 0212 return; 0213 } 0214 0215 // An item was queued within the last 5 seconds, we're probably in the middle of a sync 0216 const bool collectionIsChanging = (QDateTime::currentMSecsSinceEpoch() - m_lastModifiedTimestamps.value(m_collectionQueue.head())) < m_busyTimeout; 0217 if (collectionIsChanging) { 0218 // We're in the middle of something, wait with indexing 0219 m_processTimer.start(); 0220 return; 0221 } 0222 0223 const Akonadi::Collection col(m_collectionQueue.takeFirst()); 0224 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing collection: " << col.id(); 0225 QQueue<Akonadi::Item::Id> &itemQueue = m_queues[col.id()]; 0226 const bool fullSync = m_dirtyCollections.contains(col.id()); 0227 CollectionIndexingJob *job = m_jobFactory->createCollectionIndexingJob(m_index, col, itemQueue, fullSync, this); 0228 itemQueue.clear(); 0229 job->setProperty("collection", col.id()); 0230 connect(job, &KJob::result, this, &Scheduler::slotIndexingFinished); 0231 connect(job, &CollectionIndexingJob::status, this, &Scheduler::status); 0232 connect(job, SIGNAL(percent(int)), this, SIGNAL(percent(int))); 0233 m_currentJob = job; 0234 job->start(); 0235 } 0236 0237 void Scheduler::slotIndexingFinished(KJob *job) 0238 { 0239 if (job->error()) { 0240 qCWarning(AKONADI_INDEXER_AGENT_LOG) << "Indexing failed: " << job->errorString(); 0241 } else { 0242 const auto collectionId = job->property("collection").value<Akonadi::Collection::Id>(); 0243 m_dirtyCollections.remove(collectionId); 0244 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Collection \"%1\" indexed", collectionId)); 0245 Q_EMIT collectionIndexingFinished(collectionId); 0246 } 0247 m_currentJob = nullptr; 0248 m_processTimer.start(); 0249 } 0250 0251 #include "moc_scheduler.cpp"