File indexing completed on 2024-05-12 05:26:00

0001 /*
0002  * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
0003  *
0004  *   This program is free software; you can redistribute it and/or modify
0005  *   it under the terms of the GNU General Public License as published by
0006  *   the Free Software Foundation; either version 2 of the License, or
0007  *   (at your option) any later version.
0008  *
0009  *   This program is distributed in the hope that it will be useful,
0010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
0011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0012  *   GNU General Public License for more details.
0013  *
0014  *   You should have received a copy of the GNU General Public License
0015  *   along with this program; if not, write to the
0016  *   Free Software Foundation, Inc.,
0017  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
0018  */
0019 
0020 #include "listener.h"
0021 
0022 #include "common/commands.h"
0023 #include "common/resource.h"
0024 #include "common/log.h"
0025 #include "common/definitions.h"
0026 #include "common/resourcecontext.h"
0027 #include "common/adaptorfactoryregistry.h"
0028 #include "common/bufferutils.h"
0029 
0030 // commands
0031 #include "common/commandcompletion_generated.h"
0032 #include "common/handshake_generated.h"
0033 #include "common/revisionupdate_generated.h"
0034 #include "common/notification_generated.h"
0035 #include "common/revisionreplayed_generated.h"
0036 #include "common/secret_generated.h"
0037 
0038 #include <QLocalServer>
0039 #include <QLocalSocket>
0040 #include <QTimer>
0041 #include <chrono>
0042 
0043 Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
0044     : QObject(parent),
0045       m_server(new QLocalServer(this)),
0046       m_resourceName(resourceType),
0047       m_resourceInstanceIdentifier(resourceInstanceIdentifier),
0048       m_clientBufferProcessesTimer(new QTimer),
0049       m_checkConnectionsTimer(new QTimer),
0050       m_messageId(0),
0051       m_exiting(false)
0052 {
0053     connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
0054     SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier;
0055 
0056     m_checkConnectionsTimer->setSingleShot(true);
0057     connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
0058         if (m_connections.isEmpty()) {
0059             SinkTrace() << QString("No connections, shutting down.");
0060             quit();
0061         }
0062     });
0063     //Give plenty of time during the first start.
0064     m_checkConnectionsTimer->start(std::chrono::milliseconds{60000});
0065 
0066     // TODO: experiment with different timeouts
0067     //      or even just drop down to invoking the method queued? => invoke queued unless we need throttling
0068     m_clientBufferProcessesTimer->setInterval(0);
0069     m_clientBufferProcessesTimer->setSingleShot(true);
0070     connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers);
0071 
0072     if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
0073         m_server->removeServer(m_resourceInstanceIdentifier);
0074         if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
0075             SinkWarning() << "Utter failure to start server";
0076             exit(-1);
0077         }
0078     }
0079 
0080     if (m_server->isListening()) {
0081         SinkTrace() << QString("Listening on %1").arg(m_server->serverName());
0082     }
0083 }
0084 
0085 Listener::~Listener()
0086 {
0087     SinkTrace() << "Shutting down " << m_resourceInstanceIdentifier;
0088     closeAllConnections();
0089 }
0090 
0091 void Listener::checkForUpgrade()
0092 {
0093     if (loadResource().checkForUpgrade()) {
0094         //Close the resource to ensure no transactions are open
0095         m_resource.reset(nullptr);
0096     }
0097 }
0098 
0099 void Listener::emergencyAbortAllConnections()
0100 {
0101     for (Client &client : m_connections) {
0102         if (client.socket) {
0103             SinkWarning() << "Sending panic";
0104             Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::ShutdownCommand, "PANIC", 5);
0105             client.socket->waitForBytesWritten();
0106             disconnect(client.socket, nullptr, this, nullptr);
0107             client.socket->abort();
0108             delete client.socket;
0109             client.socket = nullptr;
0110         }
0111     }
0112 
0113     m_connections.clear();
0114 }
0115 
0116 void Listener::closeAllConnections()
0117 {
0118     for (Client &client : m_connections) {
0119         if (client.socket) {
0120             disconnect(client.socket, nullptr, this, nullptr);
0121             client.socket->flush();
0122             client.socket->close();
0123             delete client.socket;
0124             client.socket = nullptr;
0125         }
0126     }
0127 
0128     m_connections.clear();
0129 }
0130 
0131 void Listener::acceptConnection()
0132 {
0133     SinkTrace() << "Accepting connection";
0134     QLocalSocket *socket = m_server->nextPendingConnection();
0135 
0136     if (!socket) {
0137         SinkWarning() << "Accepted connection but didn't get a socket for it";
0138         return;
0139     }
0140 
0141     m_connections << Client("Unknown Client", socket);
0142     connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable);
0143     connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped);
0144     m_checkConnectionsTimer->stop();
0145 
0146     if (socket->bytesAvailable()) {
0147         readFromSocket(socket);
0148     }
0149 }
0150 
0151 void Listener::clientDropped()
0152 {
0153     QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
0154     if (!socket) {
0155         return;
0156     }
0157 
0158     bool dropped = false;
0159     QMutableVectorIterator<Client> it(m_connections);
0160     while (it.hasNext()) {
0161         const Client &client = it.next();
0162         if (client.socket == socket) {
0163             dropped = true;
0164             SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket;
0165             it.remove();
0166             break;
0167         }
0168     }
0169     if (!dropped) {
0170         SinkWarning() << "Failed to find connection for disconnected socket: " << socket;
0171     }
0172 
0173     checkConnections();
0174 }
0175 
0176 void Listener::checkConnections()
0177 {
0178     // If this was the last client, disengage the lower limit for revision cleanup
0179     if (m_connections.isEmpty()) {
0180         loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max());
0181     }
0182     m_checkConnectionsTimer->start(std::chrono::milliseconds{1000});
0183 }
0184 
0185 void Listener::onDataAvailable()
0186 {
0187     QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
0188     if (!socket || m_exiting) {
0189         return;
0190     }
0191     readFromSocket(socket);
0192 }
0193 
0194 void Listener::readFromSocket(QLocalSocket *socket)
0195 {
0196     SinkTrace() << "Reading from socket...";
0197     for (Client &client : m_connections) {
0198         if (client.socket == socket) {
0199             client.commandBuffer += socket->readAll();
0200             if (!m_clientBufferProcessesTimer->isActive()) {
0201                 m_clientBufferProcessesTimer->start();
0202             }
0203             break;
0204         }
0205     }
0206 }
0207 
0208 void Listener::processClientBuffers()
0209 {
0210     // TODO: we should not process all clients, but iterate async over them and process
0211     //      one command from each in turn to ensure all clients get fair handling of
0212     //      commands?
0213     bool again = false;
0214     for (Client &client : m_connections) {
0215         if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) {
0216             continue;
0217         }
0218 
0219         if (processClientBuffer(client)) {
0220             again = true;
0221         }
0222     }
0223 
0224     if (again) {
0225         m_clientBufferProcessesTimer->start();
0226     }
0227 }
0228 
0229 void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void(bool)> &callback)
0230 {
0231     bool success = true;
0232     switch (commandId) {
0233         case Sink::Commands::HandshakeCommand: {
0234             flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
0235             if (Sink::Commands::VerifyHandshakeBuffer(verifier)) {
0236                 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData());
0237                 client.name = buffer->name()->c_str();
0238             } else {
0239                 SinkWarning() << "received invalid command";
0240             }
0241             break;
0242         }
0243         case Sink::Commands::SecretCommand: {
0244             flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
0245             if (Sink::Commands::VerifySecretBuffer(verifier)) {
0246                 auto buffer = Sink::Commands::GetSecret(commandBuffer.constData());
0247                 loadResource().setSecret(QString{buffer->secret()->c_str()});
0248             } else {
0249                 SinkWarning() << "received invalid command";
0250             }
0251             break;
0252         }
0253 
0254         case Sink::Commands::SynchronizeCommand:
0255         case Sink::Commands::InspectionCommand:
0256         case Sink::Commands::DeleteEntityCommand:
0257         case Sink::Commands::ModifyEntityCommand:
0258         case Sink::Commands::CreateEntityCommand:
0259         case Sink::Commands::FlushCommand:
0260         case Sink::Commands::AbortSynchronizationCommand:
0261             SinkTrace() << "Command id  " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
0262             loadResource().processCommand(commandId, commandBuffer);
0263             break;
0264         case Sink::Commands::ShutdownCommand:
0265             SinkLog() << QString("Received shutdown command from %1").arg(client.name);
0266             m_exiting = true;
0267             break;
0268         case Sink::Commands::PingCommand:
0269             SinkTrace() << QString("Received ping command from %1").arg(client.name);
0270             break;
0271         case Sink::Commands::RevisionReplayedCommand: {
0272             SinkTrace() << QString("Received revision replayed command from %1").arg(client.name);
0273             flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
0274             if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
0275                 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
0276                 client.currentRevision = buffer->revision();
0277             } else {
0278                 SinkWarning() << "received invalid command";
0279             }
0280             loadResource().setLowerBoundRevision(lowerBoundRevision());
0281         }
0282         break;
0283         case Sink::Commands::RemoveFromDiskCommand: {
0284             SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
0285             //Close the resource to ensure no transactions are open
0286             m_resource.reset(nullptr);
0287             if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
0288                 resourceFactory->removeDataFromDisk(m_resourceInstanceIdentifier);
0289             }
0290             m_exiting = true;
0291         } break;
0292         case Sink::Commands::UpgradeCommand:
0293             //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes.
0294             break;
0295         default:
0296             if (commandId > Sink::Commands::CustomCommand) {
0297                 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
0298                 loadResource().processCommand(commandId, commandBuffer);
0299             } else {
0300                 success = false;
0301                 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
0302             }
0303             break;
0304     }
0305     callback(success);
0306 }
0307 
0308 qint64 Listener::lowerBoundRevision()
0309 {
0310     qint64 lowerBound = 0;
0311     for (const Client &c : m_connections) {
0312         if (c.currentRevision > 0) {
0313             if (lowerBound == 0) {
0314                 lowerBound = c.currentRevision;
0315             } else {
0316                 lowerBound = qMin(c.currentRevision, lowerBound);
0317             }
0318         }
0319     }
0320     return lowerBound;
0321 }
0322 
0323 void Listener::sendShutdownNotification()
0324 {
0325     // Broadcast shutdown notifications to open clients, so they don't try to restart the resource
0326     auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown);
0327     Sink::Commands::FinishNotificationBuffer(m_fbb, command);
0328     for (Client &client : m_connections) {
0329         if (client.socket && client.socket->isOpen()) {
0330             Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb);
0331         }
0332     }
0333 }
0334 
0335 void Listener::quit()
0336 {
0337     SinkTrace() << "Quitting " << m_resourceInstanceIdentifier;
0338     m_clientBufferProcessesTimer->stop();
0339     m_server->close();
0340     sendShutdownNotification();
0341     closeAllConnections();
0342     m_fbb.Clear();
0343 
0344     QTimer::singleShot(0, this, [this]() {
0345         // This will destroy this object
0346         emit noClients();
0347     });
0348 }
0349 
0350 bool Listener::processClientBuffer(Client &client)
0351 {
0352     static const int headerSize = Sink::Commands::headerSize();
0353     if (client.commandBuffer.size() < headerSize) {
0354         return false;
0355     }
0356 
0357     const uint messageId = *(const uint *)client.commandBuffer.constData();
0358     const int commandId = *(const int *)(client.commandBuffer.constData() + sizeof(uint));
0359     const uint size = *(const uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
0360     SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
0361 
0362     // TODO: reject messages above a certain size?
0363 
0364     const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize);
0365     if (commandComplete) {
0366         client.commandBuffer.remove(0, headerSize);
0367 
0368         auto socket = QPointer<QLocalSocket>(client.socket);
0369         auto clientName = client.name;
0370         const QByteArray commandBuffer = client.commandBuffer.left(size);
0371         client.commandBuffer.remove(0, size);
0372         processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
0373             SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName);
0374             if (socket) {
0375                 sendCommandCompleted(socket.data(), messageId, success);
0376             } else {
0377                 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
0378             }
0379         });
0380         if (m_exiting) {
0381             quit();
0382             return false;
0383         }
0384 
0385         return client.commandBuffer.size() >= headerSize;
0386     }
0387 
0388     return false;
0389 }
0390 
0391 void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success)
0392 {
0393     if (!socket || !socket->isValid()) {
0394         return;
0395     }
0396 
0397     auto command = Sink::Commands::CreateCommandCompletion(m_fbb, messageId, success);
0398     Sink::Commands::FinishCommandCompletionBuffer(m_fbb, command);
0399     Sink::Commands::write(socket, ++m_messageId, Sink::Commands::CommandCompletionCommand, m_fbb);
0400     if (m_exiting) {
0401         socket->waitForBytesWritten();
0402     }
0403     m_fbb.Clear();
0404 }
0405 
0406 void Listener::refreshRevision(qint64 revision)
0407 {
0408     updateClientsWithRevision(revision);
0409 }
0410 
0411 void Listener::updateClientsWithRevision(qint64 revision)
0412 {
0413     auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision);
0414     Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command);
0415 
0416     for (const Client &client : m_connections) {
0417         if (!client.socket || !client.socket->isValid()) {
0418             continue;
0419         }
0420 
0421         SinkTrace() << "Sending revision update for " << client.name << revision;
0422         Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb);
0423         client.socket->flush();
0424     }
0425     m_fbb.Clear();
0426 }
0427 
0428 void Listener::notify(const Sink::Notification &notification)
0429 {
0430     auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size());
0431     auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size());
0432     auto entitiesType = m_fbb.CreateString(notification.entitiesType.constData(), notification.entitiesType.size());
0433     auto entities = Sink::BufferUtils::toVector(m_fbb, notification.entities);
0434     Sink::Commands::NotificationBuilder builder(m_fbb);
0435     builder.add_type(notification.type);
0436     builder.add_code(notification.code);
0437     builder.add_identifier(idString);
0438     builder.add_message(messageString);
0439     builder.add_entitiesType(entitiesType);
0440     builder.add_entities(entities);
0441     builder.add_progress(notification.progress);
0442     builder.add_total(notification.total);
0443     auto command = builder.Finish();
0444     Sink::Commands::FinishNotificationBuffer(m_fbb, command);
0445     for (Client &client : m_connections) {
0446         if (client.socket && client.socket->isOpen()) {
0447             Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb);
0448         }
0449     }
0450     m_fbb.Clear();
0451 }
0452 
0453 Sink::Resource &Listener::loadResource()
0454 {
0455     if (!m_resource) {
0456         if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
0457             m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)}));
0458             if (!m_resource) {
0459                 SinkError() << "Failed to instantiate the resource " << m_resourceName;
0460                 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
0461             }
0462             SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
0463             SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
0464             connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
0465             connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
0466         } else {
0467             SinkError() << "Failed to load resource " << m_resourceName;
0468             m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
0469         }
0470     }
0471     Q_ASSERT(m_resource);
0472     return *m_resource;
0473 }
0474 
0475 #pragma clang diagnostic push
0476 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
0477 #include "moc_listener.cpp"
0478 #pragma clang diagnostic pop