File indexing completed on 2024-11-10 04:40:45
0001 /* 0002 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "session.h" 0008 #include "session_p.h" 0009 0010 #include "job.h" 0011 #include "job_p.h" 0012 #include "private/protocol_p.h" 0013 #include "protocolhelper_p.h" 0014 #include "servermanager.h" 0015 #include "servermanager_p.h" 0016 #include "sessionthread_p.h" 0017 0018 #include "akonadicore_debug.h" 0019 0020 #include <KLocalizedString> 0021 0022 #include <QCoreApplication> 0023 #include <QPointer> 0024 #include <QRandomGenerator> 0025 #include <QThread> 0026 #include <QThreadStorage> 0027 #include <QTimer> 0028 0029 #include <QApplication> 0030 #include <QHostAddress> 0031 0032 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission 0033 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still 0034 // sends responses for the next one to the already finished one 0035 #define PIPELINE_LENGTH 0 0036 //#define PIPELINE_LENGTH 2 0037 0038 using namespace Akonadi; 0039 using namespace std::chrono_literals; 0040 /// @cond PRIVATE 0041 0042 void SessionPrivate::startNext() 0043 { 0044 QTimer::singleShot(0, mParent, [this]() { 0045 doStartNext(); 0046 }); 0047 } 0048 0049 void SessionPrivate::reconnect() 0050 { 0051 if (!connection) { 0052 connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer); 0053 sessionThread()->addConnection(connection); 0054 mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection); 0055 mParent->connect( 0056 connection, 0057 &Connection::socketDisconnected, 0058 mParent, 0059 [this]() { 0060 socketDisconnected(); 0061 }, 0062 Qt::QueuedConnection); 0063 mParent->connect( 0064 connection, 0065 &Connection::socketError, 0066 mParent, 0067 [this](const QString &error) { 0068 socketError(error); 0069 }, 0070 Qt::QueuedConnection); 0071 } 0072 0073 connection->reconnect(); 0074 } 0075 0076 void SessionPrivate::socketError(const QString &error) 0077 { 0078 qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error; 0079 socketDisconnected(); 0080 } 0081 0082 void SessionPrivate::socketDisconnected() 0083 { 0084 if (currentJob) { 0085 currentJob->d_ptr->lostConnection(); 0086 } 0087 connected = false; 0088 } 0089 0090 bool SessionPrivate::handleCommands() 0091 { 0092 CommandBufferLocker lock(&mCommandBuffer); 0093 CommandBufferNotifyBlocker notify(&mCommandBuffer); 0094 while (!mCommandBuffer.isEmpty()) { 0095 const auto command = mCommandBuffer.dequeue(); 0096 lock.unlock(); 0097 const auto cmd = command.command; 0098 const auto tag = command.tag; 0099 0100 // Handle Hello response -> send Login 0101 if (cmd->type() == Protocol::Command::Hello) { 0102 const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd); 0103 if (hello.isError()) { 0104 qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage(); 0105 connection->closeConnection(); 0106 QTimer::singleShot(1s, connection, &Connection::reconnect); 0107 return false; 0108 } 0109 0110 qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion(); 0111 qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation(); 0112 qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message(); 0113 // Version mismatch is handled in SessionPrivate::startJob() so that 0114 // we can report the error out via KJob API 0115 protocolVersion = hello.protocolVersion(); 0116 Internal::setServerProtocolVersion(protocolVersion); 0117 Internal::setGeneration(hello.generation()); 0118 0119 sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId)); 0120 } else if (cmd->type() == Protocol::Command::Login) { 0121 const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd); 0122 if (login.isError()) { 0123 qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage(); 0124 connection->closeConnection(); 0125 QTimer::singleShot(1s, mParent, [this]() { 0126 reconnect(); 0127 }); 0128 return false; 0129 } 0130 0131 connected = true; 0132 startNext(); 0133 } else if (currentJob) { 0134 currentJob->d_ptr->handleResponse(tag, cmd); 0135 } 0136 0137 lock.relock(); 0138 } 0139 0140 return true; 0141 } 0142 0143 bool SessionPrivate::canPipelineNext() 0144 { 0145 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) { 0146 return false; 0147 } 0148 if (pipeline.isEmpty() && currentJob) { 0149 return currentJob->d_ptr->mWriteFinished; 0150 } 0151 if (!pipeline.isEmpty()) { 0152 return pipeline.last()->d_ptr->mWriteFinished; 0153 } 0154 return false; 0155 } 0156 0157 void SessionPrivate::doStartNext() 0158 { 0159 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) { 0160 return; 0161 } 0162 if (canPipelineNext()) { 0163 Akonadi::Job *nextJob = queue.dequeue(); 0164 pipeline.enqueue(nextJob); 0165 startJob(nextJob); 0166 } 0167 if (jobRunning) { 0168 return; 0169 } 0170 jobRunning = true; 0171 if (!pipeline.isEmpty()) { 0172 currentJob = pipeline.dequeue(); 0173 } else { 0174 currentJob = queue.dequeue(); 0175 startJob(currentJob); 0176 } 0177 } 0178 0179 void SessionPrivate::startJob(Job *job) 0180 { 0181 if (protocolVersion != Protocol::version()) { 0182 job->setError(Job::ProtocolVersionMismatch); 0183 if (protocolVersion < Protocol::version()) { 0184 job->setErrorText( 0185 i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). " 0186 "If you updated your system recently please restart the Akonadi server.", 0187 protocolVersion, 0188 Protocol::version())); 0189 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version() 0190 << "). " 0191 "If you updated your system recently please restart the Akonadi server."; 0192 } else { 0193 job->setErrorText( 0194 i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). " 0195 "If you updated your system recently please restart all KDE PIM applications.", 0196 protocolVersion, 0197 Protocol::version())); 0198 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version() 0199 << "). " 0200 "If you updated your system recently please restart all KDE PIM applications."; 0201 } 0202 job->emitResult(); 0203 } else { 0204 job->d_ptr->startQueued(); 0205 } 0206 } 0207 0208 void SessionPrivate::endJob(Job *job) 0209 { 0210 job->emitResult(); 0211 } 0212 0213 void SessionPrivate::jobDone(KJob *job) 0214 { 0215 // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below) 0216 // so don't call any methods on job itself 0217 if (job == currentJob) { 0218 if (pipeline.isEmpty()) { 0219 jobRunning = false; 0220 currentJob = nullptr; 0221 } else { 0222 currentJob = pipeline.dequeue(); 0223 } 0224 startNext(); 0225 } else { 0226 // non-current job finished, likely canceled while still in the queue 0227 queue.removeAll(static_cast<Akonadi::Job *>(job)); 0228 // ### likely not enough to really cancel already running jobs 0229 pipeline.removeAll(static_cast<Akonadi::Job *>(job)); 0230 } 0231 } 0232 0233 void SessionPrivate::jobWriteFinished(Akonadi::Job *job) 0234 { 0235 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last())); 0236 Q_UNUSED(job) 0237 0238 startNext(); 0239 } 0240 0241 void SessionPrivate::jobDestroyed(QObject *job) 0242 { 0243 // careful, accessing non-QObject methods of job will fail here already 0244 jobDone(static_cast<KJob *>(job)); 0245 } 0246 0247 void SessionPrivate::addJob(Job *job) 0248 { 0249 queue.append(job); 0250 QObject::connect(job, &KJob::result, mParent, [this](KJob *job) { 0251 jobDone(job); 0252 }); 0253 QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) { 0254 jobWriteFinished(job); 0255 }); 0256 QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) { 0257 jobDestroyed(o); 0258 }); 0259 startNext(); 0260 } 0261 0262 void SessionPrivate::publishOtherJobs(Job *thanThisJob) 0263 { 0264 int count = 0; 0265 for (const auto &job : std::as_const(queue)) { 0266 if (job != thanThisJob) { 0267 job->d_ptr->publishJob(); 0268 ++count; 0269 } 0270 } 0271 if (count > 0) { 0272 qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker"; 0273 } 0274 if (currentJob && currentJob != thanThisJob) { 0275 currentJob->d_ptr->signalStartedToJobTracker(); 0276 } 0277 } 0278 0279 qint64 SessionPrivate::nextTag() 0280 { 0281 return theNextTag++; 0282 } 0283 0284 void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command) 0285 { 0286 connection->sendCommand(tag, command); 0287 } 0288 0289 void SessionPrivate::serverStateChanged(ServerManager::State state) 0290 { 0291 if (state == ServerManager::Running && !connected) { 0292 reconnect(); 0293 } else if (!connected && state == ServerManager::Broken) { 0294 // If the server is broken, cancel all pending jobs, otherwise they will be 0295 // blocked forever and applications waiting for them to finish would be stuck 0296 auto q = queue; 0297 for (Job *job : q) { 0298 job->setError(Job::ConnectionFailed); 0299 job->kill(KJob::EmitResult); 0300 } 0301 } else if (state == ServerManager::Stopping) { 0302 sessionThread()->destroyConnection(connection); 0303 connection = nullptr; 0304 } 0305 } 0306 0307 void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) 0308 { 0309 // only deal with the queue, for the guys in the pipeline it's too late already anyway 0310 // and they shouldn't have gotten there if they depend on a preceding job anyway. 0311 for (Job *job : std::as_const(queue)) { 0312 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); 0313 } 0314 } 0315 0316 /// @endcond 0317 0318 SessionPrivate::SessionPrivate(Session *parent) 0319 : mParent(parent) 0320 , mSessionThread(new SessionThread) 0321 , connection(nullptr) 0322 , protocolVersion(0) 0323 , mCommandBuffer(parent, "handleCommands") 0324 , currentJob(nullptr) 0325 { 0326 // Shutdown the thread before QApplication event loop quits - the 0327 // thread()->wait() mechanism in Connection dtor crashes sometimes 0328 // when called from QApplication destructor 0329 connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, [this]() { 0330 delete mSessionThread; 0331 mSessionThread = nullptr; 0332 }); 0333 } 0334 0335 SessionPrivate::~SessionPrivate() 0336 { 0337 QObject::disconnect(connThreadCleanUp); 0338 delete mSessionThread; 0339 } 0340 0341 void SessionPrivate::init(const QByteArray &id) 0342 { 0343 if (!id.isEmpty()) { 0344 sessionId = id; 0345 } else { 0346 sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number(QRandomGenerator::global()->generate()); 0347 } 0348 0349 qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id; 0350 0351 connected = false; 0352 theNextTag = 2; 0353 jobRunning = false; 0354 0355 if (ServerManager::state() == ServerManager::NotRunning) { 0356 ServerManager::start(); 0357 } 0358 QObject::connect(ServerManager::self(), &ServerManager::stateChanged, mParent, [this](ServerManager::State state) { 0359 serverStateChanged(state); 0360 }); 0361 reconnect(); 0362 } 0363 0364 void SessionPrivate::forceReconnect() 0365 { 0366 jobRunning = false; 0367 connected = false; 0368 if (connection) { 0369 connection->forceReconnect(); 0370 } 0371 QMetaObject::invokeMethod( 0372 mParent, 0373 [this]() { 0374 reconnect(); 0375 }, 0376 Qt::QueuedConnection); 0377 } 0378 0379 Session::Session(const QByteArray &sessionId, QObject *parent) 0380 : QObject(parent) 0381 , d(new SessionPrivate(this)) 0382 { 0383 d->init(sessionId); 0384 } 0385 0386 Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent) 0387 : QObject(parent) 0388 , d(dd) 0389 { 0390 d->mParent = this; 0391 d->init(sessionId); 0392 } 0393 0394 Session::~Session() 0395 { 0396 d->clear(false); 0397 } 0398 0399 QByteArray Session::sessionId() const 0400 { 0401 return d->sessionId; 0402 } 0403 0404 Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init) 0405 0406 void SessionPrivate::createDefaultSession(const QByteArray &sessionId) 0407 { 0408 Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!"); 0409 Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!"); 0410 0411 auto session = new Session(sessionId); 0412 setDefaultSession(session); 0413 } 0414 0415 void SessionPrivate::setDefaultSession(Session *session) 0416 { 0417 instances()->setLocalData({session}); 0418 QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, []() { 0419 instances()->setLocalData({}); 0420 }); 0421 } 0422 0423 Session *Session::defaultSession() 0424 { 0425 if (!instances()->hasLocalData()) { 0426 auto session = new Session(); 0427 SessionPrivate::setDefaultSession(session); 0428 } 0429 return instances()->localData().data(); 0430 } 0431 0432 void Session::clear() 0433 { 0434 d->clear(true); 0435 } 0436 0437 void SessionPrivate::clear(bool forceReconnect) 0438 { 0439 auto q = queue; 0440 for (Job *job : q) { 0441 job->kill(KJob::EmitResult); // safe, not started yet 0442 } 0443 queue.clear(); 0444 auto p = pipeline; 0445 for (Job *job : p) { 0446 job->d_ptr->mStarted = false; // avoid killing/reconnect loops 0447 job->kill(KJob::EmitResult); 0448 } 0449 pipeline.clear(); 0450 if (currentJob) { 0451 currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops 0452 currentJob->kill(KJob::EmitResult); 0453 } 0454 0455 if (forceReconnect) { 0456 this->forceReconnect(); 0457 } 0458 } 0459 0460 #include "moc_session.cpp"