File indexing completed on 2024-05-12 05:26:06

0001 /*
0002  * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
0003  *
0004  * This library is free software; you can redistribute it and/or
0005  * modify it under the terms of the GNU Lesser General Public
0006  * License as published by the Free Software Foundation; either
0007  * version 2.1 of the License, or (at your option) version 3, or any
0008  * later version accepted by the membership of KDE e.V. (or its
0009  * successor approved by the membership of KDE e.V.), which shall
0010  * act as a proxy defined in Section 6 of version 3 of the license.
0011  *
0012  * This library is distributed in the hope that it will be useful,
0013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0015  * Lesser General Public License for more details.
0016  *
0017  * You should have received a copy of the GNU Lesser General Public
0018  * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0019  */
0020 
0021 #include "store.h"
0022 
0023 #include <QTime>
0024 #include <QAbstractItemModel>
0025 #include <functional>
0026 #include <memory>
0027 
0028 #include "resourceaccess.h"
0029 #include "commands.h"
0030 #include "resourcefacade.h"
0031 #include "definitions.h"
0032 #include "resourceconfig.h"
0033 #include "resourcecontrol.h"
0034 #include "facadefactory.h"
0035 #include "modelresult.h"
0036 #include "storage.h"
0037 #include "log.h"
0038 #include "utils.h"
0039 
0040 #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast<int>(A) == static_cast<int>(B), "The enum values must match");
0041 
0042 //Ensure the copied enum matches
0043 typedef ModelResult<Sink::ApplicationDomain::Mail, Sink::ApplicationDomain::Mail::Ptr> MailModelResult;
0044 ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole)
0045 ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole)
0046 ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole)
0047 ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole)
0048 ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole)
0049 ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole)
0050 
0051 Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>)
0052 Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>);
0053 Q_DECLARE_METATYPE(std::shared_ptr<void>);
0054 
0055 
0056 static bool sanityCheckQuery(const Sink::Query &query)
0057 {
0058     for (const auto &id : query.ids()) {
0059         if (id.isEmpty()) {
0060             SinkError() << "Empty id in query.";
0061             return false;
0062         }
0063     }
0064     return true;
0065 }
0066 
0067 static KAsync::Job<void> forEachResource(const Sink::SyncScope &scope, std::function<KAsync::Job<void>(const Sink::ApplicationDomain::SinkResource::Ptr &resource)> callback)
0068 {
0069     using namespace Sink;
0070     auto resourceFilter = scope.getResourceFilter();
0071     //Filter resources by type by default
0072     if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) {
0073         resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains});
0074     }
0075     Sink::Query query;
0076     query.setFilter(resourceFilter);
0077     return Store::fetchAll<ApplicationDomain::SinkResource>(query)
0078         .template each(callback);
0079 }
0080 
0081 namespace Sink {
0082 
0083 QString Store::storageLocation()
0084 {
0085     return Sink::storageLocation();
0086 }
0087 
0088 
0089 template <class DomainType>
0090 KAsync::Job<void> queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_)
0091 {
0092     auto ctx = ctx_.subContext(resourceInstanceIdentifier);
0093     auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier);
0094     if (facade) {
0095         SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier;
0096         auto result = facade->load(query, ctx);
0097         if (result.second) {
0098             aggregatingEmitter->addEmitter(result.second);
0099         } else {
0100             SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier;
0101         }
0102         return result.first;
0103     } else {
0104         SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier;
0105         // Ignore the error and carry on
0106         return KAsync::null<void>();
0107     }
0108 }
0109 
0110 template <class DomainType>
0111 QPair<typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr,  typename ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr> getEmitter(Query query, const Log::Context &ctx)
0112 {
0113     query.setType(ApplicationDomain::getTypeName<DomainType>());
0114     SinkTraceCtx(ctx) << "Query: " << query;
0115 
0116     // Query all resources and aggregate results
0117     auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create();
0118     if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) {
0119         //For global types we don't need to query for the resources first.
0120         queryResource<DomainType>("", "", query, aggregatingEmitter, ctx).exec();
0121     } else {
0122         auto resourceCtx = ctx.subContext("resourceQuery");
0123         auto facade = FacadeFactory::instance().getFacade<ApplicationDomain::SinkResource>();
0124         Q_ASSERT(facade);
0125         Sink::Query resourceQuery;
0126         resourceQuery.request<ApplicationDomain::SinkResource::Capabilities>();
0127         if (query.liveQuery()) {
0128             SinkTraceCtx(ctx) << "Listening for new resources.";
0129             resourceQuery.setFlags(Query::LiveQuery);
0130         }
0131 
0132         //Filter resources by available content types (unless the query already specifies a capability filter)
0133         auto resourceFilter = query.getResourceFilter();
0134         if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) {
0135             resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName<DomainType>(), Query::Comparator::Contains});
0136         }
0137         resourceQuery.setFilter(resourceFilter);
0138         for (auto const &properties :  resourceFilter.propertyFilter.keys()) {
0139             resourceQuery.requestedProperties << properties;
0140         }
0141 
0142         auto result = facade->load(resourceQuery, resourceCtx);
0143         auto emitter = result.second;
0144         emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) {
0145             SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier();
0146             const auto resourceType = ResourceConfig::getResourceType(resource->identifier());
0147             Q_ASSERT(!resourceType.isEmpty());
0148             queryResource<DomainType>(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec();
0149         });
0150         emitter->onComplete([query, aggregatingEmitter, resourceCtx]() {
0151             SinkTraceCtx(resourceCtx) << "Resource query complete";
0152         });
0153 
0154         return qMakePair(aggregatingEmitter, emitter);
0155     }
0156     return qMakePair(aggregatingEmitter, ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr{});
0157 }
0158 
0159 static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type)
0160 {
0161     if (!query.id().isEmpty()) {
0162         return Log::Context{"query." + type + "." + query.id()};
0163     }
0164     return Log::Context{"query." + type};
0165 }
0166 
0167 template <class DomainType>
0168 QSharedPointer<QAbstractItemModel> Store::loadModel(const Query &query)
0169 {
0170     Q_ASSERT(sanityCheckQuery(query));
0171     auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>());
0172     auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx);
0173 
0174     //* Client defines lifetime of model
0175     //* The model lifetime defines the duration of live-queries
0176     //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks
0177     //* The emitter needs to live or the duration of query (respectively, the model)
0178     //* The result provider needs to live for as long as results are provided (until the last thread exits).
0179 
0180     auto result = getEmitter<DomainType>(query, ctx);
0181     model->setEmitter(result.first);
0182 
0183     //Keep the emitter alive
0184     if (auto resourceEmitter = result.second) {
0185         model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries
0186         resourceEmitter->fetch();
0187     }
0188 
0189 
0190     //Automatically populate the top-level
0191     model->fetchMore(QModelIndex());
0192 
0193     return std::move(model);
0194 }
0195 
0196 template <class DomainType>
0197 void Store::updateModel(const Query &query, const QSharedPointer<QAbstractItemModel> &model)
0198 {
0199     Q_ASSERT(sanityCheckQuery(query));
0200     auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>());
0201 
0202     auto result = getEmitter<DomainType>(query, ctx);
0203 
0204     QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>> m = model.dynamicCast<ModelResult<DomainType, typename DomainType::Ptr>>();
0205     Q_ASSERT(m);
0206     m->setEmitter(result.first);
0207 
0208     //Keep the emitter alive
0209     if (auto resourceEmitter = result.second) {
0210         m->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries
0211         resourceEmitter->fetch();
0212     }
0213 
0214     m->updateQuery(query);
0215 }
0216 
0217 template <class DomainType>
0218 static std::shared_ptr<StoreFacade<DomainType>> getFacade(const QByteArray &resourceInstanceIdentifier)
0219 {
0220     if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) {
0221         if (auto facade = FacadeFactory::instance().getFacade<DomainType>()) {
0222             return facade;
0223         }
0224     }
0225     if (auto facade = FacadeFactory::instance().getFacade<DomainType>(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) {
0226         return facade;
0227     }
0228     return std::make_shared<NullFacade<DomainType>>();
0229 }
0230 
0231 template <class DomainType>
0232 KAsync::Job<void> Store::create(const DomainType &domainObject)
0233 {
0234     SinkLog() << "Create: " << domainObject;
0235     auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
0236     return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; });
0237 }
0238 
0239 template <class DomainType>
0240 KAsync::Job<void> Store::modify(const DomainType &domainObject)
0241 {
0242     if (domainObject.changedProperties().isEmpty()) {
0243         SinkLog() << "Nothing to modify: " << domainObject.identifier();
0244         return KAsync::null();
0245     }
0246     SinkLog() << "Modify: " << domainObject;
0247     auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
0248     if (domainObject.isAggregate()) {
0249         return KAsync::value(domainObject.aggregatedIds())
0250             .addToContext(std::shared_ptr<void>(facade))
0251             .each([=] (const QByteArray &id) {
0252                 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject);
0253                 return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; });
0254         });
0255     }
0256     return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; });
0257 }
0258 
0259 template <class DomainType>
0260 KAsync::Job<void> Store::modify(const Query &query, const DomainType &domainObject)
0261 {
0262     if (domainObject.changedProperties().isEmpty()) {
0263         SinkLog() << "Nothing to modify: " << domainObject.identifier();
0264         return KAsync::null();
0265     }
0266     SinkLog() << "Modify: " << query << domainObject;
0267     return fetchAll<DomainType>(query)
0268         .each([=] (const typename DomainType::Ptr &entity) {
0269             auto copy = *entity;
0270             for (const auto &p : domainObject.changedProperties()) {
0271                 copy.setProperty(p, domainObject.getProperty(p));
0272             }
0273             return modify(copy);
0274         });
0275 }
0276 
0277 template <class DomainType>
0278 KAsync::Job<void> Store::move(const DomainType &domainObject, const QByteArray &newResource)
0279 {
0280     SinkLog() << "Move: " << domainObject << newResource;
0281     auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
0282     if (domainObject.isAggregate()) {
0283         return KAsync::value(domainObject.aggregatedIds())
0284             .addToContext(std::shared_ptr<void>(facade))
0285             .each([=] (const QByteArray &id) {
0286                 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject);
0287                 return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; });
0288         });
0289     }
0290     return facade->move(domainObject, newResource).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; });
0291 }
0292 
0293 template <class DomainType>
0294 KAsync::Job<void> Store::copy(const DomainType &domainObject, const QByteArray &newResource)
0295 {
0296     SinkLog() << "Copy: " << domainObject << newResource;
0297     auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
0298     if (domainObject.isAggregate()) {
0299         return KAsync::value(domainObject.aggregatedIds())
0300             .addToContext(std::shared_ptr<void>(facade))
0301             .each([=] (const QByteArray &id) {
0302                 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject);
0303                 return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; });
0304         });
0305     }
0306     return facade->copy(domainObject, newResource).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; });
0307 }
0308 
0309 template <class DomainType>
0310 KAsync::Job<void> Store::remove(const DomainType &domainObject)
0311 {
0312     SinkLog() << "Remove: " << domainObject;
0313     auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
0314     if (domainObject.isAggregate()) {
0315         return KAsync::value(domainObject.aggregatedIds())
0316             .addToContext(std::shared_ptr<void>(facade))
0317             .each([=] (const QByteArray &id) {
0318                 auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject);
0319                 return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; });
0320         });
0321     }
0322     return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; });
0323 }
0324 
0325 template <class DomainType>
0326 KAsync::Job<void> Store::remove(const Sink::Query &query)
0327 {
0328     SinkLog() << "Remove: " << query;
0329     return fetchAll<DomainType>(query)
0330         .each([] (const typename DomainType::Ptr &entity) {
0331             return remove(*entity);
0332         });
0333 }
0334 
0335 KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
0336 {
0337     // All databases are going to become invalid, nuke the environments
0338     // TODO: all clients should react to a notification from the resource
0339     Sink::Storage::DataStore::clearEnv();
0340     SinkTrace() << "Remove data from disk " << identifier;
0341     auto time = QSharedPointer<QTime>::create();
0342     time->start();
0343     auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
0344     resourceAccess->open();
0345     return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand)
0346         .addToContext(resourceAccess)
0347         .then<void>([resourceAccess](KAsync::Future<void> &future) {
0348             if (resourceAccess->isReady()) {
0349                 //Wait for the resource shutdown
0350                 auto guard = new QObject;
0351                 QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) {
0352                     if (!ready) {
0353                         //We don't disconnect if ResourceAccess get's recycled, so ready can fire multiple times, which can result in a crash if the future is no longer valid.
0354                         delete guard;
0355                         future.setFinished();
0356                     }
0357                 });
0358             } else {
0359                 future.setFinished();
0360             }
0361         })
0362         .then([time]() {
0363             SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed());
0364         });
0365 }
0366 
0367 static KAsync::Job<Store::UpgradeResult> upgrade(const QByteArray &resource)
0368 {
0369     auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly);
0370     if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) {
0371         return KAsync::value(Store::UpgradeResult{false});
0372     }
0373     SinkLog() << "Upgrading " << resource;
0374 
0375     //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType
0376     auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater};
0377     //We first shutdown the resource, because the upgrade runs on start
0378     return Sink::ResourceControl::shutdown(resource)
0379         .then(resourceAccess->sendCommand(Sink::Commands::UpgradeCommand))
0380         .addToContext(resourceAccess)
0381         .then([=](const KAsync::Error &error) {
0382             if (error) {
0383                 SinkWarning() << "Error during upgrade.";
0384                 return KAsync::error(error);
0385             }
0386             SinkTrace() << "Upgrade of resource " << resource << " complete.";
0387             return KAsync::null();
0388         })
0389         .then(KAsync::value(Store::UpgradeResult{true}));
0390 }
0391 
0392 KAsync::Job<Store::UpgradeResult> Store::upgrade()
0393 {
0394     SinkLog() << "Upgrading...";
0395 
0396     //Migrate from sink.dav to sink.carddav
0397     const auto resources = ResourceConfig::getResources();
0398     for (auto it = resources.constBegin(); it != resources.constEnd(); it++) {
0399         if (it.value() == "sink.dav") {
0400             ResourceConfig::setResourceType(it.key(), "sink.carddav");
0401         }
0402     }
0403 
0404     auto ret = QSharedPointer<bool>::create(false);
0405     return fetchAll<ApplicationDomain::SinkResource>({})
0406         .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> {
0407             return Sink::upgrade(resource->identifier())
0408                 .then([ret](UpgradeResult returnValue) {
0409                     if (returnValue.upgradeExecuted) {
0410                         SinkLog() << "Upgrade executed.";
0411                         *ret = true;
0412                     }
0413                 });
0414         })
0415         .then([ret] {
0416             if (*ret) {
0417                 SinkLog() << "Upgrade complete.";
0418             }
0419             return Store::UpgradeResult{*ret};
0420         });
0421 }
0422 
0423 static KAsync::Job<void> synchronize(const QByteArray &resource, const Sink::SyncScope &scope)
0424 {
0425     SinkLog() << "Synchronizing " << resource << scope;
0426     auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
0427     return resourceAccess->synchronizeResource(scope)
0428         .addToContext(resourceAccess)
0429         .then([=](const KAsync::Error &error) {
0430             if (error) {
0431                 SinkWarning() << "Error during sync.";
0432                 return KAsync::error(error);
0433             }
0434             SinkTrace() << "Synchronization of resource " << resource << " complete.";
0435             return KAsync::null();
0436         });
0437 }
0438 
0439 KAsync::Job<void> Store::synchronize(const Sink::Query &query)
0440 {
0441     return synchronize(Sink::SyncScope{query});
0442 }
0443 
0444 KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope)
0445 {
0446     SinkLog() << "Synchronizing all resource matching: " << scope;
0447     return forEachResource(scope, [=] (const auto &resource) {
0448             return synchronize(resource->identifier(), scope);
0449         });
0450 }
0451 
0452 KAsync::Job<void> Store::abortSynchronization(const Sink::SyncScope &scope)
0453 {
0454     return forEachResource(scope, [] (const auto &resource) {
0455         auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource->identifier(), ResourceConfig::getResourceType(resource->identifier()));
0456         return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand)
0457             .addToContext(resourceAccess)
0458             .then([=](const KAsync::Error &error) {
0459                 if (error) {
0460                     SinkWarning() << "Error aborting synchronization.";
0461                     return KAsync::error(error);
0462                 }
0463                 return KAsync::null();
0464             });
0465         });
0466 }
0467 
0468 template <class DomainType>
0469 KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query)
0470 {
0471     return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) {
0472         return KAsync::value(*list.first());
0473     });
0474 }
0475 
0476 template <class DomainType>
0477 KAsync::Job<QList<typename DomainType::Ptr>> Store::fetchAll(const Sink::Query &query)
0478 {
0479     return fetch<DomainType>(query);
0480 }
0481 
0482 template <class DomainType>
0483 KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &query, int minimumAmount)
0484 {
0485     Q_ASSERT(sanityCheckQuery(query));
0486     auto model = loadModel<DomainType>(query);
0487     auto list = QSharedPointer<QList<typename DomainType::Ptr>>::create();
0488     auto context = QSharedPointer<QObject>::create();
0489     return KAsync::start<QList<typename DomainType::Ptr>>([model, list, context, minimumAmount](KAsync::Future<QList<typename DomainType::Ptr>> &future) {
0490         if (model->rowCount() >= 1) {
0491             for (int i = 0; i < model->rowCount(); i++) {
0492                 list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value<typename DomainType::Ptr>());
0493             }
0494         } else {
0495             QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) {
0496                 for (int i = start; i <= end; i++) {
0497                     list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value<typename DomainType::Ptr>());
0498                 }
0499             });
0500             QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(),
0501                 [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector<int> &roles) {
0502                     if (roles.contains(ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole)) {
0503                         if (list->size() < minimumAmount) {
0504                             future.setError(1, "Not enough values.");
0505                         } else {
0506                             future.setValue(*list);
0507                             future.setFinished();
0508                         }
0509                     }
0510                 });
0511         }
0512         if (model->data(QModelIndex(), ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole).toBool()) {
0513             if (list->size() < minimumAmount) {
0514                 future.setError(1, "Not enough values.");
0515             } else {
0516                 future.setValue(*list);
0517             }
0518             future.setFinished();
0519         }
0520     });
0521 }
0522 
0523 template <class DomainType>
0524 DomainType Store::readOne(const Sink::Query &query)
0525 {
0526     const auto list = read<DomainType>(query);
0527     if (!list.isEmpty()) {
0528         return list.first();
0529     }
0530     SinkWarning() << "Tried to read value but no values are available.";
0531     return DomainType();
0532 }
0533 
0534 template <class DomainType>
0535 QList<DomainType> Store::read(const Sink::Query &query_)
0536 {
0537     Q_ASSERT(sanityCheckQuery(query_));
0538     auto query = query_;
0539     query.setFlags(Query::SynchronousQuery);
0540 
0541     auto ctx = getQueryContext(query, ApplicationDomain::getTypeName<DomainType>());
0542 
0543     QList<DomainType> list;
0544 
0545     auto result = getEmitter<DomainType>(query, ctx);
0546     auto aggregatingEmitter = result.first;
0547     aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){
0548         SinkTraceCtx(ctx) << "Found value: " << value->identifier();
0549         list << *value;
0550     });
0551 
0552     if (auto resourceEmitter = result.second) {
0553         resourceEmitter->fetch();
0554     }
0555 
0556     aggregatingEmitter->fetch();
0557     return list;
0558 }
0559 
0560 #define REGISTER_TYPE(T)                                                          \
0561     template KAsync::Job<void> Store::remove<T>(const T &domainObject);           \
0562     template KAsync::Job<void> Store::remove<T>(const Query &);           \
0563     template KAsync::Job<void> Store::create<T>(const T &domainObject);           \
0564     template KAsync::Job<void> Store::modify<T>(const T &domainObject);           \
0565     template KAsync::Job<void> Store::modify<T>(const Query &, const T &);           \
0566     template KAsync::Job<void> Store::move<T>(const T &domainObject, const QByteArray &newResource);           \
0567     template KAsync::Job<void> Store::copy<T>(const T &domainObject, const QByteArray &newResource);           \
0568     template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(const Query &query); \
0569     template void Store::updateModel<T>(const Query &, const QSharedPointer<QAbstractItemModel> &); \
0570     template KAsync::Job<T> Store::fetchOne<T>(const Query &);                    \
0571     template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &);        \
0572     template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int);      \
0573     template T Store::readOne<T>(const Query &);                                  \
0574     template QList<T> Store::read<T>(const Query &);
0575 
0576 SINK_REGISTER_TYPES()
0577 
0578 } // namespace Sink