File indexing completed on 2024-05-12 05:26:03

0001 /*
0002  * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 
0021 #include "resourceaccess.h"
0022 
0023 #include "common/commands.h"
0024 #include "common/commandcompletion_generated.h"
0025 #include "common/handshake_generated.h"
0026 #include "common/revisionupdate_generated.h"
0027 #include "common/synchronize_generated.h"
0028 #include "common/notification_generated.h"
0029 #include "common/createentity_generated.h"
0030 #include "common/modifyentity_generated.h"
0031 #include "common/deleteentity_generated.h"
0032 #include "common/revisionreplayed_generated.h"
0033 #include "common/inspection_generated.h"
0034 #include "common/flush_generated.h"
0035 #include "common/secret_generated.h"
0036 #include "common/entitybuffer.h"
0037 #include "common/bufferutils.h"
0038 #include "common/test.h"
0039 #include "common/secretstore.h"
0040 #include "log.h"
0041 
0042 #include <QCoreApplication>
0043 #include <QDebug>
0044 #include <QDir>
0045 #include <QProcess>
0046 #include <QDataStream>
0047 #include <QBuffer>
0048 #include <QTime>
0049 #include <QStandardPaths>
0050 
0051 static void queuedInvoke(const std::function<void()> &f, QObject *context = nullptr)
0052 {
0053     auto timer = QSharedPointer<QTimer>::create();
0054     timer->setSingleShot(true);
0055     QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); });
0056     timer->start(0);
0057 }
0058 
0059 namespace Sink {
0060 
0061 struct QueuedCommand
0062 {
0063 public:
0064     QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) : commandId(commandId), callback(callback)
0065     {
0066     }
0067 
0068     QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) : commandId(commandId), buffer(b), callback(callback)
0069     {
0070     }
0071 
0072 private:
0073     QueuedCommand(const QueuedCommand &other);
0074     QueuedCommand &operator=(const QueuedCommand &rhs);
0075 
0076 public:
0077     const int commandId;
0078     QByteArray buffer;
0079     std::function<void(int, const QString &)> callback;
0080 };
0081 
0082 class ResourceAccess::Private
0083 {
0084 public:
0085     Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *ra);
0086     ~Private();
0087     KAsync::Job<void> tryToConnect();
0088     KAsync::Job<void> initializeSocket();
0089     void abortPendingOperations();
0090     void callCallbacks();
0091 
0092     QByteArray resourceName;
0093     QByteArray resourceInstanceIdentifier;
0094     QSharedPointer<QLocalSocket> socket;
0095     QByteArray partialMessageBuffer;
0096     QVector<QSharedPointer<QueuedCommand>> commandQueue;
0097     QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands;
0098     QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler;
0099     QHash<uint, bool> completeCommands;
0100     uint messageId;
0101     bool openingSocket;
0102     Sink::Log::Context logCtx;
0103     SINK_DEBUG_COMPONENT(resourceInstanceIdentifier)
0104 };
0105 
0106 
0107 ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q)
0108     : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false), logCtx{instanceIdentifier + ".resourceaccess"}
0109 {
0110 }
0111 ResourceAccess::Private::~Private()
0112 {
0113 }
0114 
0115 void ResourceAccess::Private::abortPendingOperations()
0116 {
0117     callCallbacks();
0118     if (!resultHandler.isEmpty()) {
0119         SinkWarning() << "Aborting pending operations " << resultHandler.keys();
0120     }
0121     auto handlers = resultHandler.values();
0122     resultHandler.clear();
0123     for (auto handler : handlers) {
0124         handler(1, "The resource closed unexpectedly");
0125     }
0126     for (auto queuedCommand : commandQueue) {
0127         queuedCommand->callback(1, "The resource closed unexpectedly");
0128     }
0129     commandQueue.clear();
0130 }
0131 
0132 void ResourceAccess::Private::callCallbacks()
0133 {
0134     const auto commandIds = completeCommands.keys();
0135     for (auto id : commandIds) {
0136         const bool success = completeCommands.take(id);
0137         // We remove the callbacks first because the handler can kill resourceaccess directly
0138         const auto callbacks = resultHandler.values(id);
0139         resultHandler.remove(id);
0140         for (auto handler : callbacks) {
0141             if (success) {
0142                 handler(0, QString());
0143             } else {
0144                 handler(1, "Command failed.");
0145             }
0146         }
0147     }
0148 }
0149 
0150 // Connects to server and returns connected socket on success
0151 KAsync::Job<QSharedPointer<QLocalSocket>> ResourceAccess::connectToServer(const QByteArray &identifier)
0152 {
0153     auto ctx = Log::Context{identifier + ".resourceaccess"};
0154 
0155     auto s = QSharedPointer<QLocalSocket>{new QLocalSocket, &QObject::deleteLater};
0156     return KAsync::start<QSharedPointer<QLocalSocket>>([identifier, s, ctx](KAsync::Future<QSharedPointer<QLocalSocket>> &future) {
0157         SinkTraceCtx(ctx) << "Connecting to server " << identifier;
0158         auto context = new QObject;
0159         QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context, identifier, ctx]() {
0160             SinkTraceCtx(ctx) << "Connected to server " << identifier;
0161             Q_ASSERT(s);
0162             delete context;
0163             future.setValue(s);
0164             future.setFinished();
0165         });
0166         QObject::connect(s.data(), static_cast<void (QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, &s, context, identifier, ctx](QLocalSocket::LocalSocketError localSocketError) {
0167             SinkTraceCtx(ctx) << "Failed to connect to server " << localSocketError << identifier;
0168             const auto errorString = s->errorString();
0169             const auto name = s->fullServerName();
0170             delete context;
0171             //We don't set the localSocketError as errorcode, because ConnectionRefused is 0 (no error)
0172             future.setError({1, QString("Failed to connect to socket %1: %2 %3").arg(name).arg(localSocketError).arg(errorString)});
0173         });
0174         s->connectToServer(identifier);
0175     });
0176 }
0177 
0178 KAsync::Job<void> ResourceAccess::Private::tryToConnect()
0179 {
0180     // We may have a socket from the last connection leftover
0181     socket.reset();
0182     auto counter = QSharedPointer<int>::create(0);
0183     return KAsync::doWhile(
0184         [this, counter]() {
0185             SinkTrace() << "Try to connect " << resourceInstanceIdentifier;
0186             return connectToServer(resourceInstanceIdentifier)
0187                 .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>(
0188                     [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
0189                         if (error) {
0190                             static int waitTime = 10;
0191                             static int timeout = 20000;
0192                             static int maxRetries = timeout / waitTime;
0193                             if (*counter >= maxRetries) {
0194                                 SinkTrace() << "Giving up after " << *counter << "tries";
0195                                 return KAsync::error<KAsync::ControlFlowFlag>(error);
0196                             } else {
0197                                 *counter = *counter + 1;
0198                                 return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue));
0199                             }
0200                         } else {
0201                             Q_ASSERT(s);
0202                             socket = s;
0203                             return KAsync::value(KAsync::Break);
0204                         }
0205                     });
0206         });
0207 }
0208 
0209 KAsync::Job<void> ResourceAccess::Private::initializeSocket()
0210 {
0211     return KAsync::start<void>([this] {
0212         SinkTrace() << "Trying to connect";
0213         return connectToServer(resourceInstanceIdentifier)
0214             .then<void, QSharedPointer<QLocalSocket>>(
0215                 [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
0216                     if (error) {
0217                         // We failed to connect, so let's start the resource
0218                         QStringList args;
0219                         if (Sink::Test::testModeEnabled()) {
0220                             args << "--test";
0221                         }
0222                         if (resourceName.isEmpty()) {
0223                             SinkWarning() << "No resource type given";
0224                             return KAsync::error();
0225                         }
0226                         args << resourceInstanceIdentifier << resourceName;
0227 
0228                         //Prefer a binary next to this binary, otherwise fall-back to PATH. Necessary for MacOS bundles because the bundle is not in the PATH.
0229                         auto executable = QStandardPaths::findExecutable("sink_synchronizer", {QCoreApplication::applicationDirPath()});
0230                         if (executable.isEmpty()) {
0231                             executable = QStandardPaths::findExecutable("sink_synchronizer");
0232                         }
0233                         if (executable.isEmpty()) {
0234                             SinkError() << "Failed to find the sink_synchronizer binary in the paths: " << QCoreApplication::applicationDirPath();
0235                             return KAsync::error("Failed to find the sink_synchronizer binary.");
0236                         }
0237                         qint64 pid = 0;
0238                         SinkLog() << "Starting resource " << executable << args.join(" ") << "Home path: " << QDir::homePath();
0239                         if (QProcess::startDetached(executable, args, QDir::homePath(), &pid)) {
0240                             SinkTrace() << "Started resource " << resourceInstanceIdentifier << pid;
0241                             return tryToConnect()
0242                                 .onError([this, args](const KAsync::Error &error) {
0243                                     SinkError() << "Failed to connect to started resource: sink_synchronizer " << args;
0244                                 });
0245                         } else {
0246                             SinkError() << "Failed to start resource " << resourceInstanceIdentifier;
0247                             return KAsync::error("Failed to start resource.");
0248                         }
0249                     } else {
0250                         SinkTrace() << "Connected to resource, without having to start it.";
0251                         Q_ASSERT(s);
0252                         socket = s;
0253                         return KAsync::null();
0254                     }
0255                 });
0256     });
0257 }
0258 
0259 ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType)
0260     : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this))
0261 {
0262     mResourceStatus = Sink::ApplicationDomain::NoStatus;
0263     SinkTraceCtx(d->logCtx) << "Starting access";
0264 
0265     QObject::connect(&SecretStore::instance(), &SecretStore::secretAvailable, this, [this] (const QByteArray &resourceId) {
0266         if (resourceId == d->resourceInstanceIdentifier) {
0267             //No need to start the resource just to send the secret, we'll do that on connection anyways.
0268             if (isReady()) {
0269                 sendSecret(SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier)).exec();
0270             }
0271         }
0272     });
0273 }
0274 
0275 ResourceAccess::~ResourceAccess()
0276 {
0277     SinkLog() << "Closing access";
0278     if (!d->resultHandler.isEmpty()) {
0279         SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
0280     }
0281     delete d;
0282 }
0283 
0284 QByteArray ResourceAccess::resourceName() const
0285 {
0286     return d->resourceName;
0287 }
0288 
0289 bool ResourceAccess::isReady() const
0290 {
0291     return (d->socket && d->socket->isValid());
0292 }
0293 
0294 void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback)
0295 {
0296     d->resultHandler.insert(messageId, callback);
0297 }
0298 
0299 void ResourceAccess::enqueueCommand(const QSharedPointer<QueuedCommand> &command)
0300 {
0301     d->commandQueue << command;
0302     if (isReady()) {
0303         processCommandQueue();
0304     } else {
0305         open();
0306     }
0307 }
0308 
0309 KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
0310 {
0311     return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) {
0312         auto continuation = [&f](int error, const QString &errorMessage) {
0313             if (error) {
0314                 f.setError(error, errorMessage);
0315             }
0316             f.setFinished();
0317         };
0318         enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, continuation));
0319     });
0320 }
0321 
0322 KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
0323 {
0324     // The flatbuffer is transient, but we want to store it until the job is executed
0325     QByteArray buffer(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
0326     return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) {
0327         auto callback = [&f](int error, const QString &errorMessage) {
0328             if (error) {
0329                 f.setError(error, errorMessage);
0330             } else {
0331                 f.setFinished();
0332             }
0333         };
0334         enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, buffer, callback));
0335     });
0336 }
0337 
0338 KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query)
0339 {
0340     flatbuffers::FlatBufferBuilder fbb;
0341     QByteArray queryString;
0342     {
0343         QDataStream stream(&queryString, QIODevice::WriteOnly);
0344         stream << query;
0345     }
0346     auto q = fbb.CreateString(queryString.toStdString());
0347     auto builder = Sink::Commands::SynchronizeBuilder(fbb);
0348     builder.add_query(q);
0349     Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
0350 
0351     return sendCommand(Commands::SynchronizeCommand, fbb);
0352 }
0353 
0354 KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer)
0355 {
0356     flatbuffers::FlatBufferBuilder fbb;
0357     auto entityId = fbb.CreateString(uid.constData());
0358     // This is the resource buffer type and not the domain type
0359     auto type = fbb.CreateString(resourceBufferType.constData());
0360     auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
0361     auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
0362     Sink::Commands::FinishCreateEntityBuffer(fbb, location);
0363     return sendCommand(Sink::Commands::CreateEntityCommand, fbb);
0364 }
0365 
0366 KAsync::Job<void>
0367 ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer, const QByteArrayList &changedProperties, const QByteArray &newResource, bool remove)
0368 {
0369     flatbuffers::FlatBufferBuilder fbb;
0370     auto entityId = fbb.CreateString(uid.constData());
0371     auto type = fbb.CreateString(resourceBufferType.constData());
0372     auto modifiedProperties = BufferUtils::toVector(fbb, changedProperties);
0373     auto deletions = BufferUtils::toVector(fbb, deletedProperties);
0374     auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
0375     auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData());
0376     auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove);
0377     Sink::Commands::FinishModifyEntityBuffer(fbb, location);
0378     return sendCommand(Sink::Commands::ModifyEntityCommand, fbb);
0379 }
0380 
0381 KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType)
0382 {
0383     flatbuffers::FlatBufferBuilder fbb;
0384     auto entityId = fbb.CreateString(uid.constData());
0385     auto type = fbb.CreateString(resourceBufferType.constData());
0386     auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type);
0387     Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
0388     return sendCommand(Sink::Commands::DeleteEntityCommand, fbb);
0389 }
0390 
0391 KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
0392 {
0393     //Avoid starting the resource just to send a revision replayed command.
0394     //This is mostly relevant so in tests we don't restart resources via live-queries after shutdown (which causes them to linger for 60s)
0395     //In production the live-query will always start the resource via open() anyways.
0396     if (!isReady()) {
0397         SinkWarningCtx(d->logCtx) << "Not starting the resource on revision replayed";
0398         return KAsync::error();
0399     }
0400     flatbuffers::FlatBufferBuilder fbb;
0401     auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision);
0402     Sink::Commands::FinishRevisionReplayedBuffer(fbb, location);
0403     return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb);
0404 }
0405 
0406 KAsync::Job<void>
0407 ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
0408 {
0409     flatbuffers::FlatBufferBuilder fbb;
0410     auto id = fbb.CreateString(inspectionId.toStdString());
0411     auto domain = fbb.CreateString(domainType.toStdString());
0412     auto entity = fbb.CreateString(entityId.toStdString());
0413     auto prop = fbb.CreateString(property.toStdString());
0414 
0415     QByteArray array;
0416     QDataStream s(&array, QIODevice::WriteOnly);
0417     s << expectedValue;
0418 
0419     auto expected = fbb.CreateString(array.toStdString());
0420     auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected);
0421     Sink::Commands::FinishInspectionBuffer(fbb, location);
0422     return sendCommand(Sink::Commands::InspectionCommand, fbb);
0423 }
0424 
0425 KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId)
0426 {
0427     flatbuffers::FlatBufferBuilder fbb;
0428     auto id = fbb.CreateString(flushId.toStdString());
0429     auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
0430     Sink::Commands::FinishFlushBuffer(fbb, location);
0431     return sendCommand(Sink::Commands::FlushCommand, fbb);
0432 }
0433 
0434 KAsync::Job<void> ResourceAccess::sendSecret(const QString &secret)
0435 {
0436     flatbuffers::FlatBufferBuilder fbb;
0437     auto s = fbb.CreateString(secret.toStdString());
0438     auto location = Sink::Commands::CreateSecret(fbb, s);
0439     Sink::Commands::FinishSecretBuffer(fbb, location);
0440     return sendCommand(Sink::Commands::SecretCommand, fbb);
0441 }
0442 
0443 KAsync::Job<void> ResourceAccess::shutdown()
0444 {
0445     return sendCommand(Sink::Commands::ShutdownCommand);
0446 }
0447 
0448 void ResourceAccess::open()
0449 {
0450     if (d->socket && d->socket->isValid()) {
0451         // SinkTrace() << "Socket valid, so not opening again";
0452         return;
0453     }
0454     if (d->openingSocket) {
0455         return;
0456     }
0457     auto time = QSharedPointer<QTime>::create();
0458     time->start();
0459     d->openingSocket = true;
0460     d->initializeSocket()
0461         .then<void>(
0462             [this, time](const KAsync::Error &error) {
0463                 d->openingSocket = false;
0464                 if (error) {
0465                     SinkErrorCtx(d->logCtx) << "Failed to initialize socket " << error;
0466                     d->abortPendingOperations();
0467                 } else {
0468                     SinkTraceCtx(d->logCtx) << "Socket is initialized." << Log::TraceTime(time->elapsed());
0469                     Q_ASSERT(d->socket);
0470                     QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
0471                     QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
0472                     QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage);
0473                     connected();
0474                 }
0475                 return KAsync::null();
0476             })
0477         .exec();
0478 }
0479 
0480 void ResourceAccess::close()
0481 {
0482     SinkLogCtx(d->logCtx) << QString("Closing %1").arg(d->socket->fullServerName());
0483     SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size();
0484     SinkTraceCtx(d->logCtx) << "Queued commands: " << d->commandQueue.size();
0485     d->abortPendingOperations();
0486     d->socket->close();
0487 }
0488 
0489 void ResourceAccess::abort()
0490 {
0491     SinkLogCtx(d->logCtx) << QString("Aborting %1").arg(d->socket->fullServerName());
0492     SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size();
0493     SinkTraceCtx(d->logCtx) << "Queued commands: " << d->commandQueue.size();
0494     d->abortPendingOperations();
0495     d->socket->abort();
0496 }
0497 
0498 void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
0499 {
0500     Q_ASSERT(isReady());
0501     // TODO: we should have a timeout for commands
0502     d->messageId++;
0503     const auto messageId = d->messageId;
0504     SinkTraceCtx(d->logCtx) << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
0505     Q_ASSERT(command->callback);
0506     registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
0507         SinkTraceCtx(d->logCtx) << "Command complete " << messageId;
0508         d->pendingCommands.remove(messageId);
0509         command->callback(errorCode, errorMessage);
0510     });
0511     // Keep track of the command until we're sure it arrived
0512     d->pendingCommands.insert(d->messageId, command);
0513     Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
0514 }
0515 
0516 void ResourceAccess::processCommandQueue()
0517 {
0518     // TODO: serialize instead of blast them all through the socket?
0519     SinkTraceCtx(d->logCtx) << "We have " << d->commandQueue.size() << " queued commands";
0520     SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size();
0521     for (auto command : d->commandQueue) {
0522         sendCommand(command);
0523     }
0524     d->commandQueue.clear();
0525 }
0526 
0527 void ResourceAccess::processPendingCommandQueue()
0528 {
0529     SinkTraceCtx(d->logCtx) << "We have " << d->pendingCommands.size() << " pending commands";
0530     for (auto command : d->pendingCommands) {
0531         SinkTraceCtx(d->logCtx) << "Reenquing command " << command->commandId;
0532         d->commandQueue << command;
0533     }
0534     d->pendingCommands.clear();
0535     processCommandQueue();
0536 }
0537 
0538 void ResourceAccess::connected()
0539 {
0540     if (!isReady()) {
0541         SinkTraceCtx(d->logCtx) << "Connected but not ready?";
0542         return;
0543     }
0544 
0545     SinkTraceCtx(d->logCtx) << QString("Connected: %1").arg(d->socket->fullServerName());
0546 
0547     {
0548         flatbuffers::FlatBufferBuilder fbb;
0549         auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1().toStdString());
0550         auto command = Sink::Commands::CreateHandshake(fbb, name);
0551         Sink::Commands::FinishHandshakeBuffer(fbb, command);
0552         Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb);
0553     }
0554 
0555     // Reenqueue pending commands, we failed to send them
0556     processPendingCommandQueue();
0557     auto secret = SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier);
0558     if (!secret.isEmpty()) {
0559         sendSecret(secret).exec();
0560     }
0561 
0562     emit ready(true);
0563 }
0564 
0565 void ResourceAccess::disconnected()
0566 {
0567     SinkLogCtx(d->logCtx) << QString("Disconnected from %1").arg(d->socket->fullServerName());
0568     //Ensure we read all remaining data before closing the socket.
0569     //This is required on windows at least.
0570     readResourceMessage();
0571     d->socket->close();
0572     emit ready(false);
0573 }
0574 
0575 void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
0576 {
0577     const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC");
0578     if (resourceCrashed) {
0579         SinkErrorCtx(d->logCtx) << "The resource crashed!";
0580         mResourceStatus = Sink::ApplicationDomain::ErrorStatus;
0581         Sink::Notification n;
0582         n.type = Sink::Notification::Status;
0583         emit notification(n);
0584         Sink::Notification crashNotification;
0585         crashNotification.type = Sink::Notification::Error;
0586         crashNotification.code = Sink::ApplicationDomain::ResourceCrashedError;
0587         emit notification(crashNotification);
0588         d->abortPendingOperations();
0589     } else if (error == QLocalSocket::PeerClosedError) {
0590         SinkLogCtx(d->logCtx) << "The resource closed the connection.";
0591         d->abortPendingOperations();
0592     } else {
0593         SinkWarningCtx(d->logCtx) << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
0594         if (d->pendingCommands.size()) {
0595             SinkTraceCtx(d->logCtx) << "Reconnecting due to pending operations: " << d->pendingCommands.size();
0596             open();
0597         }
0598     }
0599 }
0600 
0601 void ResourceAccess::readResourceMessage()
0602 {
0603     if (!d->socket) {
0604         SinkWarningCtx(d->logCtx) << "No socket available";
0605         return;
0606     }
0607 
0608     if (d->socket->bytesAvailable()) {
0609         d->partialMessageBuffer += d->socket->readAll();
0610 
0611         // should be scheduled rather than processed all at once
0612         while (processMessageBuffer()) {
0613         }
0614     }
0615 }
0616 
0617 static Sink::Notification getNotification(const Sink::Commands::Notification *buffer)
0618 {
0619     Sink::Notification n;
0620     if (buffer->identifier()) {
0621         // Don't use fromRawData, the buffer is gone once we invoke emit notification
0622         n.id = BufferUtils::extractBufferCopy(buffer->identifier());
0623     }
0624     if (buffer->message()) {
0625         // Don't use fromRawData, the buffer is gone once we invoke emit notification
0626         n.message = BufferUtils::extractBufferCopy(buffer->message());
0627     }
0628     n.type = buffer->type();
0629     n.code = buffer->code();
0630     n.progress = buffer->progress();
0631     n.total = buffer->total();
0632     if (buffer->entitiesType()) {
0633         // Don't use fromRawData, the buffer is gone once we invoke emit notification
0634         n.entitiesType = BufferUtils::extractBufferCopy(buffer->entitiesType());
0635     }
0636     n.entities = BufferUtils::fromVector(*buffer->entities());
0637     return n;
0638 }
0639 
0640 bool ResourceAccess::processMessageBuffer()
0641 {
0642     static const int headerSize = Commands::headerSize();
0643     if (d->partialMessageBuffer.size() < headerSize) {
0644         //This is not an error
0645         SinkTraceCtx(d->logCtx) << "command too small, smaller than headerSize: " << d->partialMessageBuffer.size() << headerSize;
0646         return false;
0647     }
0648 
0649     // const uint messageId = *(int*)(d->partialMessageBuffer.constData());
0650     const int commandId = *(const int *)(d->partialMessageBuffer.constData() + sizeof(uint));
0651     const uint size = *(const int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
0652 
0653     const uint availableMessageSize = d->partialMessageBuffer.size() - headerSize;
0654     if (size > availableMessageSize) {
0655         //This is not an error
0656         SinkTraceCtx(d->logCtx) << "command too small, message smaller than advertised. Available: " << availableMessageSize << "Expected" << size << "HeaderSize: " << headerSize;
0657         return false;
0658     }
0659 
0660     switch (commandId) {
0661         case Commands::RevisionUpdateCommand: {
0662             auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
0663             SinkTraceCtx(d->logCtx) << QString("Revision updated to: %1").arg(buffer->revision());
0664             Notification n;
0665             n.type = Sink::Notification::RevisionUpdate;
0666             emit notification(n);
0667             emit revisionChanged(buffer->revision());
0668 
0669             break;
0670         }
0671         case Commands::CommandCompletionCommand: {
0672             auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
0673             SinkTraceCtx(d->logCtx) << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
0674 
0675             d->completeCommands.insert(buffer->id(), buffer->success());
0676             // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
0677             queuedInvoke([=]() { d->callCallbacks(); }, this);
0678             break;
0679         }
0680         case Commands::NotificationCommand: {
0681             auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
0682             switch (buffer->type()) {
0683                 case Sink::Notification::Shutdown:
0684                     SinkLogCtx(d->logCtx) << "Received shutdown notification.";
0685                     abort();
0686                     break;
0687                 case Sink::Notification::Inspection: {
0688                     SinkTraceCtx(d->logCtx) << "Received inspection notification.";
0689                     auto n = getNotification(buffer);
0690                     // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
0691                     queuedInvoke([=]() { emit notification(n); }, this);
0692                 } break;
0693                 case Sink::Notification::Status:
0694                     if (mResourceStatus != buffer->code()) {
0695                         mResourceStatus = buffer->code();
0696                         SinkTraceCtx(d->logCtx) << "Updated status: " << mResourceStatus;
0697                     }
0698                     [[clang::fallthrough]];
0699                 case Sink::Notification::Info:
0700                     [[clang::fallthrough]];
0701                 case Sink::Notification::Warning:
0702                     [[clang::fallthrough]];
0703                 case Sink::Notification::Error:
0704                     [[clang::fallthrough]];
0705                 case Sink::Notification::FlushCompletion:
0706                     [[clang::fallthrough]];
0707                 case Sink::Notification::Progress: {
0708                     auto n = getNotification(buffer);
0709                     SinkTraceCtx(d->logCtx) << "Received notification: " << n;
0710                     n.resource = d->resourceInstanceIdentifier;
0711                     emit notification(n);
0712                 } break;
0713                 case Sink::Notification::RevisionUpdate:
0714                 default:
0715                     SinkWarningCtx(d->logCtx) << "Received unknown notification: " << buffer->type();
0716                     break;
0717             }
0718             break;
0719         }
0720         default:
0721             break;
0722     }
0723 
0724     d->partialMessageBuffer.remove(0, headerSize + size);
0725     return d->partialMessageBuffer.size() >= headerSize;
0726 }
0727 
0728 
0729 
0730 ResourceAccessFactory::ResourceAccessFactory()
0731 {
0732 
0733 }
0734 
0735 ResourceAccessFactory &ResourceAccessFactory::instance()
0736 {
0737     static ResourceAccessFactory *instance = nullptr;
0738     if (!instance) {
0739         instance = new ResourceAccessFactory;
0740     }
0741     return *instance;
0742 }
0743 
0744 Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier, const QByteArray resourceType)
0745 {
0746     if (!mCache.contains(instanceIdentifier)) {
0747         // Reuse the pointer if something else kept the resourceaccess alive
0748         if (mWeakCache.contains(instanceIdentifier)) {
0749             if (auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef()) {
0750                 mCache.insert(instanceIdentifier, sharedPointer);
0751             }
0752         }
0753         if (!mCache.contains(instanceIdentifier)) {
0754             // Create a new instance if necessary
0755             auto sharedPointer = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(instanceIdentifier, resourceType), &QObject::deleteLater};
0756             QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) {
0757                 if (!ready) {
0758                     //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps.
0759                     auto ptr = mCache.take(instanceIdentifier);
0760                     if (auto timer = mTimer.take(instanceIdentifier)) {
0761                         timer->stop();
0762                     }
0763                     Q_UNUSED(ptr);
0764                 }
0765             });
0766             mCache.insert(instanceIdentifier, sharedPointer);
0767             mWeakCache.insert(instanceIdentifier, sharedPointer);
0768         }
0769     }
0770     if (!mTimer.contains(instanceIdentifier)) {
0771         auto timer = QSharedPointer<QTimer>::create();
0772         timer->setSingleShot(true);
0773         // Drop connection after 3 seconds (which is a random value)
0774         QObject::connect(timer.data(), &QTimer::timeout, timer.data(), [this, instanceIdentifier]() {
0775             //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps.
0776             auto ptr = mCache.take(instanceIdentifier);
0777             Q_UNUSED(ptr);
0778         });
0779         timer->setInterval(3000);
0780         mTimer.insert(instanceIdentifier, timer);
0781     }
0782     mTimer.value(instanceIdentifier)->start();
0783     return mCache.value(instanceIdentifier);
0784 }
0785 }
0786 
0787 #pragma clang diagnostic push
0788 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
0789 #include "moc_resourceaccess.cpp"
0790 #pragma clang diagnostic pop