File indexing completed on 2024-05-26 05:14:35

0001 /***************************************************************************
0002  *   SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>                 *
0003  *   SPDX-FileCopyrightText: 2013 Volker Krause <vkrause@kde.org>          *
0004  *                                                                         *
0005  *   SPDX-License-Identifier: LGPL-2.0-or-later                            *
0006  ***************************************************************************/
0007 #include "connection.h"
0008 #include "akonadiserver_debug.h"
0009 
0010 #include <QEventLoop>
0011 #include <QSettings>
0012 #include <QThreadStorage>
0013 
0014 #include "handler.h"
0015 #include "notificationmanager.h"
0016 #include "storage/datastore.h"
0017 #include "storage/dbdeadlockcatcher.h"
0018 
0019 #include <cassert>
0020 
0021 #ifndef Q_OS_WIN
0022 #include <cxxabi.h>
0023 #endif
0024 
0025 #include "private/standarddirs_p.h"
0026 
0027 using namespace Akonadi;
0028 using namespace Akonadi::Server;
0029 
0030 #define IDLE_TIMER_TIMEOUT 180000 // 3 min
0031 
0032 static QString connectionIdentifier(Connection *c)
0033 {
0034     const QString id = QString::asprintf("%p", static_cast<void *>(c));
0035     return id;
0036 }
0037 
0038 Connection::Connection(AkonadiServer &akonadi)
0039     : AkThread(connectionIdentifier(this), QThread::InheritPriority)
0040     , m_akonadi(akonadi)
0041 {
0042 }
0043 
0044 Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
0045     : AkThread(connectionIdentifier(this), QThread::InheritPriority)
0046     , m_akonadi(akonadi)
0047 {
0048     m_socketDescriptor = socketDescriptor;
0049     m_identifier = connectionIdentifier(this); // same as objectName()
0050 
0051     const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
0052     m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
0053 }
0054 
0055 void Connection::init()
0056 {
0057     AkThread::init();
0058 
0059     auto socket = std::make_unique<QLocalSocket>();
0060     if (!socket->setSocketDescriptor(m_socketDescriptor)) {
0061         qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier << ")::run: failed to set socket descriptor: " << socket->error() << "("
0062                                      << socket->errorString() << ")";
0063         return;
0064     }
0065 
0066     m_socket = std::move(socket);
0067     connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
0068 
0069     m_idleTimer = std::make_unique<QTimer>();
0070     connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
0071 
0072     storageBackend()->notificationCollector()->setConnection(this);
0073 
0074     if (m_socket->state() == QLocalSocket::ConnectedState) {
0075         QTimer::singleShot(0, this, &Connection::handleIncomingData);
0076     } else {
0077         connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData, Qt::QueuedConnection);
0078     }
0079 
0080     try {
0081         slotSendHello();
0082     } catch (const ProtocolException &e) {
0083         qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
0084         m_socket->disconnectFromServer();
0085     }
0086 }
0087 
0088 void Connection::quit()
0089 {
0090     if (QThread::currentThread()->loopLevel() > 1) {
0091         m_connectionClosing = true;
0092         Q_EMIT connectionClosing();
0093         return;
0094     }
0095 
0096     m_akonadi.tracer().endConnection(m_identifier, QString());
0097 
0098     m_socket.reset();
0099     m_idleTimer.reset();
0100 
0101     AkThread::quit();
0102 }
0103 
0104 void Connection::slotSendHello()
0105 {
0106     SchemaVersion version = SchemaVersion::retrieveAll().at(0);
0107 
0108     Protocol::HelloResponse hello;
0109     hello.setServerName(QStringLiteral("Akonadi"));
0110     hello.setMessage(QStringLiteral("Not Really IMAP server"));
0111     hello.setProtocolVersion(Protocol::version());
0112     hello.setGeneration(version.generation());
0113     sendResponse(0, std::move(hello));
0114 }
0115 
0116 DataStore *Connection::storageBackend()
0117 {
0118     if (!m_backend) {
0119         m_backend = DataStore::self();
0120     }
0121     return m_backend;
0122 }
0123 
0124 Connection::~Connection()
0125 {
0126     quitThread();
0127 
0128     if (m_reportTime) {
0129         reportTime();
0130     }
0131 }
0132 
0133 void Connection::slotConnectionIdle()
0134 {
0135     Q_ASSERT(m_currentHandler == nullptr);
0136     if (m_backend && m_backend->isOpened()) {
0137         if (m_backend->inTransaction()) {
0138             // This is a programming error, the timer should not have fired.
0139             // But it is safer to abort and leave the connection open, until
0140             // a later operation causes the idle timer to fire (than crash
0141             // the akonadi server).
0142             qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
0143             return;
0144         }
0145         m_backend->close();
0146     }
0147 }
0148 
0149 void Connection::slotSocketDisconnected()
0150 {
0151     // If we have active handler, wait for it to finish, then we emit the signal
0152     // from slotNewDate()
0153     if (m_currentHandler) {
0154         return;
0155     }
0156 
0157     Q_EMIT disconnected();
0158 }
0159 
0160 void Connection::parseStream(const Protocol::CommandPtr &cmd)
0161 {
0162     if (!m_currentHandler->parseStream()) {
0163         try {
0164             m_currentHandler->failureResponse("Error while handling a command");
0165         } catch (...) {
0166             m_connectionClosing = true;
0167         }
0168         qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() << "on connection" << m_identifier;
0169     }
0170 }
0171 
0172 void Connection::handleIncomingData()
0173 {
0174     for (;;) {
0175         if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
0176             break;
0177         }
0178 
0179         // Blocks with event loop until some data arrive, allows us to still use QTimers
0180         // and similar while waiting for some data to arrive
0181         if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
0182             QEventLoop loop;
0183             connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
0184             connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
0185             connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
0186             loop.exec();
0187 
0188             // RAII fails to clean QT's slot/signal connections above, leaking memory. Manually disconnect.
0189             disconnect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
0190             disconnect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
0191             disconnect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
0192         }
0193 
0194         if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
0195             break;
0196         }
0197 
0198         m_idleTimer->stop();
0199 
0200         // will only open() a previously idle backend.
0201         // Otherwise, a new backend could lazily be constructed by later calls.
0202         if (!storageBackend()->isOpened()) {
0203             m_backend->open();
0204         }
0205 
0206         QString currentCommand;
0207         while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
0208             Protocol::DataStream stream(m_socket.get());
0209             qint64 tag = -1;
0210             stream >> tag;
0211             // TODO: Check tag is incremental sequence
0212 
0213             Protocol::CommandPtr cmd;
0214             try {
0215                 cmd = Protocol::deserialize(m_socket.get());
0216             } catch (const Akonadi::ProtocolException &e) {
0217                 qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection" << m_identifier << ":" << e.what();
0218                 setState(Server::LoggingOut);
0219                 return;
0220             } catch (const std::exception &e) {
0221                 qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection" << m_identifier << ":" << e.what();
0222                 setState(Server::LoggingOut);
0223                 return;
0224             }
0225             if (cmd->type() == Protocol::Command::Invalid) {
0226                 qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier << ": resetting connection";
0227                 setState(Server::LoggingOut);
0228                 return;
0229             }
0230 
0231             // Tag context and collection context is not persistent.
0232             m_context.setTag(std::nullopt);
0233             m_context.setCollection({});
0234             if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
0235                 m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
0236             }
0237 
0238             m_currentHandler = findHandlerForCommand(cmd->type());
0239             if (!m_currentHandler) {
0240                 qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type() << "on connection" << m_identifier;
0241                 setState(Server::LoggingOut);
0242                 return;
0243             }
0244             if (m_reportTime) {
0245                 startTime();
0246             }
0247 
0248             m_currentHandler->setConnection(this);
0249             m_currentHandler->setTag(tag);
0250             m_currentHandler->setCommand(cmd);
0251             try {
0252                 DbDeadlockCatcher catcher([this, &cmd]() {
0253                     parseStream(cmd);
0254                 });
0255             } catch (const Akonadi::Server::HandlerException &e) {
0256                 if (m_currentHandler) {
0257                     try {
0258                         m_currentHandler->failureResponse(e.what());
0259                     } catch (...) {
0260                         m_connectionClosing = true;
0261                     }
0262                     qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
0263                                                  << e.what();
0264                 }
0265             } catch (const Akonadi::Server::Exception &e) {
0266                 if (m_currentHandler) {
0267                     try {
0268                         m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1StringView(": ") + QString::fromUtf8(e.what()));
0269                     } catch (...) {
0270                         m_connectionClosing = true;
0271                     }
0272                     qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
0273                                                  << e.what();
0274                 }
0275             } catch (const Akonadi::ProtocolException &e) {
0276                 // No point trying to send anything back to client, the connection is
0277                 // already messed up
0278                 qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" << e.what();
0279                 m_connectionClosing = true;
0280 #if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
0281             } catch (abi::__forced_unwind &) {
0282                 // HACK: NPTL throws __forced_unwind during thread cancellation and
0283                 // we *must* rethrow it otherwise the program aborts. Due to the issue
0284                 // described in #376385 we might end up destroying (cancelling) the
0285                 // thread from a nested loop executed inside parseStream() above,
0286                 // so the exception raised in there gets caught by this try..catch
0287                 // statement and it must be rethrown at all cost. Remove this hack
0288                 // once the root problem is fixed.
0289                 throw;
0290 #endif
0291             } catch (...) {
0292                 qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type() << "on connection" << m_identifier;
0293                 if (m_currentHandler) {
0294                     try {
0295                         m_currentHandler->failureResponse("Unknown exception caught");
0296                     } catch (...) {
0297                         m_connectionClosing = true;
0298                     }
0299                 }
0300             }
0301             if (m_reportTime) {
0302                 stopTime(currentCommand);
0303             }
0304             m_currentHandler.reset();
0305 
0306             if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
0307                 Q_EMIT disconnected();
0308                 return;
0309             }
0310 
0311             if (m_connectionClosing) {
0312                 break;
0313             }
0314         }
0315 
0316         // reset, arm the timer
0317         m_idleTimer->start(IDLE_TIMER_TIMEOUT);
0318 
0319         if (m_connectionClosing) {
0320             break;
0321         }
0322     }
0323 
0324     if (m_connectionClosing) {
0325         m_socket->disconnect(this);
0326         m_socket->close();
0327         QTimer::singleShot(0, this, &Connection::quit);
0328     }
0329 }
0330 
0331 const CommandContext &Connection::context() const
0332 {
0333     return m_context;
0334 }
0335 
0336 void Connection::setContext(const CommandContext &context)
0337 {
0338     m_context = context;
0339 }
0340 
0341 std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
0342 {
0343     auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
0344     if (handler) {
0345         return handler;
0346     }
0347 
0348     switch (m_connectionState) {
0349     case NonAuthenticated:
0350         handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
0351         break;
0352     case Authenticated:
0353         handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
0354         break;
0355     case LoggingOut:
0356         break;
0357     }
0358 
0359     return handler;
0360 }
0361 
0362 qint64 Connection::currentTag() const
0363 {
0364     return m_currentHandler->tag();
0365 }
0366 
0367 void Connection::setState(ConnectionState state)
0368 {
0369     if (state == m_connectionState) {
0370         return;
0371     }
0372     m_connectionState = state;
0373     switch (m_connectionState) {
0374     case NonAuthenticated:
0375         assert(0); // can't happen, it's only the initial state, we can't go back to it
0376         break;
0377     case Authenticated:
0378         break;
0379     case LoggingOut:
0380         m_socket->disconnectFromServer();
0381         break;
0382     }
0383 }
0384 
0385 void Connection::setSessionId(const QByteArray &id)
0386 {
0387     m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
0388     m_akonadi.tracer().beginConnection(m_identifier, QString());
0389     // m_streamParser->setTracerIdentifier(m_identifier);
0390 
0391     m_sessionId = id;
0392     setObjectName(QString::fromLatin1(id));
0393     // this races with the use of objectName() in QThreadPrivate::start
0394     // thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
0395     storageBackend()->setSessionId(id);
0396 }
0397 
0398 QByteArray Connection::sessionId() const
0399 {
0400     return m_sessionId;
0401 }
0402 
0403 bool Connection::isOwnerResource(const PimItem &item) const
0404 {
0405     if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
0406         return true;
0407     }
0408     // fallback for older resources
0409     if (sessionId() == item.collection().resource().name().toUtf8()) {
0410         return true;
0411     }
0412     return false;
0413 }
0414 
0415 bool Connection::isOwnerResource(const Collection &collection) const
0416 {
0417     if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
0418         return true;
0419     }
0420     if (sessionId() == collection.resource().name().toUtf8()) {
0421         return true;
0422     }
0423     return false;
0424 }
0425 
0426 bool Connection::verifyCacheOnRetrieval() const
0427 {
0428     return m_verifyCacheOnRetrieval;
0429 }
0430 
0431 void Connection::startTime()
0432 {
0433     m_time.start();
0434 }
0435 
0436 void Connection::stopTime(const QString &identifier)
0437 {
0438     int elapsed = m_time.elapsed();
0439     m_totalTime += elapsed;
0440     m_totalTimeByHandler[identifier] += elapsed;
0441     m_executionsByHandler[identifier]++;
0442     qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
0443 }
0444 
0445 void Connection::reportTime() const
0446 {
0447     qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
0448     qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
0449     for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
0450         const QString &handler = it.key();
0451         qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions "
0452                                    << m_executionsByHandler.value(handler)
0453                                    << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
0454     }
0455 }
0456 
0457 void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
0458 {
0459     if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
0460         m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
0461     }
0462     Protocol::DataStream stream(m_socket.get());
0463     stream << tag;
0464     Protocol::serialize(stream, response);
0465     stream.flush();
0466     if (!m_socket->waitForBytesWritten()) {
0467         if (m_socket->state() == QLocalSocket::ConnectedState) {
0468             throw ProtocolException("Server write timeout");
0469         } else {
0470             // The client has disconnected before we managed to send our response,
0471             // which is not an error
0472         }
0473     }
0474 }
0475 
0476 Protocol::CommandPtr Connection::readCommand()
0477 {
0478     while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) {
0479         Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy
0480     }
0481 
0482     Protocol::DataStream stream(m_socket.get());
0483     qint64 tag;
0484     stream >> tag;
0485 
0486     // TODO: compare tag with m_currentHandler->tag() ?
0487     return Protocol::deserialize(m_socket.get());
0488 }
0489 
0490 #include "moc_connection.cpp"