File indexing completed on 2024-05-12 05:25:59
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #include "genericresource.h" 0021 0022 #include "pipeline.h" 0023 #include "synchronizer.h" 0024 #include "inspector.h" 0025 #include "commandprocessor.h" 0026 #include "definitions.h" 0027 #include "storage.h" 0028 0029 using namespace Sink; 0030 using namespace Sink::Storage; 0031 0032 GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 0033 : Sink::Resource(), 0034 mResourceContext(resourceContext), 0035 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext, Log::Context{})), 0036 mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId(), Log::Context{})), 0037 mError(0), 0038 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 0039 { 0040 QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 0041 QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); 0042 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 0043 } 0044 0045 GenericResource::~GenericResource() 0046 { 0047 } 0048 0049 void GenericResource::setSecret(const QString &s) 0050 { 0051 if (mSynchronizer) { 0052 mSynchronizer->setSecret(s); 0053 } 0054 if (mInspector) { 0055 mInspector->setSecret(s); 0056 } 0057 } 0058 0059 bool GenericResource::checkForUpgrade() 0060 { 0061 const auto currentDatabaseVersion = [&] { 0062 auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); 0063 //We rely on the store already having been created in the pipeline constructor before this get's called. 0064 Q_ASSERT(store.exists()); 0065 const auto transaction = store.createTransaction(Storage::DataStore::ReadOnly); 0066 //If we can't create a read-only transaction something is wrong and we have to exit. 0067 //Otherwise we risk deleting data due to a temporary issue. 0068 if (!transaction) { 0069 SinkError() << "Failed to create a read-only transaction during upgrade check"; 0070 std::abort(); 0071 } 0072 return Storage::DataStore::databaseVersion(transaction); 0073 }(); 0074 if (currentDatabaseVersion != Sink::latestDatabaseVersion()) { 0075 SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion(); 0076 0077 bool nukeDatabases = false; 0078 //Only apply the necessary updates. 0079 for (int i = currentDatabaseVersion; i < Sink::latestDatabaseVersion(); i++) { 0080 //TODO implement specific upgrade paths where applicable, and only nuke otherwise 0081 nukeDatabases = true; 0082 } 0083 if (nukeDatabases) { 0084 SinkLog() << "Wiping all databases during upgrade, you will have to resync."; 0085 //Right now upgrading just means removing all local storage so we will resync 0086 GenericResource::removeFromDisk(mResourceContext.instanceId()); 0087 } 0088 auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite); 0089 auto t = store.createTransaction(Storage::DataStore::ReadWrite); 0090 Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion()); 0091 SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion(); 0092 return true; 0093 } 0094 return false; 0095 } 0096 0097 void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 0098 { 0099 mPipeline->setPreprocessors(type, preprocessors); 0100 } 0101 0102 void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 0103 { 0104 mSynchronizer = synchronizer; 0105 mProcessor->setSynchronizer(synchronizer); 0106 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 0107 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 0108 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 0109 } 0110 0111 void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector) 0112 { 0113 mInspector = inspector; 0114 mProcessor->setInspector(inspector); 0115 } 0116 0117 void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 0118 { 0119 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 0120 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 0121 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 0122 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 0123 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 0124 } 0125 0126 qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 0127 { 0128 auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage(); 0129 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); 0130 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); 0131 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage(); 0132 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadOnly).diskUsage(); 0133 return size; 0134 } 0135 0136 void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 0137 { 0138 SinkWarning() << "Received error from Processor: " << errorCode << errorMessage; 0139 mError = errorCode; 0140 } 0141 0142 int GenericResource::error() const 0143 { 0144 return mError; 0145 } 0146 0147 void GenericResource::processCommand(int commandId, const QByteArray &data) 0148 { 0149 mProcessor->processCommand(commandId, data); 0150 } 0151 0152 KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 0153 { 0154 mSynchronizer->synchronize(query); 0155 return KAsync::null<void>(); 0156 } 0157 0158 KAsync::Job<void> GenericResource::processAllMessages() 0159 { 0160 return mProcessor->processAllMessages(); 0161 } 0162 0163 void GenericResource::updateLowerBoundRevision() 0164 { 0165 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision())); 0166 } 0167 0168 void GenericResource::setLowerBoundRevision(qint64 revision) 0169 { 0170 SinkTrace() << "Updating client lower bound revision:" << revision; 0171 mClientLowerBoundRevision = revision; 0172 updateLowerBoundRevision(); 0173 } 0174