File indexing completed on 2024-05-12 05:26:22
0001 #include <QTest> 0002 0003 #include <QString> 0004 #include <QDateTime> 0005 0006 #include <iostream> 0007 0008 #include "dummyresource/resourcefactory.h" 0009 #include "store.h" 0010 #include "commands.h" 0011 #include "entitybuffer.h" 0012 #include "log.h" 0013 #include "resourceconfig.h" 0014 #include "definitions.h" 0015 #include "facadefactory.h" 0016 #include "adaptorfactoryregistry.h" 0017 0018 #include "hawd/dataset.h" 0019 #include "hawd/formatter.h" 0020 0021 #include "event_generated.h" 0022 #include "mail_generated.h" 0023 #include "entity_generated.h" 0024 #include "metadata_generated.h" 0025 #include "createentity_generated.h" 0026 0027 #include "getrssusage.h" 0028 #include "utils.h" 0029 0030 #include <KMime/Message> 0031 0032 static QByteArray createEntityBuffer(size_t attachmentSize, int &bufferSize) 0033 { 0034 flatbuffers::FlatBufferBuilder eventFbb; 0035 eventFbb.Clear(); 0036 { 0037 0038 auto msg = KMime::Message::Ptr::create(); 0039 msg->subject()->from7BitString("Some subject"); 0040 msg->setBody("This is the body now."); 0041 msg->assemble(); 0042 0043 const auto data = msg->encodedContent(); 0044 0045 auto summary = eventFbb.CreateString("summary"); 0046 auto mimeMessage = eventFbb.CreateString(data.constData(), data.length()); 0047 Sink::ApplicationDomain::Buffer::MailBuilder eventBuilder(eventFbb); 0048 eventBuilder.add_subject(summary); 0049 eventBuilder.add_messageId(summary); 0050 eventBuilder.add_mimeMessage(mimeMessage); 0051 Sink::ApplicationDomain::Buffer::FinishMailBuffer(eventFbb, eventBuilder.Finish()); 0052 } 0053 0054 flatbuffers::FlatBufferBuilder entityFbb; 0055 Sink::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize()); 0056 bufferSize = entityFbb.GetSize(); 0057 0058 flatbuffers::FlatBufferBuilder fbb; 0059 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Mail>().toStdString().data()); 0060 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 0061 Sink::Commands::CreateEntityBuilder builder(fbb); 0062 builder.add_domainType(type); 0063 builder.add_delta(delta); 0064 auto location = builder.Finish(); 0065 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 0066 0067 return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 0068 } 0069 0070 /** 0071 * Benchmark writing in the synchronizer process. 0072 */ 0073 class DummyResourceWriteBenchmark : public QObject 0074 { 0075 Q_OBJECT 0076 0077 QList<double> mRssGrowthPerEntity; 0078 QList<double> mTimePerEntity; 0079 QDateTime mTimeStamp{QDateTime::currentDateTimeUtc()}; 0080 0081 void writeInProcess(int num, const QDateTime ×tamp) 0082 { 0083 DummyResource::removeFromDisk("sink.dummy.instance1"); 0084 0085 QTime time; 0086 time.start(); 0087 DummyResource resource(Sink::ResourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}); 0088 0089 int bufferSize = 0; 0090 auto command = createEntityBuffer(0, bufferSize); 0091 0092 const auto startingRss = static_cast<double>(getCurrentRSS()); 0093 for (int i = 0; i < num; i++) { 0094 resource.processCommand(Sink::Commands::CreateEntityCommand, command); 0095 } 0096 auto appendTime = time.elapsed(); 0097 Q_UNUSED(appendTime); 0098 auto bufferSizeTotal = bufferSize * num; 0099 0100 // Wait until all messages have been processed 0101 resource.processAllMessages().exec().waitForFinished(); 0102 0103 auto allProcessedTime = time.elapsed(); 0104 0105 const auto finalRss = static_cast<double>(getCurrentRSS()); 0106 const auto rssGrowth = finalRss - startingRss; 0107 // Since the database is memory mapped it is attributted to the resident set size. 0108 const auto rssWithoutDb = finalRss - static_cast<double>(DummyResource::diskUsage("sink.dummy.instance1")); 0109 const auto peakRss = static_cast<double>(getPeakRSS()); 0110 // How much peak deviates from final rss in percent 0111 const auto percentageRssError = static_cast<double>(peakRss - finalRss) * 100.0 / static_cast<double>(finalRss); 0112 auto rssGrowthPerEntity = rssGrowth / num; 0113 std::cout << "Current Rss usage [kb]: " << finalRss / 1024 << std::endl; 0114 std::cout << "Peak Rss usage [kb]: " << peakRss / 1024 << std::endl; 0115 std::cout << "Rss growth [kb]: " << rssGrowth / 1024 << std::endl; 0116 std::cout << "Rss growth per entity [byte]: " << rssGrowthPerEntity << std::endl; 0117 std::cout << "Rss without db [kb]: " << rssWithoutDb / 1024 << std::endl; 0118 std::cout << "Percentage peak rss error: " << percentageRssError << std::endl; 0119 0120 auto onDisk = Sink::Storage::DataStore(Sink::storageLocation(), "sink.dummy.instance1", Sink::Storage::DataStore::ReadOnly).diskUsage(); 0121 auto writeAmplification = static_cast<double>(onDisk) / static_cast<double>(bufferSizeTotal); 0122 std::cout << "On disk [kb]: " << onDisk / 1024 << std::endl; 0123 std::cout << "Buffer size total [kb]: " << bufferSizeTotal / 1024 << std::endl; 0124 std::cout << "Write amplification: " << writeAmplification << std::endl; 0125 0126 0127 mTimePerEntity << static_cast<double>(allProcessedTime) / static_cast<double>(num); 0128 mRssGrowthPerEntity << rssGrowthPerEntity; 0129 0130 { 0131 HAWD::Dataset dataset("dummy_write_perf", m_hawdState); 0132 HAWD::Dataset::Row row = dataset.row(); 0133 row.setValue("rows", num); 0134 row.setValue("append", (qreal)num/appendTime); 0135 row.setValue("total", (qreal)num/allProcessedTime); 0136 row.setTimestamp(timestamp); 0137 dataset.insertRow(row); 0138 HAWD::Formatter::print(dataset); 0139 } 0140 0141 { 0142 HAWD::Dataset dataset("dummy_write_memory", m_hawdState); 0143 HAWD::Dataset::Row row = dataset.row(); 0144 row.setValue("rows", num); 0145 row.setValue("rss", QVariant::fromValue(finalRss / 1024)); 0146 row.setValue("peakRss", QVariant::fromValue(peakRss / 1024)); 0147 row.setValue("percentagePeakRssError", percentageRssError); 0148 row.setValue("rssGrowthPerEntity", QVariant::fromValue(rssGrowthPerEntity)); 0149 row.setValue("rssWithoutDb", rssWithoutDb / 1024); 0150 row.setTimestamp(timestamp); 0151 dataset.insertRow(row); 0152 HAWD::Formatter::print(dataset); 0153 } 0154 0155 { 0156 HAWD::Dataset dataset("dummy_write_disk", m_hawdState); 0157 HAWD::Dataset::Row row = dataset.row(); 0158 row.setValue("rows", num); 0159 row.setValue("onDisk", onDisk / 1024); 0160 row.setValue("bufferSize", bufferSizeTotal / 1024); 0161 row.setValue("writeAmplification", writeAmplification); 0162 row.setTimestamp(timestamp); 0163 dataset.insertRow(row); 0164 HAWD::Formatter::print(dataset); 0165 } 0166 0167 // Print memory layout, RSS is what is in memory 0168 // std::system("exec pmap -x \"$PPID\""); 0169 } 0170 0171 void testDiskUsage(int num) 0172 { 0173 auto resourceId = "testDiskUsage"; 0174 DummyResource::removeFromDisk(resourceId); 0175 0176 { 0177 DummyResource resource(Sink::ResourceContext{resourceId, "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}); 0178 0179 int bufferSize = 0; 0180 auto command = createEntityBuffer(1000, bufferSize); 0181 0182 for (int i = 0; i < num; i++) { 0183 resource.processCommand(Sink::Commands::CreateEntityCommand, command); 0184 } 0185 0186 // Wait until all messages have been processed 0187 resource.processAllMessages().exec().waitForFinished(); 0188 } 0189 0190 qint64 totalDbSizes = 0; 0191 qint64 totalKeysAndValues = 0; 0192 QMap<QByteArray, qint64> dbSizes; 0193 Sink::Storage::DataStore storage(Sink::storageLocation(), resourceId, Sink::Storage::DataStore::ReadOnly); 0194 auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); 0195 auto stat = transaction.stat(); 0196 0197 std::cout << "Free pages: " << stat.freePages << std::endl; 0198 std::cout << "Total pages: " << stat.totalPages << std::endl; 0199 auto totalUsedSize = stat.pageSize * (stat.totalPages - stat.freePages); 0200 std::cout << "Used size: " << totalUsedSize << std::endl; 0201 0202 auto freeDbSize = stat.pageSize * (stat.freeDbStat.leafPages + stat.freeDbStat.overflowPages + stat.freeDbStat.branchPages); 0203 std::cout << "Free db size: " << freeDbSize << std::endl; 0204 auto mainDbSize = stat.pageSize * (stat.mainDbStat.leafPages + stat.mainDbStat.overflowPages + stat.mainDbStat.branchPages); 0205 std::cout << "Main db size: " << mainDbSize << std::endl; 0206 0207 totalDbSizes += mainDbSize; 0208 QList<QByteArray> databases = transaction.getDatabaseNames(); 0209 for (const auto &databaseName : databases) { 0210 auto db = transaction.openDatabase(databaseName); 0211 const auto size = db.getSize(); 0212 dbSizes.insert(databaseName, size); 0213 totalDbSizes += size; 0214 0215 qint64 keySizes = 0; 0216 qint64 valueSizes = 0; 0217 db.scan({}, [&] (const QByteArray &key, const QByteArray &data) { 0218 keySizes += key.size(); 0219 valueSizes += data.size(); 0220 return true; 0221 }, 0222 [&](const Sink::Storage::DataStore::Error &e) { 0223 qWarning() << "Error while reading" << e; 0224 }, 0225 false); 0226 0227 auto s = db.stat(); 0228 auto usedPages = (s.leafPages + s.branchPages + s.overflowPages); 0229 0230 std::cout << std::endl; 0231 std::cout << "Db: " << databaseName.toStdString() << (db.allowsDuplicates() ? " DUP" : "") << std::endl; 0232 std::cout << "Used pages " << usedPages << std::endl; 0233 std::cout << "Used size " << static_cast<double>(keySizes + valueSizes) / 4096.0 << std::endl; 0234 std::cout << "Entries " << s.numEntries << std::endl; 0235 totalKeysAndValues += (keySizes + valueSizes); 0236 } 0237 std::cout << std::endl; 0238 0239 auto mainStoreOnDisk = Sink::Storage::DataStore(Sink::storageLocation(), resourceId, Sink::Storage::DataStore::ReadOnly).diskUsage(); 0240 auto totalOnDisk = DummyResource::diskUsage(resourceId); 0241 std::cout << "Calculated key + value size: " << totalKeysAndValues << std::endl; 0242 std::cout << "Calculated total db sizes: " << totalDbSizes << std::endl; 0243 std::cout << "Main store on disk: " << mainStoreOnDisk << std::endl; 0244 std::cout << "Total on disk: " << totalOnDisk << std::endl; 0245 std::cout << "Used size amplification: " << static_cast<double>(totalUsedSize) / static_cast<double>(totalKeysAndValues) << std::endl; 0246 std::cout << "Write amplification: " << static_cast<double>(mainStoreOnDisk) / static_cast<double>(totalKeysAndValues) << std::endl; 0247 std::cout << std::endl; 0248 } 0249 0250 private slots: 0251 void initTestCase() 0252 { 0253 Sink::Log::setDebugOutputLevel(Sink::Log::Warning); 0254 auto factory = Sink::ResourceFactory::load("sink.dummy"); 0255 QVERIFY(factory); 0256 } 0257 0258 void cleanup() 0259 { 0260 } 0261 0262 void runBenchmarks() 0263 { 0264 writeInProcess(5000, mTimeStamp); 0265 } 0266 0267 void ensureUsedMemoryRemainsStable() 0268 { 0269 auto rssStandardDeviation = sqrt(variance(mRssGrowthPerEntity)); 0270 auto timeStandardDeviation = sqrt(variance(mTimePerEntity)); 0271 HAWD::Dataset dataset("dummy_write_summary", m_hawdState); 0272 HAWD::Dataset::Row row = dataset.row(); 0273 row.setValue("rssStandardDeviation", rssStandardDeviation); 0274 row.setValue("rssMaxDifference", maxDifference(mRssGrowthPerEntity)); 0275 row.setValue("timeStandardDeviation", timeStandardDeviation); 0276 row.setValue("timeMaxDifference", maxDifference(mTimePerEntity)); 0277 row.setTimestamp(mTimeStamp); 0278 dataset.insertRow(row); 0279 HAWD::Formatter::print(dataset); 0280 } 0281 0282 void testDiskUsage() 0283 { 0284 testDiskUsage(1000); 0285 } 0286 0287 // This allows to run individual parts without doing a cleanup, but still cleaning up normally 0288 void testCleanupForCompleteTest() 0289 { 0290 DummyResource::removeFromDisk("sink.dummy.instance1"); 0291 } 0292 0293 private: 0294 HAWD::State m_hawdState; 0295 }; 0296 0297 QTEST_MAIN(DummyResourceWriteBenchmark) 0298 #include "dummyresourcewritebenchmark.moc"