File indexing completed on 2024-11-10 04:40:45

0001 /*
0002     SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.0-or-later
0005 */
0006 
0007 #include "session.h"
0008 #include "session_p.h"
0009 
0010 #include "job.h"
0011 #include "job_p.h"
0012 #include "private/protocol_p.h"
0013 #include "protocolhelper_p.h"
0014 #include "servermanager.h"
0015 #include "servermanager_p.h"
0016 #include "sessionthread_p.h"
0017 
0018 #include "akonadicore_debug.h"
0019 
0020 #include <KLocalizedString>
0021 
0022 #include <QCoreApplication>
0023 #include <QPointer>
0024 #include <QRandomGenerator>
0025 #include <QThread>
0026 #include <QThreadStorage>
0027 #include <QTimer>
0028 
0029 #include <QApplication>
0030 #include <QHostAddress>
0031 
0032 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
0033 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
0034 // sends responses for the next one to the already finished one
0035 #define PIPELINE_LENGTH 0
0036 //#define PIPELINE_LENGTH 2
0037 
0038 using namespace Akonadi;
0039 using namespace std::chrono_literals;
0040 /// @cond PRIVATE
0041 
0042 void SessionPrivate::startNext()
0043 {
0044     QTimer::singleShot(0, mParent, [this]() {
0045         doStartNext();
0046     });
0047 }
0048 
0049 void SessionPrivate::reconnect()
0050 {
0051     if (!connection) {
0052         connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
0053         sessionThread()->addConnection(connection);
0054         mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection);
0055         mParent->connect(
0056             connection,
0057             &Connection::socketDisconnected,
0058             mParent,
0059             [this]() {
0060                 socketDisconnected();
0061             },
0062             Qt::QueuedConnection);
0063         mParent->connect(
0064             connection,
0065             &Connection::socketError,
0066             mParent,
0067             [this](const QString &error) {
0068                 socketError(error);
0069             },
0070             Qt::QueuedConnection);
0071     }
0072 
0073     connection->reconnect();
0074 }
0075 
0076 void SessionPrivate::socketError(const QString &error)
0077 {
0078     qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
0079     socketDisconnected();
0080 }
0081 
0082 void SessionPrivate::socketDisconnected()
0083 {
0084     if (currentJob) {
0085         currentJob->d_ptr->lostConnection();
0086     }
0087     connected = false;
0088 }
0089 
0090 bool SessionPrivate::handleCommands()
0091 {
0092     CommandBufferLocker lock(&mCommandBuffer);
0093     CommandBufferNotifyBlocker notify(&mCommandBuffer);
0094     while (!mCommandBuffer.isEmpty()) {
0095         const auto command = mCommandBuffer.dequeue();
0096         lock.unlock();
0097         const auto cmd = command.command;
0098         const auto tag = command.tag;
0099 
0100         // Handle Hello response -> send Login
0101         if (cmd->type() == Protocol::Command::Hello) {
0102             const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
0103             if (hello.isError()) {
0104                 qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
0105                 connection->closeConnection();
0106                 QTimer::singleShot(1s, connection, &Connection::reconnect);
0107                 return false;
0108             }
0109 
0110             qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
0111             qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
0112             qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
0113             // Version mismatch is handled in SessionPrivate::startJob() so that
0114             // we can report the error out via KJob API
0115             protocolVersion = hello.protocolVersion();
0116             Internal::setServerProtocolVersion(protocolVersion);
0117             Internal::setGeneration(hello.generation());
0118 
0119             sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
0120         } else if (cmd->type() == Protocol::Command::Login) {
0121             const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
0122             if (login.isError()) {
0123                 qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
0124                 connection->closeConnection();
0125                 QTimer::singleShot(1s, mParent, [this]() {
0126                     reconnect();
0127                 });
0128                 return false;
0129             }
0130 
0131             connected = true;
0132             startNext();
0133         } else if (currentJob) {
0134             currentJob->d_ptr->handleResponse(tag, cmd);
0135         }
0136 
0137         lock.relock();
0138     }
0139 
0140     return true;
0141 }
0142 
0143 bool SessionPrivate::canPipelineNext()
0144 {
0145     if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
0146         return false;
0147     }
0148     if (pipeline.isEmpty() && currentJob) {
0149         return currentJob->d_ptr->mWriteFinished;
0150     }
0151     if (!pipeline.isEmpty()) {
0152         return pipeline.last()->d_ptr->mWriteFinished;
0153     }
0154     return false;
0155 }
0156 
0157 void SessionPrivate::doStartNext()
0158 {
0159     if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
0160         return;
0161     }
0162     if (canPipelineNext()) {
0163         Akonadi::Job *nextJob = queue.dequeue();
0164         pipeline.enqueue(nextJob);
0165         startJob(nextJob);
0166     }
0167     if (jobRunning) {
0168         return;
0169     }
0170     jobRunning = true;
0171     if (!pipeline.isEmpty()) {
0172         currentJob = pipeline.dequeue();
0173     } else {
0174         currentJob = queue.dequeue();
0175         startJob(currentJob);
0176     }
0177 }
0178 
0179 void SessionPrivate::startJob(Job *job)
0180 {
0181     if (protocolVersion != Protocol::version()) {
0182         job->setError(Job::ProtocolVersionMismatch);
0183         if (protocolVersion < Protocol::version()) {
0184             job->setErrorText(
0185                 i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
0186                      "If you updated your system recently please restart the Akonadi server.",
0187                      protocolVersion,
0188                      Protocol::version()));
0189             qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version()
0190                                        << "). "
0191                                           "If you updated your system recently please restart the Akonadi server.";
0192         } else {
0193             job->setErrorText(
0194                 i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
0195                      "If you updated your system recently please restart all KDE PIM applications.",
0196                      protocolVersion,
0197                      Protocol::version()));
0198             qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version()
0199                                        << "). "
0200                                           "If you updated your system recently please restart all KDE PIM applications.";
0201         }
0202         job->emitResult();
0203     } else {
0204         job->d_ptr->startQueued();
0205     }
0206 }
0207 
0208 void SessionPrivate::endJob(Job *job)
0209 {
0210     job->emitResult();
0211 }
0212 
0213 void SessionPrivate::jobDone(KJob *job)
0214 {
0215     // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
0216     // so don't call any methods on job itself
0217     if (job == currentJob) {
0218         if (pipeline.isEmpty()) {
0219             jobRunning = false;
0220             currentJob = nullptr;
0221         } else {
0222             currentJob = pipeline.dequeue();
0223         }
0224         startNext();
0225     } else {
0226         // non-current job finished, likely canceled while still in the queue
0227         queue.removeAll(static_cast<Akonadi::Job *>(job));
0228         // ### likely not enough to really cancel already running jobs
0229         pipeline.removeAll(static_cast<Akonadi::Job *>(job));
0230     }
0231 }
0232 
0233 void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
0234 {
0235     Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
0236     Q_UNUSED(job)
0237 
0238     startNext();
0239 }
0240 
0241 void SessionPrivate::jobDestroyed(QObject *job)
0242 {
0243     // careful, accessing non-QObject methods of job will fail here already
0244     jobDone(static_cast<KJob *>(job));
0245 }
0246 
0247 void SessionPrivate::addJob(Job *job)
0248 {
0249     queue.append(job);
0250     QObject::connect(job, &KJob::result, mParent, [this](KJob *job) {
0251         jobDone(job);
0252     });
0253     QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) {
0254         jobWriteFinished(job);
0255     });
0256     QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) {
0257         jobDestroyed(o);
0258     });
0259     startNext();
0260 }
0261 
0262 void SessionPrivate::publishOtherJobs(Job *thanThisJob)
0263 {
0264     int count = 0;
0265     for (const auto &job : std::as_const(queue)) {
0266         if (job != thanThisJob) {
0267             job->d_ptr->publishJob();
0268             ++count;
0269         }
0270     }
0271     if (count > 0) {
0272         qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
0273     }
0274     if (currentJob && currentJob != thanThisJob) {
0275         currentJob->d_ptr->signalStartedToJobTracker();
0276     }
0277 }
0278 
0279 qint64 SessionPrivate::nextTag()
0280 {
0281     return theNextTag++;
0282 }
0283 
0284 void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
0285 {
0286     connection->sendCommand(tag, command);
0287 }
0288 
0289 void SessionPrivate::serverStateChanged(ServerManager::State state)
0290 {
0291     if (state == ServerManager::Running && !connected) {
0292         reconnect();
0293     } else if (!connected && state == ServerManager::Broken) {
0294         // If the server is broken, cancel all pending jobs, otherwise they will be
0295         // blocked forever and applications waiting for them to finish would be stuck
0296         auto q = queue;
0297         for (Job *job : q) {
0298             job->setError(Job::ConnectionFailed);
0299             job->kill(KJob::EmitResult);
0300         }
0301     } else if (state == ServerManager::Stopping) {
0302         sessionThread()->destroyConnection(connection);
0303         connection = nullptr;
0304     }
0305 }
0306 
0307 void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
0308 {
0309     // only deal with the queue, for the guys in the pipeline it's too late already anyway
0310     // and they shouldn't have gotten there if they depend on a preceding job anyway.
0311     for (Job *job : std::as_const(queue)) {
0312         job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
0313     }
0314 }
0315 
0316 /// @endcond
0317 
0318 SessionPrivate::SessionPrivate(Session *parent)
0319     : mParent(parent)
0320     , mSessionThread(new SessionThread)
0321     , connection(nullptr)
0322     , protocolVersion(0)
0323     , mCommandBuffer(parent, "handleCommands")
0324     , currentJob(nullptr)
0325 {
0326     // Shutdown the thread before QApplication event loop quits - the
0327     // thread()->wait() mechanism in Connection dtor crashes sometimes
0328     // when called from QApplication destructor
0329     connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, [this]() {
0330         delete mSessionThread;
0331         mSessionThread = nullptr;
0332     });
0333 }
0334 
0335 SessionPrivate::~SessionPrivate()
0336 {
0337     QObject::disconnect(connThreadCleanUp);
0338     delete mSessionThread;
0339 }
0340 
0341 void SessionPrivate::init(const QByteArray &id)
0342 {
0343     if (!id.isEmpty()) {
0344         sessionId = id;
0345     } else {
0346         sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number(QRandomGenerator::global()->generate());
0347     }
0348 
0349     qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
0350 
0351     connected = false;
0352     theNextTag = 2;
0353     jobRunning = false;
0354 
0355     if (ServerManager::state() == ServerManager::NotRunning) {
0356         ServerManager::start();
0357     }
0358     QObject::connect(ServerManager::self(), &ServerManager::stateChanged, mParent, [this](ServerManager::State state) {
0359         serverStateChanged(state);
0360     });
0361     reconnect();
0362 }
0363 
0364 void SessionPrivate::forceReconnect()
0365 {
0366     jobRunning = false;
0367     connected = false;
0368     if (connection) {
0369         connection->forceReconnect();
0370     }
0371     QMetaObject::invokeMethod(
0372         mParent,
0373         [this]() {
0374             reconnect();
0375         },
0376         Qt::QueuedConnection);
0377 }
0378 
0379 Session::Session(const QByteArray &sessionId, QObject *parent)
0380     : QObject(parent)
0381     , d(new SessionPrivate(this))
0382 {
0383     d->init(sessionId);
0384 }
0385 
0386 Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
0387     : QObject(parent)
0388     , d(dd)
0389 {
0390     d->mParent = this;
0391     d->init(sessionId);
0392 }
0393 
0394 Session::~Session()
0395 {
0396     d->clear(false);
0397 }
0398 
0399 QByteArray Session::sessionId() const
0400 {
0401     return d->sessionId;
0402 }
0403 
0404 Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
0405 
0406 void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
0407 {
0408     Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!");
0409     Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!");
0410 
0411     auto session = new Session(sessionId);
0412     setDefaultSession(session);
0413 }
0414 
0415 void SessionPrivate::setDefaultSession(Session *session)
0416 {
0417     instances()->setLocalData({session});
0418     QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, []() {
0419         instances()->setLocalData({});
0420     });
0421 }
0422 
0423 Session *Session::defaultSession()
0424 {
0425     if (!instances()->hasLocalData()) {
0426         auto session = new Session();
0427         SessionPrivate::setDefaultSession(session);
0428     }
0429     return instances()->localData().data();
0430 }
0431 
0432 void Session::clear()
0433 {
0434     d->clear(true);
0435 }
0436 
0437 void SessionPrivate::clear(bool forceReconnect)
0438 {
0439     auto q = queue;
0440     for (Job *job : q) {
0441         job->kill(KJob::EmitResult); // safe, not started yet
0442     }
0443     queue.clear();
0444     auto p = pipeline;
0445     for (Job *job : p) {
0446         job->d_ptr->mStarted = false; // avoid killing/reconnect loops
0447         job->kill(KJob::EmitResult);
0448     }
0449     pipeline.clear();
0450     if (currentJob) {
0451         currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
0452         currentJob->kill(KJob::EmitResult);
0453     }
0454 
0455     if (forceReconnect) {
0456         this->forceReconnect();
0457     }
0458 }
0459 
0460 #include "moc_session.cpp"