File indexing completed on 2024-11-10 04:40:53

0001 /*
0002     SPDX-FileCopyrightText: 2013, 2014 Daniel Vrátil <dvratil@redhat.com>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "searchtaskmanager.h"
0008 #include "agentsearchinstance.h"
0009 #include "akonadiserver_search_debug.h"
0010 #include "connection.h"
0011 #include "entities.h"
0012 #include "storage/selectquerybuilder.h"
0013 
0014 #include "private/dbus_p.h"
0015 
0016 #include <QDBusConnection>
0017 #include <QDeadlineTimer>
0018 #include <QSqlError>
0019 #include <QTime>
0020 #include <QTimer>
0021 using namespace Akonadi;
0022 using namespace Akonadi::Server;
0023 
0024 SearchTaskManager::SearchTaskManager()
0025     : AkThread(QStringLiteral("SearchTaskManager"))
0026     , mShouldStop(false)
0027 {
0028     QTimer::singleShot(0, this, &SearchTaskManager::searchLoop);
0029 }
0030 
0031 SearchTaskManager::~SearchTaskManager()
0032 {
0033     QMutexLocker locker(&mLock);
0034     mShouldStop = true;
0035     mWait.wakeAll();
0036     locker.unlock();
0037 
0038     quitThread();
0039 
0040     mInstancesLock.lock();
0041     qDeleteAll(mInstances);
0042     mInstancesLock.unlock();
0043 }
0044 
0045 void SearchTaskManager::registerInstance(const QString &id)
0046 {
0047     QMutexLocker locker(&mInstancesLock);
0048 
0049     qCDebug(AKONADISERVER_SEARCH_LOG) << "SearchManager::registerInstance(" << id << ")";
0050 
0051     AgentSearchInstance *instance = mInstances.value(id);
0052     if (instance) {
0053         return; // already registered
0054     }
0055 
0056     instance = new AgentSearchInstance(id, *this);
0057     if (!instance->init()) {
0058         qCDebug(AKONADISERVER_SEARCH_LOG) << "Failed to initialize Search agent";
0059         delete instance;
0060         return;
0061     }
0062 
0063     qCDebug(AKONADISERVER_SEARCH_LOG) << "Registering search instance " << id;
0064     mInstances.insert(id, instance);
0065 }
0066 
0067 void SearchTaskManager::unregisterInstance(const QString &id)
0068 {
0069     QMutexLocker locker(&mInstancesLock);
0070 
0071     QMap<QString, AgentSearchInstance *>::Iterator it = mInstances.find(id);
0072     if (it != mInstances.end()) {
0073         qCDebug(AKONADISERVER_SEARCH_LOG) << "Unregistering search instance" << id;
0074         it.value()->deleteLater();
0075         mInstances.erase(it);
0076     }
0077 }
0078 
0079 void SearchTaskManager::addTask(SearchTask *task)
0080 {
0081     QueryBuilder qb(Collection::tableName());
0082     qb.addJoin(QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
0083     qb.addColumn(Collection::idFullColumnName());
0084     qb.addColumn(Resource::nameFullColumnName());
0085 
0086     Q_ASSERT(!task->collections.isEmpty());
0087     QVariantList list;
0088     list.reserve(task->collections.size());
0089     for (qint64 collection : std::as_const(task->collections)) {
0090         list << collection;
0091     }
0092     qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
0093 
0094     if (!qb.exec()) {
0095         throw SearchException(qb.query().lastError().text());
0096     }
0097 
0098     QSqlQuery query = qb.query();
0099     if (!query.next()) {
0100         return;
0101     }
0102 
0103     mInstancesLock.lock();
0104 
0105     org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"), QDBusConnection::sessionBus());
0106     do {
0107         const QString resourceId = query.value(1).toString();
0108         if (!mInstances.contains(resourceId)) {
0109             qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource" << resourceId << "does not implement Search interface, skipping";
0110         } else if (!agentManager.agentInstanceOnline(resourceId)) {
0111             qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is offline, skipping";
0112         } else if (agentManager.agentInstanceStatus(resourceId) > 2) { // 2 == Broken, 3 == Not Configured
0113             qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is broken or not configured";
0114         } else {
0115             const qint64 collectionId = query.value(0).toLongLong();
0116             qCDebug(AKONADISERVER_SEARCH_LOG) << "Enqueued search query (" << resourceId << ", " << collectionId << ")";
0117             task->queries << qMakePair(resourceId, collectionId);
0118         }
0119     } while (query.next());
0120     mInstancesLock.unlock();
0121 
0122     QMutexLocker locker(&mLock);
0123     mTasklist.append(task);
0124     mWait.wakeAll();
0125 }
0126 
0127 void SearchTaskManager::pushResults(const QByteArray &searchId, const QSet<qint64> &ids, Connection *connection)
0128 {
0129     Q_UNUSED(searchId)
0130 
0131     const auto resourceName = connection->context().resource().name();
0132     qCDebug(AKONADISERVER_SEARCH_LOG) << ids.count() << "results for search" << searchId << "pushed from" << resourceName;
0133 
0134     QMutexLocker locker(&mLock);
0135     ResourceTask *task = mRunningTasks.take(resourceName);
0136     if (!task) {
0137         qCDebug(AKONADISERVER_SEARCH_LOG) << "No running task for" << resourceName << " - maybe it has timed out?";
0138         return;
0139     }
0140 
0141     if (task->parentTask->id != searchId) {
0142         qCDebug(AKONADISERVER_SEARCH_LOG) << "Received results for different search - maybe the original task has timed out?";
0143         qCDebug(AKONADISERVER_SEARCH_LOG) << "Search is" << searchId << ", but task is" << task->parentTask->id;
0144         return;
0145     }
0146 
0147     task->results = ids;
0148     mPendingResults.append(task);
0149 
0150     mWait.wakeAll();
0151 }
0152 
0153 bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask) const
0154 {
0155     // Check for queries pending to be dispatched
0156     if (!agentSearchTask->queries.isEmpty()) {
0157         return false;
0158     }
0159 
0160     // Check for running queries
0161     QMap<QString, ResourceTask *>::const_iterator it = mRunningTasks.begin();
0162     QMap<QString, ResourceTask *>::const_iterator end = mRunningTasks.end();
0163     for (; it != end; ++it) {
0164         if (it.value()->parentTask == agentSearchTask) {
0165             return false;
0166         }
0167     }
0168 
0169     return true;
0170 }
0171 
0172 SearchTaskManager::TasksMap::Iterator SearchTaskManager::cancelRunningTask(TasksMap::Iterator &iter)
0173 {
0174     ResourceTask *task = iter.value();
0175     SearchTask *parentTask = task->parentTask;
0176     QMutexLocker locker(&parentTask->sharedLock);
0177     // erase the task before allResourceTasksCompleted
0178     SearchTaskManager::TasksMap::Iterator it = mRunningTasks.erase(iter);
0179     // We're not clearing the results since we don't want to clear successful results from other resources
0180     parentTask->complete = allResourceTasksCompleted(parentTask);
0181     parentTask->notifier.wakeAll();
0182     delete task;
0183 
0184     return it;
0185 }
0186 
0187 void SearchTaskManager::searchLoop()
0188 {
0189     qint64 timeout = ULONG_MAX;
0190 
0191     QMutexLocker locker(&mLock);
0192 
0193     for (;;) {
0194         qCDebug(AKONADISERVER_SEARCH_LOG) << "Search loop is waiting, will wake again in" << timeout << "ms";
0195         mWait.wait(&mLock, QDeadlineTimer(QDeadlineTimer::Forever));
0196         if (mShouldStop) {
0197             for (SearchTask *task : std::as_const(mTasklist)) {
0198                 QMutexLocker locker(&task->sharedLock);
0199                 task->queries.clear();
0200                 task->notifier.wakeAll();
0201             }
0202 
0203             QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
0204             for (; it != mRunningTasks.end();) {
0205                 if (mTasklist.contains(it.value()->parentTask)) {
0206                     delete it.value();
0207                     it = mRunningTasks.erase(it);
0208                     continue;
0209                 }
0210                 it = cancelRunningTask(it);
0211             }
0212 
0213             break;
0214         }
0215 
0216         // First notify about available results
0217         while (!mPendingResults.isEmpty()) {
0218             ResourceTask *finishedTask = mPendingResults.first();
0219             mPendingResults.remove(0);
0220             qCDebug(AKONADISERVER_SEARCH_LOG) << "Pending results from" << finishedTask->resourceId << "for collection" << finishedTask->collectionId
0221                                               << "for search" << finishedTask->parentTask->id << "available!";
0222             SearchTask *parentTask = finishedTask->parentTask;
0223             QMutexLocker locker(&parentTask->sharedLock);
0224             // We need to append, this agent search task is shared
0225             parentTask->pendingResults += finishedTask->results;
0226             parentTask->complete = allResourceTasksCompleted(parentTask);
0227             parentTask->notifier.wakeAll();
0228             delete finishedTask;
0229         }
0230 
0231         // No check whether there are any tasks running longer than 1 minute and kill them
0232         QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
0233         const qint64 now = QDateTime::currentMSecsSinceEpoch();
0234         for (; it != mRunningTasks.end();) {
0235             ResourceTask *task = it.value();
0236             if (now - task->timestamp > 60 * 1000) {
0237                 // Remove the task - and signal to parent task that it has "finished" without results
0238                 qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource task" << task->resourceId << "for search" << task->parentTask->id << "timed out!";
0239                 it = cancelRunningTask(it);
0240             } else {
0241                 ++it;
0242             }
0243         }
0244 
0245         if (!mTasklist.isEmpty()) {
0246             SearchTask *task = mTasklist.first();
0247             qCDebug(AKONADISERVER_SEARCH_LOG) << "Search task" << task->id << "available!";
0248             if (task->queries.isEmpty()) {
0249                 qCDebug(AKONADISERVER_SEARCH_LOG) << "nothing to do for task";
0250                 QMutexLocker locker(&task->sharedLock);
0251                 // After this the AgentSearchTask will be destroyed
0252                 task->complete = true;
0253                 task->notifier.wakeAll();
0254                 mTasklist.remove(0);
0255                 continue;
0256             }
0257 
0258             for (auto it = task->queries.begin(); it != task->queries.end();) {
0259                 if (!mRunningTasks.contains(it->first)) {
0260                     const auto &[resource, colId] = *it;
0261                     qCDebug(AKONADISERVER_SEARCH_LOG) << "\t Sending query for collection" << colId << "to resource" << resource;
0262                     auto rTask = new ResourceTask;
0263                     rTask->resourceId = resource;
0264                     rTask->collectionId = colId;
0265                     rTask->parentTask = task;
0266                     rTask->timestamp = QDateTime::currentMSecsSinceEpoch();
0267                     mRunningTasks.insert(resource, rTask);
0268 
0269                     mInstancesLock.lock();
0270                     AgentSearchInstance *instance = mInstances.value(resource);
0271                     if (!instance) {
0272                         mInstancesLock.unlock();
0273                         // Resource disappeared in the meanwhile
0274                         continue;
0275                     }
0276 
0277                     instance->search(task->id, task->query, colId);
0278                     mInstancesLock.unlock();
0279 
0280                     task->sharedLock.lock();
0281                     it = task->queries.erase(it);
0282                     task->sharedLock.unlock();
0283                 } else {
0284                     ++it;
0285                 }
0286             }
0287             // Yay! We managed to dispatch all requests!
0288             if (task->queries.isEmpty()) {
0289                 qCDebug(AKONADISERVER_SEARCH_LOG) << "All queries from task" << task->id << "dispatched!";
0290                 mTasklist.remove(0);
0291             }
0292 
0293             timeout = 60 * 1000; // check whether all tasks have finished within a minute
0294         } else {
0295             if (mRunningTasks.isEmpty()) {
0296                 timeout = ULONG_MAX;
0297             }
0298         }
0299     }
0300 }
0301 
0302 #include "moc_searchtaskmanager.cpp"