File indexing completed on 2024-05-12 05:26:00
0001 /* 0002 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 0003 * 0004 * This program is free software; you can redistribute it and/or modify 0005 * it under the terms of the GNU General Public License as published by 0006 * the Free Software Foundation; either version 2 of the License, or 0007 * (at your option) any later version. 0008 * 0009 * This program is distributed in the hope that it will be useful, 0010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 0012 * GNU General Public License for more details. 0013 * 0014 * You should have received a copy of the GNU General Public License 0015 * along with this program; if not, write to the 0016 * Free Software Foundation, Inc., 0017 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 0018 */ 0019 0020 #include "listener.h" 0021 0022 #include "common/commands.h" 0023 #include "common/resource.h" 0024 #include "common/log.h" 0025 #include "common/definitions.h" 0026 #include "common/resourcecontext.h" 0027 #include "common/adaptorfactoryregistry.h" 0028 #include "common/bufferutils.h" 0029 0030 // commands 0031 #include "common/commandcompletion_generated.h" 0032 #include "common/handshake_generated.h" 0033 #include "common/revisionupdate_generated.h" 0034 #include "common/notification_generated.h" 0035 #include "common/revisionreplayed_generated.h" 0036 #include "common/secret_generated.h" 0037 0038 #include <QLocalServer> 0039 #include <QLocalSocket> 0040 #include <QTimer> 0041 #include <chrono> 0042 0043 Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 0044 : QObject(parent), 0045 m_server(new QLocalServer(this)), 0046 m_resourceName(resourceType), 0047 m_resourceInstanceIdentifier(resourceInstanceIdentifier), 0048 m_clientBufferProcessesTimer(new QTimer), 0049 m_checkConnectionsTimer(new QTimer), 0050 m_messageId(0), 0051 m_exiting(false) 0052 { 0053 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); 0054 SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier; 0055 0056 m_checkConnectionsTimer->setSingleShot(true); 0057 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { 0058 if (m_connections.isEmpty()) { 0059 SinkTrace() << QString("No connections, shutting down."); 0060 quit(); 0061 } 0062 }); 0063 //Give plenty of time during the first start. 0064 m_checkConnectionsTimer->start(std::chrono::milliseconds{60000}); 0065 0066 // TODO: experiment with different timeouts 0067 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling 0068 m_clientBufferProcessesTimer->setInterval(0); 0069 m_clientBufferProcessesTimer->setSingleShot(true); 0070 connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers); 0071 0072 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 0073 m_server->removeServer(m_resourceInstanceIdentifier); 0074 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 0075 SinkWarning() << "Utter failure to start server"; 0076 exit(-1); 0077 } 0078 } 0079 0080 if (m_server->isListening()) { 0081 SinkTrace() << QString("Listening on %1").arg(m_server->serverName()); 0082 } 0083 } 0084 0085 Listener::~Listener() 0086 { 0087 SinkTrace() << "Shutting down " << m_resourceInstanceIdentifier; 0088 closeAllConnections(); 0089 } 0090 0091 void Listener::checkForUpgrade() 0092 { 0093 if (loadResource().checkForUpgrade()) { 0094 //Close the resource to ensure no transactions are open 0095 m_resource.reset(nullptr); 0096 } 0097 } 0098 0099 void Listener::emergencyAbortAllConnections() 0100 { 0101 for (Client &client : m_connections) { 0102 if (client.socket) { 0103 SinkWarning() << "Sending panic"; 0104 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::ShutdownCommand, "PANIC", 5); 0105 client.socket->waitForBytesWritten(); 0106 disconnect(client.socket, nullptr, this, nullptr); 0107 client.socket->abort(); 0108 delete client.socket; 0109 client.socket = nullptr; 0110 } 0111 } 0112 0113 m_connections.clear(); 0114 } 0115 0116 void Listener::closeAllConnections() 0117 { 0118 for (Client &client : m_connections) { 0119 if (client.socket) { 0120 disconnect(client.socket, nullptr, this, nullptr); 0121 client.socket->flush(); 0122 client.socket->close(); 0123 delete client.socket; 0124 client.socket = nullptr; 0125 } 0126 } 0127 0128 m_connections.clear(); 0129 } 0130 0131 void Listener::acceptConnection() 0132 { 0133 SinkTrace() << "Accepting connection"; 0134 QLocalSocket *socket = m_server->nextPendingConnection(); 0135 0136 if (!socket) { 0137 SinkWarning() << "Accepted connection but didn't get a socket for it"; 0138 return; 0139 } 0140 0141 m_connections << Client("Unknown Client", socket); 0142 connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable); 0143 connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped); 0144 m_checkConnectionsTimer->stop(); 0145 0146 if (socket->bytesAvailable()) { 0147 readFromSocket(socket); 0148 } 0149 } 0150 0151 void Listener::clientDropped() 0152 { 0153 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); 0154 if (!socket) { 0155 return; 0156 } 0157 0158 bool dropped = false; 0159 QMutableVectorIterator<Client> it(m_connections); 0160 while (it.hasNext()) { 0161 const Client &client = it.next(); 0162 if (client.socket == socket) { 0163 dropped = true; 0164 SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket; 0165 it.remove(); 0166 break; 0167 } 0168 } 0169 if (!dropped) { 0170 SinkWarning() << "Failed to find connection for disconnected socket: " << socket; 0171 } 0172 0173 checkConnections(); 0174 } 0175 0176 void Listener::checkConnections() 0177 { 0178 // If this was the last client, disengage the lower limit for revision cleanup 0179 if (m_connections.isEmpty()) { 0180 loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max()); 0181 } 0182 m_checkConnectionsTimer->start(std::chrono::milliseconds{1000}); 0183 } 0184 0185 void Listener::onDataAvailable() 0186 { 0187 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); 0188 if (!socket || m_exiting) { 0189 return; 0190 } 0191 readFromSocket(socket); 0192 } 0193 0194 void Listener::readFromSocket(QLocalSocket *socket) 0195 { 0196 SinkTrace() << "Reading from socket..."; 0197 for (Client &client : m_connections) { 0198 if (client.socket == socket) { 0199 client.commandBuffer += socket->readAll(); 0200 if (!m_clientBufferProcessesTimer->isActive()) { 0201 m_clientBufferProcessesTimer->start(); 0202 } 0203 break; 0204 } 0205 } 0206 } 0207 0208 void Listener::processClientBuffers() 0209 { 0210 // TODO: we should not process all clients, but iterate async over them and process 0211 // one command from each in turn to ensure all clients get fair handling of 0212 // commands? 0213 bool again = false; 0214 for (Client &client : m_connections) { 0215 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { 0216 continue; 0217 } 0218 0219 if (processClientBuffer(client)) { 0220 again = true; 0221 } 0222 } 0223 0224 if (again) { 0225 m_clientBufferProcessesTimer->start(); 0226 } 0227 } 0228 0229 void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void(bool)> &callback) 0230 { 0231 bool success = true; 0232 switch (commandId) { 0233 case Sink::Commands::HandshakeCommand: { 0234 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 0235 if (Sink::Commands::VerifyHandshakeBuffer(verifier)) { 0236 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); 0237 client.name = buffer->name()->c_str(); 0238 } else { 0239 SinkWarning() << "received invalid command"; 0240 } 0241 break; 0242 } 0243 case Sink::Commands::SecretCommand: { 0244 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 0245 if (Sink::Commands::VerifySecretBuffer(verifier)) { 0246 auto buffer = Sink::Commands::GetSecret(commandBuffer.constData()); 0247 loadResource().setSecret(QString{buffer->secret()->c_str()}); 0248 } else { 0249 SinkWarning() << "received invalid command"; 0250 } 0251 break; 0252 } 0253 0254 case Sink::Commands::SynchronizeCommand: 0255 case Sink::Commands::InspectionCommand: 0256 case Sink::Commands::DeleteEntityCommand: 0257 case Sink::Commands::ModifyEntityCommand: 0258 case Sink::Commands::CreateEntityCommand: 0259 case Sink::Commands::FlushCommand: 0260 case Sink::Commands::AbortSynchronizationCommand: 0261 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 0262 loadResource().processCommand(commandId, commandBuffer); 0263 break; 0264 case Sink::Commands::ShutdownCommand: 0265 SinkLog() << QString("Received shutdown command from %1").arg(client.name); 0266 m_exiting = true; 0267 break; 0268 case Sink::Commands::PingCommand: 0269 SinkTrace() << QString("Received ping command from %1").arg(client.name); 0270 break; 0271 case Sink::Commands::RevisionReplayedCommand: { 0272 SinkTrace() << QString("Received revision replayed command from %1").arg(client.name); 0273 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 0274 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 0275 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 0276 client.currentRevision = buffer->revision(); 0277 } else { 0278 SinkWarning() << "received invalid command"; 0279 } 0280 loadResource().setLowerBoundRevision(lowerBoundRevision()); 0281 } 0282 break; 0283 case Sink::Commands::RemoveFromDiskCommand: { 0284 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); 0285 //Close the resource to ensure no transactions are open 0286 m_resource.reset(nullptr); 0287 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 0288 resourceFactory->removeDataFromDisk(m_resourceInstanceIdentifier); 0289 } 0290 m_exiting = true; 0291 } break; 0292 case Sink::Commands::UpgradeCommand: 0293 //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes. 0294 break; 0295 default: 0296 if (commandId > Sink::Commands::CustomCommand) { 0297 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; 0298 loadResource().processCommand(commandId, commandBuffer); 0299 } else { 0300 success = false; 0301 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 0302 } 0303 break; 0304 } 0305 callback(success); 0306 } 0307 0308 qint64 Listener::lowerBoundRevision() 0309 { 0310 qint64 lowerBound = 0; 0311 for (const Client &c : m_connections) { 0312 if (c.currentRevision > 0) { 0313 if (lowerBound == 0) { 0314 lowerBound = c.currentRevision; 0315 } else { 0316 lowerBound = qMin(c.currentRevision, lowerBound); 0317 } 0318 } 0319 } 0320 return lowerBound; 0321 } 0322 0323 void Listener::sendShutdownNotification() 0324 { 0325 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource 0326 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown); 0327 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 0328 for (Client &client : m_connections) { 0329 if (client.socket && client.socket->isOpen()) { 0330 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); 0331 } 0332 } 0333 } 0334 0335 void Listener::quit() 0336 { 0337 SinkTrace() << "Quitting " << m_resourceInstanceIdentifier; 0338 m_clientBufferProcessesTimer->stop(); 0339 m_server->close(); 0340 sendShutdownNotification(); 0341 closeAllConnections(); 0342 m_fbb.Clear(); 0343 0344 QTimer::singleShot(0, this, [this]() { 0345 // This will destroy this object 0346 emit noClients(); 0347 }); 0348 } 0349 0350 bool Listener::processClientBuffer(Client &client) 0351 { 0352 static const int headerSize = Sink::Commands::headerSize(); 0353 if (client.commandBuffer.size() < headerSize) { 0354 return false; 0355 } 0356 0357 const uint messageId = *(const uint *)client.commandBuffer.constData(); 0358 const int commandId = *(const int *)(client.commandBuffer.constData() + sizeof(uint)); 0359 const uint size = *(const uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 0360 SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 0361 0362 // TODO: reject messages above a certain size? 0363 0364 const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); 0365 if (commandComplete) { 0366 client.commandBuffer.remove(0, headerSize); 0367 0368 auto socket = QPointer<QLocalSocket>(client.socket); 0369 auto clientName = client.name; 0370 const QByteArray commandBuffer = client.commandBuffer.left(size); 0371 client.commandBuffer.remove(0, size); 0372 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { 0373 SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); 0374 if (socket) { 0375 sendCommandCompleted(socket.data(), messageId, success); 0376 } else { 0377 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 0378 } 0379 }); 0380 if (m_exiting) { 0381 quit(); 0382 return false; 0383 } 0384 0385 return client.commandBuffer.size() >= headerSize; 0386 } 0387 0388 return false; 0389 } 0390 0391 void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success) 0392 { 0393 if (!socket || !socket->isValid()) { 0394 return; 0395 } 0396 0397 auto command = Sink::Commands::CreateCommandCompletion(m_fbb, messageId, success); 0398 Sink::Commands::FinishCommandCompletionBuffer(m_fbb, command); 0399 Sink::Commands::write(socket, ++m_messageId, Sink::Commands::CommandCompletionCommand, m_fbb); 0400 if (m_exiting) { 0401 socket->waitForBytesWritten(); 0402 } 0403 m_fbb.Clear(); 0404 } 0405 0406 void Listener::refreshRevision(qint64 revision) 0407 { 0408 updateClientsWithRevision(revision); 0409 } 0410 0411 void Listener::updateClientsWithRevision(qint64 revision) 0412 { 0413 auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); 0414 Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); 0415 0416 for (const Client &client : m_connections) { 0417 if (!client.socket || !client.socket->isValid()) { 0418 continue; 0419 } 0420 0421 SinkTrace() << "Sending revision update for " << client.name << revision; 0422 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); 0423 client.socket->flush(); 0424 } 0425 m_fbb.Clear(); 0426 } 0427 0428 void Listener::notify(const Sink::Notification ¬ification) 0429 { 0430 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); 0431 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); 0432 auto entitiesType = m_fbb.CreateString(notification.entitiesType.constData(), notification.entitiesType.size()); 0433 auto entities = Sink::BufferUtils::toVector(m_fbb, notification.entities); 0434 Sink::Commands::NotificationBuilder builder(m_fbb); 0435 builder.add_type(notification.type); 0436 builder.add_code(notification.code); 0437 builder.add_identifier(idString); 0438 builder.add_message(messageString); 0439 builder.add_entitiesType(entitiesType); 0440 builder.add_entities(entities); 0441 builder.add_progress(notification.progress); 0442 builder.add_total(notification.total); 0443 auto command = builder.Finish(); 0444 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 0445 for (Client &client : m_connections) { 0446 if (client.socket && client.socket->isOpen()) { 0447 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); 0448 } 0449 } 0450 m_fbb.Clear(); 0451 } 0452 0453 Sink::Resource &Listener::loadResource() 0454 { 0455 if (!m_resource) { 0456 if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 0457 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)})); 0458 if (!m_resource) { 0459 SinkError() << "Failed to instantiate the resource " << m_resourceName; 0460 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 0461 } 0462 SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 0463 SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); 0464 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 0465 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); 0466 } else { 0467 SinkError() << "Failed to load resource " << m_resourceName; 0468 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 0469 } 0470 } 0471 Q_ASSERT(m_resource); 0472 return *m_resource; 0473 } 0474 0475 #pragma clang diagnostic push 0476 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 0477 #include "moc_listener.cpp" 0478 #pragma clang diagnostic pop