File indexing completed on 2025-01-05 04:46:59

0001 /*
0002     SPDX-FileCopyrightText: 2009 Volker Krause <vkrause@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "itemretrievalmanager.h"
0008 #include "akonadiserver_debug.h"
0009 #include "itemretrievaljob.h"
0010 
0011 #include "resourceinterface.h"
0012 
0013 #include "private/dbus_p.h"
0014 
0015 #include <QDBusConnection>
0016 #include <QDBusConnectionInterface>
0017 #include <QScopedPointer>
0018 
0019 using namespace Akonadi;
0020 using namespace Akonadi::Server;
0021 
0022 Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult)
0023 
0024 class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
0025 {
0026     AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override
0027     {
0028         return new ItemRetrievalJob(std::move(request), parent);
0029     }
0030 };
0031 
0032 ItemRetrievalManager::ItemRetrievalManager(QObject *parent)
0033     : ItemRetrievalManager(std::make_unique<ItemRetrievalJobFactory>(), parent)
0034 {
0035 }
0036 
0037 ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrievalJobFactory> factory, QObject *parent)
0038     : AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent)
0039     , mJobFactory(std::move(factory))
0040 {
0041     qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult");
0042     qDBusRegisterMetaType<QByteArrayList>();
0043 }
0044 
0045 ItemRetrievalManager::~ItemRetrievalManager()
0046 {
0047     quitThread();
0048 }
0049 
0050 void ItemRetrievalManager::init()
0051 {
0052     AkThread::init();
0053 
0054     QDBusConnection conn = QDBusConnection::sessionBus();
0055     connect(conn.interface(), &QDBusConnectionInterface::serviceOwnerChanged, this, &ItemRetrievalManager::serviceOwnerChanged);
0056     connect(this, &ItemRetrievalManager::requestAdded, this, &ItemRetrievalManager::processRequest, Qt::QueuedConnection);
0057 }
0058 
0059 // called within the retrieval thread
0060 void ItemRetrievalManager::serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner)
0061 {
0062     Q_UNUSED(newOwner)
0063     if (oldOwner.isEmpty()) {
0064         return;
0065     }
0066     const auto service = DBus::parseAgentServiceName(serviceName);
0067     if (!service.has_value() || service->agentType != DBus::Resource) {
0068         return;
0069     }
0070     qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager lost connection to resource" << serviceName << ", discarding cached interface";
0071     mResourceInterfaces.erase(service->identifier);
0072 }
0073 
0074 // called within the retrieval thread
0075 org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(const QString &id)
0076 {
0077     if (id.isEmpty()) {
0078         return nullptr;
0079     }
0080 
0081     auto ifaceIt = mResourceInterfaces.find(id);
0082     if (ifaceIt != mResourceInterfaces.cend() && ifaceIt->second->isValid()) {
0083         return ifaceIt->second.get();
0084     }
0085 
0086     auto iface =
0087         std::make_unique<org::freedesktop::Akonadi::Resource>(DBus::agentServiceName(id, DBus::Resource), QStringLiteral("/"), QDBusConnection::sessionBus());
0088     if (!iface->isValid()) {
0089         qCCritical(AKONADISERVER_LOG,
0090                    "Cannot connect to agent instance with identifier '%s', error message: '%s'",
0091                    qUtf8Printable(id),
0092                    qUtf8Printable(iface ? iface->lastError().message() : QString()));
0093         return nullptr;
0094     }
0095     // DBus calls can take some time to reply -- e.g. if a huge local mbox has to be parsed first.
0096     iface->setTimeout(5 * 60 * 1000); // 5 minutes, rather than 25 seconds
0097     std::tie(ifaceIt, std::ignore) = mResourceInterfaces.emplace(id, std::move(iface));
0098     return ifaceIt->second.get();
0099 }
0100 
0101 // called from any thread
0102 void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest req)
0103 {
0104     QWriteLocker locker(&mLock);
0105     qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids << "to" << req.resourceId << ". There are"
0106                                << mPendingRequests.size() << "request queues and" << mPendingRequests[req.resourceId].size() << "items mine";
0107     mPendingRequests[req.resourceId].emplace_back(std::move(req));
0108     locker.unlock();
0109 
0110     Q_EMIT requestAdded();
0111 }
0112 
0113 QList<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked()
0114 {
0115     QList<AbstractItemRetrievalJob *> newJobs;
0116     for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
0117         if (it->second.empty()) {
0118             it = mPendingRequests.erase(it);
0119             continue;
0120         }
0121 
0122         if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) {
0123             // TODO: check if there is another one for the same uid with more parts requested
0124             auto req = std::move(it->second.front());
0125             it->second.pop_front();
0126             Q_ASSERT(req.resourceId == it->first);
0127             auto job = mJobFactory->retrievalJob(std::move(req), this);
0128             connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
0129             mCurrentJobs.insert(job->request().resourceId, job);
0130             // delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
0131             newJobs.append(job);
0132             qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id;
0133         }
0134         ++it;
0135     }
0136 
0137     return newJobs;
0138 }
0139 
0140 // called within the retrieval thread
0141 void ItemRetrievalManager::processRequest()
0142 {
0143     QWriteLocker locker(&mLock);
0144     // look for idle resources
0145     auto newJobs = scheduleJobsForIdleResourcesLocked();
0146     // someone asked as to process requests although everything is done already, he might still be waiting
0147     if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
0148         return;
0149     }
0150     locker.unlock();
0151 
0152     // Start the jobs
0153     for (auto job : newJobs) {
0154         if (auto j = qobject_cast<ItemRetrievalJob *>(job)) {
0155             j->setInterface(resourceInterface(j->request().resourceId));
0156         }
0157         job->start();
0158     }
0159 }
0160 
0161 namespace
0162 {
0163 bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset)
0164 {
0165     // For very small lists like these, this is faster than copy, sort and std::include
0166     return std::all_of(subset.cbegin(), subset.cend(), [&superset](const auto &val) {
0167         return superset.contains(val);
0168     });
0169 }
0170 
0171 }
0172 
0173 void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
0174 {
0175     const auto &request = job->request();
0176     const auto &result = job->result();
0177 
0178     if (result.errorMsg.has_value()) {
0179         qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg;
0180     } else {
0181         qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished";
0182     }
0183 
0184     QWriteLocker locker(&mLock);
0185     Q_ASSERT(mCurrentJobs.contains(request.resourceId));
0186     mCurrentJobs.remove(request.resourceId);
0187     // Check if there are any pending requests that are satisfied by this retrieval job
0188     auto &requests = mPendingRequests[request.resourceId];
0189     for (auto it = requests.begin(); it != requests.end();) {
0190         // TODO: also complete requests that are subset of the completed one
0191         if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) {
0192             qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
0193             ItemRetrievalResult otherResult{std::move(*it)};
0194             otherResult.errorMsg = result.errorMsg;
0195             Q_EMIT requestFinished(otherResult);
0196             it = requests.erase(it);
0197         } else {
0198             ++it;
0199         }
0200     }
0201     locker.unlock();
0202 
0203     Q_EMIT requestFinished(result);
0204     Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
0205 }
0206 
0207 // Can be called from any thread
0208 void ItemRetrievalManager::triggerCollectionSync(const QString &resource, qint64 colId)
0209 {
0210     QTimer::singleShot(0, this, [this, resource, colId]() {
0211         if (auto interface = resourceInterface(resource)) {
0212             interface->synchronizeCollection(colId);
0213         }
0214     });
0215 }
0216 
0217 void ItemRetrievalManager::triggerCollectionTreeSync(const QString &resource)
0218 {
0219     QTimer::singleShot(0, this, [this, resource]() {
0220         if (auto interface = resourceInterface(resource)) {
0221             interface->synchronizeCollectionTree();
0222         }
0223     });
0224 }
0225 
0226 #include "moc_itemretrievalmanager.cpp"