File indexing completed on 2024-05-12 05:11:16

0001 /*
0002  * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
0005  *
0006  */
0007 
0008 #include "scheduler.h"
0009 
0010 #include "akonadi_indexer_agent_debug.h"
0011 #include "collectionindexingjob.h"
0012 
0013 #include <Akonadi/AgentBase>
0014 #include <Akonadi/CollectionFetchJob>
0015 #include <Akonadi/CollectionFetchScope>
0016 #include <Akonadi/IndexPolicyAttribute>
0017 #include <Akonadi/ServerManager>
0018 
0019 #include <KConfigGroup>
0020 #include <KLocalizedString>
0021 
0022 #include <QTimer>
0023 #include <chrono>
0024 
0025 using namespace std::chrono_literals;
0026 
0027 JobFactory::~JobFactory() = default;
0028 
0029 CollectionIndexingJob *
0030 JobFactory::createCollectionIndexingJob(Index &index, const Akonadi::Collection &col, const QList<Akonadi::Item::Id> &pending, bool fullSync, QObject *parent)
0031 {
0032     auto job = new CollectionIndexingJob(index, col, pending, parent);
0033     job->setFullSync(fullSync);
0034     return job;
0035 }
0036 
0037 Scheduler::Scheduler(Index &index, const KSharedConfigPtr &config, const QSharedPointer<JobFactory> &jobFactory, QObject *parent)
0038     : QObject(parent)
0039     , m_config(config)
0040     , m_index(index)
0041     , m_jobFactory(jobFactory)
0042     , m_busyTimeout(5000)
0043 {
0044     if (!m_jobFactory) {
0045         m_jobFactory = QSharedPointer<JobFactory>(new JobFactory);
0046     }
0047     m_processTimer.setSingleShot(true);
0048     m_processTimer.setInterval(100ms);
0049     connect(&m_processTimer, &QTimer::timeout, this, &Scheduler::processNext);
0050 
0051     KConfigGroup cfg = m_config->group(QStringLiteral("General"));
0052     const auto dirtyCollectionsResult2 = cfg.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>());
0053     m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult2.begin(), dirtyCollectionsResult2.end());
0054 
0055     qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Dirty collections " << m_dirtyCollections;
0056     for (Akonadi::Collection::Id col : std::as_const(m_dirtyCollections)) {
0057         scheduleCollection(Akonadi::Collection(col), true);
0058     }
0059 
0060     bool initialIndexingDone = cfg.readEntry("initialIndexingDone", false);
0061     // Trigger a full sync initially
0062     if (!initialIndexingDone) {
0063         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "initial indexing";
0064         QMetaObject::invokeMethod(this, &Scheduler::scheduleCompleteSync, Qt::QueuedConnection);
0065     }
0066     cfg.writeEntry("initialIndexingDone", true);
0067     cfg.sync();
0068 }
0069 
0070 Scheduler::~Scheduler()
0071 {
0072     collectDirtyCollections();
0073 }
0074 
0075 void Scheduler::setBusyTimeout(int timeout)
0076 {
0077     m_busyTimeout = timeout;
0078 }
0079 
0080 int Scheduler::numberOfCollectionQueued() const
0081 {
0082     return m_collectionQueue.count();
0083 }
0084 
0085 void Scheduler::collectDirtyCollections()
0086 {
0087     KConfigGroup cfg = m_config->group(QStringLiteral("General"));
0088     // Store collections where we did not manage to index all, we'll need to do a full sync for them the next time
0089     QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator it = m_queues.constBegin();
0090     QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator end = m_queues.constEnd();
0091     for (; it != end; ++it) {
0092         if (!it.value().isEmpty()) {
0093             m_dirtyCollections.insert(it.key());
0094         }
0095     }
0096     qCDebug(AKONADI_INDEXER_AGENT_LOG) << m_dirtyCollections;
0097     cfg.writeEntry("dirtyCollections", m_dirtyCollections.values());
0098     cfg.sync();
0099 }
0100 
0101 void Scheduler::scheduleCollection(const Akonadi::Collection &col, bool fullSync)
0102 {
0103     if (!m_collectionQueue.contains(col.id())) {
0104         m_collectionQueue.enqueue(col.id());
0105     }
0106     if (fullSync) {
0107         m_dirtyCollections.insert(col.id());
0108     }
0109     processNext();
0110 }
0111 
0112 void Scheduler::addItem(const Akonadi::Item &item)
0113 {
0114     Q_ASSERT(item.parentCollection().isValid());
0115     m_lastModifiedTimestamps.insert(item.parentCollection().id(), QDateTime::currentMSecsSinceEpoch());
0116     m_queues[item.parentCollection().id()].append(item.id());
0117     // Move to the back
0118     m_collectionQueue.removeOne(item.parentCollection().id());
0119     m_collectionQueue.enqueue(item.parentCollection().id());
0120     if (!m_processTimer.isActive()) {
0121         m_processTimer.start();
0122     }
0123 }
0124 
0125 void Scheduler::scheduleCompleteSync()
0126 {
0127     qCDebug(AKONADI_INDEXER_AGENT_LOG);
0128     {
0129         auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
0130         job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
0131         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
0132         job->fetchScope().fetchAttribute<Akonadi::IndexPolicyAttribute>();
0133         connect(job, &KJob::finished, this, &Scheduler::slotRootCollectionsFetched);
0134         job->start();
0135     }
0136 
0137     // We want to index all collections, even if we don't index their content
0138     {
0139         auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
0140         job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
0141         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::NoFilter);
0142         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
0143         connect(job, &KJob::finished, this, &Scheduler::slotCollectionsToIndexFetched);
0144         job->start();
0145     }
0146 }
0147 
0148 void Scheduler::slotRootCollectionsFetched(KJob *kjob)
0149 {
0150     auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
0151     const Akonadi::Collection::List lstCols = cjob->collections();
0152     for (const Akonadi::Collection &c : lstCols) {
0153         // For skipping search collections
0154         if (c.isVirtual()) {
0155             continue;
0156         }
0157         if (c == Akonadi::Collection::root()) {
0158             continue;
0159         }
0160         if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
0161             continue;
0162         }
0163         scheduleCollection(c, true);
0164     }
0165 
0166     // If we did not schedule any collection
0167     if (m_collectionQueue.isEmpty()) {
0168         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "No collections scheduled";
0169         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
0170     }
0171 }
0172 
0173 void Scheduler::slotCollectionsToIndexFetched(KJob *kjob)
0174 {
0175     auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
0176     const Akonadi::Collection::List lstCols = cjob->collections();
0177     for (const Akonadi::Collection &c : lstCols) {
0178         // For skipping search collections
0179         if (c.isVirtual()) {
0180             continue;
0181         }
0182         if (c == Akonadi::Collection::root()) {
0183             continue;
0184         }
0185         if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
0186             continue;
0187         }
0188         m_index.index(c);
0189     }
0190 }
0191 
0192 void Scheduler::abort()
0193 {
0194     if (m_currentJob) {
0195         m_currentJob->kill(KJob::Quietly);
0196     }
0197     m_currentJob = nullptr;
0198     collectDirtyCollections();
0199     m_collectionQueue.clear();
0200     Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
0201 }
0202 
0203 void Scheduler::processNext()
0204 {
0205     m_processTimer.stop();
0206     if (m_currentJob) {
0207         return;
0208     }
0209     if (m_collectionQueue.isEmpty()) {
0210         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing done";
0211         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
0212         return;
0213     }
0214 
0215     // An item was queued within the last 5 seconds, we're probably in the middle of a sync
0216     const bool collectionIsChanging = (QDateTime::currentMSecsSinceEpoch() - m_lastModifiedTimestamps.value(m_collectionQueue.head())) < m_busyTimeout;
0217     if (collectionIsChanging) {
0218         // We're in the middle of something, wait with indexing
0219         m_processTimer.start();
0220         return;
0221     }
0222 
0223     const Akonadi::Collection col(m_collectionQueue.takeFirst());
0224     qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing collection: " << col.id();
0225     QQueue<Akonadi::Item::Id> &itemQueue = m_queues[col.id()];
0226     const bool fullSync = m_dirtyCollections.contains(col.id());
0227     CollectionIndexingJob *job = m_jobFactory->createCollectionIndexingJob(m_index, col, itemQueue, fullSync, this);
0228     itemQueue.clear();
0229     job->setProperty("collection", col.id());
0230     connect(job, &KJob::result, this, &Scheduler::slotIndexingFinished);
0231     connect(job, &CollectionIndexingJob::status, this, &Scheduler::status);
0232     connect(job, SIGNAL(percent(int)), this, SIGNAL(percent(int)));
0233     m_currentJob = job;
0234     job->start();
0235 }
0236 
0237 void Scheduler::slotIndexingFinished(KJob *job)
0238 {
0239     if (job->error()) {
0240         qCWarning(AKONADI_INDEXER_AGENT_LOG) << "Indexing failed: " << job->errorString();
0241     } else {
0242         const auto collectionId = job->property("collection").value<Akonadi::Collection::Id>();
0243         m_dirtyCollections.remove(collectionId);
0244         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Collection \"%1\" indexed", collectionId));
0245         Q_EMIT collectionIndexingFinished(collectionId);
0246     }
0247     m_currentJob = nullptr;
0248     m_processTimer.start();
0249 }
0250 
0251 #include "moc_scheduler.cpp"