File indexing completed on 2024-04-28 05:24:11
0001 0002 namespace async { 0003 template<typename T> 0004 KAsync::Job<T> run(const std::function<T()> &f) 0005 { 0006 return KAsync::start<T>([f](KAsync::Future<T> &future) { 0007 auto result = QtConcurrent::run(f); 0008 auto watcher = new QFutureWatcher<T>; 0009 watcher->setFuture(result); 0010 QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future, watcher]() { 0011 future.setValue(watcher->future().result()); 0012 delete watcher; 0013 future.setFinished(); 0014 }); 0015 }); 0016 } 0017 0018 } 0019 0020 void run() 0021 { 0022 return KAsync::start<T>([f](KAsync::Future<T> &future) { 0023 auto result = QtConcurrent::run(f); 0024 auto watcher = new QFutureWatcher<T>; 0025 watcher->setFuture(result); 0026 QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future]() { 0027 future.setFinished(); 0028 }); 0029 }); 0030 } 0031 0032 class ResourceAccessFactory { 0033 public: 0034 static ResourceAccessFactory &instance() 0035 { 0036 static ResourceAccessFactory *instance = 0; 0037 if (!instance) { 0038 instance = new ResourceAccessFactory; 0039 } 0040 return *instance; 0041 } 0042 0043 Sink::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier) 0044 { 0045 if (!mCache.contains(instanceIdentifier)) { 0046 //Reuse the pointer if something else kept the resourceaccess alive 0047 if (mWeakCache.contains(instanceIdentifier)) { 0048 auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); 0049 if (sharedPointer) { 0050 mCache.insert(instanceIdentifier, sharedPointer); 0051 } 0052 } 0053 if (!mCache.contains(instanceIdentifier)) { 0054 //Create a new instance if necessary 0055 auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier); 0056 QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { 0057 if (!ready) { 0058 mCache.remove(instanceIdentifier); 0059 } 0060 }); 0061 mCache.insert(instanceIdentifier, sharedPointer); 0062 mWeakCache.insert(instanceIdentifier, sharedPointer); 0063 } 0064 } 0065 if (!mTimer.contains(instanceIdentifier)) { 0066 auto timer = new QTimer; 0067 //Drop connection after 3 seconds (which is a random value) 0068 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { 0069 mCache.remove(instanceIdentifier); 0070 }); 0071 timer->setInterval(3000); 0072 mTimer.insert(instanceIdentifier, timer); 0073 } 0074 auto timer = mTimer.value(instanceIdentifier); 0075 timer->start(); 0076 return mCache.value(instanceIdentifier); 0077 } 0078 0079 QHash<QByteArray, QWeakPointer<Sink::ResourceAccess> > mWeakCache; 0080 QHash<QByteArray, Sink::ResourceAccess::Ptr> mCache; 0081 QHash<QByteArray, QTimer*> mTimer; 0082 }; 0083 0084 class ChangeReplay : public QObject 0085 { 0086 Q_OBJECT 0087 public: 0088 0089 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; 0090 0091 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) 0092 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), 0093 mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), 0094 mReplayFunction(replayFunction) 0095 { 0096 0097 } 0098 0099 qint64 getLastReplayedRevision() 0100 { 0101 qint64 lastReplayedRevision = 0; 0102 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); 0103 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 0104 lastReplayedRevision = value.toLongLong(); 0105 return false; 0106 }, [](const Storage::Error &) { 0107 }); 0108 return lastReplayedRevision; 0109 } 0110 0111 bool allChangesReplayed() 0112 { 0113 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); 0114 const qint64 lastReplayedRevision = getLastReplayedRevision(); 0115 Trace() << "All changes replayed " << topRevision << lastReplayedRevision; 0116 return (lastReplayedRevision >= topRevision); 0117 } 0118 0119 signals: 0120 void changesReplayed(); 0121 0122 public slots: 0123 void revisionChanged() 0124 { 0125 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); 0126 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); 0127 qint64 lastReplayedRevision = 1; 0128 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 0129 lastReplayedRevision = value.toLongLong(); 0130 return false; 0131 }, [](const Storage::Error &) { 0132 }); 0133 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); 0134 0135 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 0136 if (lastReplayedRevision <= topRevision) { 0137 qint64 revision = lastReplayedRevision; 0138 for (;revision <= topRevision; revision++) { 0139 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 0140 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 0141 const auto key = Storage::assembleKey(uid, revision); 0142 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 0143 mReplayFunction(type, key, value).exec(); 0144 //TODO make for loop async, and pass to async replay function together with type 0145 Trace() << "Replaying " << key; 0146 return false; 0147 }, [key](const Storage::Error &) { 0148 ErrorMsg() << "Failed to replay change " << key; 0149 }); 0150 } 0151 revision--; 0152 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 0153 replayStoreTransaction.commit(); 0154 Trace() << "Replayed until " << revision; 0155 } 0156 emit changesReplayed(); 0157 } 0158 0159 private: 0160 Sink::Storage mStorage; 0161 Sink::Storage mChangeReplayStore; 0162 ReplayFunction mReplayFunction; 0163 }; 0164 0165 KAsync::Job<void> processPipeline() 0166 { 0167 mPipeline->startTransaction(); 0168 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 0169 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 0170 mPipeline->cleanupRevision(revision); 0171 } 0172 mPipeline->commit(); 0173 0174 //Go through all message queues 0175 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 0176 return KAsync::doWhile( 0177 [it]() { return it->hasNext(); }, 0178 [it, this](KAsync::Future<void> &future) { 0179 auto queue = it->next(); 0180 processQueue(queue).then<void>([&future]() { 0181 Trace() << "Queue processed"; 0182 future.setFinished(); 0183 }).exec(); 0184 } 0185 ); 0186 } 0187