Warning, file /pim/sink/tests/pipelinetest.cpp was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 #include <QTest> 0002 0003 #include <QString> 0004 0005 #include "testimplementations.h" 0006 0007 #include "event_generated.h" 0008 #include "entity_generated.h" 0009 #include "metadata_generated.h" 0010 #include "createentity_generated.h" 0011 #include "modifyentity_generated.h" 0012 #include "deleteentity_generated.h" 0013 #include "dummyresource/resourcefactory.h" 0014 #include "store.h" 0015 #include "commands.h" 0016 #include "entitybuffer.h" 0017 #include "resourceconfig.h" 0018 #include "pipeline.h" 0019 #include "log.h" 0020 #include "domainadaptor.h" 0021 #include "definitions.h" 0022 #include "adaptorfactoryregistry.h" 0023 #include "storage/key.h" 0024 0025 static void removeFromDisk(const QString &name) 0026 { 0027 Sink::Storage::DataStore store(Sink::Store::storageLocation(), name, Sink::Storage::DataStore::ReadWrite); 0028 store.removeFromDisk(); 0029 } 0030 0031 static QList<Sink::Storage::Key> getKeys(const QByteArray &dbEnv, const QByteArray &name) 0032 { 0033 Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly); 0034 auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); 0035 auto db = transaction.openDatabase(name, nullptr, Sink::Storage::IntegerKeys); 0036 QList<Sink::Storage::Key> result; 0037 db.scan("", [&](const QByteArray &key, const QByteArray &value) { 0038 size_t revision = Sink::byteArrayToSizeT(key); 0039 result << Sink::Storage::Key(Sink::Storage::DataStore::getUidFromRevision(transaction, revision), revision); 0040 return true; 0041 }); 0042 return result; 0043 } 0044 0045 static QByteArray getEntity(const QByteArray &dbEnv, const QByteArray &name, const Sink::Storage::Key &key) 0046 { 0047 Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly); 0048 auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); 0049 auto db = transaction.openDatabase(name, nullptr, Sink::Storage::IntegerKeys); 0050 QByteArray result; 0051 db.scan(key.revision().toSizeT(), [&](size_t rev, const QByteArray &value) { 0052 result = value; 0053 return true; 0054 }); 0055 return result; 0056 } 0057 0058 flatbuffers::FlatBufferBuilder &createEvent(flatbuffers::FlatBufferBuilder &entityFbb, const QString &s = QString("summary"), const QString &d = QString()) 0059 { 0060 flatbuffers::FlatBufferBuilder eventFbb; 0061 eventFbb.Clear(); 0062 { 0063 Sink::ApplicationDomain::Buffer::EventBuilder eventBuilder(eventFbb); 0064 auto eventLocation = eventBuilder.Finish(); 0065 Sink::ApplicationDomain::Buffer::FinishEventBuffer(eventFbb, eventLocation); 0066 } 0067 0068 flatbuffers::FlatBufferBuilder localFbb; 0069 { 0070 auto uid = localFbb.CreateString("testuid"); 0071 auto summary = localFbb.CreateString(s.toStdString()); 0072 auto description = localFbb.CreateString(d.toStdString()); 0073 auto localBuilder = Sink::ApplicationDomain::Buffer::EventBuilder(localFbb); 0074 localBuilder.add_uid(uid); 0075 localBuilder.add_summary(summary); 0076 if (!d.isEmpty()) { 0077 localBuilder.add_description(description); 0078 } 0079 auto location = localBuilder.Finish(); 0080 Sink::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, location); 0081 } 0082 0083 Sink::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); 0084 return entityFbb; 0085 } 0086 0087 QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb) 0088 { 0089 flatbuffers::FlatBufferBuilder fbb; 0090 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); 0091 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 0092 Sink::Commands::CreateEntityBuilder builder(fbb); 0093 builder.add_domainType(type); 0094 builder.add_delta(delta); 0095 auto location = builder.Finish(); 0096 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 0097 0098 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 0099 { 0100 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size()); 0101 Q_ASSERT(Sink::Commands::VerifyCreateEntityBuffer(verifyer)); 0102 } 0103 return command; 0104 } 0105 0106 QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true) 0107 { 0108 flatbuffers::FlatBufferBuilder fbb; 0109 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); 0110 auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); 0111 std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector; 0112 for (const auto &modified : modifiedProperties) { 0113 modifiedVector.push_back(fbb.CreateString(modified.toStdString())); 0114 } 0115 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 0116 auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector); 0117 Sink::Commands::ModifyEntityBuilder builder(fbb); 0118 builder.add_domainType(type); 0119 builder.add_delta(delta); 0120 builder.add_revision(revision); 0121 builder.add_entityId(id); 0122 builder.add_modifiedProperties(modifiedPropertiesVector); 0123 builder.add_replayToSource(replayToSource); 0124 0125 auto location = builder.Finish(); 0126 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 0127 0128 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 0129 { 0130 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size()); 0131 Q_ASSERT(Sink::Commands::VerifyModifyEntityBuffer(verifyer)); 0132 } 0133 return command; 0134 } 0135 0136 QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) 0137 { 0138 flatbuffers::FlatBufferBuilder fbb; 0139 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); 0140 auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); 0141 Sink::Commands::DeleteEntityBuilder builder(fbb); 0142 builder.add_domainType(type); 0143 builder.add_revision(revision); 0144 builder.add_entityId(id); 0145 auto location = builder.Finish(); 0146 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 0147 0148 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 0149 { 0150 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size()); 0151 Q_ASSERT(Sink::Commands::VerifyDeleteEntityBuffer(verifyer)); 0152 } 0153 return command; 0154 } 0155 0156 class TestProcessor : public Sink::Preprocessor 0157 { 0158 public: 0159 void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE 0160 { 0161 newUids << newEntity.identifier(); 0162 newRevisions << newEntity.revision(); 0163 } 0164 0165 void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE 0166 { 0167 modifiedUids << newEntity.identifier(); 0168 modifiedRevisions << newEntity.revision(); 0169 } 0170 0171 void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE 0172 { 0173 deletedUids << oldEntity.identifier(); 0174 deletedRevisions << oldEntity.revision(); 0175 deletedSummaries << oldEntity.getProperty("summary").toByteArray(); 0176 } 0177 0178 QList<QByteArray> newUids; 0179 QList<qint64> newRevisions; 0180 QList<QByteArray> modifiedUids; 0181 QList<qint64> modifiedRevisions; 0182 QList<QByteArray> deletedUids; 0183 QList<qint64> deletedRevisions; 0184 QList<QByteArray> deletedSummaries; 0185 }; 0186 0187 /** 0188 * Test of the pipeline implementation to ensure new revisions are created correctly in the database. 0189 */ 0190 class PipelineTest : public QObject 0191 { 0192 Q_OBJECT 0193 0194 QByteArray instanceIdentifier() 0195 { 0196 return "pipelinetest.instance1"; 0197 } 0198 0199 Sink::ResourceContext getContext() 0200 { 0201 return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")}; 0202 } 0203 0204 private slots: 0205 void initTestCase() 0206 { 0207 Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Event, TestEventAdaptorFactory>("test"); 0208 } 0209 0210 void init() 0211 { 0212 removeFromDisk(instanceIdentifier()); 0213 } 0214 0215 void testCreate() 0216 { 0217 flatbuffers::FlatBufferBuilder entityFbb; 0218 auto command = createEntityCommand(createEvent(entityFbb)); 0219 0220 Sink::Pipeline pipeline(getContext(), {"test"}); 0221 0222 pipeline.startTransaction(); 0223 pipeline.newEntity(command.constData(), command.size()).exec(); 0224 pipeline.commit(); 0225 0226 auto result = getKeys(instanceIdentifier(), "event.main"); 0227 qDebug() << result; 0228 QCOMPARE(result.size(), 1); 0229 0230 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 0231 auto buffer = getEntity(instanceIdentifier(), "event.main", result.first()); 0232 QVERIFY(!buffer.isEmpty()); 0233 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 0234 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 0235 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary"), "The modification isn't applied."); 0236 } 0237 0238 void testModify() 0239 { 0240 flatbuffers::FlatBufferBuilder entityFbb; 0241 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 0242 0243 Sink::Pipeline pipeline(getContext(), {"test"}); 0244 0245 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 0246 0247 // Create the initial revision 0248 pipeline.startTransaction(); 0249 pipeline.newEntity(command.constData(), command.size()).exec(); 0250 pipeline.commit(); 0251 0252 // Get uid of written entity 0253 auto keys = getKeys(instanceIdentifier(), "event.main"); 0254 QCOMPARE(keys.size(), 1); 0255 auto key = keys.first(); 0256 const auto uid = key.identifier().toDisplayByteArray(); 0257 0258 // Execute the modification 0259 entityFbb.Clear(); 0260 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); 0261 pipeline.startTransaction(); 0262 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0263 pipeline.commit(); 0264 0265 key.setRevision(2); 0266 0267 // Ensure we've got the new revision with the modification 0268 auto buffer = getEntity(instanceIdentifier(), "event.main", key); 0269 QVERIFY(!buffer.isEmpty()); 0270 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 0271 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 0272 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary2"), "The modification isn't applied."); 0273 // Ensure we didn't modify anything else 0274 QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects."); 0275 0276 // Both revisions are in the store at this point 0277 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); 0278 0279 // Cleanup old revisions 0280 pipeline.cleanupRevisions(2); 0281 0282 // And now only the latest revision is left 0283 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); 0284 } 0285 0286 void testModifyWithUnrelatedOperationInbetween() 0287 { 0288 flatbuffers::FlatBufferBuilder entityFbb; 0289 auto command = createEntityCommand(createEvent(entityFbb)); 0290 0291 Sink::Pipeline pipeline(getContext(), {"test"}); 0292 0293 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 0294 0295 // Create the initial revision 0296 pipeline.startTransaction(); 0297 pipeline.newEntity(command.constData(), command.size()).exec(); 0298 pipeline.commit(); 0299 0300 // Get uid of written entity 0301 auto keys = getKeys(instanceIdentifier(), "event.main"); 0302 QCOMPARE(keys.size(), 1); 0303 auto key = keys.first(); 0304 const auto uid = key.identifier().toDisplayByteArray(); 0305 0306 0307 // Create another operation inbetween 0308 { 0309 entityFbb.Clear(); 0310 auto command = createEntityCommand(createEvent(entityFbb)); 0311 pipeline.startTransaction(); 0312 pipeline.newEntity(command.constData(), command.size()).exec(); 0313 pipeline.commit(); 0314 } 0315 0316 // Execute the modification on revision 2 0317 entityFbb.Clear(); 0318 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 2); 0319 pipeline.startTransaction(); 0320 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0321 pipeline.commit(); 0322 0323 key.setRevision(3); 0324 0325 // Ensure we've got the new revision with the modification 0326 auto buffer = getEntity(instanceIdentifier(), "event.main", key); 0327 QVERIFY(!buffer.isEmpty()); 0328 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 0329 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 0330 QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); 0331 } 0332 0333 void testDelete() 0334 { 0335 flatbuffers::FlatBufferBuilder entityFbb; 0336 auto command = createEntityCommand(createEvent(entityFbb)); 0337 Sink::Pipeline pipeline(getContext(), {"test"}); 0338 0339 // Create the initial revision 0340 pipeline.startTransaction(); 0341 pipeline.newEntity(command.constData(), command.size()).exec(); 0342 pipeline.commit(); 0343 0344 auto result = getKeys(instanceIdentifier(), "event.main"); 0345 QCOMPARE(result.size(), 1); 0346 0347 const auto uid = result.first().identifier().toDisplayByteArray(); 0348 0349 // Delete entity 0350 auto deleteCommand = deleteEntityCommand(uid, 1); 0351 pipeline.startTransaction(); 0352 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec(); 0353 pipeline.commit(); 0354 0355 // We have a new revision that indicates the deletion 0356 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); 0357 0358 // Cleanup old revisions 0359 pipeline.cleanupRevisions(2); 0360 0361 // And all revisions are gone 0362 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); 0363 } 0364 0365 void testPreprocessor() 0366 { 0367 flatbuffers::FlatBufferBuilder entityFbb; 0368 0369 auto testProcessor = new TestProcessor; 0370 0371 Sink::Pipeline pipeline(getContext(), {"test"}); 0372 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); 0373 pipeline.startTransaction(); 0374 // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); 0375 0376 // Actual test 0377 { 0378 auto command = createEntityCommand(createEvent(entityFbb)); 0379 pipeline.newEntity(command.constData(), command.size()).exec(); 0380 QCOMPARE(testProcessor->newUids.size(), 1); 0381 QCOMPARE(testProcessor->newRevisions.size(), 1); 0382 const auto uid = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->newUids.at(0)).toDisplayByteArray(); 0383 QCOMPARE(testProcessor->newUids.at(0), uid); 0384 } 0385 pipeline.commit(); 0386 entityFbb.Clear(); 0387 pipeline.startTransaction(); 0388 auto keys = getKeys(instanceIdentifier(), "event.main"); 0389 QCOMPARE(keys.size(), 1); 0390 const auto uid = keys.first().identifier().toDisplayByteArray(); 0391 { 0392 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); 0393 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0394 QCOMPARE(testProcessor->modifiedUids.size(), 1); 0395 QCOMPARE(testProcessor->modifiedRevisions.size(), 1); 0396 const auto uid2 = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->modifiedUids.at(0)).toDisplayByteArray(); 0397 QCOMPARE(testProcessor->modifiedUids.at(0), uid2); 0398 } 0399 pipeline.commit(); 0400 entityFbb.Clear(); 0401 pipeline.startTransaction(); 0402 { 0403 auto deleteCommand = deleteEntityCommand(uid, 1); 0404 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec(); 0405 QCOMPARE(testProcessor->deletedUids.size(), 1); 0406 QCOMPARE(testProcessor->deletedUids.size(), 1); 0407 QCOMPARE(testProcessor->deletedSummaries.size(), 1); 0408 const auto uid2 = Sink::Storage::Identifier::fromDisplayByteArray(testProcessor->modifiedUids.at(0)).toDisplayByteArray(); 0409 QCOMPARE(testProcessor->deletedUids.at(0), uid2); 0410 QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); 0411 } 0412 } 0413 0414 void testModifyWithConflict() 0415 { 0416 flatbuffers::FlatBufferBuilder entityFbb; 0417 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 0418 0419 Sink::Pipeline pipeline(getContext(), {"test"}); 0420 0421 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 0422 0423 // Create the initial revision 0424 pipeline.startTransaction(); 0425 pipeline.newEntity(command.constData(), command.size()).exec(); 0426 pipeline.commit(); 0427 0428 // Get uid of written entity 0429 auto keys = getKeys(instanceIdentifier(), "event.main"); 0430 QCOMPARE(keys.size(), 1); 0431 auto key = keys.first(); 0432 const auto uid = key.identifier().toDisplayByteArray(); 0433 0434 //Simulate local modification 0435 { 0436 entityFbb.Clear(); 0437 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true); 0438 pipeline.startTransaction(); 0439 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0440 pipeline.commit(); 0441 } 0442 0443 0444 //Simulate remote modification 0445 //We assume the remote modification is not overly smart and always marks all properties as changed. 0446 { 0447 entityFbb.Clear(); 0448 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false); 0449 pipeline.startTransaction(); 0450 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0451 pipeline.commit(); 0452 } 0453 0454 key.setRevision(3); 0455 0456 // Ensure we've got the new revision with the modification 0457 auto buffer = getEntity(instanceIdentifier(), "event.main", key); 0458 QVERIFY(!buffer.isEmpty()); 0459 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 0460 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 0461 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted."); 0462 QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied."); 0463 } 0464 0465 void testModifyDeleted() 0466 { 0467 flatbuffers::FlatBufferBuilder entityFbb; 0468 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 0469 0470 Sink::Pipeline pipeline(getContext(), {"test"}); 0471 0472 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 0473 0474 // Create the initial revision 0475 pipeline.startTransaction(); 0476 pipeline.newEntity(command.constData(), command.size()).exec(); 0477 pipeline.commit(); 0478 0479 // Get uid of written entity 0480 auto keys = getKeys(instanceIdentifier(), "event.main"); 0481 QCOMPARE(keys.size(), 1); 0482 auto key = keys.first(); 0483 const auto uid = key.identifier().toDisplayByteArray(); 0484 0485 { 0486 auto deleteCommand = deleteEntityCommand(uid, 1); 0487 pipeline.startTransaction(); 0488 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()).exec(); 0489 pipeline.commit(); 0490 } 0491 0492 { 0493 entityFbb.Clear(); 0494 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); 0495 pipeline.startTransaction(); 0496 auto future = pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()).exec(); 0497 future.waitForFinished(); 0498 QVERIFY(future.errorCode()); 0499 } 0500 } 0501 }; 0502 0503 QTEST_MAIN(PipelineTest) 0504 #include "pipelinetest.moc"