File indexing completed on 2024-05-19 05:26:16

0001 /*
0002  * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
0004  *
0005  * This library is free software; you can redistribute it and/or
0006  * modify it under the terms of the GNU Lesser General Public
0007  * License as published by the Free Software Foundation; either
0008  * version 2.1 of the License, or (at your option) version 3, or any
0009  * later version accepted by the membership of KDE e.V. (or its
0010  * successor approved by the membership of KDE e.V.), which shall
0011  * act as a proxy defined in Section 6 of version 3 of the license.
0012  *
0013  * This library is distributed in the hope that it will be useful,
0014  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0015  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0016  * Lesser General Public License for more details.
0017  *
0018  * You should have received a copy of the GNU Lesser General Public
0019  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0020  */
0021 
0022 #include "storage.h"
0023 
0024 #include <iostream>
0025 
0026 #include <QDebug>
0027 #include <QDir>
0028 #include <QReadWriteLock>
0029 #include <QMutex>
0030 #include <QMutexLocker>
0031 #include <QString>
0032 #include <QTime>
0033 #include <valgrind.h>
0034 #include <lmdb.h>
0035 #include "log.h"
0036 
0037 #ifdef Q_OS_WIN
0038 #include <BaseTsd.h>
0039 typedef SSIZE_T ssize_t;
0040 #endif
0041 
0042 namespace Sink {
0043 namespace Storage {
0044 
0045 static QReadWriteLock sDbisLock;
0046 static QReadWriteLock sEnvironmentsLock;
0047 static QMutex sCreateDbiLock;
0048 static QHash<QString, MDB_env *> sEnvironments;
0049 static QHash<QString, MDB_dbi> sDbis;
0050 
0051 int AllowDuplicates = MDB_DUPSORT;
0052 int IntegerKeys = MDB_INTEGERKEY;
0053 int IntegerValues = MDB_INTEGERDUP;
0054 
0055 int getErrorCode(int e)
0056 {
0057     switch (e) {
0058         case MDB_NOTFOUND:
0059             return DataStore::ErrorCodes::NotFound;
0060         default:
0061             break;
0062     }
0063     return -1;
0064 }
0065 
0066 static QList<QByteArray> getDatabaseNames(MDB_txn *transaction)
0067 {
0068     if (!transaction) {
0069         SinkWarning() << "Invalid transaction";
0070         return QList<QByteArray>();
0071     }
0072     int rc;
0073     QList<QByteArray> list;
0074     MDB_dbi dbi;
0075     if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) {
0076         MDB_val key;
0077         MDB_val data;
0078         MDB_cursor *cursor;
0079 
0080         mdb_cursor_open(transaction, dbi, &cursor);
0081         if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) {
0082             list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size);
0083             while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
0084                 list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size);
0085             }
0086         } else {
0087             //Normal if we don't have any databases yet
0088             if (rc == MDB_NOTFOUND) {
0089                 rc = 0;
0090             }
0091             if (rc) {
0092                 SinkWarning() << "Failed to get a value" << rc;
0093             }
0094         }
0095         mdb_cursor_close(cursor);
0096     } else {
0097         SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc));
0098     }
0099     return list;
0100 
0101 }
0102 
0103 /*
0104  * To create a dbi we always need a write transaction,
0105  * and we always need to commit the transaction ASAP
0106  * We can only ever enter from one point per process.
0107  */
0108 static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, int flags, MDB_dbi &dbi)
0109 {
0110     MDB_dbi flagtableDbi;
0111     if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) {
0112         if (!readOnly) {
0113             SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc));
0114         }
0115     } else {
0116         MDB_val key, value;
0117         key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
0118         key.mv_size = db.size();
0119         if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) {
0120             //We expect this to fail for new databases
0121             if (rc != MDB_NOTFOUND) {
0122                 SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc));
0123             }
0124         } else {
0125             //Found the flags
0126             const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size);
0127             flags = ba.toInt();
0128         }
0129     }
0130 
0131     if (flags & IntegerValues && !(flags & AllowDuplicates)) {
0132         SinkWarning() << "Opening a database with integer values, but not duplicate keys";
0133     }
0134 
0135     if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) {
0136         //Create the db if it is not existing already
0137         if (rc == MDB_NOTFOUND && !readOnly) {
0138             //Sanity check db name
0139             {
0140                 auto parts = db.split('.');
0141                 for (const auto &p : parts) {
0142                     auto containsSpecialCharacter = [] (const QByteArray &p) {
0143                         for (int i = 0; i < p.size(); i++) {
0144                             const auto c = p.at(i);
0145                             //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars
0146                             if (c < 0x30 || c > 0x7A) {
0147                                 return true;
0148                             }
0149                         }
0150                         return false;
0151                     };
0152                     if (p.isEmpty() || containsSpecialCharacter(p)) {
0153                         SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db;
0154                         Q_ASSERT(false);
0155                         throw std::runtime_error("Fatal error while creating db.");
0156                     }
0157                 }
0158             }
0159             if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) {
0160                 SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc));
0161                 return false;
0162             }
0163             //Record the db flags
0164             MDB_val key, value;
0165             key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
0166             key.mv_size = db.size();
0167             //Store the flags without the create option
0168             const auto ba = QByteArray::number(flags);
0169             value.mv_data = const_cast<void*>(static_cast<const void*>(ba.constData()));
0170             value.mv_size = ba.size();
0171             if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) {
0172                 //We expect this to fail if we're only creating the dbi but not the db
0173                 if (rc != MDB_KEYEXIST) {
0174                     SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc));
0175                 }
0176             }
0177         } else {
0178             //It's not an error if we only want to read
0179             if (!readOnly) {
0180                 SinkWarning() << "Failed to open db " << db << "error:" << QByteArray(mdb_strerror(rc));
0181                 return true;
0182             }
0183             return false;
0184         }
0185     }
0186     return true;
0187 }
0188 
0189 class DataStore::NamedDatabase::Private
0190 {
0191 public:
0192     Private(const QByteArray &_db, int _flags,
0193         const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler,
0194         const QString &_name, MDB_txn *_txn)
0195         : db(_db),
0196           transaction(_txn),
0197           flags(_flags),
0198           defaultErrorHandler(_defaultErrorHandler),
0199           name(_name)
0200     {
0201     }
0202 
0203     ~Private() = default;
0204 
0205     QByteArray db;
0206     MDB_txn *transaction;
0207     MDB_dbi dbi{0};
0208     int flags;
0209     std::function<void(const DataStore::Error &error)> defaultErrorHandler;
0210     QString name;
0211     bool createdNewDbi = false;
0212     QString createdNewDbiName;
0213 
0214     bool dbiValidForTransaction(MDB_dbi dbi, MDB_txn *transaction)
0215     {
0216         //sDbis can contain dbi's that are not available to this transaction.
0217         //We use mdb_dbi_flags to check if the dbi is valid for this transaction.
0218         uint f;
0219         if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) {
0220             return false;
0221         }
0222         return true;
0223     }
0224 
0225     bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler)
0226     {
0227         const auto dbiName = name + db;
0228         //Never access sDbis while anything is writing to it.
0229         QReadLocker dbiLocker{&sDbisLock};
0230         if (sDbis.contains(dbiName)) {
0231             dbi = sDbis.value(dbiName);
0232             //sDbis can potentially contain a dbi that is not valid for this transaction, if this transaction was created before the dbi was created.
0233             if (dbiValidForTransaction(dbi, transaction)) {
0234                 return true;
0235             } else {
0236                 SinkTrace() << "Found dbi that is not available for the current transaction.";
0237                 if (readOnly) {
0238                     //Recovery for read-only transactions. Abort and renew.
0239                     mdb_txn_reset(transaction);
0240                     mdb_txn_renew(transaction);
0241                     Q_ASSERT(dbiValidForTransaction(dbi, transaction));
0242                     return true;
0243                 }
0244                 //There is no recover path for non-read-only transactions.
0245             }
0246             //Nothing in the code deals well with non-existing databases.
0247             Q_ASSERT(false);
0248             return false;
0249         }
0250 
0251 
0252         /*
0253         * Dynamic creation of databases.
0254         * If all databases were defined via the database layout we wouldn't ever end up in here.
0255         * However, we rely on this codepath for indexes, synchronization databases and in race-conditions
0256         * where the database is not yet fully created when the client initializes it for reading.
0257         *
0258         * There are a few things to consider:
0259         * * dbi's (DataBase Identifier) should be opened once (ideally), and then be persisted in the environment.
0260         * * To open a dbi we need a transaction and must commit the transaction. From then on any open transaction will have access to the dbi.
0261         * * Already running transactions will not have access to the dbi.
0262         * * There *must* only ever be one active transaction opening dbi's (using mdb_dbi_open), and that transaction *must*
0263         * commit or abort before any other transaction opens a dbi.
0264         *
0265         * We solve this the following way:
0266         * * For read-only transactions we abort the transaction, open the dbi and persist it in the environment, and reopen the transaction (so the dbi is available). This may result in the db content changing unexpectedly and referenced memory becoming unavailable, but isn't a problem as long as we don't rely on memory remaining valid for the duration of the transaction (which is anyways not given since any operation would invalidate the memory region)..
0267         * * For write transactions we open the dbi for future use, and then open it as well in the current transaction.
0268         * * Write transactions that open the named database multiple times will call this codepath multiple times,
0269         * this is ok though because the same dbi will be returned by mdb_dbi_open (We could also start to do a lookup in
0270         * Transaction::Private::createdDbs first).
0271         */
0272         SinkTrace() << "Creating database dynamically: " << dbiName << readOnly;
0273         //Only one transaction may ever create dbis at a time.
0274         while (!sCreateDbiLock.tryLock(10)) {
0275             //Allow another thread that has already acquired sCreateDbiLock to continue below.
0276             //Otherwise we risk a dead-lock if another thread already acquired sCreateDbiLock, but then lost the sDbisLock while upgrading it to a
0277             //write lock below
0278             dbiLocker.unlock();
0279             dbiLocker.relock();
0280         }
0281         //Double checked locking
0282         if (sDbis.contains(dbiName)) {
0283             dbi = sDbis.value(dbiName);
0284             //sDbis can potentially contain a dbi that is not valid for this transaction, if this transaction was created before the dbi was created.
0285             sCreateDbiLock.unlock();
0286             if (dbiValidForTransaction(dbi, transaction)) {
0287                 return true;
0288             } else {
0289                 SinkTrace() << "Found dbi that is not available for the current transaction.";
0290                 if (readOnly) {
0291                     //Recovery for read-only transactions. Abort and renew.
0292                     mdb_txn_reset(transaction);
0293                     mdb_txn_renew(transaction);
0294                     Q_ASSERT(dbiValidForTransaction(dbi, transaction));
0295                     return true;
0296                 }
0297                 //There is no recover path for non-read-only transactions.
0298                 Q_ASSERT(false);
0299                 return false;
0300             }
0301         }
0302 
0303         //Ensure nobody reads sDbis either
0304         dbiLocker.unlock();
0305         //We risk loosing the lock in here. That's why we tryLock above in the while loop
0306         QWriteLocker dbiWriteLocker(&sDbisLock);
0307 
0308         //Create a transaction to open the dbi
0309         MDB_txn *dbiTransaction;
0310         if (readOnly) {
0311             MDB_env *env = mdb_txn_env(transaction);
0312             Q_ASSERT(env);
0313             mdb_txn_reset(transaction);
0314             if (const int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &dbiTransaction)) {
0315                 SinkError() << "Failed to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction;
0316                 sCreateDbiLock.unlock();
0317                 return false;
0318             }
0319         } else {
0320             dbiTransaction = transaction;
0321         }
0322         if (createDbi(dbiTransaction, db, readOnly, flags, dbi)) {
0323             if (readOnly) {
0324                 mdb_txn_commit(dbiTransaction);
0325                 Q_ASSERT(!sDbis.contains(dbiName));
0326                 sDbis.insert(dbiName, dbi);
0327                 //We reopen the read-only transaction so the dbi becomes available in it.
0328                 mdb_txn_renew(transaction);
0329             } else {
0330                 createdNewDbi = true;
0331                 createdNewDbiName = dbiName;
0332             }
0333             //Ensure the dbi is valid for the parent transaction
0334             Q_ASSERT(dbiValidForTransaction(dbi, transaction));
0335         } else {
0336             if (readOnly) {
0337                 mdb_txn_abort(dbiTransaction);
0338                 mdb_txn_renew(transaction);
0339             } else {
0340                 SinkWarning() << "Failed to create the dbi: " << dbiName;
0341             }
0342             dbi = 0;
0343             transaction = 0;
0344             sCreateDbiLock.unlock();
0345             return false;
0346         }
0347         sCreateDbiLock.unlock();
0348         return true;
0349     }
0350 };
0351 
0352 DataStore::NamedDatabase::NamedDatabase() : d(nullptr)
0353 {
0354 }
0355 
0356 DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv)
0357 {
0358 }
0359 
0360 DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr)
0361 {
0362     *this = std::move(other);
0363 }
0364 
0365 DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other)
0366 {
0367     if (&other != this) {
0368         delete d;
0369         d = other.d;
0370         other.d = nullptr;
0371     }
0372     return *this;
0373 }
0374 
0375 DataStore::NamedDatabase::~NamedDatabase()
0376 {
0377     delete d;
0378 }
0379 
0380 bool DataStore::NamedDatabase::write(const size_t key, const QByteArray &value,
0381     const std::function<void(const DataStore::Error &error)> &errorHandler)
0382 {
0383     return write(sizeTToByteArray(key), value, errorHandler);
0384 }
0385 
0386 bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const DataStore::Error &error)> &errorHandler)
0387 {
0388     if (!d || !d->transaction) {
0389         Error error("", ErrorCodes::GenericError, "Not open");
0390         if (d) {
0391             errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0392         }
0393         return false;
0394     }
0395     const void *keyPtr = sKey.data();
0396     const size_t keySize = sKey.size();
0397     const void *valuePtr = sValue.data();
0398     const size_t valueSize = sValue.size();
0399 
0400     if (!keyPtr || keySize == 0) {
0401         Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key.");
0402         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0403         return false;
0404     }
0405 
0406     int rc;
0407     MDB_val key, data;
0408     key.mv_size = keySize;
0409     key.mv_data = const_cast<void *>(keyPtr);
0410     data.mv_size = valueSize;
0411     data.mv_data = const_cast<void *>(valuePtr);
0412     rc = mdb_put(d->transaction, d->dbi, &key, &data, 0);
0413 
0414     if (rc) {
0415         Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue);
0416         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0417     }
0418 
0419     return !rc;
0420 }
0421 
0422 void DataStore::NamedDatabase::remove(
0423     const size_t key, const std::function<void(const DataStore::Error &error)> &errorHandler)
0424 {
0425     return remove(sizeTToByteArray(key), errorHandler);
0426 }
0427 
0428 void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function<void(const DataStore::Error &error)> &errorHandler)
0429 {
0430     remove(k, QByteArray(), errorHandler);
0431 }
0432 
0433 void DataStore::NamedDatabase::remove(const size_t key, const QByteArray &value,
0434     const std::function<void(const DataStore::Error &error)> &errorHandler)
0435 {
0436     return remove(sizeTToByteArray(key), value, errorHandler);
0437 }
0438 
0439 void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler)
0440 {
0441     if (!d || !d->transaction) {
0442         if (d) {
0443             Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open");
0444             errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0445         }
0446         return;
0447     }
0448 
0449     int rc;
0450     MDB_val key;
0451     key.mv_size = k.size();
0452     key.mv_data = const_cast<void *>(static_cast<const void *>(k.data()));
0453     if (value.isEmpty()) {
0454         rc = mdb_del(d->transaction, d->dbi, &key, 0);
0455     } else {
0456         MDB_val data;
0457         data.mv_size = value.size();
0458         data.mv_data = const_cast<void *>(static_cast<const void *>(value.data()));
0459         rc = mdb_del(d->transaction, d->dbi, &key, &data);
0460     }
0461 
0462     if (rc) {
0463         auto errorCode = ErrorCodes::GenericError;
0464         if (rc == MDB_NOTFOUND) {
0465             errorCode = ErrorCodes::NotFound;
0466         }
0467         Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1());
0468         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0469     }
0470 }
0471 
0472 int DataStore::NamedDatabase::scan(const size_t key,
0473     const std::function<bool(size_t key, const QByteArray &value)> &resultHandler,
0474     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0475 {
0476     return scan(sizeTToByteArray(key),
0477         [&resultHandler](const QByteArray &key, const QByteArray &value) {
0478             return resultHandler(byteArrayToSizeT(key), value);
0479         },
0480         errorHandler, /* findSubstringKeys = */ false);
0481 }
0482 
0483 int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler,
0484     const std::function<void(const DataStore::Error &error)> &errorHandler, bool findSubstringKeys) const
0485 {
0486     if (!d || !d->transaction) {
0487         // Not an error. We rely on this to read nothing from non-existing databases.
0488         return 0;
0489     }
0490 
0491     int rc;
0492     MDB_val key;
0493     MDB_val data;
0494     MDB_cursor *cursor;
0495 
0496     key.mv_data = (void *)k.constData();
0497     key.mv_size = k.size();
0498 
0499     rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
0500     if (rc) {
0501         //Invalid arguments can mean that the transaction doesn't contain the db dbi
0502         Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k);
0503         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0504         return 0;
0505     }
0506 
0507     int numberOfRetrievedValues = 0;
0508 
0509     const bool allowDuplicates = d->flags & AllowDuplicates;
0510     const bool emptyKey = k.isEmpty();
0511 
0512     if (emptyKey || allowDuplicates || findSubstringKeys) {
0513         const MDB_cursor_op op = [&] {
0514             if (emptyKey) {
0515                 return MDB_FIRST;
0516             }
0517             if (findSubstringKeys) {
0518                 return MDB_SET_RANGE;
0519             }
0520             return MDB_SET;
0521         }();
0522         const MDB_cursor_op nextOp = (allowDuplicates && !findSubstringKeys && !emptyKey) ? MDB_NEXT_DUP : MDB_NEXT;
0523 
0524         if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) {
0525             const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size);
0526             // The first lookup will find a key that is equal or greather than our key
0527             if (current.startsWith(k)) {
0528                 numberOfRetrievedValues++;
0529                 if (resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) {
0530                     if (findSubstringKeys) {
0531                         // Reset the key to what we search for
0532                         key.mv_data = (void *)k.constData();
0533                         key.mv_size = k.size();
0534                     }
0535                     while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) {
0536                         const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size);
0537                         // Every consequitive lookup simply iterates through the list
0538                         if (current.startsWith(k)) {
0539                             numberOfRetrievedValues++;
0540                             if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) {
0541                                 break;
0542                             }
0543                         }
0544                     }
0545                 }
0546             }
0547         }
0548 
0549         // We never find the last value
0550         if (rc == MDB_NOTFOUND) {
0551             rc = 0;
0552         }
0553     } else {
0554         if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) {
0555             numberOfRetrievedValues++;
0556             resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size));
0557         }
0558     }
0559 
0560     mdb_cursor_close(cursor);
0561 
0562     if (rc) {
0563         Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc)));
0564         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0565     }
0566 
0567     return numberOfRetrievedValues;
0568 }
0569 
0570 
0571 void DataStore::NamedDatabase::findLatest(size_t key,
0572     const std::function<void(size_t key, const QByteArray &value)> &resultHandler,
0573     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0574 {
0575     return findLatest(sizeTToByteArray(key),
0576         [&resultHandler](const QByteArray &key, const QByteArray &value) {
0577             resultHandler(byteArrayToSizeT(value), value);
0578         },
0579         errorHandler);
0580 }
0581 
0582 void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
0583     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0584 {
0585     if (!d || !d->transaction) {
0586         // Not an error. We rely on this to read nothing from non-existing databases.
0587         return;
0588     }
0589     if (k.isEmpty()) {
0590         Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key."));
0591         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0592         return;
0593     }
0594 
0595     int rc;
0596     MDB_val key;
0597     MDB_val data;
0598     MDB_cursor *cursor;
0599 
0600     key.mv_data = (void *)k.constData();
0601     key.mv_size = k.size();
0602 
0603     rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
0604     if (rc) {
0605         Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)));
0606         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0607         return;
0608     }
0609 
0610     bool foundValue = false;
0611     MDB_cursor_op op = MDB_SET_RANGE;
0612     if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) {
0613         // The first lookup will find a key that is equal or greather than our key
0614         if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) {
0615             //Read next value until we no longer match
0616             while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) {
0617                 MDB_cursor_op nextOp = MDB_NEXT;
0618                 rc = mdb_cursor_get(cursor, &key, &data, nextOp);
0619                 if (rc) {
0620                     break;
0621                 }
0622             }
0623             //Now read the previous value, and that's the latest one
0624             MDB_cursor_op prefOp = MDB_PREV;
0625             // We read past the end above, just take the last value
0626             if (rc == MDB_NOTFOUND) {
0627                 prefOp = MDB_LAST;
0628             }
0629             rc = mdb_cursor_get(cursor, &key, &data, prefOp);
0630             if (!rc) {
0631                 foundValue = true;
0632                 resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size));
0633             }
0634         }
0635     }
0636 
0637     // We never find the last value
0638     if (rc == MDB_NOTFOUND) {
0639         rc = 0;
0640     }
0641 
0642     mdb_cursor_close(cursor);
0643 
0644     if (rc) {
0645         Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc)));
0646         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0647     } else if (!foundValue) {
0648         Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found");
0649         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0650     }
0651 
0652     return;
0653 }
0654 
0655 void DataStore::NamedDatabase::findLast(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
0656     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0657 {
0658     if (!d || !d->transaction) {
0659         // Not an error. We rely on this to read nothing from non-existing databases.
0660         return;
0661     }
0662     if (k.isEmpty()) {
0663         Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key."));
0664         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0665         return;
0666     }
0667 
0668     int rc;
0669     MDB_val key;
0670     MDB_val data;
0671     MDB_cursor *cursor;
0672 
0673     key.mv_data = (void *)k.constData();
0674     key.mv_size = k.size();
0675 
0676     rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
0677     if (rc) {
0678         Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)));
0679         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0680         return;
0681     }
0682 
0683     bool foundValue = false;
0684     if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) {
0685         if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_LAST_DUP)) == 0) {
0686             foundValue = true;
0687             resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size));
0688         }
0689     }
0690 
0691     mdb_cursor_close(cursor);
0692 
0693     if (rc) {
0694         Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc)));
0695         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0696     } else if (!foundValue) {
0697         Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found");
0698         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0699     }
0700 
0701     return;
0702 }
0703 
0704 int DataStore::NamedDatabase::findAllInRange(const size_t lowerBound, const size_t upperBound,
0705     const std::function<void(size_t key, const QByteArray &value)> &resultHandler,
0706     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0707 {
0708     return findAllInRange(sizeTToByteArray(lowerBound), sizeTToByteArray(upperBound),
0709         [&resultHandler](const QByteArray &key, const QByteArray &value) {
0710             resultHandler(byteArrayToSizeT(value), value);
0711         },
0712         errorHandler);
0713 }
0714 
0715 int DataStore::NamedDatabase::findAllInRange(const QByteArray &lowerBound, const QByteArray &upperBound,
0716     const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
0717     const std::function<void(const DataStore::Error &error)> &errorHandler) const
0718 {
0719     if (!d || !d->transaction) {
0720         // Not an error. We rely on this to read nothing from non-existing databases.
0721         return 0;
0722     }
0723 
0724     MDB_cursor *cursor;
0725     if (int rc = mdb_cursor_open(d->transaction, d->dbi, &cursor)) {
0726         // Invalid arguments can mean that the transaction doesn't contain the db dbi
0727         Error error(d->name.toLatin1() + d->db, getErrorCode(rc),
0728             QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) +
0729                 ". Lower bound: " + lowerBound + " Upper bound: " + upperBound);
0730         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0731         return 0;
0732     }
0733 
0734     MDB_val firstKey = {(size_t)lowerBound.size(), (void *)lowerBound.constData()};
0735     MDB_val idealLastKey = {(size_t)upperBound.size(), (void *)upperBound.constData()};
0736     MDB_val currentKey;
0737     MDB_val data;
0738 
0739     // Find the first key in the range
0740     int rc = mdb_cursor_get(cursor, &firstKey, &data, MDB_SET_RANGE);
0741 
0742     if (rc != MDB_SUCCESS) {
0743         // Nothing is greater or equal than the lower bound, meaning no result
0744         mdb_cursor_close(cursor);
0745         return 0;
0746     }
0747 
0748     currentKey = firstKey;
0749 
0750     // If already bigger than the upper bound
0751     if (mdb_cmp(d->transaction, d->dbi, &currentKey, &idealLastKey) > 0) {
0752         mdb_cursor_close(cursor);
0753         return 0;
0754     }
0755 
0756     int count = 0;
0757     do {
0758         const auto currentBAKey = QByteArray::fromRawData((char *)currentKey.mv_data, currentKey.mv_size);
0759         const auto currentBAValue = QByteArray::fromRawData((char *)data.mv_data, data.mv_size);
0760         resultHandler(currentBAKey, currentBAValue);
0761         count++;
0762     } while (mdb_cursor_get(cursor, &currentKey, &data, MDB_NEXT) == MDB_SUCCESS &&
0763              mdb_cmp(d->transaction, d->dbi, &currentKey, &idealLastKey) <= 0);
0764 
0765     mdb_cursor_close(cursor);
0766     return count;
0767 }
0768 
0769 qint64 DataStore::NamedDatabase::getSize()
0770 {
0771     if (!d || !d->transaction) {
0772         return -1;
0773     }
0774 
0775     int rc;
0776     MDB_stat stat;
0777     rc = mdb_stat(d->transaction, d->dbi, &stat);
0778     if (rc) {
0779         SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc));
0780     }
0781     return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages);
0782 }
0783 
0784 DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat()
0785 {
0786     if (!d || !d->transaction) {
0787         return {};
0788     }
0789 
0790     int rc;
0791     MDB_stat stat;
0792     rc = mdb_stat(d->transaction, d->dbi, &stat);
0793     if (rc) {
0794         SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc));
0795         return {};
0796     }
0797     return {stat.ms_branch_pages,
0798             stat.ms_leaf_pages,
0799             stat.ms_overflow_pages,
0800             stat.ms_entries};
0801     // std::cout << "page size: " << stat.ms_psize << std::endl;
0802     // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl;
0803     // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl;
0804     // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl;
0805     // std::cout << "depth: " << stat.ms_depth << std::endl;
0806     // std::cout << "entries: " << stat.ms_entries << std::endl;
0807 }
0808 
0809 bool DataStore::NamedDatabase::allowsDuplicates() const
0810 {
0811     unsigned int flags;
0812     mdb_dbi_flags(d->transaction, d->dbi, &flags);
0813     return flags & MDB_DUPSORT;
0814 }
0815 
0816 
0817 class DataStore::Transaction::Private
0818 {
0819 public:
0820     Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env)
0821         : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false)
0822     {
0823     }
0824     ~Private()
0825     {
0826     }
0827 
0828     MDB_env *env;
0829     MDB_txn *transaction;
0830     bool requestedRead;
0831     std::function<void(const DataStore::Error &error)> defaultErrorHandler;
0832     QString name;
0833     bool implicitCommit;
0834     bool error;
0835     QMap<QString, MDB_dbi> createdDbs;
0836 
0837     bool startTransaction()
0838     {
0839         Q_ASSERT(!transaction);
0840         Q_ASSERT(sEnvironments.values().contains(env));
0841         Q_ASSERT(env);
0842         // auto f = [](const char *msg, void *ctx) -> int {
0843         //     qDebug() << msg;
0844         //     return 0;
0845         // };
0846         // mdb_reader_list(env, f, nullptr);
0847         // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead;
0848         const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction);
0849         // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction;
0850         if (rc) {
0851             unsigned int flags;
0852             mdb_env_get_flags(env, &flags);
0853             if (flags & MDB_RDONLY && !requestedRead) {
0854                 SinkError() << "Tried to open a write transation in a read-only enironment";
0855             }
0856             defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc))));
0857             return false;
0858         }
0859         return true;
0860     }
0861 };
0862 
0863 DataStore::Transaction::Transaction() : d(nullptr)
0864 {
0865 }
0866 
0867 DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv)
0868 {
0869     if (!d->startTransaction()) {
0870         delete d;
0871         d = nullptr;
0872     }
0873 }
0874 
0875 DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr)
0876 {
0877     *this = std::move(other);
0878 }
0879 
0880 DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other)
0881 {
0882     if (&other != this) {
0883         abort();
0884         delete d;
0885         d = other.d;
0886         other.d = nullptr;
0887     }
0888     return *this;
0889 }
0890 
0891 DataStore::Transaction::~Transaction()
0892 {
0893     if (d && d->transaction) {
0894         if (d->implicitCommit && !d->error) {
0895             commit();
0896         } else {
0897             // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
0898             abort();
0899         }
0900     }
0901     delete d;
0902 }
0903 
0904 DataStore::Transaction::operator bool() const
0905 {
0906     return (d && d->transaction);
0907 }
0908 
0909 bool DataStore::Transaction::commit(const std::function<void(const DataStore::Error &error)> &errorHandler)
0910 {
0911     if (!d || !d->transaction) {
0912         return false;
0913     }
0914 
0915     // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction;
0916     Q_ASSERT(sEnvironments.values().contains(d->env));
0917     const int rc = mdb_txn_commit(d->transaction);
0918     if (rc) {
0919         abort();
0920         Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc)));
0921         errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
0922         //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application.
0923         throw std::runtime_error("Fatal error while committing transaction.");
0924     }
0925 
0926     //Add the created dbis to the shared environment
0927     if (!d->createdDbs.isEmpty()) {
0928         sDbisLock.lockForWrite();
0929         for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) {
0930             //This means we opened the dbi again in a read-only transaction while the write transaction was ongoing.
0931             Q_ASSERT(!sDbis.contains(it.key()));
0932             if (!sDbis.contains(it.key())) {
0933                 sDbis.insert(it.key(), it.value());
0934             }
0935         }
0936         d->createdDbs.clear();
0937         sDbisLock.unlock();
0938     }
0939 
0940     d->transaction = nullptr;
0941     return !rc;
0942 }
0943 
0944 void DataStore::Transaction::abort()
0945 {
0946     if (!d || !d->transaction) {
0947         return;
0948     }
0949 
0950     // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
0951     Q_ASSERT(sEnvironments.values().contains(d->env));
0952     mdb_txn_abort(d->transaction);
0953     d->createdDbs.clear();
0954     d->transaction = nullptr;
0955 }
0956 
0957 DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db,
0958     const std::function<void(const DataStore::Error &error)> &errorHandler, int flags) const
0959 {
0960     if (!d) {
0961         SinkError() << "Tried to open database on invalid transaction: " << db;
0962         return DataStore::NamedDatabase();
0963     }
0964     Q_ASSERT(d->transaction);
0965     // We don't now if anything changed
0966     d->implicitCommit = true;
0967     auto p = new DataStore::NamedDatabase::Private(
0968         db, flags, d->defaultErrorHandler, d->name, d->transaction);
0969     auto ret = p->openDatabase(d->requestedRead, errorHandler);
0970     if (!ret) {
0971         delete p;
0972         return DataStore::NamedDatabase();
0973     }
0974 
0975     if (p->createdNewDbi) {
0976         d->createdDbs.insert(p->createdNewDbiName, p->dbi);
0977     }
0978 
0979     auto database = DataStore::NamedDatabase(p);
0980     return database;
0981 }
0982 
0983 QList<QByteArray> DataStore::Transaction::getDatabaseNames() const
0984 {
0985     if (!d) {
0986         SinkWarning() << "Invalid transaction";
0987         return QList<QByteArray>();
0988     }
0989     return Sink::Storage::getDatabaseNames(d->transaction);
0990 
0991 }
0992 
0993 
0994 DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails)
0995 {
0996     const int freeDbi = 0;
0997     const int mainDbi = 1;
0998 
0999     MDB_envinfo mei;
1000     mdb_env_info(d->env, &mei);
1001 
1002     MDB_stat mst;
1003     mdb_stat(d->transaction, freeDbi, &mst);
1004     auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages,
1005             mst.ms_leaf_pages,
1006             mst.ms_overflow_pages,
1007             mst.ms_entries};
1008 
1009     mdb_stat(d->transaction, mainDbi, &mst);
1010     auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages,
1011             mst.ms_leaf_pages,
1012             mst.ms_overflow_pages,
1013             mst.ms_entries};
1014 
1015     MDB_cursor *cursor;
1016     MDB_val key, data;
1017     size_t freePages = 0, *iptr;
1018 
1019     int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor);
1020     if (rc) {
1021         fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc));
1022         return {};
1023     }
1024 
1025     while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
1026         iptr = static_cast<size_t*>(data.mv_data);
1027         freePages += *iptr;
1028         bool bad = false;
1029         size_t pg, prev;
1030         ssize_t i, j, span = 0;
1031         j = *iptr++;
1032         for (i = j, prev = 1; --i >= 0; ) {
1033             pg = iptr[i];
1034             if (pg <= prev) {
1035                 bad = true;
1036             }
1037             prev = pg;
1038             pg += span;
1039             for (; i >= span && iptr[i-span] == pg; span++, pg++) ;
1040         }
1041         if (printDetails) {
1042             std::cout << "    Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl;
1043             for (--j; j >= 0; ) {
1044                 pg = iptr[j];
1045                 for (span=1; --j >= 0 && iptr[j] == pg+span; span++);
1046                 if (span > 1) {
1047                     std::cout << "     " << pg << "[" << span << "]\n";
1048                 } else {
1049                     std::cout << "     " << pg << std::endl;
1050                 }
1051             }
1052         }
1053     }
1054     mdb_cursor_close(cursor);
1055     return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat};
1056 }
1057 
1058 static size_t mapsize()
1059 {
1060     if (RUNNING_ON_VALGRIND) {
1061         // In order to run valgrind this size must be smaller than half your available RAM
1062         // https://github.com/BVLC/caffe/issues/2404
1063         return (size_t)1048576 * (size_t)1000; // 1MB * 1000
1064     }
1065 #ifdef Q_OS_WIN
1066     //Windows home 10 has a virtual address space limit of 128GB(https://msdn.microsoft.com/en-us/library/windows/desktop/aa366778(v=vs.85).aspx#physical_memory_limits_windows_10). I seems like the 128GB need to accomodate all databases we open in the process.
1067     return (size_t)1048576 * (size_t)200; // 1MB * 200
1068 #else
1069     //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit.
1070     return (size_t)1048576 * (size_t)100000; // 1MB * 100'000
1071 #endif
1072 }
1073 
1074 class DataStore::Private
1075 {
1076 public:
1077     Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {});
1078     ~Private();
1079 
1080     QString storageRoot;
1081     QString name;
1082 
1083     MDB_env *env = nullptr;
1084     AccessMode mode;
1085     Sink::Log::Context logCtx;
1086 
1087     void initEnvironment(const QString &fullPath, const DbLayout &layout)
1088     {
1089         // Ensure the environment is only created once, and that we only have one environment per process
1090         QReadLocker locker(&sEnvironmentsLock);
1091         if (!(env = sEnvironments.value(fullPath))) {
1092             locker.unlock();
1093             QWriteLocker envLocker(&sEnvironmentsLock);
1094             QWriteLocker dbiLocker(&sDbisLock);
1095             if (!(env = sEnvironments.value(fullPath))) {
1096                 int rc = 0;
1097                 if ((rc = mdb_env_create(&env))) {
1098                     SinkErrorCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc);
1099                     env = nullptr;
1100                     throw std::runtime_error("Fatal error while creating db.");
1101                 } else {
1102                     //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table.
1103                     mdb_env_set_maxdbs(env, 50);
1104                     if (const int rc = mdb_env_set_mapsize(env, mapsize())) {
1105                         SinkErrorCtx(logCtx) << "mdb_env_set_mapsize: " << rc << ":" << mdb_strerror(rc);
1106                         Q_ASSERT(false);
1107                         throw std::runtime_error("Fatal error while creating db.");
1108                     }
1109                     const bool readOnly = (mode == ReadOnly);
1110                     unsigned int flags = MDB_NOTLS;
1111                     if (readOnly) {
1112                         flags |= MDB_RDONLY;
1113                     }
1114                     if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) {
1115                         if (readOnly) {
1116                             SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath;
1117                         } else {
1118                             SinkErrorCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc);
1119                             Q_ASSERT(false);
1120                             throw std::runtime_error("Fatal error while creating db.");
1121                         }
1122                         mdb_env_close(env);
1123                         env = 0;
1124                     } else {
1125                         Q_ASSERT(env);
1126                         sEnvironments.insert(fullPath, env);
1127                         //Open all available dbi's
1128                         MDB_txn *transaction;
1129                         if (const int rc = mdb_txn_begin(env, nullptr, readOnly ? MDB_RDONLY : 0, &transaction)) {
1130                             SinkWarning() << "Failed to to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction;
1131                             return;
1132                         }
1133                         if (!layout.tables.isEmpty()) {
1134 
1135                             //TODO upgrade db if the layout has changed:
1136                             //* read existing layout
1137                             //* if layout is not the same create new layout
1138 
1139                             //Create dbis from the given layout.
1140                             for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) {
1141                                 const int flags = it.value();
1142                                 MDB_dbi dbi = 0;
1143                                 const auto &db = it.key();
1144                                 const auto dbiName = name + db;
1145                                 if (createDbi(transaction, db, readOnly, flags, dbi)) {
1146                                     sDbis.insert(dbiName, dbi);
1147                                 }
1148                             }
1149                         } else {
1150                             //Open all available databases
1151                             for (const auto &db : getDatabaseNames(transaction)) {
1152                                 MDB_dbi dbi = 0;
1153                                 const auto dbiName = name + db;
1154                                 //We're going to load the flags anyways.
1155                                 const int flags = 0;
1156                                 if (createDbi(transaction, db, readOnly, flags, dbi)) {
1157                                     sDbis.insert(dbiName, dbi);
1158                                 }
1159                             }
1160                         }
1161                         //To persist the dbis (this is also necessary for read-only transactions)
1162                         mdb_txn_commit(transaction);
1163                     }
1164                 }
1165             }
1166         }
1167     }
1168 
1169 };
1170 
1171 DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1())
1172 {
1173 
1174     const QString fullPath(storageRoot + '/' + name);
1175     QFileInfo dirInfo(fullPath);
1176     if (!dirInfo.exists() && mode == ReadWrite) {
1177         QDir().mkpath(fullPath);
1178         dirInfo.refresh();
1179     }
1180     if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) {
1181         qCritical() << fullPath << "does not have write permissions. Aborting";
1182     } else if (dirInfo.exists()) {
1183         initEnvironment(fullPath, layout);
1184     }
1185 }
1186 
1187 DataStore::Private::~Private()
1188 {
1189     //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs)
1190     //and create storage instance from all over the place. Thus, we're not closing it here on purpose.
1191 }
1192 
1193 DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode))
1194 {
1195 }
1196 
1197 DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout))
1198 {
1199 }
1200 
1201 DataStore::~DataStore()
1202 {
1203     delete d;
1204 }
1205 
1206 bool DataStore::exists(const QString &storageRoot, const QString &name)
1207 {
1208     return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists();
1209 }
1210 
1211 bool DataStore::exists() const
1212 {
1213     return (d->env != 0) && DataStore::exists(d->storageRoot, d->name);
1214 }
1215 
1216 DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function<void(const DataStore::Error &error)> &errorHandlerArg)
1217 {
1218     auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler();
1219     if (!d->env) {
1220         errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment"));
1221         return Transaction();
1222     }
1223 
1224     bool requestedRead = type == ReadOnly;
1225 
1226     if (d->mode == ReadOnly && !requestedRead) {
1227         errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode."));
1228         return Transaction();
1229     }
1230     QReadLocker locker(&sEnvironmentsLock);
1231     if (!sEnvironments.values().contains(d->env)) {
1232         return {};
1233     }
1234     return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env));
1235 }
1236 
1237 qint64 DataStore::diskUsage() const
1238 {
1239     QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb");
1240     if (!info.exists()) {
1241         SinkWarning() << "Tried to get filesize for non-existant file: " << info.path();
1242     }
1243     return info.size();
1244 }
1245 
1246 void DataStore::removeFromDisk() const
1247 {
1248     const QString fullPath(d->storageRoot + '/' + d->name);
1249     QWriteLocker dbiLocker(&sDbisLock);
1250     QWriteLocker envLocker(&sEnvironmentsLock);
1251     SinkTrace() << "Removing database from disk: " << fullPath;
1252     auto env = sEnvironments.take(fullPath);
1253     for (const auto &key : sDbis.keys()) {
1254         if (key.startsWith(d->name)) {
1255             sDbis.remove(key);
1256         }
1257     }
1258     mdb_env_close(env);
1259     QDir dir(fullPath);
1260     if (!dir.removeRecursively()) {
1261         Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1());
1262         defaultErrorHandler()(error);
1263     }
1264 }
1265 
1266 void DataStore::clearEnv()
1267 {
1268     SinkTrace() << "Clearing environment";
1269     QWriteLocker locker(&sEnvironmentsLock);
1270     QWriteLocker dbiLocker(&sDbisLock);
1271     for (const auto &envName : sEnvironments.keys()) {
1272         auto env = sEnvironments.value(envName);
1273         mdb_env_sync(env, true);
1274         for (const auto &k : sDbis.keys()) {
1275             if (k.startsWith(envName)) {
1276                 auto dbi = sDbis.value(k);
1277                 mdb_dbi_close(env, dbi);
1278             }
1279         }
1280         mdb_env_close(env);
1281     }
1282     sDbis.clear();
1283     sEnvironments.clear();
1284 }
1285 
1286 }
1287 } // namespace Sink