File indexing completed on 2024-05-12 05:26:24

0001 #include <QTest>
0002 #include <QSignalSpy>
0003 #include <QTimer>
0004 
0005 #include <QString>
0006 #include <QQueue>
0007 
0008 #include "store.h"
0009 #include "storage.h"
0010 #include "messagequeue.h"
0011 #include "log.h"
0012 #include "test.h"
0013 
0014 /**
0015  * Test of the messagequeue implementation.
0016  */
0017 class MessageQueueTest : public QObject
0018 {
0019     Q_OBJECT
0020 private slots:
0021     void initTestCase()
0022     {
0023         Sink::Test::initTest();
0024         Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite);
0025         store.removeFromDisk();
0026     }
0027 
0028     void cleanupTestCase()
0029     {
0030     }
0031 
0032     void cleanup()
0033     {
0034         Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite);
0035         store.removeFromDisk();
0036     }
0037 
0038     void testEmpty()
0039     {
0040         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0041         QVERIFY(queue.isEmpty());
0042         queue.enqueue("value");
0043         QVERIFY(!queue.isEmpty());
0044         queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) { callback(true); }, [](const MessageQueue::Error &error) {});
0045         QVERIFY(queue.isEmpty());
0046     }
0047 
0048     void testDequeueEmpty()
0049     {
0050         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0051         bool gotValue = false;
0052         bool gotError = false;
0053         queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { gotValue = true; }, [&](const MessageQueue::Error &error) { gotError = true; });
0054         QVERIFY(!gotValue);
0055         QVERIFY(!gotError);
0056     }
0057 
0058     void testEnqueue()
0059     {
0060         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0061         QSignalSpy spy(&queue, SIGNAL(messageReady()));
0062         queue.enqueue("value1");
0063         QCOMPARE(spy.size(), 1);
0064     }
0065 
0066     void testDrained()
0067     {
0068         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0069         QSignalSpy spy(&queue, SIGNAL(drained()));
0070         queue.enqueue("value1");
0071 
0072         queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) { callback(true); }, [](const MessageQueue::Error &error) {});
0073         QCOMPARE(spy.size(), 1);
0074     }
0075 
0076     void testSyncDequeue()
0077     {
0078         QQueue<QByteArray> values;
0079         values << "value1";
0080         values << "value2";
0081 
0082         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0083         for (const QByteArray &value : values) {
0084             queue.enqueue(value);
0085         }
0086 
0087         while (!queue.isEmpty()) {
0088             SinkLog() << "start";
0089             const auto expected = values.dequeue();
0090             bool gotValue = false;
0091             bool gotError = false;
0092             queue.dequeue(
0093                 [&](void *ptr, int size, std::function<void(bool success)> callback) {
0094                     if (QByteArray(static_cast<char *>(ptr), size) == expected) {
0095                         gotValue = true;
0096                     }
0097                     callback(true);
0098                 },
0099                 [&](const MessageQueue::Error &error) { gotError = true; });
0100             QVERIFY(gotValue);
0101             QVERIFY(!gotError);
0102         }
0103         QVERIFY(values.isEmpty());
0104     }
0105 
0106     void testAsyncDequeue()
0107     {
0108         QQueue<QByteArray> values;
0109         values << "value1";
0110         values << "value2";
0111 
0112         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0113         for (const QByteArray &value : values) {
0114             queue.enqueue(value);
0115         }
0116 
0117         while (!queue.isEmpty()) {
0118             QEventLoop eventLoop;
0119             const auto expected = values.dequeue();
0120             bool gotValue = false;
0121             bool gotError = false;
0122 
0123             queue.dequeue(
0124                 [&](void *ptr, int size, std::function<void(bool success)> callback) {
0125                     if (QByteArray(static_cast<char *>(ptr), size) == expected) {
0126                         gotValue = true;
0127                     }
0128                     auto timer = new QTimer();
0129                     timer->setSingleShot(true);
0130                     QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() {
0131                         delete timer;
0132                         callback(true);
0133                         eventLoop.exit();
0134                     });
0135                     timer->start(0);
0136                 },
0137                 [&](const MessageQueue::Error &error) { gotError = true; });
0138             eventLoop.exec();
0139             QVERIFY(gotValue);
0140             QVERIFY(!gotError);
0141         }
0142         QVERIFY(values.isEmpty());
0143     }
0144 
0145     /*
0146      * Dequeue's are async and we want to be able to enqueue new items in between.
0147      */
0148     void testNestedEnqueue()
0149     {
0150         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0151         queue.enqueue("value1");
0152 
0153         bool gotError = false;
0154         queue.dequeue(
0155             [&](void *ptr, int size, std::function<void(bool success)> callback) {
0156                 queue.enqueue("value3");
0157                 callback(true);
0158             },
0159             [&](const MessageQueue::Error &error) { gotError = true; });
0160         QVERIFY(!gotError);
0161     }
0162 
0163     void testBatchDequeue()
0164     {
0165         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0166         queue.enqueue("value1");
0167         queue.enqueue("value2");
0168         queue.enqueue("value3");
0169 
0170         int count = 0;
0171         queue.dequeueBatch(2, [&count](const QByteArray &data) {
0172                  count++;
0173                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0174                  return KAsync::null<void>();
0175              }).exec().waitForFinished();
0176         QCOMPARE(count, 2);
0177 
0178         queue.dequeueBatch(1, [&count](const QByteArray &data) {
0179                  count++;
0180                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0181                  return KAsync::null<void>();
0182              }).exec().waitForFinished();
0183         QCOMPARE(count, 3);
0184     }
0185 
0186     void testBatchDequeueDuringWriteTransaction()
0187     {
0188         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0189         queue.enqueue("value1");
0190         queue.enqueue("value2");
0191         queue.enqueue("value3");
0192 
0193         queue.startTransaction();
0194         //Inivisible to dequeues because in write transaction
0195         queue.enqueue("value4");
0196 
0197         int count = 0;
0198         queue.dequeueBatch(2, [&count](const QByteArray &data) {
0199                  count++;
0200                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0201                  return KAsync::null<void>();
0202              }).exec().waitForFinished();
0203         QCOMPARE(count, 2);
0204 
0205         queue.dequeueBatch(2, [&count](const QByteArray &data) {
0206                  count++;
0207                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0208                  return KAsync::null<void>();
0209              }).exec().waitForFinished();
0210         QCOMPARE(count, 3);
0211         QVERIFY(queue.isEmpty());
0212 
0213         //Commit value4
0214         queue.commit();
0215         QVERIFY(!queue.isEmpty());
0216         queue.dequeueBatch(2, [&count](const QByteArray &data) {
0217                  count++;
0218                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0219                  return KAsync::null<void>();
0220              }).exec().waitForFinished();
0221         QCOMPARE(count, 4);
0222     }
0223 
0224     void testBatchEnqueue()
0225     {
0226         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0227         QSignalSpy spy(&queue, SIGNAL(messageReady()));
0228         queue.startTransaction();
0229         queue.enqueue("value1");
0230         queue.enqueue("value2");
0231         queue.enqueue("value3");
0232 
0233         QVERIFY(queue.isEmpty());
0234         QCOMPARE(spy.count(), 0);
0235 
0236         queue.commit();
0237 
0238         QVERIFY(!queue.isEmpty());
0239         QCOMPARE(spy.count(), 1);
0240     }
0241 
0242     void testSortOrder()
0243     {
0244         MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue");
0245         queue.startTransaction();
0246         //Over 10 so we can make sure that 10 > 9
0247         const int num = 11;
0248         for (int i = 0; i < num; i++) {
0249             queue.enqueue("value" + QByteArray::number(i));
0250         }
0251         queue.commit();
0252 
0253         int count = 0;
0254         queue.dequeueBatch(num, [&count](const QByteArray &data) {
0255                  ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count));
0256                  count++;
0257                  return KAsync::null<void>();
0258              }).exec().waitForFinished();
0259         QCOMPARE(count, num);
0260 
0261     }
0262 };
0263 
0264 QTEST_MAIN(MessageQueueTest)
0265 #include "messagequeuetest.moc"