File indexing completed on 2024-11-10 04:40:30
0001 /* 0002 SPDX-FileCopyrightText: 2006 Tobias Koenig <tokoe@kde.org> 0003 2006 Marc Mutz <mutz@kde.org> 0004 2006 - 2007 Volker Krause <vkrause@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include "job.h" 0010 #include "akonadicore_debug.h" 0011 #include "job_p.h" 0012 #include "private/instance_p.h" 0013 #include "private/protocol_p.h" 0014 #include "session.h" 0015 #include "session_p.h" 0016 #include <QDBusConnection> 0017 #include <QTime> 0018 0019 #include <KLocalizedString> 0020 0021 #include <QDBusConnectionInterface> 0022 #include <QDBusInterface> 0023 #include <QElapsedTimer> 0024 #include <QTimer> 0025 0026 using namespace Akonadi; 0027 0028 static QDBusAbstractInterface *s_jobtracker = nullptr; 0029 0030 /// @cond PRIVATE 0031 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response) 0032 { 0033 Q_Q(Job); 0034 0035 if (mCurrentSubJob) { 0036 mCurrentSubJob->d_ptr->handleResponse(tag, response); 0037 return; 0038 } 0039 0040 if (tag == mTag) { 0041 if (response->isResponse()) { 0042 const auto &resp = Protocol::cmdCast<Protocol::Response>(response); 0043 if (resp.isError()) { 0044 q->setError(Job::Unknown); 0045 q->setErrorText(resp.errorMessage()); 0046 q->emitResult(); 0047 return; 0048 } 0049 } 0050 } 0051 0052 if (mTag != tag) { 0053 qCWarning(AKONADICORE_LOG) << "Received response with a different tag!"; 0054 qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type(); 0055 qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q; 0056 return; 0057 } 0058 0059 if (mStarted) { 0060 if (mReadingFinished) { 0061 qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring"; 0062 qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type(); 0063 qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q; 0064 Q_ASSERT(!mReadingFinished); 0065 return; 0066 } 0067 0068 if (q->doHandleResponse(tag, response)) { 0069 mReadingFinished = true; 0070 QTimer::singleShot(0, q, [this]() { 0071 delayedEmitResult(); 0072 }); 0073 } 0074 } 0075 } 0076 0077 void JobPrivate::init(QObject *parent) 0078 { 0079 Q_Q(Job); 0080 0081 mParentJob = qobject_cast<Job *>(parent); 0082 mSession = qobject_cast<Session *>(parent); 0083 0084 if (!mSession) { 0085 if (!mParentJob) { 0086 mSession = Session::defaultSession(); 0087 } else { 0088 mSession = mParentJob->d_ptr->mSession; 0089 } 0090 } 0091 0092 if (!mParentJob) { 0093 mSession->d->addJob(q); 0094 } else { 0095 mParentJob->addSubjob(q); 0096 } 0097 publishJob(); 0098 } 0099 0100 void JobPrivate::publishJob() 0101 { 0102 Q_Q(Job); 0103 // if there's a job tracker running, tell it about the new job 0104 if (!s_jobtracker) { 0105 // Let's only check for the debugging console every 3 seconds, otherwise every single job 0106 // makes a dbus call to the dbus daemon, doesn't help performance. 0107 static QElapsedTimer s_lastTime; 0108 if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) { 0109 if (!s_lastTime.isValid()) { 0110 s_lastTime.start(); 0111 } 0112 const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier(); 0113 if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) { 0114 s_jobtracker = new QDBusInterface(QLatin1StringView("org.kde.akonadiconsole") + suffix, 0115 QStringLiteral("/jobtracker"), 0116 QStringLiteral("org.freedesktop.Akonadi.JobTracker"), 0117 QDBusConnection::sessionBus(), 0118 nullptr); 0119 mSession->d->publishOtherJobs(q); 0120 } else { 0121 s_lastTime.restart(); 0122 } 0123 } 0124 // Note: we never reset s_jobtracker to 0 when a call fails; but if we did 0125 // then we should restart s_lastTime. 0126 } 0127 QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection); 0128 } 0129 0130 void JobPrivate::signalCreationToJobTracker() 0131 { 0132 Q_Q(Job); 0133 if (s_jobtracker) { 0134 // We do these dbus calls manually, so as to avoid having to install (or copy) the console's 0135 // xml interface document. Since this is purely a debugging aid, that seems preferable to 0136 // publishing something not intended for public consumption. 0137 // WARNING: for any signature change here, apply it to resourcescheduler.cpp too 0138 const QList<QVariant> argumentList = QList<QVariant>() << QLatin1StringView(mSession->sessionId()) << QString::number(reinterpret_cast<quintptr>(q), 16) 0139 << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString()) 0140 << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString(); 0141 QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList); 0142 0143 auto watcher = new QDBusPendingCallWatcher(call, s_jobtracker); 0144 QObject::connect(watcher, &QDBusPendingCallWatcher::finished, s_jobtracker, [](QDBusPendingCallWatcher *w) { 0145 QDBusPendingReply<void> reply = *w; 0146 if (reply.isError() && s_jobtracker) { 0147 qDebug() << reply.error().name() << reply.error().message(); 0148 s_jobtracker->deleteLater(); 0149 s_jobtracker = nullptr; 0150 } 0151 w->deleteLater(); 0152 }); 0153 } 0154 } 0155 0156 void JobPrivate::signalStartedToJobTracker() 0157 { 0158 Q_Q(Job); 0159 if (s_jobtracker) { 0160 // if there's a job tracker running, tell it a job started 0161 const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(q), 16)}; 0162 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList); 0163 } 0164 } 0165 0166 void JobPrivate::aboutToFinish() 0167 { 0168 // Dummy 0169 } 0170 0171 void JobPrivate::delayedEmitResult() 0172 { 0173 Q_Q(Job); 0174 if (q->hasSubjobs()) { 0175 // We still have subjobs, wait for them to finish 0176 mFinishPending = true; 0177 } else { 0178 aboutToFinish(); 0179 q->emitResult(); 0180 } 0181 } 0182 0183 void JobPrivate::startQueued() 0184 { 0185 Q_Q(Job); 0186 mStarted = true; 0187 0188 Q_EMIT q->aboutToStart(q); 0189 q->doStart(); 0190 QTimer::singleShot(0, q, [this]() { 0191 startNext(); 0192 }); 0193 QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection); 0194 } 0195 0196 void JobPrivate::lostConnection() 0197 { 0198 Q_Q(Job); 0199 0200 if (mCurrentSubJob) { 0201 mCurrentSubJob->d_ptr->lostConnection(); 0202 } else { 0203 q->setError(Job::ConnectionFailed); 0204 q->emitResult(); 0205 } 0206 } 0207 0208 void JobPrivate::slotSubJobAboutToStart(Job *job) 0209 { 0210 Q_ASSERT(mCurrentSubJob == nullptr); 0211 mCurrentSubJob = job; 0212 } 0213 0214 void JobPrivate::startNext() 0215 { 0216 Q_Q(Job); 0217 0218 if (mStarted && !mCurrentSubJob && q->hasSubjobs()) { 0219 Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0)); 0220 Q_ASSERT(job); 0221 job->d_ptr->startQueued(); 0222 } else if (mFinishPending && !q->hasSubjobs()) { 0223 // The last subjob we've been waiting for has finished, emitResult() finally 0224 QTimer::singleShot(0, q, [this]() { 0225 delayedEmitResult(); 0226 }); 0227 } 0228 } 0229 0230 qint64 JobPrivate::newTag() 0231 { 0232 if (mParentJob) { 0233 mTag = mParentJob->d_ptr->newTag(); 0234 } else { 0235 mTag = mSession->d->nextTag(); 0236 } 0237 return mTag; 0238 } 0239 0240 qint64 JobPrivate::tag() const 0241 { 0242 return mTag; 0243 } 0244 0245 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd) 0246 { 0247 if (mParentJob) { 0248 mParentJob->d_ptr->sendCommand(tag, cmd); 0249 } else { 0250 mSession->d->sendCommand(tag, cmd); 0251 } 0252 } 0253 0254 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd) 0255 { 0256 sendCommand(newTag(), cmd); 0257 } 0258 0259 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) 0260 { 0261 mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision); 0262 } 0263 0264 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) 0265 { 0266 Q_Q(Job); 0267 const auto &subjobs = q->subjobs(); 0268 for (KJob *j : subjobs) { 0269 auto job = qobject_cast<Akonadi::Job *>(j); 0270 if (job) { 0271 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); 0272 } 0273 } 0274 doUpdateItemRevision(itemId, oldRevision, newRevision); 0275 } 0276 0277 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) 0278 { 0279 Q_UNUSED(itemId) 0280 Q_UNUSED(oldRevision) 0281 Q_UNUSED(newRevision) 0282 } 0283 0284 int JobPrivate::protocolVersion() const 0285 { 0286 return mSession->d->protocolVersion; 0287 } 0288 /// @endcond 0289 0290 Job::Job(QObject *parent) 0291 : KCompositeJob(parent) 0292 , d_ptr(new JobPrivate(this)) 0293 { 0294 d_ptr->init(parent); 0295 } 0296 0297 Job::Job(JobPrivate *dd, QObject *parent) 0298 : KCompositeJob(parent) 0299 , d_ptr(dd) 0300 { 0301 d_ptr->init(parent); 0302 } 0303 0304 Job::~Job() 0305 { 0306 // if there is a job tracer listening, tell it the job is done now 0307 if (s_jobtracker) { 0308 const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()}; 0309 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList); 0310 } 0311 } 0312 0313 void Job::start() 0314 { 0315 } 0316 0317 bool Job::doKill() 0318 { 0319 Q_D(Job); 0320 if (d->mStarted) { 0321 // the only way to cancel an already started job is reconnecting to the server 0322 d->mSession->d->forceReconnect(); 0323 } 0324 d->mStarted = false; 0325 return true; 0326 } 0327 0328 QString Job::errorString() const 0329 { 0330 QString str; 0331 switch (error()) { 0332 case NoError: 0333 break; 0334 case ConnectionFailed: 0335 str = i18n("Cannot connect to the Akonadi service."); 0336 break; 0337 case ProtocolVersionMismatch: 0338 str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed."); 0339 break; 0340 case UserCanceled: 0341 str = i18n("User canceled operation."); 0342 break; 0343 case Unknown: 0344 return errorText(); 0345 case UserError: 0346 str = i18n("Unknown error."); 0347 break; 0348 } 0349 if (!errorText().isEmpty()) { 0350 str += QStringLiteral(" (%1)").arg(errorText()); 0351 } 0352 return str; 0353 } 0354 0355 bool Job::addSubjob(KJob *job) 0356 { 0357 bool rv = KCompositeJob::addSubjob(job); 0358 if (rv) { 0359 connect(qobject_cast<Job *>(job), &Job::aboutToStart, this, [this](Job *job) { 0360 d_ptr->slotSubJobAboutToStart(job); 0361 }); 0362 QTimer::singleShot(0, this, [this]() { 0363 d_ptr->startNext(); 0364 }); 0365 } 0366 return rv; 0367 } 0368 0369 bool Job::removeSubjob(KJob *job) 0370 { 0371 bool rv = KCompositeJob::removeSubjob(job); 0372 if (job == d_ptr->mCurrentSubJob) { 0373 d_ptr->mCurrentSubJob = nullptr; 0374 QTimer::singleShot(0, this, [this]() { 0375 d_ptr->startNext(); 0376 }); 0377 } 0378 return rv; 0379 } 0380 0381 bool Akonadi::Job::doHandleResponse(qint64 tag, const Akonadi::Protocol::CommandPtr &response) 0382 { 0383 qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response); 0384 setError(Unknown); 0385 setErrorText(i18n("Unexpected response")); 0386 emitResult(); 0387 return true; 0388 } 0389 0390 void Job::slotResult(KJob *job) 0391 { 0392 if (d_ptr->mCurrentSubJob == job) { 0393 // current job finished, start the next one 0394 d_ptr->mCurrentSubJob = nullptr; 0395 KCompositeJob::slotResult(job); 0396 if (!job->error()) { 0397 QTimer::singleShot(0, this, [this]() { 0398 d_ptr->startNext(); 0399 }); 0400 } 0401 } else { 0402 // job that was still waiting for execution finished, probably canceled, 0403 // so just remove it from the queue and move on without caring about 0404 // its error code 0405 KCompositeJob::removeSubjob(job); 0406 } 0407 } 0408 0409 void Job::emitWriteFinished() 0410 { 0411 d_ptr->mWriteFinished = true; 0412 Q_EMIT writeFinished(this); 0413 } 0414 0415 #include "moc_job.cpp"