File indexing completed on 2024-05-12 05:25:57
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #include "changereplay.h" 0021 0022 #include "log.h" 0023 #include "definitions.h" 0024 #include "bufferutils.h" 0025 #include "storage/key.h" 0026 0027 #include <QTimer> 0028 0029 using namespace Sink; 0030 using namespace Sink::Storage; 0031 0032 ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) 0033 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")} 0034 { 0035 } 0036 0037 qint64 ChangeReplay::getLastReplayedRevision() 0038 { 0039 qint64 lastReplayedRevision = 0; 0040 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly); 0041 //This only happens if we for some reason can't open the database at all, and we don't have a recovery from that. 0042 if (!replayStoreTransaction) { 0043 SinkErrorCtx(mLogCtx) << "Failed to create a read-only transaction during change-replay"; 0044 std::abort(); 0045 } 0046 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 0047 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 0048 lastReplayedRevision = value.toLongLong(); 0049 return false; 0050 }, 0051 [](const DataStore::Error &) {}); 0052 return lastReplayedRevision; 0053 } 0054 0055 bool ChangeReplay::allChangesReplayed() 0056 { 0057 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) { 0058 SinkWarningCtx(mLogCtx) << error.message; 0059 })); 0060 const qint64 lastReplayedRevision = getLastReplayedRevision(); 0061 return (lastReplayedRevision >= topRevision); 0062 } 0063 0064 void ChangeReplay::recordReplayedRevision(qint64 revision) 0065 { 0066 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) { 0067 SinkWarningCtx(mLogCtx) << error.message; 0068 }); 0069 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 0070 replayStoreTransaction.commit(); 0071 }; 0072 0073 KAsync::Job<void> ChangeReplay::replayNextRevision() 0074 { 0075 Q_ASSERT(!mReplayInProgress); 0076 return KAsync::start<void>([this]() { 0077 if (mReplayInProgress) { 0078 SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!"; 0079 return KAsync::null<void>(); 0080 } 0081 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 0082 auto topRevision = QSharedPointer<qint64>::create(0); 0083 emit replayingChanges(); 0084 mReplayInProgress = true; 0085 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { 0086 SinkWarningCtx(mLogCtx) << error.message; 0087 }); 0088 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { 0089 SinkWarningCtx(mLogCtx) << error.message; 0090 }); 0091 Q_ASSERT(mMainStoreTransaction); 0092 Q_ASSERT(replayStoreTransaction); 0093 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 0094 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 0095 *lastReplayedRevision = value.toLongLong(); 0096 return false; 0097 }, 0098 [](const DataStore::Error &) {}); 0099 *topRevision = DataStore::maxRevision(mMainStoreTransaction); 0100 if (*lastReplayedRevision >= *topRevision) { 0101 SinkTraceCtx(mLogCtx) << "Nothing to replay"; 0102 return KAsync::null(); 0103 } 0104 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 0105 return KAsync::doWhile( 0106 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { 0107 if (*lastReplayedRevision >= *topRevision) { 0108 SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; 0109 return KAsync::value(KAsync::Break); 0110 } 0111 Q_ASSERT(mMainStoreTransaction); 0112 0113 auto replayJob = KAsync::null(); 0114 qint64 revision = *lastReplayedRevision + 1; 0115 while (revision <= *topRevision) { 0116 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); 0117 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); 0118 if (uid.isNull() || type.isEmpty()) { 0119 SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type; 0120 } else { 0121 // TODO: should not use internal representations 0122 const auto key = Storage::Key(uid, revision); 0123 const auto displayKey = key.toDisplayByteArray(); 0124 QByteArray entityBuffer; 0125 DataStore::mainDatabase(mMainStoreTransaction, type) 0126 .scan(revision, 0127 [&entityBuffer](const size_t, const QByteArray &value) -> bool { 0128 entityBuffer = value; 0129 return false; 0130 }, 0131 [this, key](const DataStore::Error &e) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key << "error:" << e; }); 0132 0133 if (entityBuffer.isEmpty()) { 0134 SinkErrorCtx(mLogCtx) << "Failed to replay change " << key; 0135 } else { 0136 if (canReplay(type, displayKey, entityBuffer)) { 0137 SinkTraceCtx(mLogCtx) << "Replaying " << displayKey; 0138 replayJob = replay(type, displayKey, entityBuffer); 0139 //Set the last revision we tried to replay 0140 *lastReplayedRevision = revision; 0141 //Execute replay job and commit 0142 break; 0143 } else { 0144 SinkTraceCtx(mLogCtx) << "Not replaying " << key; 0145 notReplaying(type, displayKey, entityBuffer); 0146 //We silently skip over revisions that cannot be replayed, as this is not an error. 0147 } 0148 } 0149 } 0150 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. 0151 *lastReplayedRevision = revision; 0152 revision++; 0153 } 0154 return replayJob.then([=](const KAsync::Error &error) { 0155 if (error) { 0156 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; 0157 //We're probably not online or so, so postpone retrying 0158 return KAsync::value(KAsync::Break).then(KAsync::error<KAsync::ControlFlowFlag>(error)); 0159 } 0160 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; 0161 0162 recordReplayedRevision(*lastReplayedRevision); 0163 reportProgress(*lastReplayedRevision, *topRevision); 0164 0165 const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); 0166 if (gotMoreToReplay) { 0167 SinkTraceCtx(mLogCtx) << "Replaying some more..."; 0168 //Replay more if we have more 0169 return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); 0170 } else { 0171 return KAsync::value(KAsync::Break); 0172 } 0173 }).guard(&mGuard); 0174 }); 0175 }) 0176 .then([this](const KAsync::Error &error) { 0177 SinkTraceCtx(mLogCtx) << "Change replay complete."; 0178 mMainStoreTransaction.abort(); 0179 mReplayInProgress = false; 0180 if (ChangeReplay::allChangesReplayed()) { 0181 //In case we have a derived implementation 0182 if (allChangesReplayed()) { 0183 SinkTraceCtx(mLogCtx) << "All changes replayed"; 0184 emit changesReplayed(); 0185 } 0186 } 0187 if (error) { 0188 return KAsync::error(error); 0189 } else { 0190 return KAsync::null(); 0191 } 0192 }).guard(&mGuard); 0193 } 0194 0195 void ChangeReplay::revisionChanged() 0196 { 0197 if (!mReplayInProgress) { 0198 replayNextRevision().exec(); 0199 } 0200 } 0201 0202 #pragma clang diagnostic push 0203 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 0204 #include "moc_changereplay.cpp" 0205 #pragma clang diagnostic pop