File indexing completed on 2024-05-19 05:26:14
0001 /* 0002 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 0021 #include "resourcecontrol.h" 0022 0023 #include <QTime> 0024 #include <functional> 0025 0026 #include "resourceaccess.h" 0027 #include "resourceconfig.h" 0028 #include "commands.h" 0029 #include "log.h" 0030 #include "notifier.h" 0031 #include "utils.h" 0032 0033 namespace Sink { 0034 0035 KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier) 0036 { 0037 const auto ctx = Log::Context{identifier + ".resourcecontrol"}; 0038 SinkTraceCtx(ctx) << "shutdown " << identifier; 0039 auto time = QSharedPointer<QTime>::create(); 0040 time->start(); 0041 0042 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 0043 return resourceAccess->shutdown() 0044 .addToContext(resourceAccess) 0045 .then<void>([resourceAccess, time, ctx](KAsync::Future<void> &future) { 0046 SinkTraceCtx(ctx) << "Shutdown command complete, waiting for shutdown." << Log::TraceTime(time->elapsed()); 0047 if (!resourceAccess->isReady()) { 0048 future.setFinished(); 0049 return; 0050 } 0051 auto guard = new QObject; 0052 QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) { 0053 if (!ready) { 0054 //Protect against callback getting called twice. 0055 delete guard; 0056 future.setFinished(); 0057 } 0058 }); 0059 }).then([time, ctx] { 0060 SinkTraceCtx(ctx) << "Shutdown complete." << Log::TraceTime(time->elapsed()); 0061 }); 0062 } 0063 0064 KAsync::Job<void> ResourceControl::start(const QByteArray &identifier) 0065 { 0066 SinkTrace() << "start " << identifier; 0067 auto time = QSharedPointer<QTime>::create(); 0068 time->start(); 0069 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 0070 resourceAccess->open(); 0071 return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).then([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); 0072 } 0073 0074 KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) 0075 { 0076 SinkTrace() << "flushMessageQueue" << resourceIdentifier; 0077 return KAsync::value(resourceIdentifier) 0078 .template each([](const QByteArray &resource) { 0079 return flushMessageQueue(resource); 0080 }); 0081 } 0082 0083 KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) 0084 { 0085 return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); 0086 } 0087 0088 KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier) 0089 { 0090 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); 0091 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); 0092 auto id = createUuid(); 0093 return KAsync::start<void>([=](KAsync::Future<void> &future) { 0094 SinkLog() << "Starting flush " << id; 0095 notifier->registerHandler([&future, id](const Notification ¬ification) { 0096 SinkTrace() << "Received notification: " << notification.type << notification.id; 0097 if (notification.type == Notification::Error && notification.code == ApplicationDomain::ResourceCrashedError) { 0098 SinkWarning() << "Error during flush"; 0099 future.setError(-1, "Error during flush: " + notification.message); 0100 } else if (notification.id == id) { 0101 SinkTrace() << "FlushComplete"; 0102 if (notification.code) { 0103 SinkWarning() << "Flush returned an error"; 0104 future.setError(-1, "Flush returned an error: " + notification.message); 0105 } else { 0106 future.setFinished(); 0107 } 0108 } 0109 }); 0110 resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) { 0111 SinkWarning() << "Failed to send command"; 0112 future.setError(1, "Failed to send command: " + error.errorMessage); 0113 }).exec(); 0114 }); 0115 } 0116 0117 KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) 0118 { 0119 return KAsync::value(resourceIdentifier) 0120 .template each([](const QByteArray &resource) { 0121 return flushReplayQueue(resource); 0122 }); 0123 } 0124 0125 KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArray &resourceIdentifier) 0126 { 0127 return flush(Flush::FlushReplayQueue, resourceIdentifier); 0128 } 0129 0130 KAsync::Job<void> ResourceControl::inspect(const Inspection &inspectionCommand, const QByteArray &domainType) 0131 { 0132 auto resourceIdentifier = inspectionCommand.resourceIdentifier; 0133 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); 0134 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); 0135 auto id = createUuid(); 0136 return KAsync::start<void>([=](KAsync::Future<void> &future) { 0137 notifier->registerHandler([&future, id](const Notification ¬ification) { 0138 if (notification.id == id) { 0139 SinkTrace() << "Inspection complete"; 0140 if (notification.code) { 0141 SinkWarning() << "Inspection returned an error"; 0142 future.setError(-1, "Inspection returned an error: " + notification.message); 0143 } else { 0144 future.setFinished(); 0145 } 0146 } 0147 }); 0148 resourceAccess->sendInspectionCommand(inspectionCommand.type, id, domainType, inspectionCommand.entityIdentifier, inspectionCommand.property, inspectionCommand.expectedValue).onError([&future] (const KAsync::Error &error) { 0149 SinkWarning() << "Failed to send command"; 0150 future.setError(1, "Failed to send command: " + error.errorMessage); 0151 }).exec(); 0152 }); 0153 } 0154 0155 0156 } // namespace Sink