File indexing completed on 2024-05-19 05:26:16
0001 /* 0002 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 0004 * 0005 * This library is free software; you can redistribute it and/or 0006 * modify it under the terms of the GNU Lesser General Public 0007 * License as published by the Free Software Foundation; either 0008 * version 2.1 of the License, or (at your option) version 3, or any 0009 * later version accepted by the membership of KDE e.V. (or its 0010 * successor approved by the membership of KDE e.V.), which shall 0011 * act as a proxy defined in Section 6 of version 3 of the license. 0012 * 0013 * This library is distributed in the hope that it will be useful, 0014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0016 * Lesser General Public License for more details. 0017 * 0018 * You should have received a copy of the GNU Lesser General Public 0019 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0020 */ 0021 0022 #include "storage.h" 0023 0024 #include <iostream> 0025 0026 #include <QDebug> 0027 #include <QDir> 0028 #include <QReadWriteLock> 0029 #include <QMutex> 0030 #include <QMutexLocker> 0031 #include <QString> 0032 #include <QTime> 0033 #include <valgrind.h> 0034 #include <lmdb.h> 0035 #include "log.h" 0036 0037 #ifdef Q_OS_WIN 0038 #include <BaseTsd.h> 0039 typedef SSIZE_T ssize_t; 0040 #endif 0041 0042 namespace Sink { 0043 namespace Storage { 0044 0045 static QReadWriteLock sDbisLock; 0046 static QReadWriteLock sEnvironmentsLock; 0047 static QMutex sCreateDbiLock; 0048 static QHash<QString, MDB_env *> sEnvironments; 0049 static QHash<QString, MDB_dbi> sDbis; 0050 0051 int AllowDuplicates = MDB_DUPSORT; 0052 int IntegerKeys = MDB_INTEGERKEY; 0053 int IntegerValues = MDB_INTEGERDUP; 0054 0055 int getErrorCode(int e) 0056 { 0057 switch (e) { 0058 case MDB_NOTFOUND: 0059 return DataStore::ErrorCodes::NotFound; 0060 default: 0061 break; 0062 } 0063 return -1; 0064 } 0065 0066 static QList<QByteArray> getDatabaseNames(MDB_txn *transaction) 0067 { 0068 if (!transaction) { 0069 SinkWarning() << "Invalid transaction"; 0070 return QList<QByteArray>(); 0071 } 0072 int rc; 0073 QList<QByteArray> list; 0074 MDB_dbi dbi; 0075 if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) { 0076 MDB_val key; 0077 MDB_val data; 0078 MDB_cursor *cursor; 0079 0080 mdb_cursor_open(transaction, dbi, &cursor); 0081 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { 0082 list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); 0083 while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { 0084 list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); 0085 } 0086 } else { 0087 //Normal if we don't have any databases yet 0088 if (rc == MDB_NOTFOUND) { 0089 rc = 0; 0090 } 0091 if (rc) { 0092 SinkWarning() << "Failed to get a value" << rc; 0093 } 0094 } 0095 mdb_cursor_close(cursor); 0096 } else { 0097 SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); 0098 } 0099 return list; 0100 0101 } 0102 0103 /* 0104 * To create a dbi we always need a write transaction, 0105 * and we always need to commit the transaction ASAP 0106 * We can only ever enter from one point per process. 0107 */ 0108 static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, int flags, MDB_dbi &dbi) 0109 { 0110 MDB_dbi flagtableDbi; 0111 if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { 0112 if (!readOnly) { 0113 SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); 0114 } 0115 } else { 0116 MDB_val key, value; 0117 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData())); 0118 key.mv_size = db.size(); 0119 if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { 0120 //We expect this to fail for new databases 0121 if (rc != MDB_NOTFOUND) { 0122 SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); 0123 } 0124 } else { 0125 //Found the flags 0126 const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); 0127 flags = ba.toInt(); 0128 } 0129 } 0130 0131 if (flags & IntegerValues && !(flags & AllowDuplicates)) { 0132 SinkWarning() << "Opening a database with integer values, but not duplicate keys"; 0133 } 0134 0135 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { 0136 //Create the db if it is not existing already 0137 if (rc == MDB_NOTFOUND && !readOnly) { 0138 //Sanity check db name 0139 { 0140 auto parts = db.split('.'); 0141 for (const auto &p : parts) { 0142 auto containsSpecialCharacter = [] (const QByteArray &p) { 0143 for (int i = 0; i < p.size(); i++) { 0144 const auto c = p.at(i); 0145 //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars 0146 if (c < 0x30 || c > 0x7A) { 0147 return true; 0148 } 0149 } 0150 return false; 0151 }; 0152 if (p.isEmpty() || containsSpecialCharacter(p)) { 0153 SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; 0154 Q_ASSERT(false); 0155 throw std::runtime_error("Fatal error while creating db."); 0156 } 0157 } 0158 } 0159 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { 0160 SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); 0161 return false; 0162 } 0163 //Record the db flags 0164 MDB_val key, value; 0165 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData())); 0166 key.mv_size = db.size(); 0167 //Store the flags without the create option 0168 const auto ba = QByteArray::number(flags); 0169 value.mv_data = const_cast<void*>(static_cast<const void*>(ba.constData())); 0170 value.mv_size = ba.size(); 0171 if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { 0172 //We expect this to fail if we're only creating the dbi but not the db 0173 if (rc != MDB_KEYEXIST) { 0174 SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); 0175 } 0176 } 0177 } else { 0178 //It's not an error if we only want to read 0179 if (!readOnly) { 0180 SinkWarning() << "Failed to open db " << db << "error:" << QByteArray(mdb_strerror(rc)); 0181 return true; 0182 } 0183 return false; 0184 } 0185 } 0186 return true; 0187 } 0188 0189 class DataStore::NamedDatabase::Private 0190 { 0191 public: 0192 Private(const QByteArray &_db, int _flags, 0193 const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, 0194 const QString &_name, MDB_txn *_txn) 0195 : db(_db), 0196 transaction(_txn), 0197 flags(_flags), 0198 defaultErrorHandler(_defaultErrorHandler), 0199 name(_name) 0200 { 0201 } 0202 0203 ~Private() = default; 0204 0205 QByteArray db; 0206 MDB_txn *transaction; 0207 MDB_dbi dbi{0}; 0208 int flags; 0209 std::function<void(const DataStore::Error &error)> defaultErrorHandler; 0210 QString name; 0211 bool createdNewDbi = false; 0212 QString createdNewDbiName; 0213 0214 bool dbiValidForTransaction(MDB_dbi dbi, MDB_txn *transaction) 0215 { 0216 //sDbis can contain dbi's that are not available to this transaction. 0217 //We use mdb_dbi_flags to check if the dbi is valid for this transaction. 0218 uint f; 0219 if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { 0220 return false; 0221 } 0222 return true; 0223 } 0224 0225 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler) 0226 { 0227 const auto dbiName = name + db; 0228 //Never access sDbis while anything is writing to it. 0229 QReadLocker dbiLocker{&sDbisLock}; 0230 if (sDbis.contains(dbiName)) { 0231 dbi = sDbis.value(dbiName); 0232 //sDbis can potentially contain a dbi that is not valid for this transaction, if this transaction was created before the dbi was created. 0233 if (dbiValidForTransaction(dbi, transaction)) { 0234 return true; 0235 } else { 0236 SinkTrace() << "Found dbi that is not available for the current transaction."; 0237 if (readOnly) { 0238 //Recovery for read-only transactions. Abort and renew. 0239 mdb_txn_reset(transaction); 0240 mdb_txn_renew(transaction); 0241 Q_ASSERT(dbiValidForTransaction(dbi, transaction)); 0242 return true; 0243 } 0244 //There is no recover path for non-read-only transactions. 0245 } 0246 //Nothing in the code deals well with non-existing databases. 0247 Q_ASSERT(false); 0248 return false; 0249 } 0250 0251 0252 /* 0253 * Dynamic creation of databases. 0254 * If all databases were defined via the database layout we wouldn't ever end up in here. 0255 * However, we rely on this codepath for indexes, synchronization databases and in race-conditions 0256 * where the database is not yet fully created when the client initializes it for reading. 0257 * 0258 * There are a few things to consider: 0259 * * dbi's (DataBase Identifier) should be opened once (ideally), and then be persisted in the environment. 0260 * * To open a dbi we need a transaction and must commit the transaction. From then on any open transaction will have access to the dbi. 0261 * * Already running transactions will not have access to the dbi. 0262 * * There *must* only ever be one active transaction opening dbi's (using mdb_dbi_open), and that transaction *must* 0263 * commit or abort before any other transaction opens a dbi. 0264 * 0265 * We solve this the following way: 0266 * * For read-only transactions we abort the transaction, open the dbi and persist it in the environment, and reopen the transaction (so the dbi is available). This may result in the db content changing unexpectedly and referenced memory becoming unavailable, but isn't a problem as long as we don't rely on memory remaining valid for the duration of the transaction (which is anyways not given since any operation would invalidate the memory region).. 0267 * * For write transactions we open the dbi for future use, and then open it as well in the current transaction. 0268 * * Write transactions that open the named database multiple times will call this codepath multiple times, 0269 * this is ok though because the same dbi will be returned by mdb_dbi_open (We could also start to do a lookup in 0270 * Transaction::Private::createdDbs first). 0271 */ 0272 SinkTrace() << "Creating database dynamically: " << dbiName << readOnly; 0273 //Only one transaction may ever create dbis at a time. 0274 while (!sCreateDbiLock.tryLock(10)) { 0275 //Allow another thread that has already acquired sCreateDbiLock to continue below. 0276 //Otherwise we risk a dead-lock if another thread already acquired sCreateDbiLock, but then lost the sDbisLock while upgrading it to a 0277 //write lock below 0278 dbiLocker.unlock(); 0279 dbiLocker.relock(); 0280 } 0281 //Double checked locking 0282 if (sDbis.contains(dbiName)) { 0283 dbi = sDbis.value(dbiName); 0284 //sDbis can potentially contain a dbi that is not valid for this transaction, if this transaction was created before the dbi was created. 0285 sCreateDbiLock.unlock(); 0286 if (dbiValidForTransaction(dbi, transaction)) { 0287 return true; 0288 } else { 0289 SinkTrace() << "Found dbi that is not available for the current transaction."; 0290 if (readOnly) { 0291 //Recovery for read-only transactions. Abort and renew. 0292 mdb_txn_reset(transaction); 0293 mdb_txn_renew(transaction); 0294 Q_ASSERT(dbiValidForTransaction(dbi, transaction)); 0295 return true; 0296 } 0297 //There is no recover path for non-read-only transactions. 0298 Q_ASSERT(false); 0299 return false; 0300 } 0301 } 0302 0303 //Ensure nobody reads sDbis either 0304 dbiLocker.unlock(); 0305 //We risk loosing the lock in here. That's why we tryLock above in the while loop 0306 QWriteLocker dbiWriteLocker(&sDbisLock); 0307 0308 //Create a transaction to open the dbi 0309 MDB_txn *dbiTransaction; 0310 if (readOnly) { 0311 MDB_env *env = mdb_txn_env(transaction); 0312 Q_ASSERT(env); 0313 mdb_txn_reset(transaction); 0314 if (const int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &dbiTransaction)) { 0315 SinkError() << "Failed to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; 0316 sCreateDbiLock.unlock(); 0317 return false; 0318 } 0319 } else { 0320 dbiTransaction = transaction; 0321 } 0322 if (createDbi(dbiTransaction, db, readOnly, flags, dbi)) { 0323 if (readOnly) { 0324 mdb_txn_commit(dbiTransaction); 0325 Q_ASSERT(!sDbis.contains(dbiName)); 0326 sDbis.insert(dbiName, dbi); 0327 //We reopen the read-only transaction so the dbi becomes available in it. 0328 mdb_txn_renew(transaction); 0329 } else { 0330 createdNewDbi = true; 0331 createdNewDbiName = dbiName; 0332 } 0333 //Ensure the dbi is valid for the parent transaction 0334 Q_ASSERT(dbiValidForTransaction(dbi, transaction)); 0335 } else { 0336 if (readOnly) { 0337 mdb_txn_abort(dbiTransaction); 0338 mdb_txn_renew(transaction); 0339 } else { 0340 SinkWarning() << "Failed to create the dbi: " << dbiName; 0341 } 0342 dbi = 0; 0343 transaction = 0; 0344 sCreateDbiLock.unlock(); 0345 return false; 0346 } 0347 sCreateDbiLock.unlock(); 0348 return true; 0349 } 0350 }; 0351 0352 DataStore::NamedDatabase::NamedDatabase() : d(nullptr) 0353 { 0354 } 0355 0356 DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) 0357 { 0358 } 0359 0360 DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) 0361 { 0362 *this = std::move(other); 0363 } 0364 0365 DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) 0366 { 0367 if (&other != this) { 0368 delete d; 0369 d = other.d; 0370 other.d = nullptr; 0371 } 0372 return *this; 0373 } 0374 0375 DataStore::NamedDatabase::~NamedDatabase() 0376 { 0377 delete d; 0378 } 0379 0380 bool DataStore::NamedDatabase::write(const size_t key, const QByteArray &value, 0381 const std::function<void(const DataStore::Error &error)> &errorHandler) 0382 { 0383 return write(sizeTToByteArray(key), value, errorHandler); 0384 } 0385 0386 bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const DataStore::Error &error)> &errorHandler) 0387 { 0388 if (!d || !d->transaction) { 0389 Error error("", ErrorCodes::GenericError, "Not open"); 0390 if (d) { 0391 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0392 } 0393 return false; 0394 } 0395 const void *keyPtr = sKey.data(); 0396 const size_t keySize = sKey.size(); 0397 const void *valuePtr = sValue.data(); 0398 const size_t valueSize = sValue.size(); 0399 0400 if (!keyPtr || keySize == 0) { 0401 Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key."); 0402 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0403 return false; 0404 } 0405 0406 int rc; 0407 MDB_val key, data; 0408 key.mv_size = keySize; 0409 key.mv_data = const_cast<void *>(keyPtr); 0410 data.mv_size = valueSize; 0411 data.mv_data = const_cast<void *>(valuePtr); 0412 rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); 0413 0414 if (rc) { 0415 Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue); 0416 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0417 } 0418 0419 return !rc; 0420 } 0421 0422 void DataStore::NamedDatabase::remove( 0423 const size_t key, const std::function<void(const DataStore::Error &error)> &errorHandler) 0424 { 0425 return remove(sizeTToByteArray(key), errorHandler); 0426 } 0427 0428 void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function<void(const DataStore::Error &error)> &errorHandler) 0429 { 0430 remove(k, QByteArray(), errorHandler); 0431 } 0432 0433 void DataStore::NamedDatabase::remove(const size_t key, const QByteArray &value, 0434 const std::function<void(const DataStore::Error &error)> &errorHandler) 0435 { 0436 return remove(sizeTToByteArray(key), value, errorHandler); 0437 } 0438 0439 void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler) 0440 { 0441 if (!d || !d->transaction) { 0442 if (d) { 0443 Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open"); 0444 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0445 } 0446 return; 0447 } 0448 0449 int rc; 0450 MDB_val key; 0451 key.mv_size = k.size(); 0452 key.mv_data = const_cast<void *>(static_cast<const void *>(k.data())); 0453 if (value.isEmpty()) { 0454 rc = mdb_del(d->transaction, d->dbi, &key, 0); 0455 } else { 0456 MDB_val data; 0457 data.mv_size = value.size(); 0458 data.mv_data = const_cast<void *>(static_cast<const void *>(value.data())); 0459 rc = mdb_del(d->transaction, d->dbi, &key, &data); 0460 } 0461 0462 if (rc) { 0463 auto errorCode = ErrorCodes::GenericError; 0464 if (rc == MDB_NOTFOUND) { 0465 errorCode = ErrorCodes::NotFound; 0466 } 0467 Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); 0468 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0469 } 0470 } 0471 0472 int DataStore::NamedDatabase::scan(const size_t key, 0473 const std::function<bool(size_t key, const QByteArray &value)> &resultHandler, 0474 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0475 { 0476 return scan(sizeTToByteArray(key), 0477 [&resultHandler](const QByteArray &key, const QByteArray &value) { 0478 return resultHandler(byteArrayToSizeT(key), value); 0479 }, 0480 errorHandler, /* findSubstringKeys = */ false); 0481 } 0482 0483 int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, 0484 const std::function<void(const DataStore::Error &error)> &errorHandler, bool findSubstringKeys) const 0485 { 0486 if (!d || !d->transaction) { 0487 // Not an error. We rely on this to read nothing from non-existing databases. 0488 return 0; 0489 } 0490 0491 int rc; 0492 MDB_val key; 0493 MDB_val data; 0494 MDB_cursor *cursor; 0495 0496 key.mv_data = (void *)k.constData(); 0497 key.mv_size = k.size(); 0498 0499 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 0500 if (rc) { 0501 //Invalid arguments can mean that the transaction doesn't contain the db dbi 0502 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k); 0503 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0504 return 0; 0505 } 0506 0507 int numberOfRetrievedValues = 0; 0508 0509 const bool allowDuplicates = d->flags & AllowDuplicates; 0510 const bool emptyKey = k.isEmpty(); 0511 0512 if (emptyKey || allowDuplicates || findSubstringKeys) { 0513 const MDB_cursor_op op = [&] { 0514 if (emptyKey) { 0515 return MDB_FIRST; 0516 } 0517 if (findSubstringKeys) { 0518 return MDB_SET_RANGE; 0519 } 0520 return MDB_SET; 0521 }(); 0522 const MDB_cursor_op nextOp = (allowDuplicates && !findSubstringKeys && !emptyKey) ? MDB_NEXT_DUP : MDB_NEXT; 0523 0524 if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { 0525 const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); 0526 // The first lookup will find a key that is equal or greather than our key 0527 if (current.startsWith(k)) { 0528 numberOfRetrievedValues++; 0529 if (resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { 0530 if (findSubstringKeys) { 0531 // Reset the key to what we search for 0532 key.mv_data = (void *)k.constData(); 0533 key.mv_size = k.size(); 0534 } 0535 while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { 0536 const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); 0537 // Every consequitive lookup simply iterates through the list 0538 if (current.startsWith(k)) { 0539 numberOfRetrievedValues++; 0540 if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { 0541 break; 0542 } 0543 } 0544 } 0545 } 0546 } 0547 } 0548 0549 // We never find the last value 0550 if (rc == MDB_NOTFOUND) { 0551 rc = 0; 0552 } 0553 } else { 0554 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { 0555 numberOfRetrievedValues++; 0556 resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); 0557 } 0558 } 0559 0560 mdb_cursor_close(cursor); 0561 0562 if (rc) { 0563 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); 0564 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0565 } 0566 0567 return numberOfRetrievedValues; 0568 } 0569 0570 0571 void DataStore::NamedDatabase::findLatest(size_t key, 0572 const std::function<void(size_t key, const QByteArray &value)> &resultHandler, 0573 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0574 { 0575 return findLatest(sizeTToByteArray(key), 0576 [&resultHandler](const QByteArray &key, const QByteArray &value) { 0577 resultHandler(byteArrayToSizeT(value), value); 0578 }, 0579 errorHandler); 0580 } 0581 0582 void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 0583 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0584 { 0585 if (!d || !d->transaction) { 0586 // Not an error. We rely on this to read nothing from non-existing databases. 0587 return; 0588 } 0589 if (k.isEmpty()) { 0590 Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); 0591 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0592 return; 0593 } 0594 0595 int rc; 0596 MDB_val key; 0597 MDB_val data; 0598 MDB_cursor *cursor; 0599 0600 key.mv_data = (void *)k.constData(); 0601 key.mv_size = k.size(); 0602 0603 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 0604 if (rc) { 0605 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); 0606 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0607 return; 0608 } 0609 0610 bool foundValue = false; 0611 MDB_cursor_op op = MDB_SET_RANGE; 0612 if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { 0613 // The first lookup will find a key that is equal or greather than our key 0614 if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { 0615 //Read next value until we no longer match 0616 while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { 0617 MDB_cursor_op nextOp = MDB_NEXT; 0618 rc = mdb_cursor_get(cursor, &key, &data, nextOp); 0619 if (rc) { 0620 break; 0621 } 0622 } 0623 //Now read the previous value, and that's the latest one 0624 MDB_cursor_op prefOp = MDB_PREV; 0625 // We read past the end above, just take the last value 0626 if (rc == MDB_NOTFOUND) { 0627 prefOp = MDB_LAST; 0628 } 0629 rc = mdb_cursor_get(cursor, &key, &data, prefOp); 0630 if (!rc) { 0631 foundValue = true; 0632 resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); 0633 } 0634 } 0635 } 0636 0637 // We never find the last value 0638 if (rc == MDB_NOTFOUND) { 0639 rc = 0; 0640 } 0641 0642 mdb_cursor_close(cursor); 0643 0644 if (rc) { 0645 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); 0646 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0647 } else if (!foundValue) { 0648 Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); 0649 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0650 } 0651 0652 return; 0653 } 0654 0655 void DataStore::NamedDatabase::findLast(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 0656 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0657 { 0658 if (!d || !d->transaction) { 0659 // Not an error. We rely on this to read nothing from non-existing databases. 0660 return; 0661 } 0662 if (k.isEmpty()) { 0663 Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); 0664 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0665 return; 0666 } 0667 0668 int rc; 0669 MDB_val key; 0670 MDB_val data; 0671 MDB_cursor *cursor; 0672 0673 key.mv_data = (void *)k.constData(); 0674 key.mv_size = k.size(); 0675 0676 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 0677 if (rc) { 0678 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); 0679 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0680 return; 0681 } 0682 0683 bool foundValue = false; 0684 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { 0685 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_LAST_DUP)) == 0) { 0686 foundValue = true; 0687 resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); 0688 } 0689 } 0690 0691 mdb_cursor_close(cursor); 0692 0693 if (rc) { 0694 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); 0695 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0696 } else if (!foundValue) { 0697 Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); 0698 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0699 } 0700 0701 return; 0702 } 0703 0704 int DataStore::NamedDatabase::findAllInRange(const size_t lowerBound, const size_t upperBound, 0705 const std::function<void(size_t key, const QByteArray &value)> &resultHandler, 0706 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0707 { 0708 return findAllInRange(sizeTToByteArray(lowerBound), sizeTToByteArray(upperBound), 0709 [&resultHandler](const QByteArray &key, const QByteArray &value) { 0710 resultHandler(byteArrayToSizeT(value), value); 0711 }, 0712 errorHandler); 0713 } 0714 0715 int DataStore::NamedDatabase::findAllInRange(const QByteArray &lowerBound, const QByteArray &upperBound, 0716 const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 0717 const std::function<void(const DataStore::Error &error)> &errorHandler) const 0718 { 0719 if (!d || !d->transaction) { 0720 // Not an error. We rely on this to read nothing from non-existing databases. 0721 return 0; 0722 } 0723 0724 MDB_cursor *cursor; 0725 if (int rc = mdb_cursor_open(d->transaction, d->dbi, &cursor)) { 0726 // Invalid arguments can mean that the transaction doesn't contain the db dbi 0727 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), 0728 QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + 0729 ". Lower bound: " + lowerBound + " Upper bound: " + upperBound); 0730 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0731 return 0; 0732 } 0733 0734 MDB_val firstKey = {(size_t)lowerBound.size(), (void *)lowerBound.constData()}; 0735 MDB_val idealLastKey = {(size_t)upperBound.size(), (void *)upperBound.constData()}; 0736 MDB_val currentKey; 0737 MDB_val data; 0738 0739 // Find the first key in the range 0740 int rc = mdb_cursor_get(cursor, &firstKey, &data, MDB_SET_RANGE); 0741 0742 if (rc != MDB_SUCCESS) { 0743 // Nothing is greater or equal than the lower bound, meaning no result 0744 mdb_cursor_close(cursor); 0745 return 0; 0746 } 0747 0748 currentKey = firstKey; 0749 0750 // If already bigger than the upper bound 0751 if (mdb_cmp(d->transaction, d->dbi, ¤tKey, &idealLastKey) > 0) { 0752 mdb_cursor_close(cursor); 0753 return 0; 0754 } 0755 0756 int count = 0; 0757 do { 0758 const auto currentBAKey = QByteArray::fromRawData((char *)currentKey.mv_data, currentKey.mv_size); 0759 const auto currentBAValue = QByteArray::fromRawData((char *)data.mv_data, data.mv_size); 0760 resultHandler(currentBAKey, currentBAValue); 0761 count++; 0762 } while (mdb_cursor_get(cursor, ¤tKey, &data, MDB_NEXT) == MDB_SUCCESS && 0763 mdb_cmp(d->transaction, d->dbi, ¤tKey, &idealLastKey) <= 0); 0764 0765 mdb_cursor_close(cursor); 0766 return count; 0767 } 0768 0769 qint64 DataStore::NamedDatabase::getSize() 0770 { 0771 if (!d || !d->transaction) { 0772 return -1; 0773 } 0774 0775 int rc; 0776 MDB_stat stat; 0777 rc = mdb_stat(d->transaction, d->dbi, &stat); 0778 if (rc) { 0779 SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); 0780 } 0781 return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages); 0782 } 0783 0784 DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat() 0785 { 0786 if (!d || !d->transaction) { 0787 return {}; 0788 } 0789 0790 int rc; 0791 MDB_stat stat; 0792 rc = mdb_stat(d->transaction, d->dbi, &stat); 0793 if (rc) { 0794 SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); 0795 return {}; 0796 } 0797 return {stat.ms_branch_pages, 0798 stat.ms_leaf_pages, 0799 stat.ms_overflow_pages, 0800 stat.ms_entries}; 0801 // std::cout << "page size: " << stat.ms_psize << std::endl; 0802 // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl; 0803 // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl; 0804 // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; 0805 // std::cout << "depth: " << stat.ms_depth << std::endl; 0806 // std::cout << "entries: " << stat.ms_entries << std::endl; 0807 } 0808 0809 bool DataStore::NamedDatabase::allowsDuplicates() const 0810 { 0811 unsigned int flags; 0812 mdb_dbi_flags(d->transaction, d->dbi, &flags); 0813 return flags & MDB_DUPSORT; 0814 } 0815 0816 0817 class DataStore::Transaction::Private 0818 { 0819 public: 0820 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) 0821 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false) 0822 { 0823 } 0824 ~Private() 0825 { 0826 } 0827 0828 MDB_env *env; 0829 MDB_txn *transaction; 0830 bool requestedRead; 0831 std::function<void(const DataStore::Error &error)> defaultErrorHandler; 0832 QString name; 0833 bool implicitCommit; 0834 bool error; 0835 QMap<QString, MDB_dbi> createdDbs; 0836 0837 bool startTransaction() 0838 { 0839 Q_ASSERT(!transaction); 0840 Q_ASSERT(sEnvironments.values().contains(env)); 0841 Q_ASSERT(env); 0842 // auto f = [](const char *msg, void *ctx) -> int { 0843 // qDebug() << msg; 0844 // return 0; 0845 // }; 0846 // mdb_reader_list(env, f, nullptr); 0847 // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; 0848 const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); 0849 // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; 0850 if (rc) { 0851 unsigned int flags; 0852 mdb_env_get_flags(env, &flags); 0853 if (flags & MDB_RDONLY && !requestedRead) { 0854 SinkError() << "Tried to open a write transation in a read-only enironment"; 0855 } 0856 defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); 0857 return false; 0858 } 0859 return true; 0860 } 0861 }; 0862 0863 DataStore::Transaction::Transaction() : d(nullptr) 0864 { 0865 } 0866 0867 DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) 0868 { 0869 if (!d->startTransaction()) { 0870 delete d; 0871 d = nullptr; 0872 } 0873 } 0874 0875 DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) 0876 { 0877 *this = std::move(other); 0878 } 0879 0880 DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) 0881 { 0882 if (&other != this) { 0883 abort(); 0884 delete d; 0885 d = other.d; 0886 other.d = nullptr; 0887 } 0888 return *this; 0889 } 0890 0891 DataStore::Transaction::~Transaction() 0892 { 0893 if (d && d->transaction) { 0894 if (d->implicitCommit && !d->error) { 0895 commit(); 0896 } else { 0897 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; 0898 abort(); 0899 } 0900 } 0901 delete d; 0902 } 0903 0904 DataStore::Transaction::operator bool() const 0905 { 0906 return (d && d->transaction); 0907 } 0908 0909 bool DataStore::Transaction::commit(const std::function<void(const DataStore::Error &error)> &errorHandler) 0910 { 0911 if (!d || !d->transaction) { 0912 return false; 0913 } 0914 0915 // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; 0916 Q_ASSERT(sEnvironments.values().contains(d->env)); 0917 const int rc = mdb_txn_commit(d->transaction); 0918 if (rc) { 0919 abort(); 0920 Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); 0921 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 0922 //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application. 0923 throw std::runtime_error("Fatal error while committing transaction."); 0924 } 0925 0926 //Add the created dbis to the shared environment 0927 if (!d->createdDbs.isEmpty()) { 0928 sDbisLock.lockForWrite(); 0929 for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) { 0930 //This means we opened the dbi again in a read-only transaction while the write transaction was ongoing. 0931 Q_ASSERT(!sDbis.contains(it.key())); 0932 if (!sDbis.contains(it.key())) { 0933 sDbis.insert(it.key(), it.value()); 0934 } 0935 } 0936 d->createdDbs.clear(); 0937 sDbisLock.unlock(); 0938 } 0939 0940 d->transaction = nullptr; 0941 return !rc; 0942 } 0943 0944 void DataStore::Transaction::abort() 0945 { 0946 if (!d || !d->transaction) { 0947 return; 0948 } 0949 0950 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; 0951 Q_ASSERT(sEnvironments.values().contains(d->env)); 0952 mdb_txn_abort(d->transaction); 0953 d->createdDbs.clear(); 0954 d->transaction = nullptr; 0955 } 0956 0957 DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, 0958 const std::function<void(const DataStore::Error &error)> &errorHandler, int flags) const 0959 { 0960 if (!d) { 0961 SinkError() << "Tried to open database on invalid transaction: " << db; 0962 return DataStore::NamedDatabase(); 0963 } 0964 Q_ASSERT(d->transaction); 0965 // We don't now if anything changed 0966 d->implicitCommit = true; 0967 auto p = new DataStore::NamedDatabase::Private( 0968 db, flags, d->defaultErrorHandler, d->name, d->transaction); 0969 auto ret = p->openDatabase(d->requestedRead, errorHandler); 0970 if (!ret) { 0971 delete p; 0972 return DataStore::NamedDatabase(); 0973 } 0974 0975 if (p->createdNewDbi) { 0976 d->createdDbs.insert(p->createdNewDbiName, p->dbi); 0977 } 0978 0979 auto database = DataStore::NamedDatabase(p); 0980 return database; 0981 } 0982 0983 QList<QByteArray> DataStore::Transaction::getDatabaseNames() const 0984 { 0985 if (!d) { 0986 SinkWarning() << "Invalid transaction"; 0987 return QList<QByteArray>(); 0988 } 0989 return Sink::Storage::getDatabaseNames(d->transaction); 0990 0991 } 0992 0993 0994 DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails) 0995 { 0996 const int freeDbi = 0; 0997 const int mainDbi = 1; 0998 0999 MDB_envinfo mei; 1000 mdb_env_info(d->env, &mei); 1001 1002 MDB_stat mst; 1003 mdb_stat(d->transaction, freeDbi, &mst); 1004 auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages, 1005 mst.ms_leaf_pages, 1006 mst.ms_overflow_pages, 1007 mst.ms_entries}; 1008 1009 mdb_stat(d->transaction, mainDbi, &mst); 1010 auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages, 1011 mst.ms_leaf_pages, 1012 mst.ms_overflow_pages, 1013 mst.ms_entries}; 1014 1015 MDB_cursor *cursor; 1016 MDB_val key, data; 1017 size_t freePages = 0, *iptr; 1018 1019 int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor); 1020 if (rc) { 1021 fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); 1022 return {}; 1023 } 1024 1025 while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { 1026 iptr = static_cast<size_t*>(data.mv_data); 1027 freePages += *iptr; 1028 bool bad = false; 1029 size_t pg, prev; 1030 ssize_t i, j, span = 0; 1031 j = *iptr++; 1032 for (i = j, prev = 1; --i >= 0; ) { 1033 pg = iptr[i]; 1034 if (pg <= prev) { 1035 bad = true; 1036 } 1037 prev = pg; 1038 pg += span; 1039 for (; i >= span && iptr[i-span] == pg; span++, pg++) ; 1040 } 1041 if (printDetails) { 1042 std::cout << " Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl; 1043 for (--j; j >= 0; ) { 1044 pg = iptr[j]; 1045 for (span=1; --j >= 0 && iptr[j] == pg+span; span++); 1046 if (span > 1) { 1047 std::cout << " " << pg << "[" << span << "]\n"; 1048 } else { 1049 std::cout << " " << pg << std::endl; 1050 } 1051 } 1052 } 1053 } 1054 mdb_cursor_close(cursor); 1055 return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat}; 1056 } 1057 1058 static size_t mapsize() 1059 { 1060 if (RUNNING_ON_VALGRIND) { 1061 // In order to run valgrind this size must be smaller than half your available RAM 1062 // https://github.com/BVLC/caffe/issues/2404 1063 return (size_t)1048576 * (size_t)1000; // 1MB * 1000 1064 } 1065 #ifdef Q_OS_WIN 1066 //Windows home 10 has a virtual address space limit of 128GB(https://msdn.microsoft.com/en-us/library/windows/desktop/aa366778(v=vs.85).aspx#physical_memory_limits_windows_10). I seems like the 128GB need to accomodate all databases we open in the process. 1067 return (size_t)1048576 * (size_t)200; // 1MB * 200 1068 #else 1069 //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit. 1070 return (size_t)1048576 * (size_t)100000; // 1MB * 100'000 1071 #endif 1072 } 1073 1074 class DataStore::Private 1075 { 1076 public: 1077 Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {}); 1078 ~Private(); 1079 1080 QString storageRoot; 1081 QString name; 1082 1083 MDB_env *env = nullptr; 1084 AccessMode mode; 1085 Sink::Log::Context logCtx; 1086 1087 void initEnvironment(const QString &fullPath, const DbLayout &layout) 1088 { 1089 // Ensure the environment is only created once, and that we only have one environment per process 1090 QReadLocker locker(&sEnvironmentsLock); 1091 if (!(env = sEnvironments.value(fullPath))) { 1092 locker.unlock(); 1093 QWriteLocker envLocker(&sEnvironmentsLock); 1094 QWriteLocker dbiLocker(&sDbisLock); 1095 if (!(env = sEnvironments.value(fullPath))) { 1096 int rc = 0; 1097 if ((rc = mdb_env_create(&env))) { 1098 SinkErrorCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc); 1099 env = nullptr; 1100 throw std::runtime_error("Fatal error while creating db."); 1101 } else { 1102 //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table. 1103 mdb_env_set_maxdbs(env, 50); 1104 if (const int rc = mdb_env_set_mapsize(env, mapsize())) { 1105 SinkErrorCtx(logCtx) << "mdb_env_set_mapsize: " << rc << ":" << mdb_strerror(rc); 1106 Q_ASSERT(false); 1107 throw std::runtime_error("Fatal error while creating db."); 1108 } 1109 const bool readOnly = (mode == ReadOnly); 1110 unsigned int flags = MDB_NOTLS; 1111 if (readOnly) { 1112 flags |= MDB_RDONLY; 1113 } 1114 if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { 1115 if (readOnly) { 1116 SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath; 1117 } else { 1118 SinkErrorCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc); 1119 Q_ASSERT(false); 1120 throw std::runtime_error("Fatal error while creating db."); 1121 } 1122 mdb_env_close(env); 1123 env = 0; 1124 } else { 1125 Q_ASSERT(env); 1126 sEnvironments.insert(fullPath, env); 1127 //Open all available dbi's 1128 MDB_txn *transaction; 1129 if (const int rc = mdb_txn_begin(env, nullptr, readOnly ? MDB_RDONLY : 0, &transaction)) { 1130 SinkWarning() << "Failed to to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; 1131 return; 1132 } 1133 if (!layout.tables.isEmpty()) { 1134 1135 //TODO upgrade db if the layout has changed: 1136 //* read existing layout 1137 //* if layout is not the same create new layout 1138 1139 //Create dbis from the given layout. 1140 for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { 1141 const int flags = it.value(); 1142 MDB_dbi dbi = 0; 1143 const auto &db = it.key(); 1144 const auto dbiName = name + db; 1145 if (createDbi(transaction, db, readOnly, flags, dbi)) { 1146 sDbis.insert(dbiName, dbi); 1147 } 1148 } 1149 } else { 1150 //Open all available databases 1151 for (const auto &db : getDatabaseNames(transaction)) { 1152 MDB_dbi dbi = 0; 1153 const auto dbiName = name + db; 1154 //We're going to load the flags anyways. 1155 const int flags = 0; 1156 if (createDbi(transaction, db, readOnly, flags, dbi)) { 1157 sDbis.insert(dbiName, dbi); 1158 } 1159 } 1160 } 1161 //To persist the dbis (this is also necessary for read-only transactions) 1162 mdb_txn_commit(transaction); 1163 } 1164 } 1165 } 1166 } 1167 } 1168 1169 }; 1170 1171 DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1()) 1172 { 1173 1174 const QString fullPath(storageRoot + '/' + name); 1175 QFileInfo dirInfo(fullPath); 1176 if (!dirInfo.exists() && mode == ReadWrite) { 1177 QDir().mkpath(fullPath); 1178 dirInfo.refresh(); 1179 } 1180 if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { 1181 qCritical() << fullPath << "does not have write permissions. Aborting"; 1182 } else if (dirInfo.exists()) { 1183 initEnvironment(fullPath, layout); 1184 } 1185 } 1186 1187 DataStore::Private::~Private() 1188 { 1189 //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) 1190 //and create storage instance from all over the place. Thus, we're not closing it here on purpose. 1191 } 1192 1193 DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) 1194 { 1195 } 1196 1197 DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout)) 1198 { 1199 } 1200 1201 DataStore::~DataStore() 1202 { 1203 delete d; 1204 } 1205 1206 bool DataStore::exists(const QString &storageRoot, const QString &name) 1207 { 1208 return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists(); 1209 } 1210 1211 bool DataStore::exists() const 1212 { 1213 return (d->env != 0) && DataStore::exists(d->storageRoot, d->name); 1214 } 1215 1216 DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function<void(const DataStore::Error &error)> &errorHandlerArg) 1217 { 1218 auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); 1219 if (!d->env) { 1220 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment")); 1221 return Transaction(); 1222 } 1223 1224 bool requestedRead = type == ReadOnly; 1225 1226 if (d->mode == ReadOnly && !requestedRead) { 1227 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode.")); 1228 return Transaction(); 1229 } 1230 QReadLocker locker(&sEnvironmentsLock); 1231 if (!sEnvironments.values().contains(d->env)) { 1232 return {}; 1233 } 1234 return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); 1235 } 1236 1237 qint64 DataStore::diskUsage() const 1238 { 1239 QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); 1240 if (!info.exists()) { 1241 SinkWarning() << "Tried to get filesize for non-existant file: " << info.path(); 1242 } 1243 return info.size(); 1244 } 1245 1246 void DataStore::removeFromDisk() const 1247 { 1248 const QString fullPath(d->storageRoot + '/' + d->name); 1249 QWriteLocker dbiLocker(&sDbisLock); 1250 QWriteLocker envLocker(&sEnvironmentsLock); 1251 SinkTrace() << "Removing database from disk: " << fullPath; 1252 auto env = sEnvironments.take(fullPath); 1253 for (const auto &key : sDbis.keys()) { 1254 if (key.startsWith(d->name)) { 1255 sDbis.remove(key); 1256 } 1257 } 1258 mdb_env_close(env); 1259 QDir dir(fullPath); 1260 if (!dir.removeRecursively()) { 1261 Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); 1262 defaultErrorHandler()(error); 1263 } 1264 } 1265 1266 void DataStore::clearEnv() 1267 { 1268 SinkTrace() << "Clearing environment"; 1269 QWriteLocker locker(&sEnvironmentsLock); 1270 QWriteLocker dbiLocker(&sDbisLock); 1271 for (const auto &envName : sEnvironments.keys()) { 1272 auto env = sEnvironments.value(envName); 1273 mdb_env_sync(env, true); 1274 for (const auto &k : sDbis.keys()) { 1275 if (k.startsWith(envName)) { 1276 auto dbi = sDbis.value(k); 1277 mdb_dbi_close(env, dbi); 1278 } 1279 } 1280 mdb_env_close(env); 1281 } 1282 sDbis.clear(); 1283 sEnvironments.clear(); 1284 } 1285 1286 } 1287 } // namespace Sink