File indexing completed on 2024-11-10 04:40:30

0001 /*
0002     SPDX-FileCopyrightText: 2006 Tobias Koenig <tokoe@kde.org>
0003                   2006 Marc Mutz <mutz@kde.org>
0004                   2006 - 2007 Volker Krause <vkrause@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #include "job.h"
0010 #include "akonadicore_debug.h"
0011 #include "job_p.h"
0012 #include "private/instance_p.h"
0013 #include "private/protocol_p.h"
0014 #include "session.h"
0015 #include "session_p.h"
0016 #include <QDBusConnection>
0017 #include <QTime>
0018 
0019 #include <KLocalizedString>
0020 
0021 #include <QDBusConnectionInterface>
0022 #include <QDBusInterface>
0023 #include <QElapsedTimer>
0024 #include <QTimer>
0025 
0026 using namespace Akonadi;
0027 
0028 static QDBusAbstractInterface *s_jobtracker = nullptr;
0029 
0030 /// @cond PRIVATE
0031 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
0032 {
0033     Q_Q(Job);
0034 
0035     if (mCurrentSubJob) {
0036         mCurrentSubJob->d_ptr->handleResponse(tag, response);
0037         return;
0038     }
0039 
0040     if (tag == mTag) {
0041         if (response->isResponse()) {
0042             const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
0043             if (resp.isError()) {
0044                 q->setError(Job::Unknown);
0045                 q->setErrorText(resp.errorMessage());
0046                 q->emitResult();
0047                 return;
0048             }
0049         }
0050     }
0051 
0052     if (mTag != tag) {
0053         qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
0054         qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
0055         qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
0056         return;
0057     }
0058 
0059     if (mStarted) {
0060         if (mReadingFinished) {
0061             qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
0062             qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
0063             qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
0064             Q_ASSERT(!mReadingFinished);
0065             return;
0066         }
0067 
0068         if (q->doHandleResponse(tag, response)) {
0069             mReadingFinished = true;
0070             QTimer::singleShot(0, q, [this]() {
0071                 delayedEmitResult();
0072             });
0073         }
0074     }
0075 }
0076 
0077 void JobPrivate::init(QObject *parent)
0078 {
0079     Q_Q(Job);
0080 
0081     mParentJob = qobject_cast<Job *>(parent);
0082     mSession = qobject_cast<Session *>(parent);
0083 
0084     if (!mSession) {
0085         if (!mParentJob) {
0086             mSession = Session::defaultSession();
0087         } else {
0088             mSession = mParentJob->d_ptr->mSession;
0089         }
0090     }
0091 
0092     if (!mParentJob) {
0093         mSession->d->addJob(q);
0094     } else {
0095         mParentJob->addSubjob(q);
0096     }
0097     publishJob();
0098 }
0099 
0100 void JobPrivate::publishJob()
0101 {
0102     Q_Q(Job);
0103     // if there's a job tracker running, tell it about the new job
0104     if (!s_jobtracker) {
0105         // Let's only check for the debugging console every 3 seconds, otherwise every single job
0106         // makes a dbus call to the dbus daemon, doesn't help performance.
0107         static QElapsedTimer s_lastTime;
0108         if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
0109             if (!s_lastTime.isValid()) {
0110                 s_lastTime.start();
0111             }
0112             const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
0113             if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
0114                 s_jobtracker = new QDBusInterface(QLatin1StringView("org.kde.akonadiconsole") + suffix,
0115                                                   QStringLiteral("/jobtracker"),
0116                                                   QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
0117                                                   QDBusConnection::sessionBus(),
0118                                                   nullptr);
0119                 mSession->d->publishOtherJobs(q);
0120             } else {
0121                 s_lastTime.restart();
0122             }
0123         }
0124         // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
0125         // then we should restart s_lastTime.
0126     }
0127     QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
0128 }
0129 
0130 void JobPrivate::signalCreationToJobTracker()
0131 {
0132     Q_Q(Job);
0133     if (s_jobtracker) {
0134         // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
0135         // xml interface document. Since this is purely a debugging aid, that seems preferable to
0136         // publishing something not intended for public consumption.
0137         // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
0138         const QList<QVariant> argumentList = QList<QVariant>() << QLatin1StringView(mSession->sessionId()) << QString::number(reinterpret_cast<quintptr>(q), 16)
0139                                                                << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
0140                                                                << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString();
0141         QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
0142 
0143         auto watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
0144         QObject::connect(watcher, &QDBusPendingCallWatcher::finished, s_jobtracker, [](QDBusPendingCallWatcher *w) {
0145             QDBusPendingReply<void> reply = *w;
0146             if (reply.isError() && s_jobtracker) {
0147                 qDebug() << reply.error().name() << reply.error().message();
0148                 s_jobtracker->deleteLater();
0149                 s_jobtracker = nullptr;
0150             }
0151             w->deleteLater();
0152         });
0153     }
0154 }
0155 
0156 void JobPrivate::signalStartedToJobTracker()
0157 {
0158     Q_Q(Job);
0159     if (s_jobtracker) {
0160         // if there's a job tracker running, tell it a job started
0161         const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(q), 16)};
0162         s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
0163     }
0164 }
0165 
0166 void JobPrivate::aboutToFinish()
0167 {
0168     // Dummy
0169 }
0170 
0171 void JobPrivate::delayedEmitResult()
0172 {
0173     Q_Q(Job);
0174     if (q->hasSubjobs()) {
0175         // We still have subjobs, wait for them to finish
0176         mFinishPending = true;
0177     } else {
0178         aboutToFinish();
0179         q->emitResult();
0180     }
0181 }
0182 
0183 void JobPrivate::startQueued()
0184 {
0185     Q_Q(Job);
0186     mStarted = true;
0187 
0188     Q_EMIT q->aboutToStart(q);
0189     q->doStart();
0190     QTimer::singleShot(0, q, [this]() {
0191         startNext();
0192     });
0193     QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
0194 }
0195 
0196 void JobPrivate::lostConnection()
0197 {
0198     Q_Q(Job);
0199 
0200     if (mCurrentSubJob) {
0201         mCurrentSubJob->d_ptr->lostConnection();
0202     } else {
0203         q->setError(Job::ConnectionFailed);
0204         q->emitResult();
0205     }
0206 }
0207 
0208 void JobPrivate::slotSubJobAboutToStart(Job *job)
0209 {
0210     Q_ASSERT(mCurrentSubJob == nullptr);
0211     mCurrentSubJob = job;
0212 }
0213 
0214 void JobPrivate::startNext()
0215 {
0216     Q_Q(Job);
0217 
0218     if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
0219         Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
0220         Q_ASSERT(job);
0221         job->d_ptr->startQueued();
0222     } else if (mFinishPending && !q->hasSubjobs()) {
0223         // The last subjob we've been waiting for has finished, emitResult() finally
0224         QTimer::singleShot(0, q, [this]() {
0225             delayedEmitResult();
0226         });
0227     }
0228 }
0229 
0230 qint64 JobPrivate::newTag()
0231 {
0232     if (mParentJob) {
0233         mTag = mParentJob->d_ptr->newTag();
0234     } else {
0235         mTag = mSession->d->nextTag();
0236     }
0237     return mTag;
0238 }
0239 
0240 qint64 JobPrivate::tag() const
0241 {
0242     return mTag;
0243 }
0244 
0245 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
0246 {
0247     if (mParentJob) {
0248         mParentJob->d_ptr->sendCommand(tag, cmd);
0249     } else {
0250         mSession->d->sendCommand(tag, cmd);
0251     }
0252 }
0253 
0254 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
0255 {
0256     sendCommand(newTag(), cmd);
0257 }
0258 
0259 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
0260 {
0261     mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
0262 }
0263 
0264 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
0265 {
0266     Q_Q(Job);
0267     const auto &subjobs = q->subjobs();
0268     for (KJob *j : subjobs) {
0269         auto job = qobject_cast<Akonadi::Job *>(j);
0270         if (job) {
0271             job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
0272         }
0273     }
0274     doUpdateItemRevision(itemId, oldRevision, newRevision);
0275 }
0276 
0277 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
0278 {
0279     Q_UNUSED(itemId)
0280     Q_UNUSED(oldRevision)
0281     Q_UNUSED(newRevision)
0282 }
0283 
0284 int JobPrivate::protocolVersion() const
0285 {
0286     return mSession->d->protocolVersion;
0287 }
0288 /// @endcond
0289 
0290 Job::Job(QObject *parent)
0291     : KCompositeJob(parent)
0292     , d_ptr(new JobPrivate(this))
0293 {
0294     d_ptr->init(parent);
0295 }
0296 
0297 Job::Job(JobPrivate *dd, QObject *parent)
0298     : KCompositeJob(parent)
0299     , d_ptr(dd)
0300 {
0301     d_ptr->init(parent);
0302 }
0303 
0304 Job::~Job()
0305 {
0306     // if there is a job tracer listening, tell it the job is done now
0307     if (s_jobtracker) {
0308         const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
0309         s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
0310     }
0311 }
0312 
0313 void Job::start()
0314 {
0315 }
0316 
0317 bool Job::doKill()
0318 {
0319     Q_D(Job);
0320     if (d->mStarted) {
0321         // the only way to cancel an already started job is reconnecting to the server
0322         d->mSession->d->forceReconnect();
0323     }
0324     d->mStarted = false;
0325     return true;
0326 }
0327 
0328 QString Job::errorString() const
0329 {
0330     QString str;
0331     switch (error()) {
0332     case NoError:
0333         break;
0334     case ConnectionFailed:
0335         str = i18n("Cannot connect to the Akonadi service.");
0336         break;
0337     case ProtocolVersionMismatch:
0338         str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
0339         break;
0340     case UserCanceled:
0341         str = i18n("User canceled operation.");
0342         break;
0343     case Unknown:
0344         return errorText();
0345     case UserError:
0346         str = i18n("Unknown error.");
0347         break;
0348     }
0349     if (!errorText().isEmpty()) {
0350         str += QStringLiteral(" (%1)").arg(errorText());
0351     }
0352     return str;
0353 }
0354 
0355 bool Job::addSubjob(KJob *job)
0356 {
0357     bool rv = KCompositeJob::addSubjob(job);
0358     if (rv) {
0359         connect(qobject_cast<Job *>(job), &Job::aboutToStart, this, [this](Job *job) {
0360             d_ptr->slotSubJobAboutToStart(job);
0361         });
0362         QTimer::singleShot(0, this, [this]() {
0363             d_ptr->startNext();
0364         });
0365     }
0366     return rv;
0367 }
0368 
0369 bool Job::removeSubjob(KJob *job)
0370 {
0371     bool rv = KCompositeJob::removeSubjob(job);
0372     if (job == d_ptr->mCurrentSubJob) {
0373         d_ptr->mCurrentSubJob = nullptr;
0374         QTimer::singleShot(0, this, [this]() {
0375             d_ptr->startNext();
0376         });
0377     }
0378     return rv;
0379 }
0380 
0381 bool Akonadi::Job::doHandleResponse(qint64 tag, const Akonadi::Protocol::CommandPtr &response)
0382 {
0383     qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
0384     setError(Unknown);
0385     setErrorText(i18n("Unexpected response"));
0386     emitResult();
0387     return true;
0388 }
0389 
0390 void Job::slotResult(KJob *job)
0391 {
0392     if (d_ptr->mCurrentSubJob == job) {
0393         // current job finished, start the next one
0394         d_ptr->mCurrentSubJob = nullptr;
0395         KCompositeJob::slotResult(job);
0396         if (!job->error()) {
0397             QTimer::singleShot(0, this, [this]() {
0398                 d_ptr->startNext();
0399             });
0400         }
0401     } else {
0402         // job that was still waiting for execution finished, probably canceled,
0403         // so just remove it from the queue and move on without caring about
0404         // its error code
0405         KCompositeJob::removeSubjob(job);
0406     }
0407 }
0408 
0409 void Job::emitWriteFinished()
0410 {
0411     d_ptr->mWriteFinished = true;
0412     Q_EMIT writeFinished(this);
0413 }
0414 
0415 #include "moc_job.cpp"