File indexing completed on 2024-05-19 05:26:14

0001 /*
0002  * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 
0021 #include "resourcecontrol.h"
0022 
0023 #include <QTime>
0024 #include <functional>
0025 
0026 #include "resourceaccess.h"
0027 #include "resourceconfig.h"
0028 #include "commands.h"
0029 #include "log.h"
0030 #include "notifier.h"
0031 #include "utils.h"
0032 
0033 namespace Sink {
0034 
0035 KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
0036 {
0037     const auto ctx = Log::Context{identifier + ".resourcecontrol"};
0038     SinkTraceCtx(ctx) << "shutdown " << identifier;
0039     auto time = QSharedPointer<QTime>::create();
0040     time->start();
0041 
0042     auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
0043     return resourceAccess->shutdown()
0044         .addToContext(resourceAccess)
0045         .then<void>([resourceAccess, time, ctx](KAsync::Future<void> &future) {
0046             SinkTraceCtx(ctx) << "Shutdown command complete, waiting for shutdown." << Log::TraceTime(time->elapsed());
0047             if (!resourceAccess->isReady()) {
0048                 future.setFinished();
0049                 return;
0050             }
0051             auto guard = new QObject;
0052             QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) {
0053                 if (!ready) {
0054                     //Protect against callback getting called twice.
0055                     delete guard;
0056                     future.setFinished();
0057                 }
0058             });
0059         }).then([time, ctx] {
0060             SinkTraceCtx(ctx) << "Shutdown complete." << Log::TraceTime(time->elapsed());
0061         });
0062 }
0063 
0064 KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
0065 {
0066     SinkTrace() << "start " << identifier;
0067     auto time = QSharedPointer<QTime>::create();
0068     time->start();
0069     auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
0070     resourceAccess->open();
0071     return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).then([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); });
0072 }
0073 
0074 KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
0075 {
0076     SinkTrace() << "flushMessageQueue" << resourceIdentifier;
0077     return KAsync::value(resourceIdentifier)
0078         .template each([](const QByteArray &resource) {
0079             return flushMessageQueue(resource);
0080         });
0081 }
0082 
0083 KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier)
0084 {
0085     return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier));
0086 }
0087 
0088 KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier)
0089 {
0090     auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier));
0091     auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
0092     auto id = createUuid();
0093     return KAsync::start<void>([=](KAsync::Future<void> &future) {
0094             SinkLog() << "Starting flush " << id;
0095             notifier->registerHandler([&future, id](const Notification &notification) {
0096                 SinkTrace() << "Received notification: " << notification.type << notification.id;
0097                 if (notification.type == Notification::Error && notification.code == ApplicationDomain::ResourceCrashedError) {
0098                     SinkWarning() << "Error during flush";
0099                     future.setError(-1, "Error during flush: " + notification.message);
0100                 } else if (notification.id == id) {
0101                     SinkTrace() << "FlushComplete";
0102                     if (notification.code) {
0103                         SinkWarning() << "Flush returned an error";
0104                         future.setError(-1, "Flush returned an error: " + notification.message);
0105                     } else {
0106                         future.setFinished();
0107                     }
0108                 }
0109             });
0110             resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) {
0111                 SinkWarning() << "Failed to send command";
0112                 future.setError(1, "Failed to send command: " + error.errorMessage);
0113             }).exec();
0114         });
0115 }
0116 
0117 KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier)
0118 {
0119     return KAsync::value(resourceIdentifier)
0120         .template each([](const QByteArray &resource) {
0121             return flushReplayQueue(resource);
0122         });
0123 }
0124 
0125 KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArray &resourceIdentifier)
0126 {
0127     return flush(Flush::FlushReplayQueue, resourceIdentifier);
0128 }
0129 
0130 KAsync::Job<void> ResourceControl::inspect(const Inspection &inspectionCommand, const QByteArray &domainType)
0131 {
0132     auto resourceIdentifier = inspectionCommand.resourceIdentifier;
0133     auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier));
0134     auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
0135     auto id = createUuid();
0136     return KAsync::start<void>([=](KAsync::Future<void> &future) {
0137             notifier->registerHandler([&future, id](const Notification &notification) {
0138                 if (notification.id == id) {
0139                     SinkTrace() << "Inspection complete";
0140                     if (notification.code) {
0141                         SinkWarning() << "Inspection returned an error";
0142                         future.setError(-1, "Inspection returned an error: " + notification.message);
0143                     } else {
0144                         future.setFinished();
0145                     }
0146                 }
0147             });
0148             resourceAccess->sendInspectionCommand(inspectionCommand.type, id, domainType, inspectionCommand.entityIdentifier, inspectionCommand.property, inspectionCommand.expectedValue).onError([&future] (const KAsync::Error &error) {
0149                 SinkWarning() << "Failed to send command";
0150                 future.setError(1, "Failed to send command: " + error.errorMessage);
0151             }).exec();
0152         });
0153 }
0154 
0155 
0156 } // namespace Sink