File indexing completed on 2024-11-10 04:40:53
0001 /* 0002 SPDX-FileCopyrightText: 2013, 2014 Daniel Vrátil <dvratil@redhat.com> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "searchtaskmanager.h" 0008 #include "agentsearchinstance.h" 0009 #include "akonadiserver_search_debug.h" 0010 #include "connection.h" 0011 #include "entities.h" 0012 #include "storage/selectquerybuilder.h" 0013 0014 #include "private/dbus_p.h" 0015 0016 #include <QDBusConnection> 0017 #include <QDeadlineTimer> 0018 #include <QSqlError> 0019 #include <QTime> 0020 #include <QTimer> 0021 using namespace Akonadi; 0022 using namespace Akonadi::Server; 0023 0024 SearchTaskManager::SearchTaskManager() 0025 : AkThread(QStringLiteral("SearchTaskManager")) 0026 , mShouldStop(false) 0027 { 0028 QTimer::singleShot(0, this, &SearchTaskManager::searchLoop); 0029 } 0030 0031 SearchTaskManager::~SearchTaskManager() 0032 { 0033 QMutexLocker locker(&mLock); 0034 mShouldStop = true; 0035 mWait.wakeAll(); 0036 locker.unlock(); 0037 0038 quitThread(); 0039 0040 mInstancesLock.lock(); 0041 qDeleteAll(mInstances); 0042 mInstancesLock.unlock(); 0043 } 0044 0045 void SearchTaskManager::registerInstance(const QString &id) 0046 { 0047 QMutexLocker locker(&mInstancesLock); 0048 0049 qCDebug(AKONADISERVER_SEARCH_LOG) << "SearchManager::registerInstance(" << id << ")"; 0050 0051 AgentSearchInstance *instance = mInstances.value(id); 0052 if (instance) { 0053 return; // already registered 0054 } 0055 0056 instance = new AgentSearchInstance(id, *this); 0057 if (!instance->init()) { 0058 qCDebug(AKONADISERVER_SEARCH_LOG) << "Failed to initialize Search agent"; 0059 delete instance; 0060 return; 0061 } 0062 0063 qCDebug(AKONADISERVER_SEARCH_LOG) << "Registering search instance " << id; 0064 mInstances.insert(id, instance); 0065 } 0066 0067 void SearchTaskManager::unregisterInstance(const QString &id) 0068 { 0069 QMutexLocker locker(&mInstancesLock); 0070 0071 QMap<QString, AgentSearchInstance *>::Iterator it = mInstances.find(id); 0072 if (it != mInstances.end()) { 0073 qCDebug(AKONADISERVER_SEARCH_LOG) << "Unregistering search instance" << id; 0074 it.value()->deleteLater(); 0075 mInstances.erase(it); 0076 } 0077 } 0078 0079 void SearchTaskManager::addTask(SearchTask *task) 0080 { 0081 QueryBuilder qb(Collection::tableName()); 0082 qb.addJoin(QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName()); 0083 qb.addColumn(Collection::idFullColumnName()); 0084 qb.addColumn(Resource::nameFullColumnName()); 0085 0086 Q_ASSERT(!task->collections.isEmpty()); 0087 QVariantList list; 0088 list.reserve(task->collections.size()); 0089 for (qint64 collection : std::as_const(task->collections)) { 0090 list << collection; 0091 } 0092 qb.addValueCondition(Collection::idFullColumnName(), Query::In, list); 0093 0094 if (!qb.exec()) { 0095 throw SearchException(qb.query().lastError().text()); 0096 } 0097 0098 QSqlQuery query = qb.query(); 0099 if (!query.next()) { 0100 return; 0101 } 0102 0103 mInstancesLock.lock(); 0104 0105 org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"), QDBusConnection::sessionBus()); 0106 do { 0107 const QString resourceId = query.value(1).toString(); 0108 if (!mInstances.contains(resourceId)) { 0109 qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource" << resourceId << "does not implement Search interface, skipping"; 0110 } else if (!agentManager.agentInstanceOnline(resourceId)) { 0111 qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is offline, skipping"; 0112 } else if (agentManager.agentInstanceStatus(resourceId) > 2) { // 2 == Broken, 3 == Not Configured 0113 qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is broken or not configured"; 0114 } else { 0115 const qint64 collectionId = query.value(0).toLongLong(); 0116 qCDebug(AKONADISERVER_SEARCH_LOG) << "Enqueued search query (" << resourceId << ", " << collectionId << ")"; 0117 task->queries << qMakePair(resourceId, collectionId); 0118 } 0119 } while (query.next()); 0120 mInstancesLock.unlock(); 0121 0122 QMutexLocker locker(&mLock); 0123 mTasklist.append(task); 0124 mWait.wakeAll(); 0125 } 0126 0127 void SearchTaskManager::pushResults(const QByteArray &searchId, const QSet<qint64> &ids, Connection *connection) 0128 { 0129 Q_UNUSED(searchId) 0130 0131 const auto resourceName = connection->context().resource().name(); 0132 qCDebug(AKONADISERVER_SEARCH_LOG) << ids.count() << "results for search" << searchId << "pushed from" << resourceName; 0133 0134 QMutexLocker locker(&mLock); 0135 ResourceTask *task = mRunningTasks.take(resourceName); 0136 if (!task) { 0137 qCDebug(AKONADISERVER_SEARCH_LOG) << "No running task for" << resourceName << " - maybe it has timed out?"; 0138 return; 0139 } 0140 0141 if (task->parentTask->id != searchId) { 0142 qCDebug(AKONADISERVER_SEARCH_LOG) << "Received results for different search - maybe the original task has timed out?"; 0143 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search is" << searchId << ", but task is" << task->parentTask->id; 0144 return; 0145 } 0146 0147 task->results = ids; 0148 mPendingResults.append(task); 0149 0150 mWait.wakeAll(); 0151 } 0152 0153 bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask) const 0154 { 0155 // Check for queries pending to be dispatched 0156 if (!agentSearchTask->queries.isEmpty()) { 0157 return false; 0158 } 0159 0160 // Check for running queries 0161 QMap<QString, ResourceTask *>::const_iterator it = mRunningTasks.begin(); 0162 QMap<QString, ResourceTask *>::const_iterator end = mRunningTasks.end(); 0163 for (; it != end; ++it) { 0164 if (it.value()->parentTask == agentSearchTask) { 0165 return false; 0166 } 0167 } 0168 0169 return true; 0170 } 0171 0172 SearchTaskManager::TasksMap::Iterator SearchTaskManager::cancelRunningTask(TasksMap::Iterator &iter) 0173 { 0174 ResourceTask *task = iter.value(); 0175 SearchTask *parentTask = task->parentTask; 0176 QMutexLocker locker(&parentTask->sharedLock); 0177 // erase the task before allResourceTasksCompleted 0178 SearchTaskManager::TasksMap::Iterator it = mRunningTasks.erase(iter); 0179 // We're not clearing the results since we don't want to clear successful results from other resources 0180 parentTask->complete = allResourceTasksCompleted(parentTask); 0181 parentTask->notifier.wakeAll(); 0182 delete task; 0183 0184 return it; 0185 } 0186 0187 void SearchTaskManager::searchLoop() 0188 { 0189 qint64 timeout = ULONG_MAX; 0190 0191 QMutexLocker locker(&mLock); 0192 0193 for (;;) { 0194 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search loop is waiting, will wake again in" << timeout << "ms"; 0195 mWait.wait(&mLock, QDeadlineTimer(QDeadlineTimer::Forever)); 0196 if (mShouldStop) { 0197 for (SearchTask *task : std::as_const(mTasklist)) { 0198 QMutexLocker locker(&task->sharedLock); 0199 task->queries.clear(); 0200 task->notifier.wakeAll(); 0201 } 0202 0203 QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin(); 0204 for (; it != mRunningTasks.end();) { 0205 if (mTasklist.contains(it.value()->parentTask)) { 0206 delete it.value(); 0207 it = mRunningTasks.erase(it); 0208 continue; 0209 } 0210 it = cancelRunningTask(it); 0211 } 0212 0213 break; 0214 } 0215 0216 // First notify about available results 0217 while (!mPendingResults.isEmpty()) { 0218 ResourceTask *finishedTask = mPendingResults.first(); 0219 mPendingResults.remove(0); 0220 qCDebug(AKONADISERVER_SEARCH_LOG) << "Pending results from" << finishedTask->resourceId << "for collection" << finishedTask->collectionId 0221 << "for search" << finishedTask->parentTask->id << "available!"; 0222 SearchTask *parentTask = finishedTask->parentTask; 0223 QMutexLocker locker(&parentTask->sharedLock); 0224 // We need to append, this agent search task is shared 0225 parentTask->pendingResults += finishedTask->results; 0226 parentTask->complete = allResourceTasksCompleted(parentTask); 0227 parentTask->notifier.wakeAll(); 0228 delete finishedTask; 0229 } 0230 0231 // No check whether there are any tasks running longer than 1 minute and kill them 0232 QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin(); 0233 const qint64 now = QDateTime::currentMSecsSinceEpoch(); 0234 for (; it != mRunningTasks.end();) { 0235 ResourceTask *task = it.value(); 0236 if (now - task->timestamp > 60 * 1000) { 0237 // Remove the task - and signal to parent task that it has "finished" without results 0238 qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource task" << task->resourceId << "for search" << task->parentTask->id << "timed out!"; 0239 it = cancelRunningTask(it); 0240 } else { 0241 ++it; 0242 } 0243 } 0244 0245 if (!mTasklist.isEmpty()) { 0246 SearchTask *task = mTasklist.first(); 0247 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search task" << task->id << "available!"; 0248 if (task->queries.isEmpty()) { 0249 qCDebug(AKONADISERVER_SEARCH_LOG) << "nothing to do for task"; 0250 QMutexLocker locker(&task->sharedLock); 0251 // After this the AgentSearchTask will be destroyed 0252 task->complete = true; 0253 task->notifier.wakeAll(); 0254 mTasklist.remove(0); 0255 continue; 0256 } 0257 0258 for (auto it = task->queries.begin(); it != task->queries.end();) { 0259 if (!mRunningTasks.contains(it->first)) { 0260 const auto &[resource, colId] = *it; 0261 qCDebug(AKONADISERVER_SEARCH_LOG) << "\t Sending query for collection" << colId << "to resource" << resource; 0262 auto rTask = new ResourceTask; 0263 rTask->resourceId = resource; 0264 rTask->collectionId = colId; 0265 rTask->parentTask = task; 0266 rTask->timestamp = QDateTime::currentMSecsSinceEpoch(); 0267 mRunningTasks.insert(resource, rTask); 0268 0269 mInstancesLock.lock(); 0270 AgentSearchInstance *instance = mInstances.value(resource); 0271 if (!instance) { 0272 mInstancesLock.unlock(); 0273 // Resource disappeared in the meanwhile 0274 continue; 0275 } 0276 0277 instance->search(task->id, task->query, colId); 0278 mInstancesLock.unlock(); 0279 0280 task->sharedLock.lock(); 0281 it = task->queries.erase(it); 0282 task->sharedLock.unlock(); 0283 } else { 0284 ++it; 0285 } 0286 } 0287 // Yay! We managed to dispatch all requests! 0288 if (task->queries.isEmpty()) { 0289 qCDebug(AKONADISERVER_SEARCH_LOG) << "All queries from task" << task->id << "dispatched!"; 0290 mTasklist.remove(0); 0291 } 0292 0293 timeout = 60 * 1000; // check whether all tasks have finished within a minute 0294 } else { 0295 if (mRunningTasks.isEmpty()) { 0296 timeout = ULONG_MAX; 0297 } 0298 } 0299 } 0300 } 0301 0302 #include "moc_searchtaskmanager.cpp"