File indexing completed on 2025-01-05 04:46:59
0001 /* 0002 SPDX-FileCopyrightText: 2009 Volker Krause <vkrause@kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "itemretrievalmanager.h" 0008 #include "akonadiserver_debug.h" 0009 #include "itemretrievaljob.h" 0010 0011 #include "resourceinterface.h" 0012 0013 #include "private/dbus_p.h" 0014 0015 #include <QDBusConnection> 0016 #include <QDBusConnectionInterface> 0017 #include <QScopedPointer> 0018 0019 using namespace Akonadi; 0020 using namespace Akonadi::Server; 0021 0022 Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult) 0023 0024 class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory 0025 { 0026 AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override 0027 { 0028 return new ItemRetrievalJob(std::move(request), parent); 0029 } 0030 }; 0031 0032 ItemRetrievalManager::ItemRetrievalManager(QObject *parent) 0033 : ItemRetrievalManager(std::make_unique<ItemRetrievalJobFactory>(), parent) 0034 { 0035 } 0036 0037 ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrievalJobFactory> factory, QObject *parent) 0038 : AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent) 0039 , mJobFactory(std::move(factory)) 0040 { 0041 qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult"); 0042 qDBusRegisterMetaType<QByteArrayList>(); 0043 } 0044 0045 ItemRetrievalManager::~ItemRetrievalManager() 0046 { 0047 quitThread(); 0048 } 0049 0050 void ItemRetrievalManager::init() 0051 { 0052 AkThread::init(); 0053 0054 QDBusConnection conn = QDBusConnection::sessionBus(); 0055 connect(conn.interface(), &QDBusConnectionInterface::serviceOwnerChanged, this, &ItemRetrievalManager::serviceOwnerChanged); 0056 connect(this, &ItemRetrievalManager::requestAdded, this, &ItemRetrievalManager::processRequest, Qt::QueuedConnection); 0057 } 0058 0059 // called within the retrieval thread 0060 void ItemRetrievalManager::serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner) 0061 { 0062 Q_UNUSED(newOwner) 0063 if (oldOwner.isEmpty()) { 0064 return; 0065 } 0066 const auto service = DBus::parseAgentServiceName(serviceName); 0067 if (!service.has_value() || service->agentType != DBus::Resource) { 0068 return; 0069 } 0070 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager lost connection to resource" << serviceName << ", discarding cached interface"; 0071 mResourceInterfaces.erase(service->identifier); 0072 } 0073 0074 // called within the retrieval thread 0075 org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(const QString &id) 0076 { 0077 if (id.isEmpty()) { 0078 return nullptr; 0079 } 0080 0081 auto ifaceIt = mResourceInterfaces.find(id); 0082 if (ifaceIt != mResourceInterfaces.cend() && ifaceIt->second->isValid()) { 0083 return ifaceIt->second.get(); 0084 } 0085 0086 auto iface = 0087 std::make_unique<org::freedesktop::Akonadi::Resource>(DBus::agentServiceName(id, DBus::Resource), QStringLiteral("/"), QDBusConnection::sessionBus()); 0088 if (!iface->isValid()) { 0089 qCCritical(AKONADISERVER_LOG, 0090 "Cannot connect to agent instance with identifier '%s', error message: '%s'", 0091 qUtf8Printable(id), 0092 qUtf8Printable(iface ? iface->lastError().message() : QString())); 0093 return nullptr; 0094 } 0095 // DBus calls can take some time to reply -- e.g. if a huge local mbox has to be parsed first. 0096 iface->setTimeout(5 * 60 * 1000); // 5 minutes, rather than 25 seconds 0097 std::tie(ifaceIt, std::ignore) = mResourceInterfaces.emplace(id, std::move(iface)); 0098 return ifaceIt->second.get(); 0099 } 0100 0101 // called from any thread 0102 void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest req) 0103 { 0104 QWriteLocker locker(&mLock); 0105 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids << "to" << req.resourceId << ". There are" 0106 << mPendingRequests.size() << "request queues and" << mPendingRequests[req.resourceId].size() << "items mine"; 0107 mPendingRequests[req.resourceId].emplace_back(std::move(req)); 0108 locker.unlock(); 0109 0110 Q_EMIT requestAdded(); 0111 } 0112 0113 QList<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked() 0114 { 0115 QList<AbstractItemRetrievalJob *> newJobs; 0116 for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) { 0117 if (it->second.empty()) { 0118 it = mPendingRequests.erase(it); 0119 continue; 0120 } 0121 0122 if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) { 0123 // TODO: check if there is another one for the same uid with more parts requested 0124 auto req = std::move(it->second.front()); 0125 it->second.pop_front(); 0126 Q_ASSERT(req.resourceId == it->first); 0127 auto job = mJobFactory->retrievalJob(std::move(req), this); 0128 connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished); 0129 mCurrentJobs.insert(job->request().resourceId, job); 0130 // delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases 0131 newJobs.append(job); 0132 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id; 0133 } 0134 ++it; 0135 } 0136 0137 return newJobs; 0138 } 0139 0140 // called within the retrieval thread 0141 void ItemRetrievalManager::processRequest() 0142 { 0143 QWriteLocker locker(&mLock); 0144 // look for idle resources 0145 auto newJobs = scheduleJobsForIdleResourcesLocked(); 0146 // someone asked as to process requests although everything is done already, he might still be waiting 0147 if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) { 0148 return; 0149 } 0150 locker.unlock(); 0151 0152 // Start the jobs 0153 for (auto job : newJobs) { 0154 if (auto j = qobject_cast<ItemRetrievalJob *>(job)) { 0155 j->setInterface(resourceInterface(j->request().resourceId)); 0156 } 0157 job->start(); 0158 } 0159 } 0160 0161 namespace 0162 { 0163 bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset) 0164 { 0165 // For very small lists like these, this is faster than copy, sort and std::include 0166 return std::all_of(subset.cbegin(), subset.cend(), [&superset](const auto &val) { 0167 return superset.contains(val); 0168 }); 0169 } 0170 0171 } 0172 0173 void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job) 0174 { 0175 const auto &request = job->request(); 0176 const auto &result = job->result(); 0177 0178 if (result.errorMsg.has_value()) { 0179 qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg; 0180 } else { 0181 qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished"; 0182 } 0183 0184 QWriteLocker locker(&mLock); 0185 Q_ASSERT(mCurrentJobs.contains(request.resourceId)); 0186 mCurrentJobs.remove(request.resourceId); 0187 // Check if there are any pending requests that are satisfied by this retrieval job 0188 auto &requests = mPendingRequests[request.resourceId]; 0189 for (auto it = requests.begin(); it != requests.end();) { 0190 // TODO: also complete requests that are subset of the completed one 0191 if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) { 0192 qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed."; 0193 ItemRetrievalResult otherResult{std::move(*it)}; 0194 otherResult.errorMsg = result.errorMsg; 0195 Q_EMIT requestFinished(otherResult); 0196 it = requests.erase(it); 0197 } else { 0198 ++it; 0199 } 0200 } 0201 locker.unlock(); 0202 0203 Q_EMIT requestFinished(result); 0204 Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues 0205 } 0206 0207 // Can be called from any thread 0208 void ItemRetrievalManager::triggerCollectionSync(const QString &resource, qint64 colId) 0209 { 0210 QTimer::singleShot(0, this, [this, resource, colId]() { 0211 if (auto interface = resourceInterface(resource)) { 0212 interface->synchronizeCollection(colId); 0213 } 0214 }); 0215 } 0216 0217 void ItemRetrievalManager::triggerCollectionTreeSync(const QString &resource) 0218 { 0219 QTimer::singleShot(0, this, [this, resource]() { 0220 if (auto interface = resourceInterface(resource)) { 0221 interface->synchronizeCollectionTree(); 0222 } 0223 }); 0224 } 0225 0226 #include "moc_itemretrievalmanager.cpp"