File indexing completed on 2024-05-12 05:26:06
0001 /* 0002 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> 0003 * 0004 * This library is free software; you can redistribute it and/or 0005 * modify it under the terms of the GNU Lesser General Public 0006 * License as published by the Free Software Foundation; either 0007 * version 2.1 of the License, or (at your option) version 3, or any 0008 * later version accepted by the membership of KDE e.V. (or its 0009 * successor approved by the membership of KDE e.V.), which shall 0010 * act as a proxy defined in Section 6 of version 3 of the license. 0011 * 0012 * This library is distributed in the hope that it will be useful, 0013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0015 * Lesser General Public License for more details. 0016 * 0017 * You should have received a copy of the GNU Lesser General Public 0018 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 0019 */ 0020 0021 #include "store.h" 0022 0023 #include <QTime> 0024 #include <QAbstractItemModel> 0025 #include <functional> 0026 #include <memory> 0027 0028 #include "resourceaccess.h" 0029 #include "commands.h" 0030 #include "resourcefacade.h" 0031 #include "definitions.h" 0032 #include "resourceconfig.h" 0033 #include "resourcecontrol.h" 0034 #include "facadefactory.h" 0035 #include "modelresult.h" 0036 #include "storage.h" 0037 #include "log.h" 0038 #include "utils.h" 0039 0040 #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast<int>(A) == static_cast<int>(B), "The enum values must match"); 0041 0042 //Ensure the copied enum matches 0043 typedef ModelResult<Sink::ApplicationDomain::Mail, Sink::ApplicationDomain::Mail::Ptr> MailModelResult; 0044 ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole) 0045 ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole) 0046 ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole) 0047 ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole) 0048 ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole) 0049 ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole) 0050 0051 Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) 0052 Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>); 0053 Q_DECLARE_METATYPE(std::shared_ptr<void>); 0054 0055 0056 static bool sanityCheckQuery(const Sink::Query &query) 0057 { 0058 for (const auto &id : query.ids()) { 0059 if (id.isEmpty()) { 0060 SinkError() << "Empty id in query."; 0061 return false; 0062 } 0063 } 0064 return true; 0065 } 0066 0067 static KAsync::Job<void> forEachResource(const Sink::SyncScope &scope, std::function<KAsync::Job<void>(const Sink::ApplicationDomain::SinkResource::Ptr &resource)> callback) 0068 { 0069 using namespace Sink; 0070 auto resourceFilter = scope.getResourceFilter(); 0071 //Filter resources by type by default 0072 if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { 0073 resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); 0074 } 0075 Sink::Query query; 0076 query.setFilter(resourceFilter); 0077 return Store::fetchAll<ApplicationDomain::SinkResource>(query) 0078 .template each(callback); 0079 } 0080 0081 namespace Sink { 0082 0083 QString Store::storageLocation() 0084 { 0085 return Sink::storageLocation(); 0086 } 0087 0088 0089 template <class DomainType> 0090 KAsync::Job<void> queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) 0091 { 0092 auto ctx = ctx_.subContext(resourceInstanceIdentifier); 0093 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier); 0094 if (facade) { 0095 SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; 0096 auto result = facade->load(query, ctx); 0097 if (result.second) { 0098 aggregatingEmitter->addEmitter(result.second); 0099 } else { 0100 SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; 0101 } 0102 return result.first; 0103 } else { 0104 SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; 0105 // Ignore the error and carry on 0106 return KAsync::null<void>(); 0107 } 0108 } 0109 0110 template <class DomainType> 0111 QPair<typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr, typename ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr> getEmitter(Query query, const Log::Context &ctx) 0112 { 0113 query.setType(ApplicationDomain::getTypeName<DomainType>()); 0114 SinkTraceCtx(ctx) << "Query: " << query; 0115 0116 // Query all resources and aggregate results 0117 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); 0118 if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) { 0119 //For global types we don't need to query for the resources first. 0120 queryResource<DomainType>("", "", query, aggregatingEmitter, ctx).exec(); 0121 } else { 0122 auto resourceCtx = ctx.subContext("resourceQuery"); 0123 auto facade = FacadeFactory::instance().getFacade<ApplicationDomain::SinkResource>(); 0124 Q_ASSERT(facade); 0125 Sink::Query resourceQuery; 0126 resourceQuery.request<ApplicationDomain::SinkResource::Capabilities>(); 0127 if (query.liveQuery()) { 0128 SinkTraceCtx(ctx) << "Listening for new resources."; 0129 resourceQuery.setFlags(Query::LiveQuery); 0130 } 0131 0132 //Filter resources by available content types (unless the query already specifies a capability filter) 0133 auto resourceFilter = query.getResourceFilter(); 0134 if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) { 0135 resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName<DomainType>(), Query::Comparator::Contains}); 0136 } 0137 resourceQuery.setFilter(resourceFilter); 0138 for (auto const &properties : resourceFilter.propertyFilter.keys()) { 0139 resourceQuery.requestedProperties << properties; 0140 } 0141 0142 auto result = facade->load(resourceQuery, resourceCtx); 0143 auto emitter = result.second; 0144 emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { 0145 SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); 0146 const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); 0147 Q_ASSERT(!resourceType.isEmpty()); 0148 queryResource<DomainType>(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec(); 0149 }); 0150 emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { 0151 SinkTraceCtx(resourceCtx) << "Resource query complete"; 0152 }); 0153 0154 return qMakePair(aggregatingEmitter, emitter); 0155 } 0156 return qMakePair(aggregatingEmitter, ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr{}); 0157 } 0158 0159 static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type) 0160 { 0161 if (!query.id().isEmpty()) { 0162 return Log::Context{"query." + type + "." + query.id()}; 0163 } 0164 return Log::Context{"query." + type}; 0165 } 0166 0167 template <class DomainType> 0168 QSharedPointer<QAbstractItemModel> Store::loadModel(const Query &query) 0169 { 0170 Q_ASSERT(sanityCheckQuery(query)); 0171 auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>()); 0172 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx); 0173 0174 //* Client defines lifetime of model 0175 //* The model lifetime defines the duration of live-queries 0176 //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks 0177 //* The emitter needs to live or the duration of query (respectively, the model) 0178 //* The result provider needs to live for as long as results are provided (until the last thread exits). 0179 0180 auto result = getEmitter<DomainType>(query, ctx); 0181 model->setEmitter(result.first); 0182 0183 //Keep the emitter alive 0184 if (auto resourceEmitter = result.second) { 0185 model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries 0186 resourceEmitter->fetch(); 0187 } 0188 0189 0190 //Automatically populate the top-level 0191 model->fetchMore(QModelIndex()); 0192 0193 return std::move(model); 0194 } 0195 0196 template <class DomainType> 0197 void Store::updateModel(const Query &query, const QSharedPointer<QAbstractItemModel> &model) 0198 { 0199 Q_ASSERT(sanityCheckQuery(query)); 0200 auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>()); 0201 0202 auto result = getEmitter<DomainType>(query, ctx); 0203 0204 QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>> m = model.dynamicCast<ModelResult<DomainType, typename DomainType::Ptr>>(); 0205 Q_ASSERT(m); 0206 m->setEmitter(result.first); 0207 0208 //Keep the emitter alive 0209 if (auto resourceEmitter = result.second) { 0210 m->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries 0211 resourceEmitter->fetch(); 0212 } 0213 0214 m->updateQuery(query); 0215 } 0216 0217 template <class DomainType> 0218 static std::shared_ptr<StoreFacade<DomainType>> getFacade(const QByteArray &resourceInstanceIdentifier) 0219 { 0220 if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) { 0221 if (auto facade = FacadeFactory::instance().getFacade<DomainType>()) { 0222 return facade; 0223 } 0224 } 0225 if (auto facade = FacadeFactory::instance().getFacade<DomainType>(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { 0226 return facade; 0227 } 0228 return std::make_shared<NullFacade<DomainType>>(); 0229 } 0230 0231 template <class DomainType> 0232 KAsync::Job<void> Store::create(const DomainType &domainObject) 0233 { 0234 SinkLog() << "Create: " << domainObject; 0235 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 0236 return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; }); 0237 } 0238 0239 template <class DomainType> 0240 KAsync::Job<void> Store::modify(const DomainType &domainObject) 0241 { 0242 if (domainObject.changedProperties().isEmpty()) { 0243 SinkLog() << "Nothing to modify: " << domainObject.identifier(); 0244 return KAsync::null(); 0245 } 0246 SinkLog() << "Modify: " << domainObject; 0247 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 0248 if (domainObject.isAggregate()) { 0249 return KAsync::value(domainObject.aggregatedIds()) 0250 .addToContext(std::shared_ptr<void>(facade)) 0251 .each([=] (const QByteArray &id) { 0252 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); 0253 return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; }); 0254 }); 0255 } 0256 return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); 0257 } 0258 0259 template <class DomainType> 0260 KAsync::Job<void> Store::modify(const Query &query, const DomainType &domainObject) 0261 { 0262 if (domainObject.changedProperties().isEmpty()) { 0263 SinkLog() << "Nothing to modify: " << domainObject.identifier(); 0264 return KAsync::null(); 0265 } 0266 SinkLog() << "Modify: " << query << domainObject; 0267 return fetchAll<DomainType>(query) 0268 .each([=] (const typename DomainType::Ptr &entity) { 0269 auto copy = *entity; 0270 for (const auto &p : domainObject.changedProperties()) { 0271 copy.setProperty(p, domainObject.getProperty(p)); 0272 } 0273 return modify(copy); 0274 }); 0275 } 0276 0277 template <class DomainType> 0278 KAsync::Job<void> Store::move(const DomainType &domainObject, const QByteArray &newResource) 0279 { 0280 SinkLog() << "Move: " << domainObject << newResource; 0281 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 0282 if (domainObject.isAggregate()) { 0283 return KAsync::value(domainObject.aggregatedIds()) 0284 .addToContext(std::shared_ptr<void>(facade)) 0285 .each([=] (const QByteArray &id) { 0286 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); 0287 return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); 0288 }); 0289 } 0290 return facade->move(domainObject, newResource).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); 0291 } 0292 0293 template <class DomainType> 0294 KAsync::Job<void> Store::copy(const DomainType &domainObject, const QByteArray &newResource) 0295 { 0296 SinkLog() << "Copy: " << domainObject << newResource; 0297 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 0298 if (domainObject.isAggregate()) { 0299 return KAsync::value(domainObject.aggregatedIds()) 0300 .addToContext(std::shared_ptr<void>(facade)) 0301 .each([=] (const QByteArray &id) { 0302 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); 0303 return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); 0304 }); 0305 } 0306 return facade->copy(domainObject, newResource).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); 0307 } 0308 0309 template <class DomainType> 0310 KAsync::Job<void> Store::remove(const DomainType &domainObject) 0311 { 0312 SinkLog() << "Remove: " << domainObject; 0313 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 0314 if (domainObject.isAggregate()) { 0315 return KAsync::value(domainObject.aggregatedIds()) 0316 .addToContext(std::shared_ptr<void>(facade)) 0317 .each([=] (const QByteArray &id) { 0318 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); 0319 return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); 0320 }); 0321 } 0322 return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); 0323 } 0324 0325 template <class DomainType> 0326 KAsync::Job<void> Store::remove(const Sink::Query &query) 0327 { 0328 SinkLog() << "Remove: " << query; 0329 return fetchAll<DomainType>(query) 0330 .each([] (const typename DomainType::Ptr &entity) { 0331 return remove(*entity); 0332 }); 0333 } 0334 0335 KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) 0336 { 0337 // All databases are going to become invalid, nuke the environments 0338 // TODO: all clients should react to a notification from the resource 0339 Sink::Storage::DataStore::clearEnv(); 0340 SinkTrace() << "Remove data from disk " << identifier; 0341 auto time = QSharedPointer<QTime>::create(); 0342 time->start(); 0343 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 0344 resourceAccess->open(); 0345 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) 0346 .addToContext(resourceAccess) 0347 .then<void>([resourceAccess](KAsync::Future<void> &future) { 0348 if (resourceAccess->isReady()) { 0349 //Wait for the resource shutdown 0350 auto guard = new QObject; 0351 QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) { 0352 if (!ready) { 0353 //We don't disconnect if ResourceAccess get's recycled, so ready can fire multiple times, which can result in a crash if the future is no longer valid. 0354 delete guard; 0355 future.setFinished(); 0356 } 0357 }); 0358 } else { 0359 future.setFinished(); 0360 } 0361 }) 0362 .then([time]() { 0363 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); 0364 }); 0365 } 0366 0367 static KAsync::Job<Store::UpgradeResult> upgrade(const QByteArray &resource) 0368 { 0369 auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); 0370 if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) { 0371 return KAsync::value(Store::UpgradeResult{false}); 0372 } 0373 SinkLog() << "Upgrading " << resource; 0374 0375 //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType 0376 auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater}; 0377 //We first shutdown the resource, because the upgrade runs on start 0378 return Sink::ResourceControl::shutdown(resource) 0379 .then(resourceAccess->sendCommand(Sink::Commands::UpgradeCommand)) 0380 .addToContext(resourceAccess) 0381 .then([=](const KAsync::Error &error) { 0382 if (error) { 0383 SinkWarning() << "Error during upgrade."; 0384 return KAsync::error(error); 0385 } 0386 SinkTrace() << "Upgrade of resource " << resource << " complete."; 0387 return KAsync::null(); 0388 }) 0389 .then(KAsync::value(Store::UpgradeResult{true})); 0390 } 0391 0392 KAsync::Job<Store::UpgradeResult> Store::upgrade() 0393 { 0394 SinkLog() << "Upgrading..."; 0395 0396 //Migrate from sink.dav to sink.carddav 0397 const auto resources = ResourceConfig::getResources(); 0398 for (auto it = resources.constBegin(); it != resources.constEnd(); it++) { 0399 if (it.value() == "sink.dav") { 0400 ResourceConfig::setResourceType(it.key(), "sink.carddav"); 0401 } 0402 } 0403 0404 auto ret = QSharedPointer<bool>::create(false); 0405 return fetchAll<ApplicationDomain::SinkResource>({}) 0406 .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> { 0407 return Sink::upgrade(resource->identifier()) 0408 .then([ret](UpgradeResult returnValue) { 0409 if (returnValue.upgradeExecuted) { 0410 SinkLog() << "Upgrade executed."; 0411 *ret = true; 0412 } 0413 }); 0414 }) 0415 .then([ret] { 0416 if (*ret) { 0417 SinkLog() << "Upgrade complete."; 0418 } 0419 return Store::UpgradeResult{*ret}; 0420 }); 0421 } 0422 0423 static KAsync::Job<void> synchronize(const QByteArray &resource, const Sink::SyncScope &scope) 0424 { 0425 SinkLog() << "Synchronizing " << resource << scope; 0426 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 0427 return resourceAccess->synchronizeResource(scope) 0428 .addToContext(resourceAccess) 0429 .then([=](const KAsync::Error &error) { 0430 if (error) { 0431 SinkWarning() << "Error during sync."; 0432 return KAsync::error(error); 0433 } 0434 SinkTrace() << "Synchronization of resource " << resource << " complete."; 0435 return KAsync::null(); 0436 }); 0437 } 0438 0439 KAsync::Job<void> Store::synchronize(const Sink::Query &query) 0440 { 0441 return synchronize(Sink::SyncScope{query}); 0442 } 0443 0444 KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) 0445 { 0446 SinkLog() << "Synchronizing all resource matching: " << scope; 0447 return forEachResource(scope, [=] (const auto &resource) { 0448 return synchronize(resource->identifier(), scope); 0449 }); 0450 } 0451 0452 KAsync::Job<void> Store::abortSynchronization(const Sink::SyncScope &scope) 0453 { 0454 return forEachResource(scope, [] (const auto &resource) { 0455 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource->identifier(), ResourceConfig::getResourceType(resource->identifier())); 0456 return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand) 0457 .addToContext(resourceAccess) 0458 .then([=](const KAsync::Error &error) { 0459 if (error) { 0460 SinkWarning() << "Error aborting synchronization."; 0461 return KAsync::error(error); 0462 } 0463 return KAsync::null(); 0464 }); 0465 }); 0466 } 0467 0468 template <class DomainType> 0469 KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) 0470 { 0471 return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) { 0472 return KAsync::value(*list.first()); 0473 }); 0474 } 0475 0476 template <class DomainType> 0477 KAsync::Job<QList<typename DomainType::Ptr>> Store::fetchAll(const Sink::Query &query) 0478 { 0479 return fetch<DomainType>(query); 0480 } 0481 0482 template <class DomainType> 0483 KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &query, int minimumAmount) 0484 { 0485 Q_ASSERT(sanityCheckQuery(query)); 0486 auto model = loadModel<DomainType>(query); 0487 auto list = QSharedPointer<QList<typename DomainType::Ptr>>::create(); 0488 auto context = QSharedPointer<QObject>::create(); 0489 return KAsync::start<QList<typename DomainType::Ptr>>([model, list, context, minimumAmount](KAsync::Future<QList<typename DomainType::Ptr>> &future) { 0490 if (model->rowCount() >= 1) { 0491 for (int i = 0; i < model->rowCount(); i++) { 0492 list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value<typename DomainType::Ptr>()); 0493 } 0494 } else { 0495 QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) { 0496 for (int i = start; i <= end; i++) { 0497 list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value<typename DomainType::Ptr>()); 0498 } 0499 }); 0500 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), 0501 [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector<int> &roles) { 0502 if (roles.contains(ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole)) { 0503 if (list->size() < minimumAmount) { 0504 future.setError(1, "Not enough values."); 0505 } else { 0506 future.setValue(*list); 0507 future.setFinished(); 0508 } 0509 } 0510 }); 0511 } 0512 if (model->data(QModelIndex(), ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole).toBool()) { 0513 if (list->size() < minimumAmount) { 0514 future.setError(1, "Not enough values."); 0515 } else { 0516 future.setValue(*list); 0517 } 0518 future.setFinished(); 0519 } 0520 }); 0521 } 0522 0523 template <class DomainType> 0524 DomainType Store::readOne(const Sink::Query &query) 0525 { 0526 const auto list = read<DomainType>(query); 0527 if (!list.isEmpty()) { 0528 return list.first(); 0529 } 0530 SinkWarning() << "Tried to read value but no values are available."; 0531 return DomainType(); 0532 } 0533 0534 template <class DomainType> 0535 QList<DomainType> Store::read(const Sink::Query &query_) 0536 { 0537 Q_ASSERT(sanityCheckQuery(query_)); 0538 auto query = query_; 0539 query.setFlags(Query::SynchronousQuery); 0540 0541 auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>()); 0542 0543 QList<DomainType> list; 0544 0545 auto result = getEmitter<DomainType>(query, ctx); 0546 auto aggregatingEmitter = result.first; 0547 aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ 0548 SinkTraceCtx(ctx) << "Found value: " << value->identifier(); 0549 list << *value; 0550 }); 0551 0552 if (auto resourceEmitter = result.second) { 0553 resourceEmitter->fetch(); 0554 } 0555 0556 aggregatingEmitter->fetch(); 0557 return list; 0558 } 0559 0560 #define REGISTER_TYPE(T) \ 0561 template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ 0562 template KAsync::Job<void> Store::remove<T>(const Query &); \ 0563 template KAsync::Job<void> Store::create<T>(const T &domainObject); \ 0564 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ 0565 template KAsync::Job<void> Store::modify<T>(const Query &, const T &); \ 0566 template KAsync::Job<void> Store::move<T>(const T &domainObject, const QByteArray &newResource); \ 0567 template KAsync::Job<void> Store::copy<T>(const T &domainObject, const QByteArray &newResource); \ 0568 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(const Query &query); \ 0569 template void Store::updateModel<T>(const Query &, const QSharedPointer<QAbstractItemModel> &); \ 0570 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ 0571 template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ 0572 template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); \ 0573 template T Store::readOne<T>(const Query &); \ 0574 template QList<T> Store::read<T>(const Query &); 0575 0576 SINK_REGISTER_TYPES() 0577 0578 } // namespace Sink