File indexing completed on 2024-05-19 04:03:51
0001 /* -*- C++ -*- 0002 This file contains a testsuite for job processing in ThreadWeaver. 0003 0004 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include "JobTests.h" 0010 0011 #include <algorithm> 0012 #include <cstdlib> 0013 #include <random> 0014 0015 #include <QMutex> 0016 #include <QSignalSpy> 0017 #include <QTest> 0018 #include <QThread> 0019 0020 #include <ThreadWeaver/Collection> 0021 #include <ThreadWeaver/DebuggingAids> 0022 #include <ThreadWeaver/Dependency> 0023 #include <ThreadWeaver/DependencyPolicy> 0024 #include <ThreadWeaver/Exception> 0025 #include <ThreadWeaver/Lambda> 0026 #include <ThreadWeaver/QObjectDecorator> 0027 #include <ThreadWeaver/QueueStream> 0028 #include <ThreadWeaver/Queueing> 0029 #include <ThreadWeaver/ResourceRestrictionPolicy> 0030 #include <ThreadWeaver/Sequence> 0031 #include <ThreadWeaver/Thread> 0032 #include <ThreadWeaver/ThreadWeaver> 0033 0034 #include "AppendCharacterAndVerifyJob.h" 0035 #include "AppendCharacterJob.h" 0036 #include "JobLoggingWeaver.h" 0037 #include "WaitForIdleAndFinished.h" 0038 0039 QMutex s_GlobalMutex; 0040 0041 using namespace ThreadWeaver; 0042 0043 void JobTests::initTestCase() 0044 { 0045 setDebugLevel(true, 1); 0046 } 0047 0048 // Call finish() before leaving a test or use a WaitForIdleAndFinished object to make sure the queue is empty 0049 // and in an idle state. 0050 0051 void JobTests::WeaverLazyThreadCreationTest() 0052 { 0053 Queue weaver; 0054 QString sequence; 0055 0056 WaitForIdleAndFinished w(&weaver); 0057 Q_ASSERT(weaver.isIdle()); 0058 QCOMPARE(weaver.currentNumberOfThreads(), 0); 0059 weaver.setMaximumNumberOfThreads(1); 0060 weaver.stream() << new AppendCharacterJob(QChar('a'), &sequence); 0061 weaver.finish(); 0062 QCOMPARE(weaver.currentNumberOfThreads(), 1); 0063 Q_ASSERT(weaver.isIdle()); 0064 } 0065 0066 void JobTests::ReduceWorkerCountTest() 0067 { 0068 Queue weaver; 0069 QString sequence; 0070 0071 WaitForIdleAndFinished w(&weaver); 0072 Q_ASSERT(weaver.isIdle()); 0073 QCOMPARE(weaver.currentNumberOfThreads(), 0); 0074 weaver.setMaximumNumberOfThreads(8); 0075 weaver.stream() << new AppendCharacterJob(QChar('a'), &sequence); 0076 weaver.finish(); 0077 QVERIFY(weaver.currentNumberOfThreads() >= 1); 0078 weaver.setMaximumNumberOfThreads(1); 0079 weaver.stream() << new AppendCharacterJob(QChar('b'), &sequence); 0080 weaver.finish(); 0081 QCOMPARE(weaver.currentNumberOfThreads(), 1); 0082 weaver.setMaximumNumberOfThreads(0); 0083 weaver.stream() << new AppendCharacterJob(QChar('c'), &sequence); 0084 weaver.reschedule(); 0085 // Unfortunately, there is no way to enforce that the last thread has exited: 0086 // QCOMPARE(weaver.currentNumberOfThreads(), 0); 0087 weaver.setMaximumNumberOfThreads(1); 0088 weaver.stream() << new AppendCharacterJob(QChar('d'), &sequence); 0089 weaver.finish(); 0090 QCOMPARE(weaver.currentNumberOfThreads(), 1); 0091 QCOMPARE(sequence, QLatin1String("abcd")); 0092 } 0093 0094 void JobTests::SimpleJobTest() 0095 { 0096 QString sequence; 0097 0098 WaitForIdleAndFinished w(Queue::instance()); 0099 stream() << new AppendCharacterJob(QChar('1'), &sequence); 0100 Queue::instance()->finish(); 0101 QCOMPARE(sequence, QString("1")); 0102 } 0103 0104 void JobTests::SimpleJobCollectionTest() 0105 { 0106 QString sequence; 0107 Collection jobCollection; 0108 jobCollection << new AppendCharacterJob(QChar('a'), &sequence) // 0109 << new AppendCharacterJob(QChar('b'), &sequence) // 0110 << new AppendCharacterJob(QChar('c'), &sequence); 0111 0112 WaitForIdleAndFinished w(Queue::instance()); 0113 stream() << jobCollection; 0114 0115 Queue::instance()->finish(); 0116 0117 QVERIFY(sequence.length() == 3); 0118 QVERIFY(sequence.count('a') == 1); 0119 QVERIFY(sequence.count('b') == 1); 0120 QVERIFY(sequence.count('c') == 1); 0121 } 0122 0123 void JobTests::EmptyJobCollectionTest() 0124 { 0125 Collection collection; 0126 0127 WaitForIdleAndFinished w(Queue::instance()); 0128 Q_ASSERT(Queue::instance()->isIdle()); 0129 stream() << collection; 0130 Queue::instance()->finish(); 0131 QVERIFY(collection.isFinished()); 0132 QVERIFY(Queue::instance()->isIdle()); 0133 } 0134 0135 void JobTests::CollectionQueueingTest() 0136 { 0137 QString output; 0138 Collection jobCollection; 0139 jobCollection << new AppendCharacterJob(QChar('a'), &output) // 0140 << new AppendCharacterJob(QChar('b'), &output) // 0141 << new AppendCharacterJob(QChar('c'), &output); 0142 0143 Queue weaver; 0144 WaitForIdleAndFinished w(&weaver); 0145 weaver.suspend(); 0146 weaver.stream() << jobCollection; 0147 QCOMPARE(weaver.queueLength(), 1); // collection queues itself, and its elements upon execution of self 0148 weaver.resume(); 0149 weaver.finish(); 0150 QCOMPARE(output.length(), 3); 0151 QVERIFY(Queue::instance()->isIdle()); 0152 } 0153 0154 namespace 0155 { 0156 using namespace ThreadWeaver; 0157 0158 QString SequenceTemplate = "abcdefghijklmnopqrstuvwxyz"; 0159 0160 class GeneratingCollection : public Collection 0161 { 0162 public: 0163 void run(JobPointer, Thread *) override 0164 { 0165 std::for_each(SequenceTemplate.cbegin(), SequenceTemplate.cend(), [this](QChar it) { 0166 *this << new AppendCharacterJob(it, &sequence_); 0167 }); 0168 } 0169 QString sequence_; 0170 }; 0171 0172 class GeneratingSequence : public Sequence 0173 { 0174 public: 0175 void run(JobPointer, Thread *) override 0176 { 0177 std::for_each(SequenceTemplate.cbegin(), SequenceTemplate.cend(), [this](QChar it) { 0178 *this << new AppendCharacterJob(it, &sequence_); 0179 }); 0180 } 0181 QString sequence_; 0182 }; 0183 0184 } 0185 0186 void JobTests::GeneratingCollectionTest() 0187 { 0188 using namespace ThreadWeaver; 0189 0190 GeneratingCollection collection; 0191 WaitForIdleAndFinished w(Queue::instance()); 0192 stream() << collection; 0193 Queue::instance()->finish(); 0194 QCOMPARE(collection.sequence_.size(), SequenceTemplate.length()); 0195 } 0196 0197 void JobTests::ShortJobSequenceTest() 0198 { 0199 QString sequence; 0200 Sequence jobSequence; 0201 jobSequence << new AppendCharacterJob(QChar('a'), &sequence) // 0202 << new AppendCharacterJob(QChar('b'), &sequence) // 0203 << new AppendCharacterJob(QChar('c'), &sequence); 0204 0205 WaitForIdleAndFinished w(Queue::instance()); 0206 QVERIFY(DependencyPolicy::instance().isEmpty()); 0207 stream() << jobSequence; 0208 0209 Queue::instance()->finish(); 0210 QCOMPARE(sequence, QLatin1String("abc")); 0211 QVERIFY(Queue::instance()->isIdle()); 0212 QVERIFY(DependencyPolicy::instance().isEmpty()); 0213 } 0214 0215 void JobTests::ShortDecoratedJobSequenceTest() 0216 { 0217 using namespace ThreadWeaver; 0218 auto logger = new JobLoggingWeaver(); 0219 Queue queue(logger); 0220 0221 QString sequence; 0222 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0223 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0224 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0225 QSharedPointer<Sequence> jobSequence(new Sequence()); 0226 jobSequence->addJob(jobA); 0227 jobSequence->addJob(jobB); 0228 jobSequence->addJob(jobC); 0229 0230 WaitForIdleAndFinished w(&queue); 0231 Q_UNUSED(w); 0232 QVERIFY(DependencyPolicy::instance().isEmpty()); 0233 queue.enqueue(jobSequence); 0234 // Job::DumpJobDependencies(); 0235 queue.finish(); 0236 QCOMPARE(sequence, QLatin1String("abc")); 0237 QVERIFY(queue.isIdle()); 0238 QVERIFY(DependencyPolicy::instance().isEmpty()); 0239 } 0240 0241 void JobTests::EmptyJobSequenceTest() 0242 { 0243 using namespace ThreadWeaver; 0244 QObjectDecorator sequence(new Sequence()); 0245 WaitForIdleAndFinished w(Queue::instance()); 0246 Q_UNUSED(w); 0247 Q_ASSERT(Queue::instance()->isIdle()); 0248 QSignalSpy doneSignalSpy(&sequence, SIGNAL(done(ThreadWeaver::JobPointer))); 0249 QCOMPARE(doneSignalSpy.count(), 0); 0250 enqueue_raw(&sequence); 0251 Queue::instance()->finish(); 0252 QVERIFY(sequence.isFinished()); 0253 QVERIFY(Queue::instance()->isIdle()); 0254 QCOMPARE(doneSignalSpy.count(), 1); 0255 } 0256 0257 void JobTests::GeneratingSequenceTest() 0258 { 0259 using namespace ThreadWeaver; 0260 0261 GeneratingSequence sequence; 0262 WaitForIdleAndFinished w(Queue::instance()); 0263 stream() << make_job_raw(&sequence); 0264 Queue::instance()->finish(); 0265 QCOMPARE(sequence.sequence_, SequenceTemplate); 0266 } 0267 0268 /** This test verifies that the done signal for a collection is only sent after all element of the collection have completed. */ 0269 void JobTests::IncompleteCollectionTest() 0270 { 0271 using namespace ThreadWeaver; 0272 0273 QString result; 0274 QObjectDecorator jobA(new AppendCharacterJob(QChar('a'), &result)); 0275 AppendCharacterJob jobB(QChar('b'), &result); // jobB does not get added to the sequence and queued 0276 QObjectDecorator col(new Collection()); 0277 *col.collection() << jobA; 0278 0279 WaitForIdleAndFinished w(Queue::instance()); 0280 DependencyPolicy::instance().addDependency(Dependency(&jobA, &jobB)); 0281 QSignalSpy collectionDoneSignalSpy(&col, SIGNAL(done(ThreadWeaver::JobPointer))); 0282 QSignalSpy jobADoneSignalSpy(&jobA, SIGNAL(done(ThreadWeaver::JobPointer))); 0283 QCOMPARE(collectionDoneSignalSpy.count(), 0); 0284 QCOMPARE(jobADoneSignalSpy.count(), 0); 0285 enqueue_raw(&col); 0286 Queue::instance()->resume(); 0287 QCoreApplication::processEvents(); 0288 QCOMPARE(collectionDoneSignalSpy.count(), 0); 0289 QCOMPARE(jobADoneSignalSpy.count(), 0); 0290 DependencyPolicy::instance().removeDependency(Dependency(&jobA, &jobB)); 0291 Queue::instance()->finish(); 0292 QCoreApplication::processEvents(); 0293 QVERIFY(col.collection()->isFinished()); 0294 QVERIFY(Queue::instance()->isIdle()); 0295 QCOMPARE(collectionDoneSignalSpy.count(), 1); 0296 QCOMPARE(jobADoneSignalSpy.count(), 1); 0297 } 0298 0299 /** This test verifies that started() is emitted for a collection at the time the first of any elements of the collection gets 0300 * executed. */ 0301 void JobTests::EmitStartedOnFirstElementTest() 0302 { 0303 using namespace ThreadWeaver; 0304 0305 WaitForIdleAndFinished w(Queue::instance()); 0306 Queue::instance()->suspend(); 0307 QString result; 0308 0309 JobPointer jobA(new AppendCharacterJob(QChar('a'), &result)); 0310 JobPointer jobB(new AppendCharacterJob(QChar('b'), &result)); 0311 QObjectDecorator collection(new Collection()); 0312 Collection *decorated = dynamic_cast<Collection *>(collection.job()); 0313 QVERIFY(decorated != nullptr); 0314 decorated->addJob(jobA); 0315 decorated->addJob(jobB); 0316 0317 enqueue(make_job_raw(&collection)); 0318 QSignalSpy collectionStartedSignalSpy(&collection, SIGNAL(started(ThreadWeaver::JobPointer))); 0319 QSignalSpy collectionDoneSignalSpy(&collection, SIGNAL(done(ThreadWeaver::JobPointer))); 0320 Queue::instance()->resume(); 0321 QCoreApplication::processEvents(); 0322 Queue::instance()->finish(); 0323 QVERIFY(collection.isFinished()); 0324 QCOMPARE(result.length(), 2); 0325 for (int i = 0; i < 100; ++i) { 0326 if (collectionStartedSignalSpy.count() != 0 && collectionDoneSignalSpy.count() != 0) { 0327 break; 0328 } 0329 QTest::qWait(1); 0330 TWDEBUG(2, "JobTests::EmitStartedOnFirstElementTest: waiting (%i)\n", i); 0331 qApp->processEvents(); 0332 } 0333 QCOMPARE(collectionStartedSignalSpy.count(), 1); 0334 QCOMPARE(collectionDoneSignalSpy.count(), 1); 0335 QVERIFY(Queue::instance()->isIdle()); 0336 } 0337 0338 /* This test verifies that all elements of a collection are only executed after all dependencies for the collection 0339 * itself have been resolved. 0340 * Previous tests have already verified that collections without dependencies get executed right away. */ 0341 void JobTests::CollectionDependenciesTest() 0342 { 0343 using namespace ThreadWeaver; 0344 0345 QString result; 0346 // set up a collection that depends on jobC which does not get queued 0347 JobPointer jobA(new AppendCharacterJob(QChar('a'), &result)); 0348 JobPointer jobB(new AppendCharacterJob(QChar('b'), &result)); 0349 QObjectDecorator col(new Collection()); 0350 QSignalSpy collectionStartedSignalSpy(&col, SIGNAL(started(ThreadWeaver::JobPointer))); 0351 col.collection()->addJob(jobA); 0352 col.collection()->addJob(jobB); 0353 QEventLoop loop; 0354 connect(&col, SIGNAL(started(ThreadWeaver::JobPointer)), &loop, SLOT(quit())); 0355 0356 QSharedPointer<AppendCharacterJob> jobC(new AppendCharacterJob(QChar('c'), &result)); 0357 DependencyPolicy::instance().addDependency(Dependency(&col, jobC)); 0358 0359 // queue collection, but not jobC, the collection should not be executed 0360 WaitForIdleAndFinished w(Queue::instance()); 0361 Q_UNUSED(w); 0362 Queue::instance()->suspend(); 0363 enqueue_raw(&col); 0364 Queue::instance()->resume(); 0365 QCoreApplication::processEvents(); 0366 QTest::qWait(100); 0367 // FIXME verify: dfaure needed this here: QTRY_COMPARE(collectionStartedSignalSpy.count(), 0); 0368 QCOMPARE(collectionStartedSignalSpy.count(), 0); 0369 // enqueue jobC, first jobC then the collection should be executed 0370 Queue::instance()->enqueue(jobC); 0371 QCoreApplication::processEvents(); 0372 Queue::instance()->finish(); 0373 QVERIFY(col.isFinished()); 0374 QVERIFY(result.startsWith(jobC->character())); 0375 // QSKIP("This test is too fragile"); // PENDING(Mirko): fix 0376 // QTRY_COMPARE(collectionStartedSignalSpy.count(), 1); 0377 loop.exec(); 0378 qApp->processEvents(); 0379 QCOMPARE(collectionStartedSignalSpy.count(), 1); 0380 QVERIFY(Queue::instance()->isIdle()); 0381 } 0382 0383 void JobTests::QueueAndDequeueCollectionTest() 0384 { 0385 QString sequence; 0386 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0387 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0388 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0389 QSharedPointer<Collection> collection(new Collection()); 0390 collection->addJob(jobA); 0391 collection->addJob(jobB); 0392 collection->addJob(jobC); 0393 0394 WaitForIdleAndFinished w(Queue::instance()); 0395 Queue::instance()->suspend(); 0396 0397 Queue::instance()->enqueue(collection); 0398 Queue::instance()->dequeue(collection); 0399 QVERIFY(Queue::instance()->isEmpty()); 0400 } 0401 0402 void JobTests::QueueAndDequeueSequenceTest() 0403 { 0404 QString sequence; 0405 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0406 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0407 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0408 QSharedPointer<Sequence> jobSequence(new Sequence()); 0409 jobSequence->addJob(jobA); 0410 jobSequence->addJob(jobB); 0411 jobSequence->addJob(jobC); 0412 0413 WaitForIdleAndFinished w(Queue::instance()); 0414 Queue::instance()->suspend(); 0415 0416 Queue::instance()->enqueue(jobSequence); 0417 Queue::instance()->dequeue(jobSequence); 0418 QVERIFY(Queue::instance()->isEmpty()); 0419 } 0420 0421 void JobTests::BlockingExecuteTest() 0422 { 0423 QString sequence; 0424 AppendCharacterJob job(QChar('a'), &sequence); 0425 job.blockingExecute(); 0426 QCOMPARE(sequence, QString("a")); 0427 } 0428 0429 void JobTests::RecursiveSequenceTest() 0430 { 0431 QString sequence; 0432 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0433 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0434 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0435 JobPointer jobD(new AppendCharacterJob(QChar('d'), &sequence)); 0436 JobPointer jobE(new AppendCharacterJob(QChar('e'), &sequence)); 0437 JobPointer jobF(new AppendCharacterJob(QChar('f'), &sequence)); 0438 JobPointer jobG(new AppendCharacterJob(QChar('g'), &sequence)); 0439 JobPointer jobH(new AppendCharacterJob(QChar('h'), &sequence)); 0440 JobPointer jobI(new AppendCharacterJob(QChar('i'), &sequence)); 0441 JobPointer jobJ(new AppendCharacterJob(QChar('j'), &sequence)); 0442 QSharedPointer<Sequence> jobSequence1(new Sequence()); 0443 jobSequence1->addJob(jobA); 0444 jobSequence1->addJob(jobB); 0445 jobSequence1->addJob(jobC); 0446 QSharedPointer<Sequence> jobSequence2(new Sequence()); 0447 jobSequence2->addJob(jobD); 0448 jobSequence2->addJob(jobE); 0449 jobSequence2->addJob(jobF); 0450 QSharedPointer<Sequence> jobSequence3(new Sequence()); 0451 jobSequence3->addJob(jobG); 0452 jobSequence3->addJob(jobH); 0453 jobSequence3->addJob(jobI); 0454 jobSequence3->addJob(jobJ); 0455 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0456 QSharedPointer<Sequence> jobSequence4(new Sequence()); 0457 jobSequence4->addJob(jobSequence1); 0458 jobSequence4->addJob(jobSequence2); 0459 jobSequence4->addJob(jobSequence3); 0460 0461 WaitForIdleAndFinished w(Queue::instance()); 0462 Queue::instance()->suspend(); 0463 Queue::instance()->enqueue(jobSequence4); 0464 // DependencyPolicy::instance().dumpJobDependencies(); 0465 Queue::instance()->resume(); 0466 Queue::instance()->finish(); 0467 QCOMPARE(sequence, QLatin1String("abcdefghij")); 0468 } 0469 0470 void JobTests::RecursiveQueueAndDequeueCollectionTest() 0471 { 0472 QString sequence; 0473 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0474 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0475 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0476 JobPointer jobD(new AppendCharacterJob(QChar('d'), &sequence)); 0477 JobPointer jobE(new AppendCharacterJob(QChar('e'), &sequence)); 0478 JobPointer jobF(new AppendCharacterJob(QChar('f'), &sequence)); 0479 JobPointer jobG(new AppendCharacterJob(QChar('g'), &sequence)); 0480 JobPointer jobH(new AppendCharacterJob(QChar('h'), &sequence)); 0481 JobPointer jobI(new AppendCharacterJob(QChar('i'), &sequence)); 0482 JobPointer jobJ(new AppendCharacterJob(QChar('j'), &sequence)); 0483 QSharedPointer<Collection> collection1(new Collection()); 0484 collection1->addJob(jobA); 0485 collection1->addJob(jobB); 0486 collection1->addJob(jobC); 0487 QSharedPointer<Collection> collection2(new Collection()); 0488 collection2->addJob(jobD); 0489 collection2->addJob(jobE); 0490 collection2->addJob(jobF); 0491 QSharedPointer<Collection> collection3(new Collection()); 0492 collection3->addJob(jobG); 0493 collection3->addJob(jobH); 0494 collection3->addJob(jobI); 0495 collection3->addJob(jobJ); 0496 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0497 QSharedPointer<Collection> collection4(new Collection()); 0498 collection4->addJob(collection1); 0499 collection4->addJob(collection2); 0500 collection4->addJob(collection3); 0501 0502 WaitForIdleAndFinished w(Queue::instance()); 0503 Queue::instance()->suspend(); 0504 Queue::instance()->enqueue(collection4); 0505 Queue::instance()->dequeue(collection4); 0506 QVERIFY(Queue::instance()->isEmpty()); 0507 Queue::instance()->resume(); 0508 } 0509 0510 void JobTests::RecursiveQueueAndDequeueSequenceTest() 0511 { 0512 QString sequence; 0513 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0514 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0515 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0516 JobPointer jobD(new AppendCharacterJob(QChar('d'), &sequence)); 0517 JobPointer jobE(new AppendCharacterJob(QChar('e'), &sequence)); 0518 JobPointer jobF(new AppendCharacterJob(QChar('f'), &sequence)); 0519 JobPointer jobG(new AppendCharacterJob(QChar('g'), &sequence)); 0520 JobPointer jobH(new AppendCharacterJob(QChar('h'), &sequence)); 0521 JobPointer jobI(new AppendCharacterJob(QChar('i'), &sequence)); 0522 JobPointer jobJ(new AppendCharacterJob(QChar('j'), &sequence)); 0523 QSharedPointer<Sequence> jobSequence1(new Sequence()); 0524 jobSequence1->addJob(jobA); 0525 jobSequence1->addJob(jobB); 0526 jobSequence1->addJob(jobC); 0527 QSharedPointer<Sequence> jobSequence2(new Sequence()); 0528 jobSequence2->addJob(jobD); 0529 jobSequence2->addJob(jobE); 0530 jobSequence2->addJob(jobF); 0531 QSharedPointer<Sequence> jobSequence3(new Sequence()); 0532 jobSequence3->addJob(jobG); 0533 jobSequence3->addJob(jobH); 0534 jobSequence3->addJob(jobI); 0535 jobSequence3->addJob(jobJ); 0536 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0537 QSharedPointer<Sequence> jobSequence4(new Sequence()); 0538 jobSequence4->addJob(jobSequence1); 0539 jobSequence4->addJob(jobSequence2); 0540 jobSequence4->addJob(jobSequence3); 0541 0542 WaitForIdleAndFinished w(Queue::instance()); 0543 Queue::instance()->suspend(); 0544 Queue::instance()->enqueue(jobSequence4); 0545 Queue::instance()->dequeue(jobSequence4); 0546 QVERIFY(Queue::instance()->isEmpty()); 0547 Queue::instance()->resume(); 0548 } 0549 0550 void JobTests::QueueAndDequeueAllCollectionTest() 0551 { 0552 QString sequence; 0553 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0554 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0555 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0556 QSharedPointer<Collection> collection(new Collection()); 0557 collection->addJob(jobA); 0558 collection->addJob(jobB); 0559 collection->addJob(jobC); 0560 0561 WaitForIdleAndFinished w(Queue::instance()); 0562 Queue::instance()->suspend(); 0563 QVERIFY(Queue::instance()->isEmpty()); 0564 Queue::instance()->enqueue(collection); 0565 // collection cannot have been started, so only one job is queued at the moment: 0566 QCOMPARE(Queue::instance()->queueLength(), 1); 0567 Queue::instance()->dequeue(); 0568 QVERIFY(Queue::instance()->isEmpty()); 0569 } 0570 0571 void JobTests::QueueAndDequeueAllSequenceTest() 0572 { 0573 QString sequence; 0574 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0575 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0576 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0577 QSharedPointer<Sequence> jobSequence(new Sequence()); 0578 jobSequence->addJob(jobA); 0579 jobSequence->addJob(jobB); 0580 jobSequence->addJob(jobC); 0581 0582 WaitForIdleAndFinished w(Queue::instance()); 0583 Queue::instance()->suspend(); 0584 0585 Queue::instance()->enqueue(jobSequence); 0586 Queue::instance()->dequeue(); 0587 QVERIFY(Queue::instance()->isEmpty()); 0588 } 0589 0590 void JobTests::RecursiveQueueAndDequeueAllCollectionTest() 0591 { 0592 QString sequence; 0593 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0594 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0595 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0596 JobPointer jobD(new AppendCharacterJob(QChar('d'), &sequence)); 0597 JobPointer jobE(new AppendCharacterJob(QChar('e'), &sequence)); 0598 JobPointer jobF(new AppendCharacterJob(QChar('f'), &sequence)); 0599 JobPointer jobG(new AppendCharacterJob(QChar('g'), &sequence)); 0600 JobPointer jobH(new AppendCharacterJob(QChar('h'), &sequence)); 0601 JobPointer jobI(new AppendCharacterJob(QChar('i'), &sequence)); 0602 JobPointer jobJ(new AppendCharacterJob(QChar('j'), &sequence)); 0603 QSharedPointer<Collection> collection1(new Collection()); 0604 collection1->addJob(jobA); 0605 collection1->addJob(jobB); 0606 collection1->addJob(jobC); 0607 QSharedPointer<Collection> collection2(new Collection()); 0608 collection2->addJob(jobD); 0609 collection2->addJob(jobE); 0610 collection2->addJob(jobF); 0611 QSharedPointer<Collection> collection3(new Collection()); 0612 collection3->addJob(jobG); 0613 collection3->addJob(jobH); 0614 collection3->addJob(jobI); 0615 collection3->addJob(jobJ); 0616 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0617 QSharedPointer<Collection> collection4(new Collection()); 0618 collection4->addJob(collection1); 0619 collection4->addJob(collection2); 0620 collection4->addJob(collection3); 0621 0622 WaitForIdleAndFinished w(Queue::instance()); 0623 Queue::instance()->suspend(); 0624 Queue::instance()->enqueue(collection4); 0625 Queue::instance()->dequeue(); 0626 QVERIFY(Queue::instance()->isEmpty()); 0627 Queue::instance()->resume(); 0628 Queue::instance()->finish(); 0629 } 0630 0631 void JobTests::RecursiveQueueAndDequeueAllSequenceTest() 0632 { 0633 QString sequence; 0634 JobPointer jobA(new AppendCharacterJob(QChar('a'), &sequence)); 0635 JobPointer jobB(new AppendCharacterJob(QChar('b'), &sequence)); 0636 JobPointer jobC(new AppendCharacterJob(QChar('c'), &sequence)); 0637 JobPointer jobD(new AppendCharacterJob(QChar('d'), &sequence)); 0638 JobPointer jobE(new AppendCharacterJob(QChar('e'), &sequence)); 0639 JobPointer jobF(new AppendCharacterJob(QChar('f'), &sequence)); 0640 JobPointer jobG(new AppendCharacterJob(QChar('g'), &sequence)); 0641 JobPointer jobH(new AppendCharacterJob(QChar('h'), &sequence)); 0642 JobPointer jobI(new AppendCharacterJob(QChar('i'), &sequence)); 0643 JobPointer jobJ(new AppendCharacterJob(QChar('j'), &sequence)); 0644 QSharedPointer<Sequence> jobSequence1(new Sequence()); 0645 jobSequence1->addJob(jobA); 0646 jobSequence1->addJob(jobB); 0647 jobSequence1->addJob(jobC); 0648 QSharedPointer<Sequence> jobSequence2(new Sequence()); 0649 jobSequence2->addJob(jobD); 0650 jobSequence2->addJob(jobE); 0651 jobSequence2->addJob(jobF); 0652 QSharedPointer<Sequence> jobSequence3(new Sequence()); 0653 jobSequence3->addJob(jobG); 0654 jobSequence3->addJob(jobH); 0655 jobSequence3->addJob(jobI); 0656 jobSequence3->addJob(jobJ); 0657 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0658 QSharedPointer<Sequence> jobSequence4(new Sequence()); 0659 jobSequence4->addJob(jobSequence1); 0660 jobSequence4->addJob(jobSequence2); 0661 jobSequence4->addJob(jobSequence3); 0662 0663 WaitForIdleAndFinished w(Queue::instance()); 0664 Queue::instance()->suspend(); 0665 Queue::instance()->enqueue(jobSequence4); 0666 Queue::instance()->dequeue(); 0667 QVERIFY(Queue::instance()->isEmpty()); 0668 Queue::instance()->resume(); 0669 Queue::instance()->finish(); 0670 } 0671 0672 // This test is not the most efficient, as the mutex locking takes most of 0673 // the execution time. Anyway, it will fail if the jobs are not executed 0674 // in the right order, and the order is randomized. 0675 void JobTests::MassiveJobSequenceTest() 0676 { 0677 const int NoOfChars = 1024; 0678 const char *Alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; 0679 const int SizeOfAlphabet = strlen(Alphabet); 0680 AppendCharacterAndVerifyJob jobs[NoOfChars]; 0681 Sequence jobSequence; 0682 QString sequence; 0683 QString in; 0684 0685 srand(1); 0686 in.reserve(NoOfChars); 0687 sequence.reserve(NoOfChars); 0688 for (int i = 0; i < NoOfChars; ++i) { 0689 const int position = static_cast<int>(SizeOfAlphabet * ((1.0 * rand()) / RAND_MAX)); 0690 Q_ASSERT(0 <= position && position < SizeOfAlphabet); 0691 QChar c(Alphabet[position]); 0692 in.append(c); 0693 } 0694 0695 for (int i = 0; i < NoOfChars; ++i) { 0696 jobs[i].setValues(in.at(i), &sequence, in); 0697 jobSequence << jobs[i]; 0698 } 0699 0700 WaitForIdleAndFinished w(Queue::instance()); 0701 QVERIFY(Queue::instance()->isIdle()); 0702 enqueue_raw(&jobSequence); 0703 Queue::instance()->finish(); 0704 QVERIFY(Queue::instance()->isIdle()); 0705 QCOMPARE(sequence, in); 0706 } 0707 0708 void JobTests::SimpleRecursiveSequencesTest() 0709 { 0710 using namespace ThreadWeaver; 0711 QString sequence; 0712 0713 Sequence jobSequence1; 0714 jobSequence1 << new AppendCharacterJob(QChar('b'), &sequence); 0715 0716 Sequence jobSequence2; 0717 jobSequence2 << new AppendCharacterJob(QChar('a'), &sequence) // 0718 << jobSequence1 // 0719 << new AppendCharacterJob(QChar('c'), &sequence); 0720 0721 WaitForIdleAndFinished w(Queue::instance()); 0722 stream() << jobSequence2; 0723 Queue::instance()->finish(); 0724 QCOMPARE(sequence, QString("abc")); 0725 } 0726 0727 void JobTests::SequenceOfSequencesTest() 0728 { 0729 QString sequence; 0730 AppendCharacterJob jobA(QChar('a'), &sequence); 0731 AppendCharacterJob jobB(QChar('b'), &sequence); 0732 AppendCharacterJob jobC(QChar('c'), &sequence); 0733 AppendCharacterJob jobD(QChar('d'), &sequence); 0734 AppendCharacterJob jobE(QChar('e'), &sequence); 0735 AppendCharacterJob jobF(QChar('f'), &sequence); 0736 AppendCharacterJob jobG(QChar('g'), &sequence); 0737 AppendCharacterJob jobH(QChar('h'), &sequence); 0738 AppendCharacterJob jobI(QChar('i'), &sequence); 0739 AppendCharacterJob jobJ(QChar('j'), &sequence); 0740 Sequence jobSequence1; 0741 jobSequence1 << jobA << jobB << jobC; 0742 Sequence jobSequence2; 0743 jobSequence2 << jobD << jobE << jobF; 0744 Sequence jobSequence3; 0745 jobSequence3 << jobG << jobH << jobI << jobJ; 0746 // sequence 4 will contain sequences 1, 2, and 3, in that order: 0747 Sequence jobSequence4; 0748 jobSequence4 << jobSequence1 << jobSequence2 << jobSequence3; 0749 0750 WaitForIdleAndFinished w(Queue::instance()); 0751 stream() << jobSequence4; 0752 // Job::DumpJobDependencies(); 0753 Queue::instance()->finish(); 0754 QCOMPARE(sequence, QString("abcdefghij")); 0755 } 0756 0757 void JobTests::QueueAndStopTest() 0758 { 0759 QString sequence; 0760 AppendCharacterJob a('a', &sequence); 0761 AppendCharacterJob b('b', &sequence); 0762 AppendCharacterJob c('c', &sequence); 0763 FailingAppendCharacterJob d('d', &sequence); 0764 AppendCharacterJob e('e', &sequence); 0765 AppendCharacterJob f('f', &sequence); 0766 AppendCharacterJob g('g', &sequence); 0767 Sequence jobSequence; 0768 jobSequence << a << b << c << d << e << f << g; 0769 0770 WaitForIdleAndFinished w(Queue::instance()); 0771 stream() << jobSequence; 0772 Queue::instance()->finish(); 0773 QCOMPARE(sequence, QString("abcd")); 0774 } 0775 0776 void JobTests::ResourceRestrictionPolicyBasicsTest() 0777 { 0778 // this test tests that with resource restrictions assigned, jobs 0779 // still get executed as expected 0780 QString sequence; 0781 ResourceRestrictionPolicy restriction(2); 0782 AppendCharacterJob a('a', &sequence); 0783 AppendCharacterJob b('b', &sequence); 0784 AppendCharacterJob c('c', &sequence); 0785 AppendCharacterJob d('d', &sequence); 0786 AppendCharacterJob e('e', &sequence); 0787 AppendCharacterJob f('f', &sequence); 0788 AppendCharacterJob g('g', &sequence); 0789 Collection collection; 0790 collection << a << b << c << d << e << f << g; 0791 const QList<AppendCharacterJob *> lstJob = {&a, &b, &c, &d, &e, &f, &g}; 0792 for (AppendCharacterJob *job : lstJob) { 0793 QMutexLocker l(job->mutex()); 0794 job->assignQueuePolicy(&restriction); 0795 } 0796 0797 WaitForIdleAndFinished w(Queue::instance()); 0798 stream() << collection; 0799 Queue::instance()->finish(); 0800 QVERIFY(Queue::instance()->isIdle()); 0801 } 0802 0803 void JobTests::jobStarted(JobPointer) 0804 { 0805 QVERIFY(thread() == QThread::currentThread()); 0806 } 0807 0808 void JobTests::jobDone(JobPointer) 0809 { 0810 QVERIFY(thread() == QThread::currentThread()); 0811 } 0812 0813 void JobTests::JobSignalsAreEmittedAsynchronouslyTest() 0814 { 0815 using namespace ThreadWeaver; 0816 0817 char bits[] = {'a', 'b', 'c', 'd', 'e', 'f', 'g'}; 0818 const int NumberOfBits = sizeof bits / sizeof bits[0]; 0819 QString sequence; 0820 QObjectDecorator collection(new Collection, this); 0821 0822 QVERIFY(connect(&collection, SIGNAL(started(ThreadWeaver::JobPointer)), SLOT(jobStarted(ThreadWeaver::JobPointer)))); 0823 QVERIFY(connect(&collection, SIGNAL(done(ThreadWeaver::JobPointer)), SLOT(jobDone(ThreadWeaver::JobPointer)))); 0824 for (int counter = 0; counter < NumberOfBits; ++counter) { 0825 QJobPointer job(new QObjectDecorator(new AppendCharacterJob(bits[counter], &sequence))); 0826 QVERIFY(connect(job.data(), SIGNAL(started(ThreadWeaver::JobPointer)), SLOT(jobStarted(ThreadWeaver::JobPointer)))); 0827 QVERIFY(connect(job.data(), SIGNAL(done(ThreadWeaver::JobPointer)), SLOT(jobDone(ThreadWeaver::JobPointer)))); 0828 collection.collection()->addJob(job); 0829 } 0830 0831 WaitForIdleAndFinished w(Queue::instance()); 0832 enqueue_raw(&collection); 0833 QCoreApplication::processEvents(); 0834 Queue::instance()->finish(); 0835 QVERIFY(sequence.length() == NumberOfBits); 0836 } 0837 0838 QAtomicInt deliveryTestCounter; 0839 0840 void JobTests::deliveryTestJobDone(JobPointer) 0841 { 0842 deliveryTestCounter.fetchAndAddRelease(-1); 0843 } 0844 0845 void noOp() 0846 { 0847 } 0848 0849 void JobTests::JobSignalsDeliveryTest() 0850 { 0851 // This test was added to investigate segmentation faults during signal delivery from jobs to the main thread. 0852 // Relies on processEvents() processing all pending events, as the specification says. 0853 using namespace ThreadWeaver; 0854 0855 QCOMPARE(deliveryTestCounter.loadAcquire(), 0); 0856 WaitForIdleAndFinished w(Queue::instance()); 0857 for (int count = 0; count < 100; ++count) { 0858 QJobPointer job(new QObjectDecorator(new Lambda<void (*)()>(noOp))); 0859 QVERIFY(connect(job.data(), SIGNAL(done(ThreadWeaver::JobPointer)), SLOT(deliveryTestJobDone(ThreadWeaver::JobPointer)))); 0860 deliveryTestCounter.fetchAndAddRelease(1); 0861 Queue::instance()->enqueue(job); 0862 } 0863 QCoreApplication::processEvents(); 0864 Queue::instance()->finish(); 0865 QCoreApplication::processEvents(); 0866 QCOMPARE(deliveryTestCounter.loadAcquire(), 0); 0867 } 0868 0869 void decrementCounter() 0870 { 0871 deliveryTestCounter.fetchAndAddRelease(-1); 0872 } 0873 0874 void JobTests::JobPointerExecutionTest() 0875 { 0876 // This test was added to investigate segmentation faults during signal delivery from jobs to the main thread. 0877 // Relies on processEvents() processing all pending events, as the specification says. 0878 using namespace ThreadWeaver; 0879 0880 QCOMPARE(deliveryTestCounter.loadAcquire(), 0); 0881 WaitForIdleAndFinished w(Queue::instance()); 0882 for (int count = 0; count < 100; ++count) { 0883 JobPointer job(new Lambda<void (*)()>(decrementCounter)); 0884 deliveryTestCounter.fetchAndAddRelease(1); 0885 Queue::instance()->enqueue(job); 0886 } 0887 QCoreApplication::processEvents(); 0888 Queue::instance()->finish(); 0889 QCoreApplication::processEvents(); 0890 QCOMPARE(deliveryTestCounter.loadAcquire(), 0); 0891 } 0892 0893 void JobTests::DequeueSuspendedSequenceTest() 0894 { 0895 using namespace ThreadWeaver; 0896 0897 Sequence sequence; 0898 Queue weaver; 0899 weaver.suspend(); 0900 enqueue_raw(&weaver, &sequence); 0901 weaver.dequeue(); 0902 // don't crash 0903 } 0904 0905 void JobTests::IdDecoratorDecoratesTest() 0906 { 0907 using namespace ThreadWeaver; 0908 QString sequence; 0909 JobPointer job(new IdDecorator(new AppendCharacterJob('a', &sequence))); 0910 WaitForIdleAndFinished w(Queue::instance()); 0911 Queue::instance()->enqueue(job); 0912 Queue::instance()->finish(); 0913 QCOMPARE(sequence, QString::fromLatin1("a")); 0914 } 0915 0916 void JobTests::IdDecoratorAutoDeleteTest() 0917 { 0918 using namespace ThreadWeaver; 0919 IdDecorator id(nullptr); 0920 QCOMPARE(id.autoDelete(), true); // autoDelete is on by default 0921 id.setAutoDelete(false); 0922 QCOMPARE(id.autoDelete(), false); 0923 id.setAutoDelete(true); 0924 QCOMPARE(id.autoDelete(), true); 0925 // now do not crash, even though id decorates a null pointer 0926 } 0927 0928 void JobTests::IdDecoratorSingleAllocationTest() 0929 { 0930 using namespace ThreadWeaver; 0931 0932 struct DecoratedJob : public IdDecorator { 0933 QString sequence; 0934 AppendCharacterJob job; 0935 DecoratedJob() 0936 : IdDecorator(&job, false) 0937 , job('a', &sequence) 0938 { 0939 } 0940 }; 0941 0942 WaitForIdleAndFinished w(Queue::instance()); 0943 DecoratedJob job; 0944 enqueue_raw(&job); 0945 Queue::instance()->finish(); 0946 QCOMPARE(job.sequence, QString::fromLatin1("a")); 0947 } 0948 0949 struct InstanceCountedJob : public Job { 0950 static QAtomicInt counter; 0951 0952 void run(JobPointer, Thread *) override 0953 { 0954 } 0955 0956 InstanceCountedJob() 0957 { 0958 counter.fetchAndAddRelease(1); 0959 } 0960 0961 ~InstanceCountedJob() override 0962 { 0963 counter.fetchAndAddRelease(-1); 0964 } 0965 }; 0966 0967 QAtomicInt InstanceCountedJob::counter; 0968 0969 /** @brief Verify that neither the queue nor the thread keep a reference to the job after completing it. 0970 * 0971 * This is necessary because user-allocated objects like queue policies may be registered with the jobs. If the jobs stick around 0972 * until the thread or queue are deleted, the user-allocated objects may have gone out of scope or been deleted already, causing 0973 * potential errors. From ThreadWeaver's point of view, a job seizes to exist once the processing thread asks for the next job. */ 0974 void JobTests::JobsAreDestroyedAfterFinishTest() 0975 { 0976 using namespace ThreadWeaver; 0977 WaitForIdleAndFinished w(Queue::instance()); 0978 Q_UNUSED(w); 0979 Queue::instance()->suspend(); 0980 JobPointer job(new InstanceCountedJob); 0981 Queue::instance()->enqueue(job); 0982 QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1); 0983 Queue::instance()->resume(); 0984 QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1); 0985 Queue::instance()->finish(); 0986 QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1); 0987 QCoreApplication::processEvents(); 0988 QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1); 0989 job.clear(); 0990 // if this succeeds, job is the only shared pointer pointing to the created InstanceCountedJob object: 0991 QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 0); 0992 } 0993 0994 void JobTests::JobExitStatusByExceptionTest() 0995 { 0996 using namespace ThreadWeaver; 0997 0998 struct FailingJob : public Job { 0999 void run(JobPointer, Thread *) override 1000 { 1001 throw JobFailed(); 1002 } 1003 }; 1004 1005 FailingJob failing; 1006 failing.blockingExecute(); 1007 QCOMPARE(failing.status(), Job::Status_Failed); 1008 1009 struct AbortingJob : public Job { 1010 void run(JobPointer, Thread *) override 1011 { 1012 throw JobAborted(); 1013 } 1014 }; 1015 1016 AbortingJob aborting; 1017 aborting.blockingExecute(); 1018 QCOMPARE(aborting.status(), Job::Status_Aborted); 1019 1020 struct SuccessfulJob : public Job { 1021 void run(JobPointer, Thread *) override 1022 { 1023 // do nothing 1024 } 1025 }; 1026 SuccessfulJob successful; 1027 successful.blockingExecute(); 1028 QCOMPARE(successful.status(), Job::Status_Success); 1029 } 1030 1031 void JobTests::JobManualExitStatusTest() 1032 { 1033 using namespace ThreadWeaver; 1034 1035 struct FailingJob : public Job { 1036 void run(JobPointer, Thread *) override 1037 { 1038 setStatus(Job::Status_Failed); 1039 } 1040 }; 1041 1042 FailingJob failing; 1043 failing.blockingExecute(); 1044 QCOMPARE(failing.status(), Job::Status_Failed); 1045 1046 struct AbortingJob : public Job { 1047 void run(JobPointer, Thread *) override 1048 { 1049 setStatus(Job::Status_Aborted); 1050 } 1051 }; 1052 1053 AbortingJob aborting; 1054 aborting.blockingExecute(); 1055 QCOMPARE(aborting.status(), Job::Status_Aborted); 1056 1057 struct SuccessfulJob : public Job { 1058 void run(JobPointer, Thread *) override 1059 { 1060 // do nothing 1061 } 1062 }; 1063 SuccessfulJob successful; 1064 successful.blockingExecute(); 1065 QCOMPARE(successful.status(), Job::Status_Success); 1066 } 1067 1068 void JobTests::QueueStreamLifecycleTest() 1069 { 1070 QString sequence; 1071 using namespace ThreadWeaver; 1072 WaitForIdleAndFinished w(Queue::instance()); 1073 Q_UNUSED(w); 1074 stream() << make_job(new AppendCharacterJob('a', &sequence)) // enqueues JobPointer 1075 << new AppendCharacterJob('b', &sequence) // enqueues JobInterface* 1076 << make_job(new AppendCharacterJob('c', &sequence)); 1077 Queue::instance()->finish(); 1078 QCOMPARE(sequence.size(), 3); 1079 } 1080 1081 class SynchronizedNumbers 1082 { 1083 public: 1084 void append(int number) 1085 { 1086 QMutexLocker l(&mutex_); 1087 numbers_.append(number); 1088 } 1089 1090 bool isSorted() const 1091 { 1092 QMutexLocker l(&mutex_); 1093 return std::is_sorted(numbers_.cbegin(), numbers_.cend()); 1094 } 1095 1096 void sortChunks(int chunkSize) 1097 { 1098 QMutexLocker l(&mutex_); 1099 Q_ASSERT(numbers_.count() % chunkSize == 0); 1100 auto start = numbers_.begin(); 1101 while (start != numbers_.end()) { 1102 auto stop = start; 1103 std::advance(stop, chunkSize); 1104 std::sort(start, stop); 1105 start = stop; 1106 } 1107 } 1108 1109 private: 1110 QList<int> numbers_; 1111 mutable QMutex mutex_; 1112 }; 1113 1114 class GeneratingEnumeratorSequence : public ThreadWeaver::Sequence 1115 { 1116 public: 1117 GeneratingEnumeratorSequence(SynchronizedNumbers *numbers, int start, int count) 1118 : start_(start) 1119 , count_(count) 1120 , numbers_(numbers) 1121 { 1122 } 1123 1124 void run(JobPointer, Thread *) override 1125 { 1126 numbers_->append(start_); 1127 for (int index = start_ + 1; index < start_ + count_; ++index) { 1128 *this << new GeneratingEnumeratorSequence(numbers_, index, 1); 1129 } 1130 } 1131 1132 private: 1133 const int start_; 1134 const int count_; 1135 SynchronizedNumbers *numbers_; 1136 }; 1137 1138 class GeneratingEnumeratorCollection : public ThreadWeaver::Collection 1139 { 1140 public: 1141 GeneratingEnumeratorCollection(SynchronizedNumbers *numbers, int start, int count) 1142 : start_(start) 1143 , count_(count) 1144 , numbers_(numbers) 1145 { 1146 } 1147 1148 void run(JobPointer, Thread *) override 1149 { 1150 numbers_->append(start_); 1151 QList<GeneratingEnumeratorCollection *> elements; 1152 for (int index = start_ + 1; index < start_ + count_; ++index) { 1153 elements.append(new GeneratingEnumeratorCollection(numbers_, index, 1)); 1154 } 1155 std::random_device rd; 1156 std::mt19937 g(rd()); 1157 std::shuffle(elements.begin(), elements.end(), g); 1158 std::for_each(elements.begin(), elements.end(), [this](GeneratingEnumeratorCollection *collection) { 1159 *this << collection; 1160 }); 1161 } 1162 1163 private: 1164 const int start_; 1165 const int count_; 1166 SynchronizedNumbers *numbers_; 1167 }; 1168 1169 void JobTests::NestedGeneratingCollectionsTest() 1170 { 1171 using namespace ThreadWeaver; 1172 WaitForIdleAndFinished w(Queue::instance()); 1173 Q_UNUSED(w); 1174 1175 SynchronizedNumbers numbers; 1176 const int NumberOfSequences = 100; 1177 const int ElementsPerSequence = 20; 1178 Sequence sequence; 1179 for (int index = 0; index < NumberOfSequences; ++index) { 1180 sequence << new GeneratingEnumeratorCollection(&numbers, index * ElementsPerSequence, ElementsPerSequence); 1181 } 1182 stream() << sequence; 1183 Queue::instance()->finish(); 1184 numbers.sortChunks(ElementsPerSequence); 1185 QVERIFY(numbers.isSorted()); 1186 } 1187 1188 void JobTests::NestedGeneratingSequencesTest() 1189 { 1190 using namespace ThreadWeaver; 1191 WaitForIdleAndFinished w(Queue::instance()); 1192 Q_UNUSED(w); 1193 1194 SynchronizedNumbers numbers; 1195 const int NumberOfSequences = 100; 1196 const int ElementsPerSequence = 20; 1197 Sequence sequence; 1198 for (int index = 0; index < NumberOfSequences; ++index) { 1199 sequence << new GeneratingEnumeratorSequence(&numbers, index * ElementsPerSequence, ElementsPerSequence); 1200 } 1201 stream() << sequence; 1202 Queue::instance()->finish(); 1203 1204 QVERIFY(numbers.isSorted()); 1205 } 1206 1207 void JobTests::DeeperNestedGeneratingCollectionsTest() 1208 { 1209 using namespace ThreadWeaver; 1210 auto logger = new JobLoggingWeaver(); 1211 Queue queue(logger); 1212 WaitForIdleAndFinished w(&queue); 1213 Q_UNUSED(w); 1214 const int ElementsPerCollection = 20; 1215 const int NumberOfBlocks = 2; 1216 const int CollectionsPerBlock = 2; 1217 SynchronizedNumbers numbers; 1218 Sequence sequence; 1219 for (int block = 0; block < NumberOfBlocks; ++block) { 1220 auto col = new Collection(); 1221 for (int collection = 0; collection < CollectionsPerBlock; ++collection) { 1222 const int start = (block * NumberOfBlocks + collection) * ElementsPerCollection; 1223 *col << new GeneratingEnumeratorCollection(&numbers, start, ElementsPerCollection); 1224 } 1225 sequence << col; 1226 } 1227 queue.stream() << sequence; 1228 queue.finish(); 1229 numbers.sortChunks(NumberOfBlocks * ElementsPerCollection); 1230 QVERIFY(numbers.isSorted()); 1231 } 1232 1233 struct AbortableJob : public Job { 1234 int aborted = 0; 1235 int extraCode = 0; 1236 QMutex waitForStart; 1237 QMutex waitForAbort; 1238 1239 AbortableJob() 1240 { 1241 waitForStart.lock(); 1242 waitForAbort.lock(); 1243 } 1244 1245 ~AbortableJob() 1246 { 1247 waitForStart.unlock(); 1248 waitForAbort.unlock(); 1249 } 1250 1251 void run(JobPointer, Thread *) override 1252 { 1253 QCOMPARE(this->status(), Job::Status_Running); 1254 waitForStart.unlock(); 1255 waitForAbort.lock(); 1256 if (shouldAbort()) { 1257 aborted++; 1258 throw JobAborted(); 1259 } 1260 extraCode++; 1261 } 1262 1263 void requestAbort() override 1264 { 1265 Job::requestAbort(); 1266 1267 // Let it handle abortion 1268 waitForAbort.unlock(); 1269 } 1270 }; 1271 1272 void JobTests::RequestAbortCollectionTest() 1273 { 1274 AbortableJob abortable; 1275 1276 QString sequence; 1277 Collection jobCollection; 1278 jobCollection << new AppendCharacterJob(QChar('a'), &sequence) << abortable << new AppendCharacterJob(QChar('c'), &sequence); 1279 1280 WaitForIdleAndFinished w(Queue::instance()); 1281 stream() << jobCollection; 1282 1283 // We need to wait for job to have started execution 1284 abortable.waitForStart.lock(); 1285 1286 // Now tell it to abort 1287 jobCollection.requestAbort(); 1288 1289 w.finish(); 1290 1291 QVERIFY(DependencyPolicy::instance().isEmpty()); 1292 QVERIFY(sequence.length() == 2); 1293 QVERIFY(sequence.count('a') == 1); 1294 QVERIFY(sequence.count('c') == 1); 1295 QVERIFY(abortable.aborted == 1); 1296 QVERIFY(abortable.extraCode == 0); 1297 QCOMPARE(jobCollection.status(), Job::Status_Aborted); 1298 QVERIFY(abortable.waitForStart.tryLock() == false); 1299 QVERIFY(abortable.waitForAbort.tryLock() == false); 1300 } 1301 1302 void JobTests::RequestAbortSequenceTest() 1303 { 1304 AbortableJob abortable; 1305 1306 QString sequence; 1307 Sequence jobSequence; 1308 jobSequence << new AppendCharacterJob(QChar('a'), &sequence) << abortable << new AppendCharacterJob(QChar('c'), &sequence); 1309 1310 WaitForIdleAndFinished w(Queue::instance()); 1311 stream() << jobSequence; 1312 1313 // We need to wait for job to have started execution 1314 abortable.waitForStart.lock(); 1315 1316 // Now tell it to abort 1317 jobSequence.requestAbort(); 1318 1319 w.finish(); 1320 1321 QVERIFY(DependencyPolicy::instance().isEmpty()); 1322 QCOMPARE(sequence, QLatin1String("a")); 1323 QVERIFY(abortable.aborted == 1); 1324 QVERIFY(abortable.extraCode == 0); 1325 QCOMPARE(jobSequence.status(), Job::Status_Aborted); 1326 QVERIFY(abortable.waitForStart.tryLock() == false); 1327 QVERIFY(abortable.waitForAbort.tryLock() == false); 1328 } 1329 1330 void JobTests::JobOnFinishTest() 1331 { 1332 QString sequence; 1333 int finishedA = 0; 1334 int finishedB = 0; 1335 Collection jobCollection; 1336 jobCollection << new AppendCharacterJob(QChar('a'), &sequence) << new AppendCharacterJob(QChar('b'), &sequence) 1337 << new AppendCharacterJob(QChar('c'), &sequence); 1338 1339 jobCollection.onFinish([&](const JobInterface &job) { 1340 QCOMPARE(job.status(), Job::Status_Success); 1341 1342 // Result should already be correct now 1343 QVERIFY(sequence.length() == 3); 1344 QVERIFY(sequence.count('a') == 1); 1345 QVERIFY(sequence.count('b') == 1); 1346 QVERIFY(sequence.count('c') == 1); 1347 1348 // We should get same original instance 1349 QVERIFY(&job == &jobCollection); 1350 1351 finishedA++; 1352 }); 1353 1354 jobCollection.onFinish([&](const JobInterface &job) { 1355 QCOMPARE(jobCollection.status(), Job::Status_Success); 1356 1357 QVERIFY(sequence.length() == 3); 1358 // We should get same original instance 1359 QVERIFY(&job == &jobCollection); 1360 1361 finishedB++; 1362 }); 1363 1364 WaitForIdleAndFinished w(Queue::instance()); 1365 stream() << jobCollection; 1366 w.finish(); 1367 1368 QVERIFY(DependencyPolicy::instance().isEmpty()); 1369 QVERIFY(finishedA == 1); 1370 QVERIFY(finishedB == 1); 1371 } 1372 1373 void JobTests::JobOnFinishAbortTest() 1374 { 1375 QString sequence; 1376 int finishedA = 0; 1377 int finishedB = 0; 1378 Sequence jobSequence; 1379 1380 struct AbortingJob : public Job { 1381 void run(JobPointer, Thread *) override 1382 { 1383 throw JobAborted(); 1384 } 1385 }; 1386 1387 AbortingJob aborting; 1388 1389 jobSequence << new AppendCharacterJob(QChar('a'), &sequence) << aborting << new AppendCharacterJob(QChar('c'), &sequence); 1390 1391 jobSequence.onFinish([&](const JobInterface &job) { 1392 // Result should already be correct now 1393 QCOMPARE(sequence, QLatin1String("a")); 1394 1395 // We should get same original instance 1396 QVERIFY(&job == &jobSequence); 1397 1398 // We should be in aborted state 1399 QCOMPARE(job.status(), Job::Status_Aborted); 1400 1401 finishedA++; 1402 }); 1403 1404 jobSequence.onFinish([&](const JobInterface &job) { 1405 // Result should already be correct now 1406 QCOMPARE(sequence, QLatin1String("a")); 1407 1408 // We should get same original instance 1409 QVERIFY(&job == &jobSequence); 1410 1411 // We should be in aborted state 1412 QCOMPARE(jobSequence.status(), Job::Status_Aborted); 1413 1414 finishedB++; 1415 }); 1416 1417 WaitForIdleAndFinished w(Queue::instance()); 1418 stream() << jobSequence; 1419 w.finish(); 1420 1421 QVERIFY(DependencyPolicy::instance().isEmpty()); 1422 QVERIFY(finishedA == 1); 1423 QVERIFY(finishedB == 1); 1424 } 1425 1426 QTEST_MAIN(JobTests) 1427 1428 #include "moc_JobTests.cpp"