File indexing completed on 2024-04-28 05:24:11

0001 
0002 namespace async {
0003     template<typename T>
0004     KAsync::Job<T> run(const std::function<T()> &f)
0005     {
0006         return KAsync::start<T>([f](KAsync::Future<T> &future) {
0007             auto result = QtConcurrent::run(f);
0008             auto watcher = new QFutureWatcher<T>;
0009             watcher->setFuture(result);
0010             QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future, watcher]() {
0011                 future.setValue(watcher->future().result());
0012                 delete watcher;
0013                 future.setFinished();
0014             });
0015         });
0016     }
0017 
0018 }
0019 
0020 void run()
0021 {
0022     return KAsync::start<T>([f](KAsync::Future<T> &future) {
0023         auto result = QtConcurrent::run(f);
0024         auto watcher = new QFutureWatcher<T>;
0025         watcher->setFuture(result);
0026         QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future]() {
0027             future.setFinished();
0028         });
0029     });
0030 }
0031 
0032 class ResourceAccessFactory {
0033 public:
0034     static ResourceAccessFactory &instance()
0035     {
0036         static ResourceAccessFactory *instance = 0;
0037         if (!instance) {
0038             instance = new ResourceAccessFactory;
0039         }
0040         return *instance;
0041     }
0042 
0043     Sink::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier)
0044     {
0045         if (!mCache.contains(instanceIdentifier)) {
0046             //Reuse the pointer if something else kept the resourceaccess alive
0047             if (mWeakCache.contains(instanceIdentifier)) {
0048                 auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef();
0049                 if (sharedPointer) {
0050                     mCache.insert(instanceIdentifier, sharedPointer);
0051                 }
0052             }
0053             if (!mCache.contains(instanceIdentifier)) {
0054                 //Create a new instance if necessary
0055                 auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier);
0056                 QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) {
0057                     if (!ready) {
0058                         mCache.remove(instanceIdentifier);
0059                     }
0060                 });
0061                 mCache.insert(instanceIdentifier, sharedPointer);
0062                 mWeakCache.insert(instanceIdentifier, sharedPointer);
0063             }
0064         }
0065         if (!mTimer.contains(instanceIdentifier)) {
0066             auto timer = new QTimer;
0067             //Drop connection after 3 seconds (which is a random value)
0068             QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() {
0069                 mCache.remove(instanceIdentifier);
0070             });
0071             timer->setInterval(3000);
0072             mTimer.insert(instanceIdentifier, timer);
0073         }
0074         auto timer = mTimer.value(instanceIdentifier);
0075         timer->start();
0076         return mCache.value(instanceIdentifier);
0077     }
0078 
0079     QHash<QByteArray, QWeakPointer<Sink::ResourceAccess> > mWeakCache;
0080     QHash<QByteArray, Sink::ResourceAccess::Ptr> mCache;
0081     QHash<QByteArray, QTimer*> mTimer;
0082 };
0083 
0084 class ChangeReplay : public QObject
0085 {
0086     Q_OBJECT
0087 public:
0088 
0089     typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
0090 
0091     ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction)
0092         : mStorage(storageLocation(), resourceName, Storage::ReadOnly),
0093         mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite),
0094         mReplayFunction(replayFunction)
0095     {
0096 
0097     }
0098 
0099     qint64 getLastReplayedRevision()
0100     {
0101         qint64 lastReplayedRevision = 0;
0102         auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
0103         replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
0104             lastReplayedRevision = value.toLongLong();
0105             return false;
0106         }, [](const Storage::Error &) {
0107         });
0108         return lastReplayedRevision;
0109     }
0110 
0111     bool allChangesReplayed()
0112     {
0113         const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
0114         const qint64 lastReplayedRevision = getLastReplayedRevision();
0115         Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
0116         return (lastReplayedRevision >= topRevision);
0117     }
0118 
0119 signals:
0120     void changesReplayed();
0121 
0122 public slots:
0123     void revisionChanged()
0124     {
0125         auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
0126         auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
0127         qint64 lastReplayedRevision = 1;
0128         replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
0129             lastReplayedRevision = value.toLongLong();
0130             return false;
0131         }, [](const Storage::Error &) {
0132         });
0133         const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
0134 
0135         Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
0136         if (lastReplayedRevision <= topRevision) {
0137             qint64 revision = lastReplayedRevision;
0138             for (;revision <= topRevision; revision++) {
0139                 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
0140                 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
0141                 const auto key = Storage::assembleKey(uid, revision);
0142                 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
0143                     mReplayFunction(type, key, value).exec();
0144                     //TODO make for loop async, and pass to async replay function together with type
0145                     Trace() << "Replaying " << key;
0146                     return false;
0147                 }, [key](const Storage::Error &) {
0148                     ErrorMsg() << "Failed to replay change " << key;
0149                 });
0150             }
0151             revision--;
0152             replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
0153             replayStoreTransaction.commit();
0154             Trace() << "Replayed until " << revision;
0155         }
0156         emit changesReplayed();
0157     }
0158 
0159 private:
0160     Sink::Storage mStorage;
0161     Sink::Storage mChangeReplayStore;
0162     ReplayFunction mReplayFunction;
0163 };
0164 
0165 KAsync::Job<void> processPipeline()
0166 {
0167     mPipeline->startTransaction();
0168     Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
0169     for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
0170         mPipeline->cleanupRevision(revision);
0171     }
0172     mPipeline->commit();
0173 
0174     //Go through all message queues
0175     auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
0176     return KAsync::doWhile(
0177         [it]() { return it->hasNext(); },
0178         [it, this](KAsync::Future<void> &future) {
0179             auto queue = it->next();
0180             processQueue(queue).then<void>([&future]() {
0181                 Trace() << "Queue processed";
0182                 future.setFinished();
0183             }).exec();
0184         }
0185     );
0186 }
0187