File indexing completed on 2024-05-12 05:25:57

0001 /*
0002  * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 #include "commandprocessor.h"
0021 
0022 #include <QDataStream>
0023 #include <QCoreApplication>
0024 
0025 #include "commands.h"
0026 #include "messagequeue.h"
0027 #include "flush_generated.h"
0028 #include "inspector.h"
0029 #include "synchronizer.h"
0030 #include "pipeline.h"
0031 #include "bufferutils.h"
0032 #include "definitions.h"
0033 #include "storage.h"
0034 
0035 #include "queuedcommand_generated.h"
0036 #include "revisionreplayed_generated.h"
0037 #include "synchronize_generated.h"
0038 
0039 static int sBatchSize = 100;
0040 // This interval directly affects the roundtrip time of single commands
0041 static int sCommitInterval = 10;
0042 
0043 
0044 using namespace Sink;
0045 using namespace Sink::Storage;
0046 
0047 CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx)
0048     : QObject(),
0049     mLogCtx(ctx.subContext("commandprocessor")),
0050     mPipeline(pipeline),
0051     mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"),
0052     mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"),
0053     mCommandQueues({&mUserQueue, &mSynchronizerQueue}), mProcessingLock(false), mLowerBoundRevision(0)
0054 {
0055     for (auto queue : mCommandQueues) {
0056         /*
0057          * This is a queued connection because otherwise we would execute CommandProcessor::process in the middle of
0058          * Synchronizer::commit, which is not what we want.
0059          */
0060         const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process, Qt::QueuedConnection);
0061         Q_UNUSED(ret);
0062     }
0063 
0064     mCommitQueueTimer.setInterval(sCommitInterval);
0065     mCommitQueueTimer.setSingleShot(true);
0066     QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
0067 }
0068 
0069 static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
0070 {
0071     flatbuffers::FlatBufferBuilder fbb;
0072     auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
0073     auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
0074     Sink::FinishQueuedCommandBuffer(fbb, buffer);
0075     mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
0076 }
0077 
0078 void CommandProcessor::processCommand(int commandId, const QByteArray &data)
0079 {
0080     switch (commandId) {
0081         case Commands::FlushCommand:
0082             processFlushCommand(data);
0083             break;
0084         case Commands::SynchronizeCommand:
0085             processSynchronizeCommand(data);
0086             break;
0087         case Commands::AbortSynchronizationCommand:
0088             mSynchronizer->abort();
0089             break;
0090         // case Commands::RevisionReplayedCommand:
0091         //     processRevisionReplayedCommand(data);
0092         //     break;
0093         default: {
0094             static int modifications = 0;
0095             mUserQueue.startTransaction();
0096             SinkTraceCtx(mLogCtx) << "Received a command" << commandId;
0097             enqueueCommand(mUserQueue, commandId, data);
0098             modifications++;
0099             if (modifications >= sBatchSize) {
0100                 mUserQueue.commit();
0101                 modifications = 0;
0102                 mCommitQueueTimer.stop();
0103             } else {
0104                 mCommitQueueTimer.start();
0105             }
0106         }
0107     };
0108 }
0109 
0110 void CommandProcessor::processFlushCommand(const QByteArray &data)
0111 {
0112     flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
0113     if (Sink::Commands::VerifyFlushBuffer(verifier)) {
0114         auto buffer = Sink::Commands::GetFlush(data.constData());
0115         const auto flushType = buffer->type();
0116         const auto flushId = BufferUtils::extractBufferCopy(buffer->id());
0117         SinkTraceCtx(mLogCtx) << "Received flush command " << flushId;
0118         if (flushType == Sink::Flush::FlushSynchronization) {
0119             mSynchronizer->flush(flushType, flushId);
0120         } else {
0121             mUserQueue.startTransaction();
0122             enqueueCommand(mUserQueue, Commands::FlushCommand, data);
0123             mUserQueue.commit();
0124         }
0125     }
0126 
0127 }
0128 
0129 void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
0130 {
0131     flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
0132     if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
0133         auto buffer = Sink::Commands::GetSynchronize(data.constData());
0134         Sink::QueryBase query;
0135         if (buffer->query()) {
0136             auto data = QByteArray::fromStdString(buffer->query()->str());
0137             QDataStream stream(&data, QIODevice::ReadOnly);
0138             stream >> query;
0139         }
0140         mSynchronizer->synchronize(query);
0141     } else {
0142         SinkWarningCtx(mLogCtx) << "received invalid command";
0143     }
0144 }
0145 
0146 // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data)
0147 // {
0148 //     flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
0149 //     if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
0150 //         auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
0151 //         client.currentRevision = buffer->revision();
0152 //     } else {
0153 //         SinkWarningCtx(mLogCtx) << "received invalid command";
0154 //     }
0155 //     loadResource().setLowerBoundRevision(lowerBoundRevision());
0156 // }
0157 
0158 void CommandProcessor::setOldestUsedRevision(qint64 revision)
0159 {
0160     SinkTrace() << "Updating lower bound revision:" << revision;
0161     mLowerBoundRevision = revision;
0162 }
0163 
0164 bool CommandProcessor::messagesToProcessAvailable()
0165 {
0166     for (auto queue : mCommandQueues) {
0167         if (!queue->isEmpty()) {
0168             return true;
0169         }
0170     }
0171     return false;
0172 }
0173 
0174 void CommandProcessor::process()
0175 {
0176     if (mProcessingLock) {
0177         return;
0178     }
0179     mProcessingLock = true;
0180     auto job = processPipeline()
0181                     .then([this]() {
0182                         mProcessingLock = false;
0183                         if (messagesToProcessAvailable()) {
0184                             process();
0185                         }
0186                     })
0187                     .exec();
0188 }
0189 
0190 KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand &queuedCommand)
0191 {
0192     SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand.commandId());
0193     const auto data = queuedCommand.command()->Data();
0194     const auto size = queuedCommand.command()->size();
0195     switch (queuedCommand.commandId()) {
0196         case Sink::Commands::DeleteEntityCommand:
0197             return mPipeline->deletedEntity(data, size);
0198         case Sink::Commands::ModifyEntityCommand:
0199             return mPipeline->modifiedEntity(data, size);
0200         case Sink::Commands::CreateEntityCommand:
0201             return mPipeline->newEntity(data, size);
0202         case Sink::Commands::InspectionCommand:
0203             Q_ASSERT(mInspector);
0204             return mInspector->processCommand(data, size)
0205                     .then(KAsync::value<qint64>(-1));
0206         case Sink::Commands::FlushCommand:
0207             return flush(data, size)
0208                 .then(KAsync::value<qint64>(-1));
0209         default:
0210             return KAsync::error<qint64>(-1, "Unhandled command");
0211     }
0212 }
0213 
0214 KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &data)
0215 {
0216     flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
0217     if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
0218         SinkWarningCtx(mLogCtx) << "invalid buffer";
0219         return KAsync::error<qint64>(-1, "Invalid Buffer");
0220     }
0221     auto queuedCommand = Sink::GetQueuedCommand(data.constData());
0222     const auto commandId = queuedCommand->commandId();
0223     return processQueuedCommand(*queuedCommand)
0224         .then<qint64, qint64>(
0225             [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
0226                 if (error) {
0227                     SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage;
0228                     return KAsync::error<qint64>(error);
0229                 }
0230                 SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId);
0231                 return KAsync::value<qint64>(createdRevision);
0232             });
0233 }
0234 
0235 // Process one batch of messages from this queue
0236 KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
0237 {
0238     return KAsync::start([=] { mPipeline->startTransaction(); })
0239         .then([=] {
0240                 return queue->dequeueBatch(sBatchSize,
0241                     [=](const QByteArray &data) {
0242                         auto time = QSharedPointer<QTime>::create();
0243                         time->start();
0244                         return processQueuedCommand(data)
0245                         .then([=](qint64 createdRevision) {
0246                             SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
0247                         });
0248                     })
0249                     .then([=](const KAsync::Error &error) {
0250                         if (error) {
0251                             if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
0252                                 SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage;
0253                             }
0254                         }
0255                     });
0256             })
0257         .then([=](const KAsync::Error &) {
0258             mPipeline->commit();
0259             //The flushed content has been persistet, we can notify the world
0260             for (const auto &flushId : mCompleteFlushes) {
0261                 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId;
0262                 mSynchronizer->flushComplete(flushId);
0263                 Sink::Notification n;
0264                 n.type = Sink::Notification::FlushCompletion;
0265                 n.id = flushId;
0266                 emit notify(n);
0267             }
0268             mCompleteFlushes.clear();
0269             QCoreApplication::processEvents(QEventLoop::AllEvents, 10);
0270         });
0271 
0272 
0273 }
0274 
0275 KAsync::Job<void> CommandProcessor::processPipeline()
0276 {
0277     mTime.start();
0278     mPipeline->cleanupRevisions(mLowerBoundRevision);
0279     SinkTraceCtx(mLogCtx) << "Cleanup until revision " << mLowerBoundRevision << "done." << Log::TraceTime(mTime.elapsed());
0280 
0281     // Go through all message queues
0282     if (mCommandQueues.isEmpty()) {
0283         return KAsync::null<void>();
0284     }
0285     return KAsync::doWhile([this]() {
0286             for (auto queue : mCommandQueues) {
0287                 if (!queue->isEmpty()) {
0288                     mTime.start();
0289                     return processQueue(queue)
0290                         .guard(this)
0291                         .then([this] {
0292                             SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(mTime.elapsed());
0293                             return KAsync::Continue;
0294                         });
0295                 }
0296             }
0297             return KAsync::value(KAsync::Break);
0298         });
0299 }
0300 
0301 void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector)
0302 {
0303     mInspector = inspector;
0304     QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify);
0305 }
0306 
0307 void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
0308 {
0309     mSynchronizer = synchronizer;
0310     mSynchronizer->setup([this](int commandId, const QByteArray &data) {
0311         enqueueCommand(mSynchronizerQueue, commandId, data);
0312     }, mSynchronizerQueue);
0313     QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
0314     setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
0315 }
0316 
0317 KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
0318 {
0319     flatbuffers::Verifier verifier((const uint8_t *)command, size);
0320     if (Sink::Commands::VerifyFlushBuffer(verifier)) {
0321         auto buffer = Sink::Commands::GetFlush(command);
0322         const auto flushType = buffer->type();
0323         const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id());
0324         Q_ASSERT(!flushId.isEmpty());
0325         if (flushType == Sink::Flush::FlushReplayQueue) {
0326             SinkTraceCtx(mLogCtx) << "Flushing synchronizer " << flushId;
0327             Q_ASSERT(mSynchronizer);
0328             mSynchronizer->flush(flushType, flushId);
0329         } else {
0330             //Defer notification until the results have been comitted
0331             mCompleteFlushes << flushId;
0332         }
0333         return KAsync::null<void>();
0334     }
0335     return KAsync::error<void>(-1, "Invalid flush command.");
0336 }
0337 
0338 static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
0339 {
0340     if (queue.isEmpty()) {
0341         f.setFinished();
0342     } else {
0343         auto context = new QObject;
0344         QObject::connect(&queue, &MessageQueue::drained, context, [&f, context]() {
0345             delete context;
0346             f.setFinished();
0347         });
0348     }
0349 };
0350 
0351 KAsync::Job<void> CommandProcessor::processAllMessages()
0352 {
0353     // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
0354     // TODO: report errors while processing sync?
0355     // TODO JOBAPI: A helper that waits for n events and then continues?
0356     return KAsync::start<void>([this](KAsync::Future<void> &f) {
0357                if (mCommitQueueTimer.isActive()) {
0358                    auto context = new QObject;
0359                    QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
0360                        delete context;
0361                        f.setFinished();
0362                    });
0363                } else {
0364                    f.setFinished();
0365                }
0366            })
0367         .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
0368         .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
0369         .then<void>([this](KAsync::Future<void> &f) {
0370             if (mSynchronizer->ChangeReplay::allChangesReplayed()) {
0371                 f.setFinished();
0372             } else {
0373                 auto context = new QObject;
0374                 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
0375                     delete context;
0376                     f.setFinished();
0377                 });
0378                 mSynchronizer->replayNextRevision().exec();
0379             }
0380         });
0381 }
0382