File indexing completed on 2024-05-12 05:25:59

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "genericresource.h"
0021 
0022 #include "pipeline.h"
0023 #include "synchronizer.h"
0024 #include "inspector.h"
0025 #include "commandprocessor.h"
0026 #include "definitions.h"
0027 #include "storage.h"
0028 
0029 using namespace Sink;
0030 using namespace Sink::Storage;
0031 
0032 GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
0033     : Sink::Resource(),
0034       mResourceContext(resourceContext),
0035       mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext, Log::Context{})),
0036       mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId(), Log::Context{})),
0037       mError(0),
0038       mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
0039 {
0040     QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
0041     QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify);
0042     QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
0043 }
0044 
0045 GenericResource::~GenericResource()
0046 {
0047 }
0048 
0049 void GenericResource::setSecret(const QString &s)
0050 {
0051     if (mSynchronizer) {
0052         mSynchronizer->setSecret(s);
0053     }
0054     if (mInspector) {
0055         mInspector->setSecret(s);
0056     }
0057 }
0058 
0059 bool GenericResource::checkForUpgrade()
0060 {
0061     const auto currentDatabaseVersion = [&] {
0062         auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly);
0063         //We rely on the store already having been created in the pipeline constructor before this get's called.
0064         Q_ASSERT(store.exists());
0065         const auto transaction = store.createTransaction(Storage::DataStore::ReadOnly);
0066         //If we can't create a read-only transaction something is wrong and we have to exit.
0067         //Otherwise we risk deleting data due to a temporary issue.
0068         if (!transaction) {
0069             SinkError() << "Failed to create a read-only transaction during upgrade check";
0070             std::abort();
0071         }
0072         return Storage::DataStore::databaseVersion(transaction);
0073     }();
0074     if (currentDatabaseVersion != Sink::latestDatabaseVersion()) {
0075         SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion();
0076 
0077         bool nukeDatabases = false;
0078         //Only apply the necessary updates.
0079         for (int i = currentDatabaseVersion; i < Sink::latestDatabaseVersion(); i++) {
0080             //TODO implement specific upgrade paths where applicable, and only nuke otherwise
0081             nukeDatabases = true;
0082         }
0083         if (nukeDatabases) {
0084             SinkLog() << "Wiping all databases during upgrade, you will have to resync.";
0085             //Right now upgrading just means removing all local storage so we will resync
0086             GenericResource::removeFromDisk(mResourceContext.instanceId());
0087         }
0088         auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite);
0089         auto t = store.createTransaction(Storage::DataStore::ReadWrite);
0090         Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion());
0091         SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion();
0092         return true;
0093     }
0094     return false;
0095 }
0096 
0097 void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
0098 {
0099     mPipeline->setPreprocessors(type, preprocessors);
0100 }
0101 
0102 void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
0103 {
0104     mSynchronizer = synchronizer;
0105     mProcessor->setSynchronizer(synchronizer);
0106     QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
0107     QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
0108     QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
0109 }
0110 
0111 void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector)
0112 {
0113     mInspector = inspector;
0114     mProcessor->setInspector(inspector);
0115 }
0116 
0117 void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
0118 {
0119     Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
0120     Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
0121     Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
0122     Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
0123     Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
0124 }
0125 
0126 qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
0127 {
0128     auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage();
0129     size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
0130     size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
0131     size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage();
0132     size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadOnly).diskUsage();
0133     return size;
0134 }
0135 
0136 void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
0137 {
0138     SinkWarning() << "Received error from Processor: " << errorCode << errorMessage;
0139     mError = errorCode;
0140 }
0141 
0142 int GenericResource::error() const
0143 {
0144     return mError;
0145 }
0146 
0147 void GenericResource::processCommand(int commandId, const QByteArray &data)
0148 {
0149     mProcessor->processCommand(commandId, data);
0150 }
0151 
0152 KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
0153 {
0154     mSynchronizer->synchronize(query);
0155     return KAsync::null<void>();
0156 }
0157 
0158 KAsync::Job<void> GenericResource::processAllMessages()
0159 {
0160     return mProcessor->processAllMessages();
0161 }
0162 
0163 void GenericResource::updateLowerBoundRevision()
0164 {
0165     mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision()));
0166 }
0167 
0168 void GenericResource::setLowerBoundRevision(qint64 revision)
0169 {
0170     SinkTrace() << "Updating client lower bound revision:" << revision;
0171     mClientLowerBoundRevision = revision;
0172     updateLowerBoundRevision();
0173 }
0174