File indexing completed on 2024-05-12 05:26:01

0001 /*
0002  * Copyright (C) 2019 Christian Mollekopf <mollekopf@kolabsys.com>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "messagequeue.h"
0021 #include "storage.h"
0022 #include "storage/key.h"
0023 #include <log.h>
0024 
0025 using namespace Sink::Storage;
0026 
0027 MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, DataStore::ReadWrite), mReplayedRevision{-1}, mName{name}
0028 {
0029 }
0030 
0031 MessageQueue::~MessageQueue()
0032 {
0033     if (mWriteTransaction) {
0034         mWriteTransaction.abort();
0035     }
0036 }
0037 
0038 QString MessageQueue::name() const
0039 {
0040     return mName;
0041 }
0042 
0043 void MessageQueue::enqueue(void const *msg, size_t size)
0044 {
0045     enqueue(QByteArray::fromRawData(static_cast<const char *>(msg), size));
0046 }
0047 
0048 void MessageQueue::startTransaction()
0049 {
0050     if (mWriteTransaction) {
0051         return;
0052     }
0053     processRemovals();
0054     mWriteTransaction = mStorage.createTransaction(DataStore::ReadWrite);
0055 }
0056 
0057 void MessageQueue::commit()
0058 {
0059     mWriteTransaction.commit();
0060     mWriteTransaction = DataStore::Transaction();
0061     processRemovals();
0062     emit messageReady();
0063 }
0064 
0065 void MessageQueue::enqueue(const QByteArray &value)
0066 {
0067     bool implicitTransaction = false;
0068     if (!mWriteTransaction) {
0069         implicitTransaction = true;
0070         startTransaction();
0071     }
0072     const qint64 revision = DataStore::maxRevision(mWriteTransaction) + 1;
0073     mWriteTransaction.openDatabase().write(Revision{size_t(revision)}.toDisplayByteArray(), value);
0074     DataStore::setMaxRevision(mWriteTransaction, revision);
0075     if (implicitTransaction) {
0076         commit();
0077     }
0078 }
0079 
0080 void MessageQueue::processRemovals()
0081 {
0082     if (mWriteTransaction) {
0083         if (mReplayedRevision > 0) {
0084             auto dequedRevisions = mReplayedRevision - DataStore::cleanedUpRevision(mWriteTransaction);
0085             if (dequedRevisions > 500) {
0086                 SinkTrace() << "We're building up a large backlog of dequeued revisions " << dequedRevisions;
0087             }
0088         }
0089         return;
0090     }
0091     if (mReplayedRevision >= 0) {
0092         auto transaction = mStorage.createTransaction(DataStore::ReadWrite);
0093         auto db = transaction.openDatabase();
0094         for (auto revision = DataStore::cleanedUpRevision(transaction) + 1; revision <= mReplayedRevision; revision++) {
0095             db.remove(Revision{size_t(revision)}.toDisplayByteArray());
0096         }
0097         DataStore::setCleanedUpRevision(transaction, mReplayedRevision);
0098         transaction.commit();
0099         mReplayedRevision = -1;
0100     }
0101 }
0102 
0103 void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, const std::function<void(const Error &error)> &errorHandler)
0104 {
0105     dequeueBatch(1, [resultHandler](const QByteArray &value) {
0106         return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) {
0107             resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); });
0108         });
0109     }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec();
0110 }
0111 
0112 KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
0113 {
0114     return KAsync::start<void>([this, maxBatchSize, resultHandler](KAsync::Future<void> &future) {
0115         int count = 0;
0116         QList<KAsync::Future<void>> waitCondition;
0117         mStorage.createTransaction(DataStore::ReadOnly)
0118             .openDatabase()
0119             .scan("",
0120                 [&](const QByteArray &key, const QByteArray &value) -> bool {
0121                     const auto revision = key.toLongLong();
0122                     if (revision <= mReplayedRevision) {
0123                         return true;
0124                     }
0125                     mReplayedRevision = revision;
0126 
0127                     waitCondition << resultHandler(value).exec();
0128 
0129                     count++;
0130                     if (count < maxBatchSize) {
0131                         return true;
0132                     }
0133                     return false;
0134                 },
0135                 [](const DataStore::Error &error) {
0136                     SinkError() << "Error while retrieving value" << error.message;
0137                     // errorHandler(Error(error.store, error.code, error.message));
0138                 });
0139 
0140         // Trace() << "Waiting on " << waitCondition.size() << " results";
0141         KAsync::waitForCompletion(waitCondition)
0142             .then([this, count, &future]() {
0143                 processRemovals();
0144                 if (count == 0) {
0145                     future.setFinished();
0146                 } else {
0147                     if (isEmpty()) {
0148                         emit this->drained();
0149                     }
0150                     future.setFinished();
0151                 }
0152             })
0153             .exec();
0154     });
0155 }
0156 
0157 bool MessageQueue::isEmpty()
0158 {
0159     int count = 0;
0160     auto t = mStorage.createTransaction(DataStore::ReadOnly);
0161     auto db = t.openDatabase();
0162     if (db) {
0163         db.scan("",
0164             [&count, this](const QByteArray &key, const QByteArray &value) -> bool {
0165                 const auto revision = key.toLongLong();
0166                 if (revision <= mReplayedRevision) {
0167                     return true;
0168                 }
0169                 count++;
0170                 return false;
0171             },
0172             [](const DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; });
0173     }
0174     return count == 0;
0175 }
0176 
0177 #pragma clang diagnostic push
0178 #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
0179 #include "moc_messagequeue.cpp"
0180 #pragma clang diagnostic pop