File indexing completed on 2024-05-12 05:26:24
0001 #include <QTest> 0002 #include <QSignalSpy> 0003 #include <QTimer> 0004 0005 #include <QString> 0006 #include <QQueue> 0007 0008 #include "store.h" 0009 #include "storage.h" 0010 #include "messagequeue.h" 0011 #include "log.h" 0012 #include "test.h" 0013 0014 /** 0015 * Test of the messagequeue implementation. 0016 */ 0017 class MessageQueueTest : public QObject 0018 { 0019 Q_OBJECT 0020 private slots: 0021 void initTestCase() 0022 { 0023 Sink::Test::initTest(); 0024 Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); 0025 store.removeFromDisk(); 0026 } 0027 0028 void cleanupTestCase() 0029 { 0030 } 0031 0032 void cleanup() 0033 { 0034 Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); 0035 store.removeFromDisk(); 0036 } 0037 0038 void testEmpty() 0039 { 0040 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0041 QVERIFY(queue.isEmpty()); 0042 queue.enqueue("value"); 0043 QVERIFY(!queue.isEmpty()); 0044 queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) { callback(true); }, [](const MessageQueue::Error &error) {}); 0045 QVERIFY(queue.isEmpty()); 0046 } 0047 0048 void testDequeueEmpty() 0049 { 0050 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0051 bool gotValue = false; 0052 bool gotError = false; 0053 queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { gotValue = true; }, [&](const MessageQueue::Error &error) { gotError = true; }); 0054 QVERIFY(!gotValue); 0055 QVERIFY(!gotError); 0056 } 0057 0058 void testEnqueue() 0059 { 0060 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0061 QSignalSpy spy(&queue, SIGNAL(messageReady())); 0062 queue.enqueue("value1"); 0063 QCOMPARE(spy.size(), 1); 0064 } 0065 0066 void testDrained() 0067 { 0068 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0069 QSignalSpy spy(&queue, SIGNAL(drained())); 0070 queue.enqueue("value1"); 0071 0072 queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) { callback(true); }, [](const MessageQueue::Error &error) {}); 0073 QCOMPARE(spy.size(), 1); 0074 } 0075 0076 void testSyncDequeue() 0077 { 0078 QQueue<QByteArray> values; 0079 values << "value1"; 0080 values << "value2"; 0081 0082 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0083 for (const QByteArray &value : values) { 0084 queue.enqueue(value); 0085 } 0086 0087 while (!queue.isEmpty()) { 0088 SinkLog() << "start"; 0089 const auto expected = values.dequeue(); 0090 bool gotValue = false; 0091 bool gotError = false; 0092 queue.dequeue( 0093 [&](void *ptr, int size, std::function<void(bool success)> callback) { 0094 if (QByteArray(static_cast<char *>(ptr), size) == expected) { 0095 gotValue = true; 0096 } 0097 callback(true); 0098 }, 0099 [&](const MessageQueue::Error &error) { gotError = true; }); 0100 QVERIFY(gotValue); 0101 QVERIFY(!gotError); 0102 } 0103 QVERIFY(values.isEmpty()); 0104 } 0105 0106 void testAsyncDequeue() 0107 { 0108 QQueue<QByteArray> values; 0109 values << "value1"; 0110 values << "value2"; 0111 0112 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0113 for (const QByteArray &value : values) { 0114 queue.enqueue(value); 0115 } 0116 0117 while (!queue.isEmpty()) { 0118 QEventLoop eventLoop; 0119 const auto expected = values.dequeue(); 0120 bool gotValue = false; 0121 bool gotError = false; 0122 0123 queue.dequeue( 0124 [&](void *ptr, int size, std::function<void(bool success)> callback) { 0125 if (QByteArray(static_cast<char *>(ptr), size) == expected) { 0126 gotValue = true; 0127 } 0128 auto timer = new QTimer(); 0129 timer->setSingleShot(true); 0130 QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() { 0131 delete timer; 0132 callback(true); 0133 eventLoop.exit(); 0134 }); 0135 timer->start(0); 0136 }, 0137 [&](const MessageQueue::Error &error) { gotError = true; }); 0138 eventLoop.exec(); 0139 QVERIFY(gotValue); 0140 QVERIFY(!gotError); 0141 } 0142 QVERIFY(values.isEmpty()); 0143 } 0144 0145 /* 0146 * Dequeue's are async and we want to be able to enqueue new items in between. 0147 */ 0148 void testNestedEnqueue() 0149 { 0150 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0151 queue.enqueue("value1"); 0152 0153 bool gotError = false; 0154 queue.dequeue( 0155 [&](void *ptr, int size, std::function<void(bool success)> callback) { 0156 queue.enqueue("value3"); 0157 callback(true); 0158 }, 0159 [&](const MessageQueue::Error &error) { gotError = true; }); 0160 QVERIFY(!gotError); 0161 } 0162 0163 void testBatchDequeue() 0164 { 0165 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0166 queue.enqueue("value1"); 0167 queue.enqueue("value2"); 0168 queue.enqueue("value3"); 0169 0170 int count = 0; 0171 queue.dequeueBatch(2, [&count](const QByteArray &data) { 0172 count++; 0173 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0174 return KAsync::null<void>(); 0175 }).exec().waitForFinished(); 0176 QCOMPARE(count, 2); 0177 0178 queue.dequeueBatch(1, [&count](const QByteArray &data) { 0179 count++; 0180 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0181 return KAsync::null<void>(); 0182 }).exec().waitForFinished(); 0183 QCOMPARE(count, 3); 0184 } 0185 0186 void testBatchDequeueDuringWriteTransaction() 0187 { 0188 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0189 queue.enqueue("value1"); 0190 queue.enqueue("value2"); 0191 queue.enqueue("value3"); 0192 0193 queue.startTransaction(); 0194 //Inivisible to dequeues because in write transaction 0195 queue.enqueue("value4"); 0196 0197 int count = 0; 0198 queue.dequeueBatch(2, [&count](const QByteArray &data) { 0199 count++; 0200 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0201 return KAsync::null<void>(); 0202 }).exec().waitForFinished(); 0203 QCOMPARE(count, 2); 0204 0205 queue.dequeueBatch(2, [&count](const QByteArray &data) { 0206 count++; 0207 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0208 return KAsync::null<void>(); 0209 }).exec().waitForFinished(); 0210 QCOMPARE(count, 3); 0211 QVERIFY(queue.isEmpty()); 0212 0213 //Commit value4 0214 queue.commit(); 0215 QVERIFY(!queue.isEmpty()); 0216 queue.dequeueBatch(2, [&count](const QByteArray &data) { 0217 count++; 0218 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0219 return KAsync::null<void>(); 0220 }).exec().waitForFinished(); 0221 QCOMPARE(count, 4); 0222 } 0223 0224 void testBatchEnqueue() 0225 { 0226 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0227 QSignalSpy spy(&queue, SIGNAL(messageReady())); 0228 queue.startTransaction(); 0229 queue.enqueue("value1"); 0230 queue.enqueue("value2"); 0231 queue.enqueue("value3"); 0232 0233 QVERIFY(queue.isEmpty()); 0234 QCOMPARE(spy.count(), 0); 0235 0236 queue.commit(); 0237 0238 QVERIFY(!queue.isEmpty()); 0239 QCOMPARE(spy.count(), 1); 0240 } 0241 0242 void testSortOrder() 0243 { 0244 MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); 0245 queue.startTransaction(); 0246 //Over 10 so we can make sure that 10 > 9 0247 const int num = 11; 0248 for (int i = 0; i < num; i++) { 0249 queue.enqueue("value" + QByteArray::number(i)); 0250 } 0251 queue.commit(); 0252 0253 int count = 0; 0254 queue.dequeueBatch(num, [&count](const QByteArray &data) { 0255 ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); 0256 count++; 0257 return KAsync::null<void>(); 0258 }).exec().waitForFinished(); 0259 QCOMPARE(count, num); 0260 0261 } 0262 }; 0263 0264 QTEST_MAIN(MessageQueueTest) 0265 #include "messagequeuetest.moc"