File indexing completed on 2024-05-12 05:26:03
0001 /* 0002 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 0021 #include "resourceaccess.h" 0022 0023 #include "common/commands.h" 0024 #include "common/commandcompletion_generated.h" 0025 #include "common/handshake_generated.h" 0026 #include "common/revisionupdate_generated.h" 0027 #include "common/synchronize_generated.h" 0028 #include "common/notification_generated.h" 0029 #include "common/createentity_generated.h" 0030 #include "common/modifyentity_generated.h" 0031 #include "common/deleteentity_generated.h" 0032 #include "common/revisionreplayed_generated.h" 0033 #include "common/inspection_generated.h" 0034 #include "common/flush_generated.h" 0035 #include "common/secret_generated.h" 0036 #include "common/entitybuffer.h" 0037 #include "common/bufferutils.h" 0038 #include "common/test.h" 0039 #include "common/secretstore.h" 0040 #include "log.h" 0041 0042 #include <QCoreApplication> 0043 #include <QDebug> 0044 #include <QDir> 0045 #include <QProcess> 0046 #include <QDataStream> 0047 #include <QBuffer> 0048 #include <QTime> 0049 #include <QStandardPaths> 0050 0051 static void queuedInvoke(const std::function<void()> &f, QObject *context = nullptr) 0052 { 0053 auto timer = QSharedPointer<QTimer>::create(); 0054 timer->setSingleShot(true); 0055 QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); }); 0056 timer->start(0); 0057 } 0058 0059 namespace Sink { 0060 0061 struct QueuedCommand 0062 { 0063 public: 0064 QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) : commandId(commandId), callback(callback) 0065 { 0066 } 0067 0068 QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) : commandId(commandId), buffer(b), callback(callback) 0069 { 0070 } 0071 0072 private: 0073 QueuedCommand(const QueuedCommand &other); 0074 QueuedCommand &operator=(const QueuedCommand &rhs); 0075 0076 public: 0077 const int commandId; 0078 QByteArray buffer; 0079 std::function<void(int, const QString &)> callback; 0080 }; 0081 0082 class ResourceAccess::Private 0083 { 0084 public: 0085 Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *ra); 0086 ~Private(); 0087 KAsync::Job<void> tryToConnect(); 0088 KAsync::Job<void> initializeSocket(); 0089 void abortPendingOperations(); 0090 void callCallbacks(); 0091 0092 QByteArray resourceName; 0093 QByteArray resourceInstanceIdentifier; 0094 QSharedPointer<QLocalSocket> socket; 0095 QByteArray partialMessageBuffer; 0096 QVector<QSharedPointer<QueuedCommand>> commandQueue; 0097 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; 0098 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler; 0099 QHash<uint, bool> completeCommands; 0100 uint messageId; 0101 bool openingSocket; 0102 Sink::Log::Context logCtx; 0103 SINK_DEBUG_COMPONENT(resourceInstanceIdentifier) 0104 }; 0105 0106 0107 ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) 0108 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false), logCtx{instanceIdentifier + ".resourceaccess"} 0109 { 0110 } 0111 ResourceAccess::Private::~Private() 0112 { 0113 } 0114 0115 void ResourceAccess::Private::abortPendingOperations() 0116 { 0117 callCallbacks(); 0118 if (!resultHandler.isEmpty()) { 0119 SinkWarning() << "Aborting pending operations " << resultHandler.keys(); 0120 } 0121 auto handlers = resultHandler.values(); 0122 resultHandler.clear(); 0123 for (auto handler : handlers) { 0124 handler(1, "The resource closed unexpectedly"); 0125 } 0126 for (auto queuedCommand : commandQueue) { 0127 queuedCommand->callback(1, "The resource closed unexpectedly"); 0128 } 0129 commandQueue.clear(); 0130 } 0131 0132 void ResourceAccess::Private::callCallbacks() 0133 { 0134 const auto commandIds = completeCommands.keys(); 0135 for (auto id : commandIds) { 0136 const bool success = completeCommands.take(id); 0137 // We remove the callbacks first because the handler can kill resourceaccess directly 0138 const auto callbacks = resultHandler.values(id); 0139 resultHandler.remove(id); 0140 for (auto handler : callbacks) { 0141 if (success) { 0142 handler(0, QString()); 0143 } else { 0144 handler(1, "Command failed."); 0145 } 0146 } 0147 } 0148 } 0149 0150 // Connects to server and returns connected socket on success 0151 KAsync::Job<QSharedPointer<QLocalSocket>> ResourceAccess::connectToServer(const QByteArray &identifier) 0152 { 0153 auto ctx = Log::Context{identifier + ".resourceaccess"}; 0154 0155 auto s = QSharedPointer<QLocalSocket>{new QLocalSocket, &QObject::deleteLater}; 0156 return KAsync::start<QSharedPointer<QLocalSocket>>([identifier, s, ctx](KAsync::Future<QSharedPointer<QLocalSocket>> &future) { 0157 SinkTraceCtx(ctx) << "Connecting to server " << identifier; 0158 auto context = new QObject; 0159 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context, identifier, ctx]() { 0160 SinkTraceCtx(ctx) << "Connected to server " << identifier; 0161 Q_ASSERT(s); 0162 delete context; 0163 future.setValue(s); 0164 future.setFinished(); 0165 }); 0166 QObject::connect(s.data(), static_cast<void (QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, &s, context, identifier, ctx](QLocalSocket::LocalSocketError localSocketError) { 0167 SinkTraceCtx(ctx) << "Failed to connect to server " << localSocketError << identifier; 0168 const auto errorString = s->errorString(); 0169 const auto name = s->fullServerName(); 0170 delete context; 0171 //We don't set the localSocketError as errorcode, because ConnectionRefused is 0 (no error) 0172 future.setError({1, QString("Failed to connect to socket %1: %2 %3").arg(name).arg(localSocketError).arg(errorString)}); 0173 }); 0174 s->connectToServer(identifier); 0175 }); 0176 } 0177 0178 KAsync::Job<void> ResourceAccess::Private::tryToConnect() 0179 { 0180 // We may have a socket from the last connection leftover 0181 socket.reset(); 0182 auto counter = QSharedPointer<int>::create(0); 0183 return KAsync::doWhile( 0184 [this, counter]() { 0185 SinkTrace() << "Try to connect " << resourceInstanceIdentifier; 0186 return connectToServer(resourceInstanceIdentifier) 0187 .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>( 0188 [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { 0189 if (error) { 0190 static int waitTime = 10; 0191 static int timeout = 20000; 0192 static int maxRetries = timeout / waitTime; 0193 if (*counter >= maxRetries) { 0194 SinkTrace() << "Giving up after " << *counter << "tries"; 0195 return KAsync::error<KAsync::ControlFlowFlag>(error); 0196 } else { 0197 *counter = *counter + 1; 0198 return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); 0199 } 0200 } else { 0201 Q_ASSERT(s); 0202 socket = s; 0203 return KAsync::value(KAsync::Break); 0204 } 0205 }); 0206 }); 0207 } 0208 0209 KAsync::Job<void> ResourceAccess::Private::initializeSocket() 0210 { 0211 return KAsync::start<void>([this] { 0212 SinkTrace() << "Trying to connect"; 0213 return connectToServer(resourceInstanceIdentifier) 0214 .then<void, QSharedPointer<QLocalSocket>>( 0215 [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { 0216 if (error) { 0217 // We failed to connect, so let's start the resource 0218 QStringList args; 0219 if (Sink::Test::testModeEnabled()) { 0220 args << "--test"; 0221 } 0222 if (resourceName.isEmpty()) { 0223 SinkWarning() << "No resource type given"; 0224 return KAsync::error(); 0225 } 0226 args << resourceInstanceIdentifier << resourceName; 0227 0228 //Prefer a binary next to this binary, otherwise fall-back to PATH. Necessary for MacOS bundles because the bundle is not in the PATH. 0229 auto executable = QStandardPaths::findExecutable("sink_synchronizer", {QCoreApplication::applicationDirPath()}); 0230 if (executable.isEmpty()) { 0231 executable = QStandardPaths::findExecutable("sink_synchronizer"); 0232 } 0233 if (executable.isEmpty()) { 0234 SinkError() << "Failed to find the sink_synchronizer binary in the paths: " << QCoreApplication::applicationDirPath(); 0235 return KAsync::error("Failed to find the sink_synchronizer binary."); 0236 } 0237 qint64 pid = 0; 0238 SinkLog() << "Starting resource " << executable << args.join(" ") << "Home path: " << QDir::homePath(); 0239 if (QProcess::startDetached(executable, args, QDir::homePath(), &pid)) { 0240 SinkTrace() << "Started resource " << resourceInstanceIdentifier << pid; 0241 return tryToConnect() 0242 .onError([this, args](const KAsync::Error &error) { 0243 SinkError() << "Failed to connect to started resource: sink_synchronizer " << args; 0244 }); 0245 } else { 0246 SinkError() << "Failed to start resource " << resourceInstanceIdentifier; 0247 return KAsync::error("Failed to start resource."); 0248 } 0249 } else { 0250 SinkTrace() << "Connected to resource, without having to start it."; 0251 Q_ASSERT(s); 0252 socket = s; 0253 return KAsync::null(); 0254 } 0255 }); 0256 }); 0257 } 0258 0259 ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) 0260 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) 0261 { 0262 mResourceStatus = Sink::ApplicationDomain::NoStatus; 0263 SinkTraceCtx(d->logCtx) << "Starting access"; 0264 0265 QObject::connect(&SecretStore::instance(), &SecretStore::secretAvailable, this, [this] (const QByteArray &resourceId) { 0266 if (resourceId == d->resourceInstanceIdentifier) { 0267 //No need to start the resource just to send the secret, we'll do that on connection anyways. 0268 if (isReady()) { 0269 sendSecret(SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier)).exec(); 0270 } 0271 } 0272 }); 0273 } 0274 0275 ResourceAccess::~ResourceAccess() 0276 { 0277 SinkLog() << "Closing access"; 0278 if (!d->resultHandler.isEmpty()) { 0279 SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 0280 } 0281 delete d; 0282 } 0283 0284 QByteArray ResourceAccess::resourceName() const 0285 { 0286 return d->resourceName; 0287 } 0288 0289 bool ResourceAccess::isReady() const 0290 { 0291 return (d->socket && d->socket->isValid()); 0292 } 0293 0294 void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback) 0295 { 0296 d->resultHandler.insert(messageId, callback); 0297 } 0298 0299 void ResourceAccess::enqueueCommand(const QSharedPointer<QueuedCommand> &command) 0300 { 0301 d->commandQueue << command; 0302 if (isReady()) { 0303 processCommandQueue(); 0304 } else { 0305 open(); 0306 } 0307 } 0308 0309 KAsync::Job<void> ResourceAccess::sendCommand(int commandId) 0310 { 0311 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { 0312 auto continuation = [&f](int error, const QString &errorMessage) { 0313 if (error) { 0314 f.setError(error, errorMessage); 0315 } 0316 f.setFinished(); 0317 }; 0318 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, continuation)); 0319 }); 0320 } 0321 0322 KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 0323 { 0324 // The flatbuffer is transient, but we want to store it until the job is executed 0325 QByteArray buffer(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 0326 return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { 0327 auto callback = [&f](int error, const QString &errorMessage) { 0328 if (error) { 0329 f.setError(error, errorMessage); 0330 } else { 0331 f.setFinished(); 0332 } 0333 }; 0334 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, buffer, callback)); 0335 }); 0336 } 0337 0338 KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) 0339 { 0340 flatbuffers::FlatBufferBuilder fbb; 0341 QByteArray queryString; 0342 { 0343 QDataStream stream(&queryString, QIODevice::WriteOnly); 0344 stream << query; 0345 } 0346 auto q = fbb.CreateString(queryString.toStdString()); 0347 auto builder = Sink::Commands::SynchronizeBuilder(fbb); 0348 builder.add_query(q); 0349 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); 0350 0351 return sendCommand(Commands::SynchronizeCommand, fbb); 0352 } 0353 0354 KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) 0355 { 0356 flatbuffers::FlatBufferBuilder fbb; 0357 auto entityId = fbb.CreateString(uid.constData()); 0358 // This is the resource buffer type and not the domain type 0359 auto type = fbb.CreateString(resourceBufferType.constData()); 0360 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 0361 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); 0362 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 0363 return sendCommand(Sink::Commands::CreateEntityCommand, fbb); 0364 } 0365 0366 KAsync::Job<void> 0367 ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer, const QByteArrayList &changedProperties, const QByteArray &newResource, bool remove) 0368 { 0369 flatbuffers::FlatBufferBuilder fbb; 0370 auto entityId = fbb.CreateString(uid.constData()); 0371 auto type = fbb.CreateString(resourceBufferType.constData()); 0372 auto modifiedProperties = BufferUtils::toVector(fbb, changedProperties); 0373 auto deletions = BufferUtils::toVector(fbb, deletedProperties); 0374 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 0375 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); 0376 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); 0377 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 0378 return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); 0379 } 0380 0381 KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) 0382 { 0383 flatbuffers::FlatBufferBuilder fbb; 0384 auto entityId = fbb.CreateString(uid.constData()); 0385 auto type = fbb.CreateString(resourceBufferType.constData()); 0386 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); 0387 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 0388 return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); 0389 } 0390 0391 KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) 0392 { 0393 //Avoid starting the resource just to send a revision replayed command. 0394 //This is mostly relevant so in tests we don't restart resources via live-queries after shutdown (which causes them to linger for 60s) 0395 //In production the live-query will always start the resource via open() anyways. 0396 if (!isReady()) { 0397 SinkWarningCtx(d->logCtx) << "Not starting the resource on revision replayed"; 0398 return KAsync::error(); 0399 } 0400 flatbuffers::FlatBufferBuilder fbb; 0401 auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); 0402 Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); 0403 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); 0404 } 0405 0406 KAsync::Job<void> 0407 ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 0408 { 0409 flatbuffers::FlatBufferBuilder fbb; 0410 auto id = fbb.CreateString(inspectionId.toStdString()); 0411 auto domain = fbb.CreateString(domainType.toStdString()); 0412 auto entity = fbb.CreateString(entityId.toStdString()); 0413 auto prop = fbb.CreateString(property.toStdString()); 0414 0415 QByteArray array; 0416 QDataStream s(&array, QIODevice::WriteOnly); 0417 s << expectedValue; 0418 0419 auto expected = fbb.CreateString(array.toStdString()); 0420 auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); 0421 Sink::Commands::FinishInspectionBuffer(fbb, location); 0422 return sendCommand(Sink::Commands::InspectionCommand, fbb); 0423 } 0424 0425 KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId) 0426 { 0427 flatbuffers::FlatBufferBuilder fbb; 0428 auto id = fbb.CreateString(flushId.toStdString()); 0429 auto location = Sink::Commands::CreateFlush(fbb, id, flushType); 0430 Sink::Commands::FinishFlushBuffer(fbb, location); 0431 return sendCommand(Sink::Commands::FlushCommand, fbb); 0432 } 0433 0434 KAsync::Job<void> ResourceAccess::sendSecret(const QString &secret) 0435 { 0436 flatbuffers::FlatBufferBuilder fbb; 0437 auto s = fbb.CreateString(secret.toStdString()); 0438 auto location = Sink::Commands::CreateSecret(fbb, s); 0439 Sink::Commands::FinishSecretBuffer(fbb, location); 0440 return sendCommand(Sink::Commands::SecretCommand, fbb); 0441 } 0442 0443 KAsync::Job<void> ResourceAccess::shutdown() 0444 { 0445 return sendCommand(Sink::Commands::ShutdownCommand); 0446 } 0447 0448 void ResourceAccess::open() 0449 { 0450 if (d->socket && d->socket->isValid()) { 0451 // SinkTrace() << "Socket valid, so not opening again"; 0452 return; 0453 } 0454 if (d->openingSocket) { 0455 return; 0456 } 0457 auto time = QSharedPointer<QTime>::create(); 0458 time->start(); 0459 d->openingSocket = true; 0460 d->initializeSocket() 0461 .then<void>( 0462 [this, time](const KAsync::Error &error) { 0463 d->openingSocket = false; 0464 if (error) { 0465 SinkErrorCtx(d->logCtx) << "Failed to initialize socket " << error; 0466 d->abortPendingOperations(); 0467 } else { 0468 SinkTraceCtx(d->logCtx) << "Socket is initialized." << Log::TraceTime(time->elapsed()); 0469 Q_ASSERT(d->socket); 0470 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 0471 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 0472 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); 0473 connected(); 0474 } 0475 return KAsync::null(); 0476 }) 0477 .exec(); 0478 } 0479 0480 void ResourceAccess::close() 0481 { 0482 SinkLogCtx(d->logCtx) << QString("Closing %1").arg(d->socket->fullServerName()); 0483 SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size(); 0484 SinkTraceCtx(d->logCtx) << "Queued commands: " << d->commandQueue.size(); 0485 d->abortPendingOperations(); 0486 d->socket->close(); 0487 } 0488 0489 void ResourceAccess::abort() 0490 { 0491 SinkLogCtx(d->logCtx) << QString("Aborting %1").arg(d->socket->fullServerName()); 0492 SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size(); 0493 SinkTraceCtx(d->logCtx) << "Queued commands: " << d->commandQueue.size(); 0494 d->abortPendingOperations(); 0495 d->socket->abort(); 0496 } 0497 0498 void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) 0499 { 0500 Q_ASSERT(isReady()); 0501 // TODO: we should have a timeout for commands 0502 d->messageId++; 0503 const auto messageId = d->messageId; 0504 SinkTraceCtx(d->logCtx) << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); 0505 Q_ASSERT(command->callback); 0506 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 0507 SinkTraceCtx(d->logCtx) << "Command complete " << messageId; 0508 d->pendingCommands.remove(messageId); 0509 command->callback(errorCode, errorMessage); 0510 }); 0511 // Keep track of the command until we're sure it arrived 0512 d->pendingCommands.insert(d->messageId, command); 0513 Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); 0514 } 0515 0516 void ResourceAccess::processCommandQueue() 0517 { 0518 // TODO: serialize instead of blast them all through the socket? 0519 SinkTraceCtx(d->logCtx) << "We have " << d->commandQueue.size() << " queued commands"; 0520 SinkTraceCtx(d->logCtx) << "Pending commands: " << d->pendingCommands.size(); 0521 for (auto command : d->commandQueue) { 0522 sendCommand(command); 0523 } 0524 d->commandQueue.clear(); 0525 } 0526 0527 void ResourceAccess::processPendingCommandQueue() 0528 { 0529 SinkTraceCtx(d->logCtx) << "We have " << d->pendingCommands.size() << " pending commands"; 0530 for (auto command : d->pendingCommands) { 0531 SinkTraceCtx(d->logCtx) << "Reenquing command " << command->commandId; 0532 d->commandQueue << command; 0533 } 0534 d->pendingCommands.clear(); 0535 processCommandQueue(); 0536 } 0537 0538 void ResourceAccess::connected() 0539 { 0540 if (!isReady()) { 0541 SinkTraceCtx(d->logCtx) << "Connected but not ready?"; 0542 return; 0543 } 0544 0545 SinkTraceCtx(d->logCtx) << QString("Connected: %1").arg(d->socket->fullServerName()); 0546 0547 { 0548 flatbuffers::FlatBufferBuilder fbb; 0549 auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1().toStdString()); 0550 auto command = Sink::Commands::CreateHandshake(fbb, name); 0551 Sink::Commands::FinishHandshakeBuffer(fbb, command); 0552 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); 0553 } 0554 0555 // Reenqueue pending commands, we failed to send them 0556 processPendingCommandQueue(); 0557 auto secret = SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier); 0558 if (!secret.isEmpty()) { 0559 sendSecret(secret).exec(); 0560 } 0561 0562 emit ready(true); 0563 } 0564 0565 void ResourceAccess::disconnected() 0566 { 0567 SinkLogCtx(d->logCtx) << QString("Disconnected from %1").arg(d->socket->fullServerName()); 0568 //Ensure we read all remaining data before closing the socket. 0569 //This is required on windows at least. 0570 readResourceMessage(); 0571 d->socket->close(); 0572 emit ready(false); 0573 } 0574 0575 void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) 0576 { 0577 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); 0578 if (resourceCrashed) { 0579 SinkErrorCtx(d->logCtx) << "The resource crashed!"; 0580 mResourceStatus = Sink::ApplicationDomain::ErrorStatus; 0581 Sink::Notification n; 0582 n.type = Sink::Notification::Status; 0583 emit notification(n); 0584 Sink::Notification crashNotification; 0585 crashNotification.type = Sink::Notification::Error; 0586 crashNotification.code = Sink::ApplicationDomain::ResourceCrashedError; 0587 emit notification(crashNotification); 0588 d->abortPendingOperations(); 0589 } else if (error == QLocalSocket::PeerClosedError) { 0590 SinkLogCtx(d->logCtx) << "The resource closed the connection."; 0591 d->abortPendingOperations(); 0592 } else { 0593 SinkWarningCtx(d->logCtx) << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 0594 if (d->pendingCommands.size()) { 0595 SinkTraceCtx(d->logCtx) << "Reconnecting due to pending operations: " << d->pendingCommands.size(); 0596 open(); 0597 } 0598 } 0599 } 0600 0601 void ResourceAccess::readResourceMessage() 0602 { 0603 if (!d->socket) { 0604 SinkWarningCtx(d->logCtx) << "No socket available"; 0605 return; 0606 } 0607 0608 if (d->socket->bytesAvailable()) { 0609 d->partialMessageBuffer += d->socket->readAll(); 0610 0611 // should be scheduled rather than processed all at once 0612 while (processMessageBuffer()) { 0613 } 0614 } 0615 } 0616 0617 static Sink::Notification getNotification(const Sink::Commands::Notification *buffer) 0618 { 0619 Sink::Notification n; 0620 if (buffer->identifier()) { 0621 // Don't use fromRawData, the buffer is gone once we invoke emit notification 0622 n.id = BufferUtils::extractBufferCopy(buffer->identifier()); 0623 } 0624 if (buffer->message()) { 0625 // Don't use fromRawData, the buffer is gone once we invoke emit notification 0626 n.message = BufferUtils::extractBufferCopy(buffer->message()); 0627 } 0628 n.type = buffer->type(); 0629 n.code = buffer->code(); 0630 n.progress = buffer->progress(); 0631 n.total = buffer->total(); 0632 if (buffer->entitiesType()) { 0633 // Don't use fromRawData, the buffer is gone once we invoke emit notification 0634 n.entitiesType = BufferUtils::extractBufferCopy(buffer->entitiesType()); 0635 } 0636 n.entities = BufferUtils::fromVector(*buffer->entities()); 0637 return n; 0638 } 0639 0640 bool ResourceAccess::processMessageBuffer() 0641 { 0642 static const int headerSize = Commands::headerSize(); 0643 if (d->partialMessageBuffer.size() < headerSize) { 0644 //This is not an error 0645 SinkTraceCtx(d->logCtx) << "command too small, smaller than headerSize: " << d->partialMessageBuffer.size() << headerSize; 0646 return false; 0647 } 0648 0649 // const uint messageId = *(int*)(d->partialMessageBuffer.constData()); 0650 const int commandId = *(const int *)(d->partialMessageBuffer.constData() + sizeof(uint)); 0651 const uint size = *(const int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 0652 0653 const uint availableMessageSize = d->partialMessageBuffer.size() - headerSize; 0654 if (size > availableMessageSize) { 0655 //This is not an error 0656 SinkTraceCtx(d->logCtx) << "command too small, message smaller than advertised. Available: " << availableMessageSize << "Expected" << size << "HeaderSize: " << headerSize; 0657 return false; 0658 } 0659 0660 switch (commandId) { 0661 case Commands::RevisionUpdateCommand: { 0662 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 0663 SinkTraceCtx(d->logCtx) << QString("Revision updated to: %1").arg(buffer->revision()); 0664 Notification n; 0665 n.type = Sink::Notification::RevisionUpdate; 0666 emit notification(n); 0667 emit revisionChanged(buffer->revision()); 0668 0669 break; 0670 } 0671 case Commands::CommandCompletionCommand: { 0672 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 0673 SinkTraceCtx(d->logCtx) << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 0674 0675 d->completeCommands.insert(buffer->id(), buffer->success()); 0676 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 0677 queuedInvoke([=]() { d->callCallbacks(); }, this); 0678 break; 0679 } 0680 case Commands::NotificationCommand: { 0681 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 0682 switch (buffer->type()) { 0683 case Sink::Notification::Shutdown: 0684 SinkLogCtx(d->logCtx) << "Received shutdown notification."; 0685 abort(); 0686 break; 0687 case Sink::Notification::Inspection: { 0688 SinkTraceCtx(d->logCtx) << "Received inspection notification."; 0689 auto n = getNotification(buffer); 0690 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 0691 queuedInvoke([=]() { emit notification(n); }, this); 0692 } break; 0693 case Sink::Notification::Status: 0694 if (mResourceStatus != buffer->code()) { 0695 mResourceStatus = buffer->code(); 0696 SinkTraceCtx(d->logCtx) << "Updated status: " << mResourceStatus; 0697 } 0698 [[clang::fallthrough]]; 0699 case Sink::Notification::Info: 0700 [[clang::fallthrough]]; 0701 case Sink::Notification::Warning: 0702 [[clang::fallthrough]]; 0703 case Sink::Notification::Error: 0704 [[clang::fallthrough]]; 0705 case Sink::Notification::FlushCompletion: 0706 [[clang::fallthrough]]; 0707 case Sink::Notification::Progress: { 0708 auto n = getNotification(buffer); 0709 SinkTraceCtx(d->logCtx) << "Received notification: " << n; 0710 n.resource = d->resourceInstanceIdentifier; 0711 emit notification(n); 0712 } break; 0713 case Sink::Notification::RevisionUpdate: 0714 default: 0715 SinkWarningCtx(d->logCtx) << "Received unknown notification: " << buffer->type(); 0716 break; 0717 } 0718 break; 0719 } 0720 default: 0721 break; 0722 } 0723 0724 d->partialMessageBuffer.remove(0, headerSize + size); 0725 return d->partialMessageBuffer.size() >= headerSize; 0726 } 0727 0728 0729 0730 ResourceAccessFactory::ResourceAccessFactory() 0731 { 0732 0733 } 0734 0735 ResourceAccessFactory &ResourceAccessFactory::instance() 0736 { 0737 static ResourceAccessFactory *instance = nullptr; 0738 if (!instance) { 0739 instance = new ResourceAccessFactory; 0740 } 0741 return *instance; 0742 } 0743 0744 Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier, const QByteArray resourceType) 0745 { 0746 if (!mCache.contains(instanceIdentifier)) { 0747 // Reuse the pointer if something else kept the resourceaccess alive 0748 if (mWeakCache.contains(instanceIdentifier)) { 0749 if (auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef()) { 0750 mCache.insert(instanceIdentifier, sharedPointer); 0751 } 0752 } 0753 if (!mCache.contains(instanceIdentifier)) { 0754 // Create a new instance if necessary 0755 auto sharedPointer = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(instanceIdentifier, resourceType), &QObject::deleteLater}; 0756 QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { 0757 if (!ready) { 0758 //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps. 0759 auto ptr = mCache.take(instanceIdentifier); 0760 if (auto timer = mTimer.take(instanceIdentifier)) { 0761 timer->stop(); 0762 } 0763 Q_UNUSED(ptr); 0764 } 0765 }); 0766 mCache.insert(instanceIdentifier, sharedPointer); 0767 mWeakCache.insert(instanceIdentifier, sharedPointer); 0768 } 0769 } 0770 if (!mTimer.contains(instanceIdentifier)) { 0771 auto timer = QSharedPointer<QTimer>::create(); 0772 timer->setSingleShot(true); 0773 // Drop connection after 3 seconds (which is a random value) 0774 QObject::connect(timer.data(), &QTimer::timeout, timer.data(), [this, instanceIdentifier]() { 0775 //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps. 0776 auto ptr = mCache.take(instanceIdentifier); 0777 Q_UNUSED(ptr); 0778 }); 0779 timer->setInterval(3000); 0780 mTimer.insert(instanceIdentifier, timer); 0781 } 0782 mTimer.value(instanceIdentifier)->start(); 0783 return mCache.value(instanceIdentifier); 0784 } 0785 } 0786 0787 #pragma clang diagnostic push 0788 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 0789 #include "moc_resourceaccess.cpp" 0790 #pragma clang diagnostic pop