File indexing completed on 2024-05-12 05:26:01

0001 /*
0002  * Copyright (C) 2019 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #pragma once
0021 
0022 #include "sink_export.h"
0023 #include <QObject>
0024 #include <QByteArrayList>
0025 #include <string>
0026 #include <functional>
0027 #include <QString>
0028 #include <KAsync/Async>
0029 #include "storage.h"
0030 
0031 /**
0032  * A persistent FIFO message queue.
0033  */
0034 class SINK_EXPORT MessageQueue : public QObject
0035 {
0036     Q_OBJECT
0037 public:
0038     enum ErrorCodes
0039     {
0040         NoMessageFound
0041     };
0042     class Error
0043     {
0044     public:
0045         Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c)
0046         {
0047         }
0048         QByteArray store;
0049         QByteArray message;
0050         int code;
0051     };
0052 
0053     MessageQueue(const QString &storageRoot, const QString &name);
0054     ~MessageQueue();
0055 
0056     QString name() const;
0057 
0058     void startTransaction();
0059     void enqueue(void const *msg, size_t size);
0060     void enqueue(const QByteArray &value);
0061     // Dequeue a message. This will return a new message everytime called.
0062     // Call the result handler with a success response to remove the message from the store.
0063     // TODO track processing progress to avoid processing the same message with the same preprocessor twice?
0064     void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, const std::function<void(const Error &error)> &errorHandler);
0065     KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler);
0066     bool isEmpty();
0067 
0068 public slots:
0069     void commit();
0070 
0071 signals:
0072     void messageReady();
0073     void drained();
0074 
0075 private slots:
0076     void processRemovals();
0077 
0078 private:
0079     Q_DISABLE_COPY(MessageQueue);
0080     Sink::Storage::DataStore mStorage;
0081     Sink::Storage::DataStore::Transaction mWriteTransaction;
0082     qint64 mReplayedRevision;
0083     QString mName;
0084 };