File indexing completed on 2024-05-12 05:26:22

0001 #include <QTest>
0002 
0003 #include <QString>
0004 #include <QDateTime>
0005 
0006 #include <iostream>
0007 
0008 #include "dummyresource/resourcefactory.h"
0009 #include "store.h"
0010 #include "commands.h"
0011 #include "entitybuffer.h"
0012 #include "log.h"
0013 #include "resourceconfig.h"
0014 #include "definitions.h"
0015 #include "facadefactory.h"
0016 #include "adaptorfactoryregistry.h"
0017 
0018 #include "hawd/dataset.h"
0019 #include "hawd/formatter.h"
0020 
0021 #include "event_generated.h"
0022 #include "mail_generated.h"
0023 #include "entity_generated.h"
0024 #include "metadata_generated.h"
0025 #include "createentity_generated.h"
0026 
0027 #include "getrssusage.h"
0028 #include "utils.h"
0029 
0030 #include <KMime/Message>
0031 
0032 static QByteArray createEntityBuffer(size_t attachmentSize, int &bufferSize)
0033 {
0034     flatbuffers::FlatBufferBuilder eventFbb;
0035     eventFbb.Clear();
0036     {
0037 
0038         auto msg = KMime::Message::Ptr::create();
0039         msg->subject()->from7BitString("Some subject");
0040         msg->setBody("This is the body now.");
0041         msg->assemble();
0042 
0043         const auto data = msg->encodedContent();
0044 
0045         auto summary = eventFbb.CreateString("summary");
0046         auto mimeMessage = eventFbb.CreateString(data.constData(), data.length());
0047         Sink::ApplicationDomain::Buffer::MailBuilder eventBuilder(eventFbb);
0048         eventBuilder.add_subject(summary);
0049         eventBuilder.add_messageId(summary);
0050         eventBuilder.add_mimeMessage(mimeMessage);
0051         Sink::ApplicationDomain::Buffer::FinishMailBuffer(eventFbb, eventBuilder.Finish());
0052     }
0053 
0054     flatbuffers::FlatBufferBuilder entityFbb;
0055     Sink::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize());
0056     bufferSize = entityFbb.GetSize();
0057 
0058     flatbuffers::FlatBufferBuilder fbb;
0059     auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Mail>().toStdString().data());
0060     auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
0061     Sink::Commands::CreateEntityBuilder builder(fbb);
0062     builder.add_domainType(type);
0063     builder.add_delta(delta);
0064     auto location = builder.Finish();
0065     Sink::Commands::FinishCreateEntityBuffer(fbb, location);
0066 
0067     return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
0068 }
0069 
0070 /**
0071  * Benchmark writing in the synchronizer process.
0072  */
0073 class DummyResourceWriteBenchmark : public QObject
0074 {
0075     Q_OBJECT
0076 
0077     QList<double> mRssGrowthPerEntity;
0078     QList<double> mTimePerEntity;
0079     QDateTime mTimeStamp{QDateTime::currentDateTimeUtc()};
0080 
0081     void writeInProcess(int num, const QDateTime &timestamp)
0082     {
0083         DummyResource::removeFromDisk("sink.dummy.instance1");
0084 
0085         QTime time;
0086         time.start();
0087         DummyResource resource(Sink::ResourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")});
0088 
0089         int bufferSize = 0;
0090         auto command = createEntityBuffer(0, bufferSize);
0091 
0092         const auto startingRss = static_cast<double>(getCurrentRSS());
0093         for (int i = 0; i < num; i++) {
0094             resource.processCommand(Sink::Commands::CreateEntityCommand, command);
0095         }
0096         auto appendTime = time.elapsed();
0097         Q_UNUSED(appendTime);
0098         auto bufferSizeTotal = bufferSize * num;
0099 
0100         // Wait until all messages have been processed
0101         resource.processAllMessages().exec().waitForFinished();
0102 
0103         auto allProcessedTime = time.elapsed();
0104 
0105         const auto finalRss = static_cast<double>(getCurrentRSS());
0106         const auto rssGrowth = finalRss - startingRss;
0107         // Since the database is memory mapped it is attributted to the resident set size.
0108         const auto rssWithoutDb = finalRss - static_cast<double>(DummyResource::diskUsage("sink.dummy.instance1"));
0109         const auto peakRss = static_cast<double>(getPeakRSS());
0110         // How much peak deviates from final rss in percent
0111         const auto percentageRssError = static_cast<double>(peakRss - finalRss) * 100.0 / static_cast<double>(finalRss);
0112         auto rssGrowthPerEntity = rssGrowth / num;
0113         std::cout << "Current Rss usage [kb]: " << finalRss / 1024 << std::endl;
0114         std::cout << "Peak Rss usage [kb]: " << peakRss / 1024 << std::endl;
0115         std::cout << "Rss growth [kb]: " << rssGrowth / 1024 << std::endl;
0116         std::cout << "Rss growth per entity [byte]: " << rssGrowthPerEntity << std::endl;
0117         std::cout << "Rss without db [kb]: " << rssWithoutDb / 1024 << std::endl;
0118         std::cout << "Percentage peak rss error: " << percentageRssError << std::endl;
0119 
0120         auto onDisk = Sink::Storage::DataStore(Sink::storageLocation(), "sink.dummy.instance1", Sink::Storage::DataStore::ReadOnly).diskUsage();
0121         auto writeAmplification = static_cast<double>(onDisk) / static_cast<double>(bufferSizeTotal);
0122         std::cout << "On disk [kb]: " << onDisk / 1024 << std::endl;
0123         std::cout << "Buffer size total [kb]: " << bufferSizeTotal / 1024 << std::endl;
0124         std::cout << "Write amplification: " << writeAmplification << std::endl;
0125 
0126 
0127         mTimePerEntity << static_cast<double>(allProcessedTime) / static_cast<double>(num);
0128         mRssGrowthPerEntity << rssGrowthPerEntity;
0129 
0130         {
0131             HAWD::Dataset dataset("dummy_write_perf", m_hawdState);
0132             HAWD::Dataset::Row row = dataset.row();
0133             row.setValue("rows", num);
0134             row.setValue("append", (qreal)num/appendTime);
0135             row.setValue("total", (qreal)num/allProcessedTime);
0136             row.setTimestamp(timestamp);
0137             dataset.insertRow(row);
0138             HAWD::Formatter::print(dataset);
0139         }
0140 
0141         {
0142             HAWD::Dataset dataset("dummy_write_memory", m_hawdState);
0143             HAWD::Dataset::Row row = dataset.row();
0144             row.setValue("rows", num);
0145             row.setValue("rss", QVariant::fromValue(finalRss / 1024));
0146             row.setValue("peakRss", QVariant::fromValue(peakRss / 1024));
0147             row.setValue("percentagePeakRssError", percentageRssError);
0148             row.setValue("rssGrowthPerEntity", QVariant::fromValue(rssGrowthPerEntity));
0149             row.setValue("rssWithoutDb", rssWithoutDb / 1024);
0150             row.setTimestamp(timestamp);
0151             dataset.insertRow(row);
0152             HAWD::Formatter::print(dataset);
0153         }
0154 
0155         {
0156             HAWD::Dataset dataset("dummy_write_disk", m_hawdState);
0157             HAWD::Dataset::Row row = dataset.row();
0158             row.setValue("rows", num);
0159             row.setValue("onDisk", onDisk / 1024);
0160             row.setValue("bufferSize", bufferSizeTotal / 1024);
0161             row.setValue("writeAmplification", writeAmplification);
0162             row.setTimestamp(timestamp);
0163             dataset.insertRow(row);
0164             HAWD::Formatter::print(dataset);
0165         }
0166 
0167         // Print memory layout, RSS is what is in memory
0168         // std::system("exec pmap -x \"$PPID\"");
0169     }
0170 
0171     void testDiskUsage(int num)
0172     {
0173         auto resourceId = "testDiskUsage";
0174         DummyResource::removeFromDisk(resourceId);
0175 
0176         {
0177             DummyResource resource(Sink::ResourceContext{resourceId, "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")});
0178 
0179             int bufferSize = 0;
0180             auto command = createEntityBuffer(1000, bufferSize);
0181 
0182             for (int i = 0; i < num; i++) {
0183                 resource.processCommand(Sink::Commands::CreateEntityCommand, command);
0184             }
0185 
0186             // Wait until all messages have been processed
0187             resource.processAllMessages().exec().waitForFinished();
0188         }
0189 
0190         qint64 totalDbSizes = 0;
0191         qint64 totalKeysAndValues = 0;
0192         QMap<QByteArray, qint64> dbSizes;
0193         Sink::Storage::DataStore storage(Sink::storageLocation(), resourceId, Sink::Storage::DataStore::ReadOnly);
0194         auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly);
0195         auto stat = transaction.stat();
0196 
0197         std::cout << "Free pages: " << stat.freePages << std::endl;
0198         std::cout << "Total pages: " << stat.totalPages << std::endl;
0199         auto totalUsedSize = stat.pageSize * (stat.totalPages - stat.freePages);
0200         std::cout << "Used size: " << totalUsedSize << std::endl;
0201 
0202         auto freeDbSize = stat.pageSize * (stat.freeDbStat.leafPages + stat.freeDbStat.overflowPages + stat.freeDbStat.branchPages);
0203         std::cout << "Free db size: " << freeDbSize << std::endl;
0204         auto mainDbSize = stat.pageSize * (stat.mainDbStat.leafPages + stat.mainDbStat.overflowPages + stat.mainDbStat.branchPages);
0205         std::cout << "Main db size: " << mainDbSize << std::endl;
0206 
0207         totalDbSizes += mainDbSize;
0208         QList<QByteArray> databases = transaction.getDatabaseNames();
0209         for (const auto &databaseName : databases) {
0210             auto db = transaction.openDatabase(databaseName);
0211             const auto size = db.getSize();
0212             dbSizes.insert(databaseName, size);
0213             totalDbSizes += size;
0214 
0215             qint64 keySizes = 0;
0216             qint64 valueSizes = 0;
0217             db.scan({}, [&] (const QByteArray &key, const QByteArray &data) {
0218                     keySizes += key.size();
0219                     valueSizes += data.size();
0220                     return true;
0221                 },
0222                 [&](const Sink::Storage::DataStore::Error &e) {
0223                     qWarning() << "Error while reading" << e;
0224                 },
0225                 false);
0226 
0227             auto s = db.stat();
0228             auto usedPages = (s.leafPages + s.branchPages + s.overflowPages);
0229 
0230             std::cout << std::endl;
0231             std::cout << "Db: " << databaseName.toStdString() << (db.allowsDuplicates() ? " DUP" : "") << std::endl;
0232             std::cout << "Used pages " << usedPages << std::endl;
0233             std::cout << "Used size " << static_cast<double>(keySizes + valueSizes) / 4096.0 << std::endl;
0234             std::cout << "Entries " << s.numEntries << std::endl;
0235             totalKeysAndValues += (keySizes + valueSizes);
0236         }
0237         std::cout << std::endl;
0238 
0239         auto mainStoreOnDisk = Sink::Storage::DataStore(Sink::storageLocation(), resourceId, Sink::Storage::DataStore::ReadOnly).diskUsage();
0240         auto totalOnDisk = DummyResource::diskUsage(resourceId);
0241         std::cout << "Calculated key + value size: " << totalKeysAndValues << std::endl;
0242         std::cout << "Calculated total db sizes: " << totalDbSizes << std::endl;
0243         std::cout << "Main store on disk: " << mainStoreOnDisk << std::endl;
0244         std::cout << "Total on disk: " << totalOnDisk << std::endl;
0245         std::cout << "Used size amplification: " << static_cast<double>(totalUsedSize) / static_cast<double>(totalKeysAndValues) << std::endl;
0246         std::cout << "Write amplification: " << static_cast<double>(mainStoreOnDisk) / static_cast<double>(totalKeysAndValues) << std::endl;
0247         std::cout << std::endl;
0248     }
0249 
0250 private slots:
0251     void initTestCase()
0252     {
0253         Sink::Log::setDebugOutputLevel(Sink::Log::Warning);
0254         auto factory = Sink::ResourceFactory::load("sink.dummy");
0255         QVERIFY(factory);
0256     }
0257 
0258     void cleanup()
0259     {
0260     }
0261 
0262     void runBenchmarks()
0263     {
0264         writeInProcess(5000, mTimeStamp);
0265     }
0266 
0267     void ensureUsedMemoryRemainsStable()
0268     {
0269         auto rssStandardDeviation = sqrt(variance(mRssGrowthPerEntity));
0270         auto timeStandardDeviation = sqrt(variance(mTimePerEntity));
0271         HAWD::Dataset dataset("dummy_write_summary", m_hawdState);
0272         HAWD::Dataset::Row row = dataset.row();
0273         row.setValue("rssStandardDeviation", rssStandardDeviation);
0274         row.setValue("rssMaxDifference", maxDifference(mRssGrowthPerEntity));
0275         row.setValue("timeStandardDeviation", timeStandardDeviation);
0276         row.setValue("timeMaxDifference", maxDifference(mTimePerEntity));
0277         row.setTimestamp(mTimeStamp);
0278         dataset.insertRow(row);
0279         HAWD::Formatter::print(dataset);
0280     }
0281 
0282     void testDiskUsage()
0283     {
0284         testDiskUsage(1000);
0285     }
0286 
0287     // This allows to run individual parts without doing a cleanup, but still cleaning up normally
0288     void testCleanupForCompleteTest()
0289     {
0290         DummyResource::removeFromDisk("sink.dummy.instance1");
0291     }
0292 
0293 private:
0294     HAWD::State m_hawdState;
0295 };
0296 
0297 QTEST_MAIN(DummyResourceWriteBenchmark)
0298 #include "dummyresourcewritebenchmark.moc"