File indexing completed on 2024-05-12 05:25:57

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #pragma once
0021 
0022 #include "sink_export.h"
0023 
0024 #include <QObject>
0025 #include <QTimer>
0026 #include <QTime>
0027 #include <KAsync/Async>
0028 #include <functional>
0029 
0030 #include "log.h"
0031 #include "notification.h"
0032 #include "messagequeue.h"
0033 
0034 namespace Sink {
0035     class Pipeline;
0036     class Inspector;
0037     class Synchronizer;
0038     struct QueuedCommand;
0039     class QueryBase;
0040 
0041 /**
0042  * Drives the pipeline using the output from all command queues
0043  */
0044 class SINK_EXPORT CommandProcessor : public QObject
0045 {
0046     Q_OBJECT
0047 
0048 public:
0049     CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx);
0050 
0051     void setOldestUsedRevision(qint64 revision);
0052 
0053     void setInspector(const QSharedPointer<Inspector> &inspector);
0054     void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
0055 
0056     void processCommand(int commandId, const QByteArray &data);
0057 
0058     KAsync::Job<void> processAllMessages();
0059 
0060 signals:
0061     void notify(Notification);
0062     void error(int errorCode, const QString &errorMessage);
0063 
0064 private:
0065     bool messagesToProcessAvailable();
0066 
0067 private slots:
0068     void process();
0069     KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand &queuedCommand);
0070     KAsync::Job<qint64> processQueuedCommand(const QByteArray &data);
0071     // Process all messages of this queue
0072     KAsync::Job<void> processQueue(MessageQueue *queue);
0073     KAsync::Job<void> processPipeline();
0074 
0075 private:
0076     void processFlushCommand(const QByteArray &data);
0077     void processSynchronizeCommand(const QByteArray &data);
0078     // void processRevisionReplayedCommand(const QByteArray &data);
0079 
0080     KAsync::Job<void> flush(void const *command, size_t size);
0081 
0082     Sink::Log::Context mLogCtx;
0083     Sink::Pipeline *mPipeline;
0084     MessageQueue mUserQueue;
0085     MessageQueue mSynchronizerQueue;
0086     // Ordered by priority
0087     QList<MessageQueue *> mCommandQueues;
0088     bool mProcessingLock;
0089     // The lowest revision we no longer need
0090     qint64 mLowerBoundRevision;
0091     QSharedPointer<Synchronizer> mSynchronizer;
0092     QSharedPointer<Inspector> mInspector;
0093     QTimer mCommitQueueTimer;
0094     QTime mTime;
0095     QVector<QByteArray> mCompleteFlushes;
0096 };
0097 
0098 };