File indexing completed on 2024-05-26 05:14:35
0001 /*************************************************************************** 0002 * SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org> * 0003 * SPDX-FileCopyrightText: 2013 Volker Krause <vkrause@kde.org> * 0004 * * 0005 * SPDX-License-Identifier: LGPL-2.0-or-later * 0006 ***************************************************************************/ 0007 #include "connection.h" 0008 #include "akonadiserver_debug.h" 0009 0010 #include <QEventLoop> 0011 #include <QSettings> 0012 #include <QThreadStorage> 0013 0014 #include "handler.h" 0015 #include "notificationmanager.h" 0016 #include "storage/datastore.h" 0017 #include "storage/dbdeadlockcatcher.h" 0018 0019 #include <cassert> 0020 0021 #ifndef Q_OS_WIN 0022 #include <cxxabi.h> 0023 #endif 0024 0025 #include "private/standarddirs_p.h" 0026 0027 using namespace Akonadi; 0028 using namespace Akonadi::Server; 0029 0030 #define IDLE_TIMER_TIMEOUT 180000 // 3 min 0031 0032 static QString connectionIdentifier(Connection *c) 0033 { 0034 const QString id = QString::asprintf("%p", static_cast<void *>(c)); 0035 return id; 0036 } 0037 0038 Connection::Connection(AkonadiServer &akonadi) 0039 : AkThread(connectionIdentifier(this), QThread::InheritPriority) 0040 , m_akonadi(akonadi) 0041 { 0042 } 0043 0044 Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi) 0045 : AkThread(connectionIdentifier(this), QThread::InheritPriority) 0046 , m_akonadi(akonadi) 0047 { 0048 m_socketDescriptor = socketDescriptor; 0049 m_identifier = connectionIdentifier(this); // same as objectName() 0050 0051 const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat); 0052 m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool(); 0053 } 0054 0055 void Connection::init() 0056 { 0057 AkThread::init(); 0058 0059 auto socket = std::make_unique<QLocalSocket>(); 0060 if (!socket->setSocketDescriptor(m_socketDescriptor)) { 0061 qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier << ")::run: failed to set socket descriptor: " << socket->error() << "(" 0062 << socket->errorString() << ")"; 0063 return; 0064 } 0065 0066 m_socket = std::move(socket); 0067 connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected); 0068 0069 m_idleTimer = std::make_unique<QTimer>(); 0070 connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle); 0071 0072 storageBackend()->notificationCollector()->setConnection(this); 0073 0074 if (m_socket->state() == QLocalSocket::ConnectedState) { 0075 QTimer::singleShot(0, this, &Connection::handleIncomingData); 0076 } else { 0077 connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData, Qt::QueuedConnection); 0078 } 0079 0080 try { 0081 slotSendHello(); 0082 } catch (const ProtocolException &e) { 0083 qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what(); 0084 m_socket->disconnectFromServer(); 0085 } 0086 } 0087 0088 void Connection::quit() 0089 { 0090 if (QThread::currentThread()->loopLevel() > 1) { 0091 m_connectionClosing = true; 0092 Q_EMIT connectionClosing(); 0093 return; 0094 } 0095 0096 m_akonadi.tracer().endConnection(m_identifier, QString()); 0097 0098 m_socket.reset(); 0099 m_idleTimer.reset(); 0100 0101 AkThread::quit(); 0102 } 0103 0104 void Connection::slotSendHello() 0105 { 0106 SchemaVersion version = SchemaVersion::retrieveAll().at(0); 0107 0108 Protocol::HelloResponse hello; 0109 hello.setServerName(QStringLiteral("Akonadi")); 0110 hello.setMessage(QStringLiteral("Not Really IMAP server")); 0111 hello.setProtocolVersion(Protocol::version()); 0112 hello.setGeneration(version.generation()); 0113 sendResponse(0, std::move(hello)); 0114 } 0115 0116 DataStore *Connection::storageBackend() 0117 { 0118 if (!m_backend) { 0119 m_backend = DataStore::self(); 0120 } 0121 return m_backend; 0122 } 0123 0124 Connection::~Connection() 0125 { 0126 quitThread(); 0127 0128 if (m_reportTime) { 0129 reportTime(); 0130 } 0131 } 0132 0133 void Connection::slotConnectionIdle() 0134 { 0135 Q_ASSERT(m_currentHandler == nullptr); 0136 if (m_backend && m_backend->isOpened()) { 0137 if (m_backend->inTransaction()) { 0138 // This is a programming error, the timer should not have fired. 0139 // But it is safer to abort and leave the connection open, until 0140 // a later operation causes the idle timer to fire (than crash 0141 // the akonadi server). 0142 qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction"; 0143 return; 0144 } 0145 m_backend->close(); 0146 } 0147 } 0148 0149 void Connection::slotSocketDisconnected() 0150 { 0151 // If we have active handler, wait for it to finish, then we emit the signal 0152 // from slotNewDate() 0153 if (m_currentHandler) { 0154 return; 0155 } 0156 0157 Q_EMIT disconnected(); 0158 } 0159 0160 void Connection::parseStream(const Protocol::CommandPtr &cmd) 0161 { 0162 if (!m_currentHandler->parseStream()) { 0163 try { 0164 m_currentHandler->failureResponse("Error while handling a command"); 0165 } catch (...) { 0166 m_connectionClosing = true; 0167 } 0168 qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() << "on connection" << m_identifier; 0169 } 0170 } 0171 0172 void Connection::handleIncomingData() 0173 { 0174 for (;;) { 0175 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) { 0176 break; 0177 } 0178 0179 // Blocks with event loop until some data arrive, allows us to still use QTimers 0180 // and similar while waiting for some data to arrive 0181 if (m_socket->bytesAvailable() < int(sizeof(qint64))) { 0182 QEventLoop loop; 0183 connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit); 0184 connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit); 0185 connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit); 0186 loop.exec(); 0187 0188 // RAII fails to clean QT's slot/signal connections above, leaking memory. Manually disconnect. 0189 disconnect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit); 0190 disconnect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit); 0191 disconnect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit); 0192 } 0193 0194 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) { 0195 break; 0196 } 0197 0198 m_idleTimer->stop(); 0199 0200 // will only open() a previously idle backend. 0201 // Otherwise, a new backend could lazily be constructed by later calls. 0202 if (!storageBackend()->isOpened()) { 0203 m_backend->open(); 0204 } 0205 0206 QString currentCommand; 0207 while (m_socket->bytesAvailable() >= int(sizeof(qint64))) { 0208 Protocol::DataStream stream(m_socket.get()); 0209 qint64 tag = -1; 0210 stream >> tag; 0211 // TODO: Check tag is incremental sequence 0212 0213 Protocol::CommandPtr cmd; 0214 try { 0215 cmd = Protocol::deserialize(m_socket.get()); 0216 } catch (const Akonadi::ProtocolException &e) { 0217 qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection" << m_identifier << ":" << e.what(); 0218 setState(Server::LoggingOut); 0219 return; 0220 } catch (const std::exception &e) { 0221 qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection" << m_identifier << ":" << e.what(); 0222 setState(Server::LoggingOut); 0223 return; 0224 } 0225 if (cmd->type() == Protocol::Command::Invalid) { 0226 qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier << ": resetting connection"; 0227 setState(Server::LoggingOut); 0228 return; 0229 } 0230 0231 // Tag context and collection context is not persistent. 0232 m_context.setTag(std::nullopt); 0233 m_context.setCollection({}); 0234 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) { 0235 m_akonadi.tracer().connectionInput(m_identifier, tag, cmd); 0236 } 0237 0238 m_currentHandler = findHandlerForCommand(cmd->type()); 0239 if (!m_currentHandler) { 0240 qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type() << "on connection" << m_identifier; 0241 setState(Server::LoggingOut); 0242 return; 0243 } 0244 if (m_reportTime) { 0245 startTime(); 0246 } 0247 0248 m_currentHandler->setConnection(this); 0249 m_currentHandler->setTag(tag); 0250 m_currentHandler->setCommand(cmd); 0251 try { 0252 DbDeadlockCatcher catcher([this, &cmd]() { 0253 parseStream(cmd); 0254 }); 0255 } catch (const Akonadi::Server::HandlerException &e) { 0256 if (m_currentHandler) { 0257 try { 0258 m_currentHandler->failureResponse(e.what()); 0259 } catch (...) { 0260 m_connectionClosing = true; 0261 } 0262 qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" 0263 << e.what(); 0264 } 0265 } catch (const Akonadi::Server::Exception &e) { 0266 if (m_currentHandler) { 0267 try { 0268 m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1StringView(": ") + QString::fromUtf8(e.what())); 0269 } catch (...) { 0270 m_connectionClosing = true; 0271 } 0272 qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" 0273 << e.what(); 0274 } 0275 } catch (const Akonadi::ProtocolException &e) { 0276 // No point trying to send anything back to client, the connection is 0277 // already messed up 0278 qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" << e.what(); 0279 m_connectionClosing = true; 0280 #if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION) 0281 } catch (abi::__forced_unwind &) { 0282 // HACK: NPTL throws __forced_unwind during thread cancellation and 0283 // we *must* rethrow it otherwise the program aborts. Due to the issue 0284 // described in #376385 we might end up destroying (cancelling) the 0285 // thread from a nested loop executed inside parseStream() above, 0286 // so the exception raised in there gets caught by this try..catch 0287 // statement and it must be rethrown at all cost. Remove this hack 0288 // once the root problem is fixed. 0289 throw; 0290 #endif 0291 } catch (...) { 0292 qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type() << "on connection" << m_identifier; 0293 if (m_currentHandler) { 0294 try { 0295 m_currentHandler->failureResponse("Unknown exception caught"); 0296 } catch (...) { 0297 m_connectionClosing = true; 0298 } 0299 } 0300 } 0301 if (m_reportTime) { 0302 stopTime(currentCommand); 0303 } 0304 m_currentHandler.reset(); 0305 0306 if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) { 0307 Q_EMIT disconnected(); 0308 return; 0309 } 0310 0311 if (m_connectionClosing) { 0312 break; 0313 } 0314 } 0315 0316 // reset, arm the timer 0317 m_idleTimer->start(IDLE_TIMER_TIMEOUT); 0318 0319 if (m_connectionClosing) { 0320 break; 0321 } 0322 } 0323 0324 if (m_connectionClosing) { 0325 m_socket->disconnect(this); 0326 m_socket->close(); 0327 QTimer::singleShot(0, this, &Connection::quit); 0328 } 0329 } 0330 0331 const CommandContext &Connection::context() const 0332 { 0333 return m_context; 0334 } 0335 0336 void Connection::setContext(const CommandContext &context) 0337 { 0338 m_context = context; 0339 } 0340 0341 std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command) 0342 { 0343 auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi); 0344 if (handler) { 0345 return handler; 0346 } 0347 0348 switch (m_connectionState) { 0349 case NonAuthenticated: 0350 handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi); 0351 break; 0352 case Authenticated: 0353 handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi); 0354 break; 0355 case LoggingOut: 0356 break; 0357 } 0358 0359 return handler; 0360 } 0361 0362 qint64 Connection::currentTag() const 0363 { 0364 return m_currentHandler->tag(); 0365 } 0366 0367 void Connection::setState(ConnectionState state) 0368 { 0369 if (state == m_connectionState) { 0370 return; 0371 } 0372 m_connectionState = state; 0373 switch (m_connectionState) { 0374 case NonAuthenticated: 0375 assert(0); // can't happen, it's only the initial state, we can't go back to it 0376 break; 0377 case Authenticated: 0378 break; 0379 case LoggingOut: 0380 m_socket->disconnectFromServer(); 0381 break; 0382 } 0383 } 0384 0385 void Connection::setSessionId(const QByteArray &id) 0386 { 0387 m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this)); 0388 m_akonadi.tracer().beginConnection(m_identifier, QString()); 0389 // m_streamParser->setTracerIdentifier(m_identifier); 0390 0391 m_sessionId = id; 0392 setObjectName(QString::fromLatin1(id)); 0393 // this races with the use of objectName() in QThreadPrivate::start 0394 // thread()->setObjectName(objectName() + QStringLiteral("-Thread")); 0395 storageBackend()->setSessionId(id); 0396 } 0397 0398 QByteArray Connection::sessionId() const 0399 { 0400 return m_sessionId; 0401 } 0402 0403 bool Connection::isOwnerResource(const PimItem &item) const 0404 { 0405 if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) { 0406 return true; 0407 } 0408 // fallback for older resources 0409 if (sessionId() == item.collection().resource().name().toUtf8()) { 0410 return true; 0411 } 0412 return false; 0413 } 0414 0415 bool Connection::isOwnerResource(const Collection &collection) const 0416 { 0417 if (context().resource().isValid() && collection.resourceId() == context().resource().id()) { 0418 return true; 0419 } 0420 if (sessionId() == collection.resource().name().toUtf8()) { 0421 return true; 0422 } 0423 return false; 0424 } 0425 0426 bool Connection::verifyCacheOnRetrieval() const 0427 { 0428 return m_verifyCacheOnRetrieval; 0429 } 0430 0431 void Connection::startTime() 0432 { 0433 m_time.start(); 0434 } 0435 0436 void Connection::stopTime(const QString &identifier) 0437 { 0438 int elapsed = m_time.elapsed(); 0439 m_totalTime += elapsed; 0440 m_totalTimeByHandler[identifier] += elapsed; 0441 m_executionsByHandler[identifier]++; 0442 qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime; 0443 } 0444 0445 void Connection::reportTime() const 0446 { 0447 qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " ====="; 0448 qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime; 0449 for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) { 0450 const QString &handler = it.key(); 0451 qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions " 0452 << m_executionsByHandler.value(handler) 0453 << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler); 0454 } 0455 } 0456 0457 void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response) 0458 { 0459 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) { 0460 m_akonadi.tracer().connectionOutput(m_identifier, tag, response); 0461 } 0462 Protocol::DataStream stream(m_socket.get()); 0463 stream << tag; 0464 Protocol::serialize(stream, response); 0465 stream.flush(); 0466 if (!m_socket->waitForBytesWritten()) { 0467 if (m_socket->state() == QLocalSocket::ConnectedState) { 0468 throw ProtocolException("Server write timeout"); 0469 } else { 0470 // The client has disconnected before we managed to send our response, 0471 // which is not an error 0472 } 0473 } 0474 } 0475 0476 Protocol::CommandPtr Connection::readCommand() 0477 { 0478 while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) { 0479 Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy 0480 } 0481 0482 Protocol::DataStream stream(m_socket.get()); 0483 qint64 tag; 0484 stream >> tag; 0485 0486 // TODO: compare tag with m_currentHandler->tag() ? 0487 return Protocol::deserialize(m_socket.get()); 0488 } 0489 0490 #include "moc_connection.cpp"