File indexing completed on 2024-05-12 05:25:57
0001 /* 0002 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 #include "commandprocessor.h" 0021 0022 #include <QDataStream> 0023 #include <QCoreApplication> 0024 0025 #include "commands.h" 0026 #include "messagequeue.h" 0027 #include "flush_generated.h" 0028 #include "inspector.h" 0029 #include "synchronizer.h" 0030 #include "pipeline.h" 0031 #include "bufferutils.h" 0032 #include "definitions.h" 0033 #include "storage.h" 0034 0035 #include "queuedcommand_generated.h" 0036 #include "revisionreplayed_generated.h" 0037 #include "synchronize_generated.h" 0038 0039 static int sBatchSize = 100; 0040 // This interval directly affects the roundtrip time of single commands 0041 static int sCommitInterval = 10; 0042 0043 0044 using namespace Sink; 0045 using namespace Sink::Storage; 0046 0047 CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) 0048 : QObject(), 0049 mLogCtx(ctx.subContext("commandprocessor")), 0050 mPipeline(pipeline), 0051 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), 0052 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), 0053 mCommandQueues({&mUserQueue, &mSynchronizerQueue}), mProcessingLock(false), mLowerBoundRevision(0) 0054 { 0055 for (auto queue : mCommandQueues) { 0056 /* 0057 * This is a queued connection because otherwise we would execute CommandProcessor::process in the middle of 0058 * Synchronizer::commit, which is not what we want. 0059 */ 0060 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process, Qt::QueuedConnection); 0061 Q_UNUSED(ret); 0062 } 0063 0064 mCommitQueueTimer.setInterval(sCommitInterval); 0065 mCommitQueueTimer.setSingleShot(true); 0066 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 0067 } 0068 0069 static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) 0070 { 0071 flatbuffers::FlatBufferBuilder fbb; 0072 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); 0073 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); 0074 Sink::FinishQueuedCommandBuffer(fbb, buffer); 0075 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); 0076 } 0077 0078 void CommandProcessor::processCommand(int commandId, const QByteArray &data) 0079 { 0080 switch (commandId) { 0081 case Commands::FlushCommand: 0082 processFlushCommand(data); 0083 break; 0084 case Commands::SynchronizeCommand: 0085 processSynchronizeCommand(data); 0086 break; 0087 case Commands::AbortSynchronizationCommand: 0088 mSynchronizer->abort(); 0089 break; 0090 // case Commands::RevisionReplayedCommand: 0091 // processRevisionReplayedCommand(data); 0092 // break; 0093 default: { 0094 static int modifications = 0; 0095 mUserQueue.startTransaction(); 0096 SinkTraceCtx(mLogCtx) << "Received a command" << commandId; 0097 enqueueCommand(mUserQueue, commandId, data); 0098 modifications++; 0099 if (modifications >= sBatchSize) { 0100 mUserQueue.commit(); 0101 modifications = 0; 0102 mCommitQueueTimer.stop(); 0103 } else { 0104 mCommitQueueTimer.start(); 0105 } 0106 } 0107 }; 0108 } 0109 0110 void CommandProcessor::processFlushCommand(const QByteArray &data) 0111 { 0112 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); 0113 if (Sink::Commands::VerifyFlushBuffer(verifier)) { 0114 auto buffer = Sink::Commands::GetFlush(data.constData()); 0115 const auto flushType = buffer->type(); 0116 const auto flushId = BufferUtils::extractBufferCopy(buffer->id()); 0117 SinkTraceCtx(mLogCtx) << "Received flush command " << flushId; 0118 if (flushType == Sink::Flush::FlushSynchronization) { 0119 mSynchronizer->flush(flushType, flushId); 0120 } else { 0121 mUserQueue.startTransaction(); 0122 enqueueCommand(mUserQueue, Commands::FlushCommand, data); 0123 mUserQueue.commit(); 0124 } 0125 } 0126 0127 } 0128 0129 void CommandProcessor::processSynchronizeCommand(const QByteArray &data) 0130 { 0131 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); 0132 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { 0133 auto buffer = Sink::Commands::GetSynchronize(data.constData()); 0134 Sink::QueryBase query; 0135 if (buffer->query()) { 0136 auto data = QByteArray::fromStdString(buffer->query()->str()); 0137 QDataStream stream(&data, QIODevice::ReadOnly); 0138 stream >> query; 0139 } 0140 mSynchronizer->synchronize(query); 0141 } else { 0142 SinkWarningCtx(mLogCtx) << "received invalid command"; 0143 } 0144 } 0145 0146 // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) 0147 // { 0148 // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 0149 // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 0150 // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 0151 // client.currentRevision = buffer->revision(); 0152 // } else { 0153 // SinkWarningCtx(mLogCtx) << "received invalid command"; 0154 // } 0155 // loadResource().setLowerBoundRevision(lowerBoundRevision()); 0156 // } 0157 0158 void CommandProcessor::setOldestUsedRevision(qint64 revision) 0159 { 0160 SinkTrace() << "Updating lower bound revision:" << revision; 0161 mLowerBoundRevision = revision; 0162 } 0163 0164 bool CommandProcessor::messagesToProcessAvailable() 0165 { 0166 for (auto queue : mCommandQueues) { 0167 if (!queue->isEmpty()) { 0168 return true; 0169 } 0170 } 0171 return false; 0172 } 0173 0174 void CommandProcessor::process() 0175 { 0176 if (mProcessingLock) { 0177 return; 0178 } 0179 mProcessingLock = true; 0180 auto job = processPipeline() 0181 .then([this]() { 0182 mProcessingLock = false; 0183 if (messagesToProcessAvailable()) { 0184 process(); 0185 } 0186 }) 0187 .exec(); 0188 } 0189 0190 KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand &queuedCommand) 0191 { 0192 SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand.commandId()); 0193 const auto data = queuedCommand.command()->Data(); 0194 const auto size = queuedCommand.command()->size(); 0195 switch (queuedCommand.commandId()) { 0196 case Sink::Commands::DeleteEntityCommand: 0197 return mPipeline->deletedEntity(data, size); 0198 case Sink::Commands::ModifyEntityCommand: 0199 return mPipeline->modifiedEntity(data, size); 0200 case Sink::Commands::CreateEntityCommand: 0201 return mPipeline->newEntity(data, size); 0202 case Sink::Commands::InspectionCommand: 0203 Q_ASSERT(mInspector); 0204 return mInspector->processCommand(data, size) 0205 .then(KAsync::value<qint64>(-1)); 0206 case Sink::Commands::FlushCommand: 0207 return flush(data, size) 0208 .then(KAsync::value<qint64>(-1)); 0209 default: 0210 return KAsync::error<qint64>(-1, "Unhandled command"); 0211 } 0212 } 0213 0214 KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &data) 0215 { 0216 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 0217 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 0218 SinkWarningCtx(mLogCtx) << "invalid buffer"; 0219 return KAsync::error<qint64>(-1, "Invalid Buffer"); 0220 } 0221 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 0222 const auto commandId = queuedCommand->commandId(); 0223 return processQueuedCommand(*queuedCommand) 0224 .then<qint64, qint64>( 0225 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { 0226 if (error) { 0227 SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; 0228 return KAsync::error<qint64>(error); 0229 } 0230 SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); 0231 return KAsync::value<qint64>(createdRevision); 0232 }); 0233 } 0234 0235 // Process one batch of messages from this queue 0236 KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue) 0237 { 0238 return KAsync::start([=] { mPipeline->startTransaction(); }) 0239 .then([=] { 0240 return queue->dequeueBatch(sBatchSize, 0241 [=](const QByteArray &data) { 0242 auto time = QSharedPointer<QTime>::create(); 0243 time->start(); 0244 return processQueuedCommand(data) 0245 .then([=](qint64 createdRevision) { 0246 SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 0247 }); 0248 }) 0249 .then([=](const KAsync::Error &error) { 0250 if (error) { 0251 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { 0252 SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; 0253 } 0254 } 0255 }); 0256 }) 0257 .then([=](const KAsync::Error &) { 0258 mPipeline->commit(); 0259 //The flushed content has been persistet, we can notify the world 0260 for (const auto &flushId : mCompleteFlushes) { 0261 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; 0262 mSynchronizer->flushComplete(flushId); 0263 Sink::Notification n; 0264 n.type = Sink::Notification::FlushCompletion; 0265 n.id = flushId; 0266 emit notify(n); 0267 } 0268 mCompleteFlushes.clear(); 0269 QCoreApplication::processEvents(QEventLoop::AllEvents, 10); 0270 }); 0271 0272 0273 } 0274 0275 KAsync::Job<void> CommandProcessor::processPipeline() 0276 { 0277 mTime.start(); 0278 mPipeline->cleanupRevisions(mLowerBoundRevision); 0279 SinkTraceCtx(mLogCtx) << "Cleanup until revision " << mLowerBoundRevision << "done." << Log::TraceTime(mTime.elapsed()); 0280 0281 // Go through all message queues 0282 if (mCommandQueues.isEmpty()) { 0283 return KAsync::null<void>(); 0284 } 0285 return KAsync::doWhile([this]() { 0286 for (auto queue : mCommandQueues) { 0287 if (!queue->isEmpty()) { 0288 mTime.start(); 0289 return processQueue(queue) 0290 .guard(this) 0291 .then([this] { 0292 SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(mTime.elapsed()); 0293 return KAsync::Continue; 0294 }); 0295 } 0296 } 0297 return KAsync::value(KAsync::Break); 0298 }); 0299 } 0300 0301 void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector) 0302 { 0303 mInspector = inspector; 0304 QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); 0305 } 0306 0307 void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 0308 { 0309 mSynchronizer = synchronizer; 0310 mSynchronizer->setup([this](int commandId, const QByteArray &data) { 0311 enqueueCommand(mSynchronizerQueue, commandId, data); 0312 }, mSynchronizerQueue); 0313 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); 0314 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 0315 } 0316 0317 KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) 0318 { 0319 flatbuffers::Verifier verifier((const uint8_t *)command, size); 0320 if (Sink::Commands::VerifyFlushBuffer(verifier)) { 0321 auto buffer = Sink::Commands::GetFlush(command); 0322 const auto flushType = buffer->type(); 0323 const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); 0324 Q_ASSERT(!flushId.isEmpty()); 0325 if (flushType == Sink::Flush::FlushReplayQueue) { 0326 SinkTraceCtx(mLogCtx) << "Flushing synchronizer " << flushId; 0327 Q_ASSERT(mSynchronizer); 0328 mSynchronizer->flush(flushType, flushId); 0329 } else { 0330 //Defer notification until the results have been comitted 0331 mCompleteFlushes << flushId; 0332 } 0333 return KAsync::null<void>(); 0334 } 0335 return KAsync::error<void>(-1, "Invalid flush command."); 0336 } 0337 0338 static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) 0339 { 0340 if (queue.isEmpty()) { 0341 f.setFinished(); 0342 } else { 0343 auto context = new QObject; 0344 QObject::connect(&queue, &MessageQueue::drained, context, [&f, context]() { 0345 delete context; 0346 f.setFinished(); 0347 }); 0348 } 0349 }; 0350 0351 KAsync::Job<void> CommandProcessor::processAllMessages() 0352 { 0353 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 0354 // TODO: report errors while processing sync? 0355 // TODO JOBAPI: A helper that waits for n events and then continues? 0356 return KAsync::start<void>([this](KAsync::Future<void> &f) { 0357 if (mCommitQueueTimer.isActive()) { 0358 auto context = new QObject; 0359 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { 0360 delete context; 0361 f.setFinished(); 0362 }); 0363 } else { 0364 f.setFinished(); 0365 } 0366 }) 0367 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 0368 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 0369 .then<void>([this](KAsync::Future<void> &f) { 0370 if (mSynchronizer->ChangeReplay::allChangesReplayed()) { 0371 f.setFinished(); 0372 } else { 0373 auto context = new QObject; 0374 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { 0375 delete context; 0376 f.setFinished(); 0377 }); 0378 mSynchronizer->replayNextRevision().exec(); 0379 } 0380 }); 0381 } 0382