File indexing completed on 2025-10-19 05:10:01

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "changereplay.h"
0021 
0022 #include "log.h"
0023 #include "definitions.h"
0024 #include "bufferutils.h"
0025 #include "storage/key.h"
0026 
0027 #include <QTimer>
0028 
0029 using namespace Sink;
0030 using namespace Sink::Storage;
0031 
0032 ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx)
0033     : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}
0034 {
0035 }
0036 
0037 qint64 ChangeReplay::getLastReplayedRevision()
0038 {
0039     qint64 lastReplayedRevision = 0;
0040     auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly);
0041     //This only happens if we for some reason can't open the database at all, and we don't have a recovery from that.
0042     if (!replayStoreTransaction) {
0043         SinkErrorCtx(mLogCtx) << "Failed to create a read-only transaction during change-replay";
0044         std::abort();
0045     }
0046     replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
0047         [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
0048             lastReplayedRevision = value.toLongLong();
0049             return false;
0050         },
0051         [](const DataStore::Error &) {});
0052     return lastReplayedRevision;
0053 }
0054 
0055 bool ChangeReplay::allChangesReplayed()
0056 {
0057     const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) {
0058         SinkWarningCtx(mLogCtx) << error.message;
0059     }));
0060     const qint64 lastReplayedRevision = getLastReplayedRevision();
0061     return (lastReplayedRevision >= topRevision);
0062 }
0063 
0064 void ChangeReplay::recordReplayedRevision(qint64 revision)
0065 {
0066     auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) {
0067         SinkWarningCtx(mLogCtx) << error.message;
0068     });
0069     replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
0070     replayStoreTransaction.commit();
0071 };
0072 
0073 KAsync::Job<void> ChangeReplay::replayNextRevision()
0074 {
0075     Q_ASSERT(!mReplayInProgress);
0076     return KAsync::start<void>([this]() {
0077             if (mReplayInProgress) {
0078                 SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!";
0079                 return KAsync::null<void>();
0080             }
0081             auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
0082             auto topRevision = QSharedPointer<qint64>::create(0);
0083             emit replayingChanges();
0084             mReplayInProgress = true;
0085             mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
0086                 SinkWarningCtx(mLogCtx) << error.message;
0087             });
0088             auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
0089                 SinkWarningCtx(mLogCtx) << error.message;
0090             });
0091             Q_ASSERT(mMainStoreTransaction);
0092             Q_ASSERT(replayStoreTransaction);
0093             replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
0094                 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
0095                     *lastReplayedRevision = value.toLongLong();
0096                     return false;
0097                 },
0098                 [](const DataStore::Error &) {});
0099             *topRevision = DataStore::maxRevision(mMainStoreTransaction);
0100             if (*lastReplayedRevision >= *topRevision) {
0101                 SinkTraceCtx(mLogCtx) << "Nothing to replay";
0102                 return KAsync::null();
0103             }
0104             SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
0105             return KAsync::doWhile(
0106                 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
0107                     if (*lastReplayedRevision >= *topRevision) {
0108                         SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision;
0109                         return KAsync::value(KAsync::Break);
0110                     }
0111                     Q_ASSERT(mMainStoreTransaction);
0112 
0113                     auto replayJob = KAsync::null();
0114                     qint64 revision = *lastReplayedRevision + 1;
0115                     while (revision <= *topRevision) {
0116                         const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
0117                         const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision);
0118                         if (uid.isNull() || type.isEmpty()) {
0119                             SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type;
0120                         } else {
0121                             // TODO: should not use internal representations
0122                             const auto key = Storage::Key(uid, revision);
0123                             const auto displayKey = key.toDisplayByteArray();
0124                             QByteArray entityBuffer;
0125                             DataStore::mainDatabase(mMainStoreTransaction, type)
0126                                 .scan(revision,
0127                                     [&entityBuffer](const size_t, const QByteArray &value) -> bool {
0128                                         entityBuffer = value;
0129                                         return false;
0130                                     },
0131                                     [this, key](const DataStore::Error &e) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key << "error:" << e; });
0132 
0133                             if (entityBuffer.isEmpty()) {
0134                                 SinkErrorCtx(mLogCtx) << "Failed to replay change " << key;
0135                             } else {
0136                                 if (canReplay(type, displayKey, entityBuffer)) {
0137                                     SinkTraceCtx(mLogCtx) << "Replaying " << displayKey;
0138                                     replayJob = replay(type, displayKey, entityBuffer);
0139                                     //Set the last revision we tried to replay
0140                                     *lastReplayedRevision = revision;
0141                                     //Execute replay job and commit
0142                                     break;
0143                                 } else {
0144                                     SinkTraceCtx(mLogCtx) << "Not replaying " << key;
0145                                     notReplaying(type, displayKey, entityBuffer);
0146                                     //We silently skip over revisions that cannot be replayed, as this is not an error.
0147                                 }
0148                             }
0149                         }
0150                         //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations.
0151                         *lastReplayedRevision = revision;
0152                         revision++;
0153                     }
0154                     return replayJob.then([=](const KAsync::Error &error) {
0155                         if (error) {
0156                             SinkWarningCtx(mLogCtx) << "Change replay failed: " << error  << "Last replayed revision: "  << *lastReplayedRevision;
0157                             //We're probably not online or so, so postpone retrying
0158                             return KAsync::value(KAsync::Break).then(KAsync::error<KAsync::ControlFlowFlag>(error));
0159                         }
0160                         SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision;
0161 
0162                         recordReplayedRevision(*lastReplayedRevision);
0163                         reportProgress(*lastReplayedRevision, *topRevision);
0164 
0165                         const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision);
0166                         if (gotMoreToReplay) {
0167                             SinkTraceCtx(mLogCtx) << "Replaying some more...";
0168                             //Replay more if we have more
0169                             return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
0170                         } else {
0171                             return KAsync::value(KAsync::Break);
0172                         }
0173                     }).guard(&mGuard);
0174             });
0175         })
0176         .then([this](const KAsync::Error &error) {
0177             SinkTraceCtx(mLogCtx) << "Change replay complete.";
0178             mMainStoreTransaction.abort();
0179             mReplayInProgress = false;
0180             if (ChangeReplay::allChangesReplayed()) {
0181                 //In case we have a derived implementation
0182                 if (allChangesReplayed()) {
0183                     SinkTraceCtx(mLogCtx) << "All changes replayed";
0184                     emit changesReplayed();
0185                 }
0186             }
0187             if (error) {
0188                 return KAsync::error(error);
0189             } else {
0190                 return KAsync::null();
0191             }
0192         }).guard(&mGuard);
0193 }
0194 
0195 void ChangeReplay::revisionChanged()
0196 {
0197     if (!mReplayInProgress) {
0198         replayNextRevision().exec();
0199     }
0200 }
0201 
0202 #pragma clang diagnostic push
0203 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
0204 #include "moc_changereplay.cpp"
0205 #pragma clang diagnostic pop