File indexing completed on 2024-11-10 04:40:39
0001 /* 0002 * SPDX-FileCopyrightText: 2015 Daniel Vrátil <dvratil@redhat.com> 0003 * 0004 * SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "akonadicore_debug.h" 0008 #include "commandbuffer_p.h" 0009 #include "connection_p.h" 0010 #include "private/instance_p.h" 0011 #include "servermanager_p.h" 0012 #include "session_p.h" 0013 0014 #include <QAbstractEventDispatcher> 0015 #include <QApplication> 0016 #include <QDateTime> 0017 #include <QFile> 0018 #include <QFileInfo> 0019 #include <QSettings> 0020 #include <QTimer> 0021 0022 #include "private/datastream_p_p.h" 0023 #include "private/protocol_exception_p.h" 0024 #include "private/standarddirs_p.h" 0025 0026 using namespace Akonadi; 0027 0028 Connection::Connection(ConnectionType connType, const QByteArray &sessionId, CommandBuffer *commandBuffer, QObject *parent) 0029 : QObject(parent) 0030 , mConnectionType(connType) 0031 , mSessionId(sessionId) 0032 , mCommandBuffer(commandBuffer) 0033 { 0034 qRegisterMetaType<Protocol::CommandPtr>(); 0035 qRegisterMetaType<QAbstractSocket::SocketState>(); 0036 0037 const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE"); 0038 if (!sessionLogFile.isEmpty()) { 0039 mLogFile = new QFile(QStringLiteral("%1.%2.%3.%4-%5") 0040 .arg(QString::fromLatin1(sessionLogFile)) 0041 .arg(QApplication::applicationPid()) 0042 .arg(QString::number(reinterpret_cast<qulonglong>(this), 16), 0043 QString::fromLatin1(mSessionId.replace('/', '_')), 0044 connType == CommandConnection ? QStringLiteral("Cmd") : QStringLiteral("Ntf"))); 0045 if (!mLogFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) { 0046 qCWarning(AKONADICORE_LOG) << "Failed to open Akonadi Session log file" << mLogFile->fileName(); 0047 delete mLogFile; 0048 mLogFile = nullptr; 0049 } 0050 } 0051 } 0052 0053 Connection::~Connection() 0054 { 0055 delete mLogFile; 0056 if (mSocket) { 0057 mSocket->disconnect(); 0058 mSocket->disconnectFromServer(); 0059 mSocket->close(); 0060 mSocket.reset(); 0061 } 0062 } 0063 0064 void Connection::reconnect() 0065 { 0066 const bool ok = QMetaObject::invokeMethod(this, &Connection::doReconnect, Qt::QueuedConnection); 0067 Q_ASSERT(ok); 0068 Q_UNUSED(ok) 0069 } 0070 0071 QString Connection::defaultAddressForTypeAndMethod(ConnectionType type, const QString &method) 0072 { 0073 if (method == QLatin1StringView("UnixPath")) { 0074 const QString defaultSocketDir = StandardDirs::saveDir("data"); 0075 if (type == CommandConnection) { 0076 return defaultSocketDir % QStringLiteral("akonadiserver-cmd.socket"); 0077 } else if (type == NotificationConnection) { 0078 return defaultSocketDir % QStringLiteral("akonadiserver-ntf.socket"); 0079 } 0080 } else if (method == QLatin1StringView("NamedPipe")) { 0081 QString suffix; 0082 if (Instance::hasIdentifier()) { 0083 suffix += QStringLiteral("%1-").arg(Instance::identifier()); 0084 } 0085 suffix += QString::fromUtf8(QUrl::toPercentEncoding(qApp->applicationDirPath())); 0086 if (type == CommandConnection) { 0087 return QStringLiteral("Akonadi-Cmd-") % suffix; 0088 } else if (type == NotificationConnection) { 0089 return QStringLiteral("Akonadi-Ntf-") % suffix; 0090 } 0091 } 0092 0093 Q_UNREACHABLE(); 0094 } 0095 0096 void Connection::doReconnect() 0097 { 0098 Q_ASSERT(QThread::currentThread() == thread()); 0099 0100 if (mSocket && (mSocket->state() == QLocalSocket::ConnectedState || mSocket->state() == QLocalSocket::ConnectingState)) { 0101 // nothing to do, we are still/already connected 0102 return; 0103 } 0104 0105 if (ServerManager::self()->state() != ServerManager::Running) { 0106 return; 0107 } 0108 0109 // try to figure out where to connect to 0110 QString serverAddress; 0111 0112 // env var has precedence 0113 const QByteArray serverAddressEnvVar = qgetenv("AKONADI_SERVER_ADDRESS"); 0114 if (!serverAddressEnvVar.isEmpty()) { 0115 const int pos = serverAddressEnvVar.indexOf(':'); 0116 const QByteArray protocol = serverAddressEnvVar.left(pos); 0117 QMap<QString, QString> options; 0118 const QStringList lst = QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(',')); 0119 for (const QString &entry : lst) { 0120 const QStringList pair = entry.split(QLatin1Char('=')); 0121 if (pair.size() != 2) { 0122 continue; 0123 } 0124 options.insert(pair.first(), pair.last()); 0125 } 0126 0127 if (protocol == "unix") { 0128 serverAddress = options.value(QStringLiteral("path")); 0129 } else if (protocol == "pipe") { 0130 serverAddress = options.value(QStringLiteral("name")); 0131 } 0132 } 0133 0134 // try config file next, fall back to defaults if that fails as well 0135 if (serverAddress.isEmpty()) { 0136 const QString connectionConfigFile = StandardDirs::connectionConfigFile(); 0137 const QFileInfo fileInfo(connectionConfigFile); 0138 if (!fileInfo.exists()) { 0139 qCWarning(AKONADICORE_LOG) << "Akonadi Client Session: connection config file '" 0140 "akonadi/akonadiconnectionrc' can not be found!"; 0141 } 0142 0143 QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat); 0144 0145 QString connectionType; 0146 if (mConnectionType == CommandConnection) { 0147 connectionType = QStringLiteral("Data"); 0148 } else if (mConnectionType == NotificationConnection) { 0149 connectionType = QStringLiteral("Notifications"); 0150 } 0151 0152 connectionSettings.beginGroup(connectionType); 0153 const auto method = connectionSettings.value(QStringLiteral("Method"), QStringLiteral("UnixPath")).toString(); 0154 serverAddress = connectionSettings.value(method, defaultAddressForTypeAndMethod(mConnectionType, method)).toString(); 0155 } 0156 0157 mSocket.reset(new QLocalSocket(this)); 0158 connect(mSocket.data(), &QLocalSocket::errorOccurred, this, [this](QLocalSocket::LocalSocketError /*unused*/) { 0159 qCWarning(AKONADICORE_LOG) << mSocket->errorString() << mSocket->serverName(); 0160 Q_EMIT socketError(mSocket->errorString()); 0161 Q_EMIT socketDisconnected(); 0162 }); 0163 connect(mSocket.data(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected); 0164 // note: we temporarily disconnect from readyRead-signal inside handleIncomingData() 0165 connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData); 0166 0167 // actually do connect 0168 qCDebug(AKONADICORE_LOG) << "connectToServer" << serverAddress; 0169 mSocket->connectToServer(serverAddress); 0170 if (!mSocket->waitForConnected()) { 0171 qCWarning(AKONADICORE_LOG) << "Failed to connect to server!"; 0172 Q_EMIT socketError(tr("Failed to connect to server!")); 0173 mSocket.reset(); 0174 return; 0175 } 0176 0177 QTimer::singleShot(0, this, &Connection::handleIncomingData); 0178 0179 Q_EMIT reconnected(); 0180 } 0181 0182 void Connection::forceReconnect() 0183 { 0184 const bool ok = QMetaObject::invokeMethod(this, &Connection::doForceReconnect, Qt::QueuedConnection); 0185 0186 Q_ASSERT(ok); 0187 Q_UNUSED(ok) 0188 } 0189 0190 void Connection::doForceReconnect() 0191 { 0192 Q_ASSERT(QThread::currentThread() == thread()); 0193 0194 if (mSocket) { 0195 disconnect(mSocket.get(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected); 0196 mSocket->disconnectFromServer(); 0197 mSocket.reset(); 0198 } 0199 } 0200 0201 void Connection::closeConnection() 0202 { 0203 const bool ok = QMetaObject::invokeMethod(this, &Connection::doCloseConnection, Qt::QueuedConnection); 0204 Q_ASSERT(ok); 0205 Q_UNUSED(ok) 0206 } 0207 0208 void Connection::doCloseConnection() 0209 { 0210 Q_ASSERT(QThread::currentThread() == thread()); 0211 0212 if (mSocket) { 0213 mSocket->close(); 0214 mSocket.reset(); 0215 } 0216 } 0217 0218 QLocalSocket *Connection::socket() const 0219 { 0220 return mSocket.data(); 0221 } 0222 0223 void Connection::handleIncomingData() 0224 { 0225 Q_ASSERT(QThread::currentThread() == thread()); 0226 0227 if (!mSocket) { // not connected yet 0228 return; 0229 } 0230 0231 while (mSocket->bytesAvailable() >= int(sizeof(qint64))) { 0232 Protocol::DataStream stream(mSocket.data()); 0233 qint64 tag; 0234 stream >> tag; 0235 0236 // temporarily disconnect from readyRead-signal to avoid re-entering this function when we 0237 // call waitForData() deep inside Protocol::deserialize 0238 disconnect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData); 0239 0240 Protocol::CommandPtr cmd; 0241 try { 0242 cmd = Protocol::deserialize(mSocket.data()); 0243 } catch (const Akonadi::ProtocolException &e) { 0244 qCWarning(AKONADICORE_LOG) << "Protocol exception:" << e.what(); 0245 // cmd's type will be Invalid by default, so fall-through 0246 } 0247 0248 // reconnect to the signal again 0249 connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData); 0250 0251 if (!cmd || (cmd->type() == Protocol::Command::Invalid)) { 0252 qCWarning(AKONADICORE_LOG) << "Invalid command, the world is going to end!"; 0253 mSocket->close(); 0254 reconnect(); 0255 return; 0256 } 0257 0258 if (mLogFile) { 0259 mLogFile->write("S: "); 0260 mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8()); 0261 mLogFile->write(QByteArray::number(tag)); 0262 mLogFile->write(" "); 0263 mLogFile->write(Protocol::debugString(cmd).toUtf8()); 0264 mLogFile->write("\n\n"); 0265 mLogFile->flush(); 0266 } 0267 0268 if (cmd->type() == Protocol::Command::Hello) { 0269 Q_ASSERT(cmd->isResponse()); 0270 } 0271 0272 { 0273 CommandBufferLocker locker(mCommandBuffer); 0274 mCommandBuffer->enqueue(tag, cmd); 0275 } 0276 } 0277 } 0278 0279 void Connection::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd) 0280 { 0281 const bool ok = QMetaObject::invokeMethod(this, "doSendCommand", Qt::QueuedConnection, Q_ARG(qint64, tag), Q_ARG(Akonadi::Protocol::CommandPtr, cmd)); 0282 Q_ASSERT(ok); 0283 Q_UNUSED(ok) 0284 } 0285 0286 void Connection::doSendCommand(qint64 tag, const Protocol::CommandPtr &cmd) 0287 { 0288 Q_ASSERT(QThread::currentThread() == thread()); 0289 0290 if (mLogFile) { 0291 mLogFile->write("C: "); 0292 mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8()); 0293 mLogFile->write(QByteArray::number(tag)); 0294 mLogFile->write(" "); 0295 mLogFile->write(Protocol::debugString(cmd).toUtf8()); 0296 mLogFile->write("\n\n"); 0297 mLogFile->flush(); 0298 } 0299 0300 if (mSocket && mSocket->isOpen()) { 0301 Protocol::DataStream stream(mSocket.data()); 0302 try { 0303 stream << tag; 0304 Protocol::serialize(stream, cmd); 0305 stream.flush(); 0306 } catch (const Akonadi::ProtocolException &e) { 0307 qCWarning(AKONADICORE_LOG) << "Protocol Exception:" << QString::fromUtf8(e.what()); 0308 mSocket->close(); 0309 reconnect(); 0310 return; 0311 } 0312 if (!mSocket->waitForBytesWritten()) { 0313 qCWarning(AKONADICORE_LOG) << "Socket write timeout"; 0314 mSocket->close(); 0315 reconnect(); 0316 return; 0317 } 0318 } else { 0319 // TODO: Queue the commands and resend on reconnect? 0320 } 0321 } 0322 0323 #include "moc_connection_p.cpp"