Warning, file /pim/sink/tests/pipelinetest.cpp was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #include <QTest>
0002 
0003 #include <QString>
0004 
0005 #include "testimplementations.h"
0006 
0007 #include "event_generated.h"
0008 #include "entity_generated.h"
0009 #include "metadata_generated.h"
0010 #include "createentity_generated.h"
0011 #include "modifyentity_generated.h"
0012 #include "deleteentity_generated.h"
0013 #include "dummyresource/resourcefactory.h"
0014 #include "store.h"
0015 #include "commands.h"
0016 #include "entitybuffer.h"
0017 #include "resourceconfig.h"
0018 #include "pipeline.h"
0019 #include "log.h"
0020 #include "domainadaptor.h"
0021 #include "definitions.h"
0022 #include "adaptorfactoryregistry.h"
0023 #include "storage/key.h"
0024 
0025 static void removeFromDisk(const QString &name)
0026 {
0027     Sink::Storage::DataStore store(Sink::Store::storageLocation(), name, Sink::Storage::DataStore::ReadWrite);
0028     store.removeFromDisk();
0029 }
0030 
0031 static QList<Sink::Storage::Key> getKeys(const QByteArray &dbEnv, const QByteArray &name)
0032 {
0033     Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly);
0034     auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly);
0035     auto db = transaction.openDatabase(name, nullptr, Sink::Storage::IntegerKeys);
0036     QList<Sink::Storage::Key> result;
0037     db.scan("", [&](const QByteArray &key, const QByteArray &value) {
0038         size_t revision = Sink::byteArrayToSizeT(key);
0039         result << Sink::Storage::Key(Sink::Storage::DataStore::getUidFromRevision(transaction, revision), revision);
0040         return true;
0041     });
0042     return result;
0043 }
0044 
0045 static QByteArray getEntity(const QByteArray &dbEnv, const QByteArray &name, const Sink::Storage::Key &key)
0046 {
0047     Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly);
0048     auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly);
0049     auto db = transaction.openDatabase(name, nullptr, Sink::Storage::IntegerKeys);
0050     QByteArray result;
0051     db.scan(key.revision().toSizeT(), [&](size_t rev, const QByteArray &value) {
0052         result = value;
0053         return true;
0054     });
0055     return result;
0056 }
0057 
0058 flatbuffers::FlatBufferBuilder &createEvent(flatbuffers::FlatBufferBuilder &entityFbb, const QString &s = QString("summary"), const QString &d = QString())
0059 {
0060     flatbuffers::FlatBufferBuilder eventFbb;
0061     eventFbb.Clear();
0062     {
0063         Sink::ApplicationDomain::Buffer::EventBuilder eventBuilder(eventFbb);
0064         auto eventLocation = eventBuilder.Finish();
0065         Sink::ApplicationDomain::Buffer::FinishEventBuffer(eventFbb, eventLocation);
0066     }
0067 
0068     flatbuffers::FlatBufferBuilder localFbb;
0069     {
0070         auto uid = localFbb.CreateString("testuid");
0071         auto summary = localFbb.CreateString(s.toStdString());
0072         auto description = localFbb.CreateString(d.toStdString());
0073         auto localBuilder = Sink::ApplicationDomain::Buffer::EventBuilder(localFbb);
0074         localBuilder.add_uid(uid);
0075         localBuilder.add_summary(summary);
0076         if (!d.isEmpty()) {
0077             localBuilder.add_description(description);
0078         }
0079         auto location = localBuilder.Finish();
0080         Sink::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, location);
0081     }
0082 
0083     Sink::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize());
0084     return entityFbb;
0085 }
0086 
0087 QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb)
0088 {
0089     flatbuffers::FlatBufferBuilder fbb;
0090     auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data());
0091     auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
0092     Sink::Commands::CreateEntityBuilder builder(fbb);
0093     builder.add_domainType(type);
0094     builder.add_delta(delta);
0095     auto location = builder.Finish();
0096     Sink::Commands::FinishCreateEntityBuffer(fbb, location);
0097 
0098     const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
0099     {
0100         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
0101         Q_ASSERT(Sink::Commands::VerifyCreateEntityBuffer(verifyer));
0102     }
0103     return command;
0104 }
0105 
0106 QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true)
0107 {
0108     flatbuffers::FlatBufferBuilder fbb;
0109     auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data());
0110     auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
0111     std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector;
0112     for (const auto &modified : modifiedProperties) {
0113         modifiedVector.push_back(fbb.CreateString(modified.toStdString()));
0114     }
0115     auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
0116     auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector);
0117     Sink::Commands::ModifyEntityBuilder builder(fbb);
0118     builder.add_domainType(type);
0119     builder.add_delta(delta);
0120     builder.add_revision(revision);
0121     builder.add_entityId(id);
0122     builder.add_modifiedProperties(modifiedPropertiesVector);
0123     builder.add_replayToSource(replayToSource);
0124 
0125     auto location = builder.Finish();
0126     Sink::Commands::FinishModifyEntityBuffer(fbb, location);
0127 
0128     const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
0129     {
0130         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
0131         Q_ASSERT(Sink::Commands::VerifyModifyEntityBuffer(verifyer));
0132     }
0133     return command;
0134 }
0135 
0136 QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision)
0137 {
0138     flatbuffers::FlatBufferBuilder fbb;
0139     auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data());
0140     auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
0141     Sink::Commands::DeleteEntityBuilder builder(fbb);
0142     builder.add_domainType(type);
0143     builder.add_revision(revision);
0144     builder.add_entityId(id);
0145     auto location = builder.Finish();
0146     Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
0147 
0148     const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
0149     {
0150         flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
0151         Q_ASSERT(Sink::Commands::VerifyDeleteEntityBuffer(verifyer));
0152     }
0153     return command;
0154 }
0155 
0156 class TestProcessor : public Sink::Preprocessor
0157 {
0158 public:
0159     void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
0160     {
0161         newUids << newEntity.identifier();
0162         newRevisions << newEntity.revision();
0163     }
0164 
0165     void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
0166     {
0167         modifiedUids << newEntity.identifier();
0168         modifiedRevisions << newEntity.revision();
0169     }
0170 
0171     void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE
0172     {
0173         deletedUids << oldEntity.identifier();
0174         deletedRevisions << oldEntity.revision();
0175         deletedSummaries << oldEntity.getProperty("summary").toByteArray();
0176     }
0177 
0178     QList<QByteArray> newUids;
0179     QList<qint64> newRevisions;
0180     QList<QByteArray> modifiedUids;
0181     QList<qint64> modifiedRevisions;
0182     QList<QByteArray> deletedUids;
0183     QList<qint64> deletedRevisions;
0184     QList<QByteArray> deletedSummaries;
0185 };
0186 
0187 /**
0188  * Test of the pipeline implementation to ensure new revisions are created correctly in the database.
0189  */
0190 class PipelineTest : public QObject
0191 {
0192     Q_OBJECT
0193 
0194     QByteArray instanceIdentifier()
0195     {
0196         return "pipelinetest.instance1";
0197     }
0198 
0199     Sink::ResourceContext getContext()
0200     {
0201         return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")};
0202     }
0203 
0204 private slots:
0205     void initTestCase()
0206     {
0207         Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Event, TestEventAdaptorFactory>("test");
0208     }
0209 
0210     void init()
0211     {
0212         removeFromDisk(instanceIdentifier());
0213     }
0214 
0215     void testCreate()
0216     {
0217         flatbuffers::FlatBufferBuilder entityFbb;
0218         auto command = createEntityCommand(createEvent(entityFbb));
0219 
0220         Sink::Pipeline pipeline(getContext(), {"test"});
0221 
0222         pipeline.startTransaction();
0223         pipeline.newEntity(command.constData(), command.size()).exec();
0224         pipeline.commit();
0225 
0226         auto result = getKeys(instanceIdentifier(), "event.main");
0227         qDebug() << result;
0228         QCOMPARE(result.size(), 1);
0229 
0230         auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
0231         auto buffer = getEntity(instanceIdentifier(), "event.main", result.first());
0232         QVERIFY(!buffer.isEmpty());
0233         Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
0234         auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
0235         QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary"), "The modification isn't applied.");
0236     }
0237 
0238     void testModify()
0239     {
0240         flatbuffers::FlatBufferBuilder entityFbb;
0241         auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
0242 
0243         Sink::Pipeline pipeline(getContext(), {"test"});
0244 
0245         auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
0246 
0247         // Create the initial revision
0248         pipeline.startTransaction();
0249         pipeline.newEntity(command.constData(), command.size()).exec();
0250         pipeline.commit();
0251 
0252         // Get uid of written entity
0253         auto keys = getKeys(instanceIdentifier(), "event.main");
0254         QCOMPARE(keys.size(), 1);
0255         auto key = keys.first();
0256         const auto uid = key.identifier().toDisplayByteArray();
0257 
0258         // Execute the modification
0259         entityFbb.Clear();
0260         auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1);
0261         pipeline.startTransaction();
0262         pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0263         pipeline.commit();
0264 
0265         key.setRevision(2);
0266 
0267         // Ensure we've got the new revision with the modification
0268         auto buffer = getEntity(instanceIdentifier(), "event.main", key);
0269         QVERIFY(!buffer.isEmpty());
0270         Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
0271         auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
0272         QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary2"), "The modification isn't applied.");
0273         // Ensure we didn't modify anything else
0274         QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects.");
0275 
0276         // Both revisions are in the store at this point
0277         QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
0278 
0279         // Cleanup old revisions
0280         pipeline.cleanupRevisions(2);
0281 
0282         // And now only the latest revision is left
0283         QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1);
0284     }
0285 
0286     void testModifyWithUnrelatedOperationInbetween()
0287     {
0288         flatbuffers::FlatBufferBuilder entityFbb;
0289         auto command = createEntityCommand(createEvent(entityFbb));
0290 
0291         Sink::Pipeline pipeline(getContext(), {"test"});
0292 
0293         auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
0294 
0295         // Create the initial revision
0296         pipeline.startTransaction();
0297         pipeline.newEntity(command.constData(), command.size()).exec();
0298         pipeline.commit();
0299 
0300         // Get uid of written entity
0301         auto keys = getKeys(instanceIdentifier(), "event.main");
0302         QCOMPARE(keys.size(), 1);
0303         auto key = keys.first();
0304         const auto uid = key.identifier().toDisplayByteArray();
0305 
0306 
0307         // Create another operation inbetween
0308         {
0309             entityFbb.Clear();
0310             auto command = createEntityCommand(createEvent(entityFbb));
0311             pipeline.startTransaction();
0312             pipeline.newEntity(command.constData(), command.size()).exec();
0313             pipeline.commit();
0314         }
0315 
0316         // Execute the modification on revision 2
0317         entityFbb.Clear();
0318         auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 2);
0319         pipeline.startTransaction();
0320         pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0321         pipeline.commit();
0322 
0323         key.setRevision(3);
0324 
0325         // Ensure we've got the new revision with the modification
0326         auto buffer = getEntity(instanceIdentifier(), "event.main", key);
0327         QVERIFY(!buffer.isEmpty());
0328         Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
0329         auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
0330         QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2"));
0331     }
0332 
0333     void testDelete()
0334     {
0335         flatbuffers::FlatBufferBuilder entityFbb;
0336         auto command = createEntityCommand(createEvent(entityFbb));
0337         Sink::Pipeline pipeline(getContext(), {"test"});
0338 
0339         // Create the initial revision
0340         pipeline.startTransaction();
0341         pipeline.newEntity(command.constData(), command.size()).exec();
0342         pipeline.commit();
0343 
0344         auto result = getKeys(instanceIdentifier(), "event.main");
0345         QCOMPARE(result.size(), 1);
0346 
0347         const auto uid = result.first().identifier().toDisplayByteArray();
0348 
0349         // Delete entity
0350         auto deleteCommand = deleteEntityCommand(uid, 1);
0351         pipeline.startTransaction();
0352         pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec();
0353         pipeline.commit();
0354 
0355         // We have a new revision that indicates the deletion
0356         QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
0357 
0358         // Cleanup old revisions
0359         pipeline.cleanupRevisions(2);
0360 
0361         // And all revisions are gone
0362         QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0);
0363     }
0364 
0365     void testPreprocessor()
0366     {
0367         flatbuffers::FlatBufferBuilder entityFbb;
0368 
0369         auto testProcessor = new TestProcessor;
0370 
0371         Sink::Pipeline pipeline(getContext(), {"test"});
0372         pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor);
0373         pipeline.startTransaction();
0374         // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());
0375 
0376         // Actual test
0377         {
0378             auto command = createEntityCommand(createEvent(entityFbb));
0379             pipeline.newEntity(command.constData(), command.size()).exec();
0380             QCOMPARE(testProcessor->newUids.size(), 1);
0381             QCOMPARE(testProcessor->newRevisions.size(), 1);
0382             const auto uid = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->newUids.at(0)).toDisplayByteArray();
0383             QCOMPARE(testProcessor->newUids.at(0), uid);
0384         }
0385         pipeline.commit();
0386         entityFbb.Clear();
0387         pipeline.startTransaction();
0388         auto keys = getKeys(instanceIdentifier(), "event.main");
0389         QCOMPARE(keys.size(), 1);
0390         const auto uid = keys.first().identifier().toDisplayByteArray();
0391         {
0392             auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1);
0393             pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0394             QCOMPARE(testProcessor->modifiedUids.size(), 1);
0395             QCOMPARE(testProcessor->modifiedRevisions.size(), 1);
0396             const auto uid2 = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->modifiedUids.at(0)).toDisplayByteArray();
0397             QCOMPARE(testProcessor->modifiedUids.at(0), uid2);
0398         }
0399         pipeline.commit();
0400         entityFbb.Clear();
0401         pipeline.startTransaction();
0402         {
0403             auto deleteCommand = deleteEntityCommand(uid, 1);
0404             pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec();
0405             QCOMPARE(testProcessor->deletedUids.size(), 1);
0406             QCOMPARE(testProcessor->deletedUids.size(), 1);
0407             QCOMPARE(testProcessor->deletedSummaries.size(), 1);
0408             const auto uid2 = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->modifiedUids.at(0)).toDisplayByteArray();
0409             QCOMPARE(testProcessor->deletedUids.at(0), uid2);
0410             QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2"));
0411         }
0412     }
0413 
0414     void testModifyWithConflict()
0415     {
0416         flatbuffers::FlatBufferBuilder entityFbb;
0417         auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
0418 
0419         Sink::Pipeline pipeline(getContext(), {"test"});
0420 
0421         auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
0422 
0423         // Create the initial revision
0424         pipeline.startTransaction();
0425         pipeline.newEntity(command.constData(), command.size()).exec();
0426         pipeline.commit();
0427 
0428         // Get uid of written entity
0429         auto keys = getKeys(instanceIdentifier(), "event.main");
0430         QCOMPARE(keys.size(), 1);
0431         auto key = keys.first();
0432         const auto uid = key.identifier().toDisplayByteArray();
0433 
0434         //Simulate local modification
0435         {
0436             entityFbb.Clear();
0437             auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true);
0438             pipeline.startTransaction();
0439             pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0440             pipeline.commit();
0441         }
0442 
0443 
0444         //Simulate remote modification
0445         //We assume the remote modification is not overly smart and always marks all properties as changed.
0446         {
0447             entityFbb.Clear();
0448             auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false);
0449             pipeline.startTransaction();
0450             pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0451             pipeline.commit();
0452         }
0453 
0454         key.setRevision(3);
0455 
0456         // Ensure we've got the new revision with the modification
0457         auto buffer = getEntity(instanceIdentifier(), "event.main", key);
0458         QVERIFY(!buffer.isEmpty());
0459         Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
0460         auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
0461         QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted.");
0462         QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied.");
0463     }
0464 
0465     void testModifyDeleted()
0466     {
0467         flatbuffers::FlatBufferBuilder entityFbb;
0468         auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
0469 
0470         Sink::Pipeline pipeline(getContext(), {"test"});
0471 
0472         auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
0473 
0474         // Create the initial revision
0475         pipeline.startTransaction();
0476         pipeline.newEntity(command.constData(), command.size()).exec();
0477         pipeline.commit();
0478 
0479         // Get uid of written entity
0480         auto keys = getKeys(instanceIdentifier(), "event.main");
0481         QCOMPARE(keys.size(), 1);
0482         auto key = keys.first();
0483         const auto uid = key.identifier().toDisplayByteArray();
0484 
0485         {
0486             auto deleteCommand = deleteEntityCommand(uid, 1);
0487             pipeline.startTransaction();
0488             pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec();
0489             pipeline.commit();
0490         }
0491 
0492         {
0493             entityFbb.Clear();
0494             auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1);
0495             pipeline.startTransaction();
0496             auto future = pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec();
0497             future.waitForFinished();
0498             QVERIFY(future.errorCode());
0499         }
0500     }
0501 };
0502 
0503 QTEST_MAIN(PipelineTest)
0504 #include "pipelinetest.moc"