File indexing completed on 2024-06-23 05:07:01

0001 /***************************************************************************
0002  *   SPDX-FileCopyrightText: 2007 Robert Zwerus <arzie@dds.nl>             *
0003  *                                                                         *
0004  *   SPDX-License-Identifier: LGPL-2.0-or-later                            *
0005  ***************************************************************************/
0006 
0007 #include "itemcreatehandler.h"
0008 
0009 #include "akonadi.h"
0010 #include "connection.h"
0011 #include "handlerhelper.h"
0012 #include "itemfetchhelper.h"
0013 #include "preprocessormanager.h"
0014 #include "private/externalpartstorage_p.h"
0015 #include "storage/datastore.h"
0016 #include "storage/dbconfig.h"
0017 #include "storage/itemretrievalmanager.h"
0018 #include "storage/parthelper.h"
0019 #include "storage/partstreamer.h"
0020 #include "storage/parttypehelper.h"
0021 #include "storage/selectquerybuilder.h"
0022 #include "storage/transaction.h"
0023 
0024 #include "shared/akranges.h"
0025 #include "shared/akscopeguard.h"
0026 
0027 #include <numeric> //std::accumulate
0028 
0029 using namespace Akonadi;
0030 using namespace Akonadi::Server;
0031 using namespace AkRanges;
0032 
0033 ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
0034     : Handler(akonadi)
0035 {
0036 }
0037 
0038 bool ItemCreateHandler::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item, Collection &parentCol)
0039 {
0040     parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
0041     if (!parentCol.isValid()) {
0042         return failureResponse(QStringLiteral("Invalid parent collection"));
0043     }
0044     if (parentCol.isVirtual()) {
0045         return failureResponse(QStringLiteral("Cannot append item into virtual collection"));
0046     }
0047 
0048     MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
0049     if (!mimeType.isValid()) {
0050         return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'."));
0051     }
0052 
0053     item.setRev(0);
0054     item.setSize(cmd.itemSize());
0055     item.setMimeTypeId(mimeType.id());
0056     item.setCollectionId(parentCol.id());
0057     item.setDatetime(cmd.dateTime());
0058     if (cmd.remoteId().isEmpty()) {
0059         // from application
0060         item.setDirty(true);
0061     } else {
0062         // from resource
0063         item.setRemoteId(cmd.remoteId());
0064         item.setDirty(false);
0065     }
0066     item.setRemoteRevision(cmd.remoteRevision());
0067     item.setGid(cmd.gid());
0068 
0069     item.setAtime(cmd.modificationTime().isValid() ? cmd.modificationTime() : QDateTime::currentDateTimeUtc());
0070 
0071     return true;
0072 }
0073 
0074 bool ItemCreateHandler::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item, const Collection &parentCol)
0075 {
0076     if (!item.datetime().isValid()) {
0077         item.setDatetime(QDateTime::currentDateTimeUtc());
0078     }
0079 
0080     if (!item.insert()) {
0081         return failureResponse(QStringLiteral("Failed to append item"));
0082     }
0083 
0084     // set message flags
0085     const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
0086     if (!flags.isEmpty()) {
0087         // This will hit an entry in cache inserted there in buildPimItem()
0088         const Flag::List flagList = HandlerHelper::resolveFlags(flags);
0089         bool flagsChanged = false;
0090         if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, false, parentCol, true)) {
0091             return failureResponse("Unable to append item flags.");
0092         }
0093     }
0094 
0095     const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
0096     if (!tags.isEmpty()) {
0097         const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
0098         bool tagsChanged = false;
0099         if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, false, parentCol, true)) {
0100             return failureResponse(QStringLiteral("Unable to append item tags."));
0101         }
0102     }
0103 
0104     // Handle individual parts
0105     qint64 partSizes = 0;
0106     PartStreamer streamer(connection(), item);
0107     const auto parts = cmd.parts();
0108     for (const QByteArray &partName : parts) {
0109         qint64 partSize = 0;
0110         try {
0111             streamer.stream(true, partName, partSize);
0112         } catch (const PartStreamerException &e) {
0113             return failureResponse(e.what());
0114         }
0115         partSizes += partSize;
0116     }
0117     const Protocol::Attributes attrs = cmd.attributes();
0118     for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
0119         try {
0120             streamer.streamAttribute(true, iter.key(), iter.value());
0121         } catch (const PartStreamerException &e) {
0122             return failureResponse(e.what());
0123         }
0124     }
0125 
0126     // TODO: Try to avoid this addition query
0127     if (partSizes > item.size()) {
0128         item.setSize(partSizes);
0129         item.update();
0130     }
0131 
0132     // Preprocessing
0133     if (akonadi().preprocessorManager().isActive()) {
0134         Part hiddenAttribute;
0135         hiddenAttribute.setPimItemId(item.id());
0136         hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN)));
0137         hiddenAttribute.setData(QByteArray());
0138         hiddenAttribute.setDatasize(0);
0139         // TODO: Handle errors? Technically, this is not a critical issue as no data are lost
0140         PartHelper::insert(&hiddenAttribute);
0141     }
0142 
0143     const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED);
0144     notify(item, seen, item.collection());
0145     sendResponse(item, Protocol::CreateItemCommand::None);
0146 
0147     return true;
0148 }
0149 
0150 bool ItemCreateHandler::mergeItem(const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem &currentItem, const Collection &parentCol)
0151 {
0152     bool needsUpdate = false;
0153     bool ignoreFlagsChanges = false;
0154     QSet<QByteArray> changedParts;
0155 
0156     if (currentItem.atime() > newItem.atime()) {
0157         qCDebug(AKONADISERVER_LOG) << "Akoandi has newer atime of Item " << currentItem.id() << " than the resource (local atime =" << currentItem.atime()
0158                                    << ", remote atime =" << newItem.atime() << "), ignoring flags changes.";
0159         // This handles a race that is rather specific to IMAP: if I change flags in KMail while the folder is syncing, the flags from sync will
0160         // overwrite my local changes.
0161         // Without server-side change recording we don't have any way to know what has really changed locally, so we just assume it's flags and
0162         // we will assume that the flags have not changed on the server as well (and if so, we will consider the local state superior to remote).
0163         ignoreFlagsChanges = true;
0164     }
0165 
0166     if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
0167         currentItem.setRemoteId(newItem.remoteId());
0168         changedParts.insert(AKONADI_PARAM_REMOTEID);
0169         needsUpdate = true;
0170     }
0171     if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
0172         currentItem.setRemoteRevision(newItem.remoteRevision());
0173         changedParts.insert(AKONADI_PARAM_REMOTEREVISION);
0174         needsUpdate = true;
0175     }
0176     if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
0177         currentItem.setGid(newItem.gid());
0178         changedParts.insert(AKONADI_PARAM_GID);
0179         needsUpdate = true;
0180     }
0181     if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
0182         currentItem.setDatetime(newItem.datetime());
0183         needsUpdate = true;
0184     }
0185 
0186     if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
0187         currentItem.setSize(newItem.size());
0188         needsUpdate = true;
0189     }
0190 
0191     const Collection col = Collection::retrieveById(parentCol.id());
0192     if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
0193         bool flagsAdded = false;
0194         bool flagsRemoved = false;
0195         if (!cmd.addedFlags().isEmpty()) {
0196             const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags());
0197             storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, true, col, true);
0198         }
0199         if (!cmd.removedFlags().isEmpty()) {
0200             const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags());
0201             storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, true);
0202         }
0203         if (flagsAdded || flagsRemoved) {
0204             changedParts.insert(AKONADI_PARAM_FLAGS);
0205             needsUpdate = true;
0206         }
0207     } else if (!ignoreFlagsChanges) {
0208         bool flagsChanged = false;
0209         QSet<QByteArray> flagNames = cmd.flags();
0210 
0211         static QList<QByteArray> localFlagsToPreserve = {"$ATTACHMENT", "$INVITATION", "$ENCRYPTED", "$SIGNED", "$WATCHED"};
0212 
0213         // Make sure we don't overwrite some local-only flags that can't come
0214         // through from Resource during ItemSync, like $ATTACHMENT, because the
0215         // resource is not aware of them (they are usually assigned by client
0216         // upon inspecting the payload)
0217         const Flag::List currentFlags = currentItem.flags();
0218         for (const Flag &currentFlag : currentFlags) {
0219             const QByteArray currentFlagName = currentFlag.name().toLatin1();
0220             if (localFlagsToPreserve.contains(currentFlagName)) {
0221                 flagNames.insert(currentFlagName);
0222             }
0223         }
0224         const auto flags = HandlerHelper::resolveFlags(flagNames);
0225         storageBackend()->setItemsFlags({currentItem}, &currentFlags, flags, &flagsChanged, col, true);
0226         if (flagsChanged) {
0227             changedParts.insert(AKONADI_PARAM_FLAGS);
0228             needsUpdate = true;
0229         }
0230     }
0231 
0232     if (cmd.tags().isEmpty()) {
0233         bool tagsAdded = false;
0234         bool tagsRemoved = false;
0235         if (!cmd.addedTags().isEmpty()) {
0236             const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
0237             storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, true, col, true);
0238         }
0239         if (!cmd.removedTags().isEmpty()) {
0240             const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
0241             storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, true);
0242         }
0243 
0244         if (tagsAdded || tagsRemoved) {
0245             changedParts.insert(AKONADI_PARAM_TAGS);
0246             needsUpdate = true;
0247         }
0248     } else {
0249         bool tagsChanged = false;
0250         const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
0251         storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, true);
0252         if (tagsChanged) {
0253             changedParts.insert(AKONADI_PARAM_TAGS);
0254             needsUpdate = true;
0255         }
0256     }
0257 
0258     const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
0259     QMap<QByteArray, qint64> partsSizes;
0260     for (const Part &part : existingParts) {
0261         partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize());
0262     }
0263 
0264     PartStreamer streamer(connection(), currentItem);
0265     const auto partNames = cmd.parts();
0266     for (const QByteArray &partName : partNames) {
0267         bool changed = false;
0268         qint64 partSize = 0;
0269         try {
0270             streamer.stream(true, partName, partSize, &changed);
0271         } catch (const PartStreamerException &e) {
0272             return failureResponse(e.what());
0273         }
0274 
0275         if (changed) {
0276             changedParts.insert(partName);
0277             partsSizes.insert(partName, partSize);
0278             needsUpdate = true;
0279         }
0280     }
0281 
0282     const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0LL);
0283     if (size > currentItem.size()) {
0284         currentItem.setSize(size);
0285         needsUpdate = true;
0286     }
0287 
0288     if (needsUpdate) {
0289         currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
0290         currentItem.setAtime(QDateTime::currentDateTimeUtc());
0291         // Only mark dirty when merged from application
0292         currentItem.setDirty(!connection()->context().resource().isValid());
0293 
0294         // Store all changes
0295         if (!currentItem.update()) {
0296             return failureResponse("Failed to store merged item");
0297         }
0298 
0299         notify(currentItem, currentItem.collection(), changedParts);
0300     }
0301 
0302     sendResponse(currentItem, cmd.mergeModes());
0303 
0304     return true;
0305 }
0306 
0307 bool ItemCreateHandler::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
0308 {
0309     if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
0310         Protocol::FetchItemsResponse resp;
0311         resp.setId(item.id());
0312         resp.setMTime(item.datetime());
0313         Handler::sendResponse(std::move(resp));
0314         return true;
0315     }
0316 
0317     Protocol::ItemFetchScope fetchScope;
0318     fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
0319     fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly
0320                         | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID
0321                         | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags);
0322     ImapSet set;
0323     set.add(QList<qint64>() << item.id());
0324     Scope scope;
0325     scope.setUidSet(set);
0326 
0327     ItemFetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}, akonadi());
0328     if (!fetchHelper.fetchItems()) {
0329         return failureResponse("Failed to retrieve item");
0330     }
0331 
0332     return true;
0333 }
0334 
0335 bool ItemCreateHandler::notify(const PimItem &item, bool seen, const Collection &collection)
0336 {
0337     storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
0338 
0339     if (akonadi().preprocessorManager().isActive()) {
0340         // enqueue the item for preprocessing
0341         akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
0342     }
0343     return true;
0344 }
0345 
0346 bool ItemCreateHandler::notify(const PimItem &item, const Collection &collection, const QSet<QByteArray> &changedParts)
0347 {
0348     if (!changedParts.isEmpty()) {
0349         storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
0350     }
0351     return true;
0352 }
0353 
0354 void ItemCreateHandler::recoverFromMultipleMergeCandidates(const PimItem::List &items, const Collection &collection)
0355 {
0356     // HACK HACK HACK: When this happens within ItemSync, we are running inside a client-side
0357     // transaction, so just calling commit here won't have any effect, since this handler will
0358     // ultimately fail and the client will rollback the transaction. To circumvent this, we
0359     // will forcibly commit the transaction, do our changes here within a new transaction and
0360     // then we open a new transaction so that the client won't notice.
0361 
0362     int transactionDepth = 0;
0363     while (storageBackend()->inTransaction()) {
0364         ++transactionDepth;
0365         storageBackend()->commitTransaction();
0366     }
0367     const AkScopeGuard restoreTransaction([&]() {
0368         for (int i = 0; i < transactionDepth; ++i) {
0369             storageBackend()->beginTransaction(QStringLiteral("RestoredTransactionAfterMMCRecovery"));
0370         }
0371     });
0372 
0373     Transaction transaction(storageBackend(), QStringLiteral("MMC Recovery Transaction"));
0374 
0375     // If any of the conflicting items is dirty or does not have a remote ID, we don't want to remove
0376     // them as it would cause data loss. There's a chance next changeReplay will fix this, so
0377     // next time the ItemSync hits this multiple merge candidates, all changes will be committed
0378     // and this check will succeed
0379     if (items | Actions::any([](const auto &item) {
0380             return item.dirty() || item.remoteId().isEmpty();
0381         })) {
0382         qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
0383         return;
0384     }
0385 
0386     // This cannot happen with ItemSync, but in theory could happen during individual GID merge.
0387     if (items | Actions::any([collection](const auto &item) {
0388             return item.collectionId() != collection.id();
0389         })) {
0390         qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
0391         return;
0392     }
0393 
0394     storageBackend()->cleanupPimItems(items, DataStore::Silent);
0395     if (!transaction.commit()) {
0396         qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
0397         return;
0398     }
0399 
0400     // Schedule a new sync of the collection, one that will succeed
0401     akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
0402 
0403     qCInfo(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery successful: conflicting items"
0404                               << (items | Views::transform([](const auto &i) {
0405                                       return i.id();
0406                                   })
0407                                   | Actions::toQVector)
0408                               << "in collection" << collection.name() << "(ID:" << collection.id()
0409                               << ") were removed and a new sync was scheduled in the resource" << collection.resource().name();
0410 }
0411 
0412 bool ItemCreateHandler::parseStream()
0413 {
0414     const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
0415 
0416     // FIXME: The streaming/reading of all item parts can hold the transaction for
0417     // unnecessary long time -> should we wrap the PimItem into one transaction
0418     // and try to insert Parts independently? In case we fail to insert a part,
0419     // it's not a problem as it can be re-fetched at any time, except for attributes.
0420     Transaction transaction(storageBackend(), QStringLiteral("ItemCreateHandler"));
0421     ExternalPartStorageTransaction storageTrx;
0422 
0423     PimItem item;
0424     Collection parentCol;
0425     if (!buildPimItem(cmd, item, parentCol)) {
0426         return false;
0427     }
0428 
0429     if ((cmd.mergeModes() & ~Protocol::CreateItemCommand::Silent) == 0) {
0430         if (!insertItem(cmd, item, parentCol)) {
0431             return false;
0432         }
0433         if (!transaction.commit()) {
0434             return failureResponse(QStringLiteral("Failed to commit transaction"));
0435         }
0436         storageTrx.commit();
0437     } else {
0438         // Merging is always restricted to the same collection
0439         SelectQueryBuilder<PimItem> qb;
0440         qb.setForUpdate();
0441         qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
0442         Query::Condition rootCondition(Query::Or);
0443 
0444         Query::Condition mergeCondition(Query::And);
0445         if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
0446             mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
0447         }
0448         if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
0449             mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
0450         }
0451         rootCondition.addCondition(mergeCondition);
0452 
0453         // If an Item with matching RID but empty GID exists during GID merge,
0454         // merge into this item instead of creating a new one
0455         if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
0456             mergeCondition = Query::Condition(Query::And);
0457             mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
0458             mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QLatin1StringView(""));
0459             rootCondition.addCondition(mergeCondition);
0460         }
0461         qb.addCondition(rootCondition);
0462 
0463         if (!qb.exec()) {
0464             return failureResponse("Failed to query database for item");
0465         }
0466 
0467         const QList<PimItem> result = qb.result();
0468         if (result.isEmpty()) {
0469             // No item with such GID/RID exists, so call ItemCreateHandler::insert() and behave
0470             // like if this was a new item
0471             if (!insertItem(cmd, item, parentCol)) {
0472                 return false;
0473             }
0474             if (!transaction.commit()) {
0475                 return failureResponse("Failed to commit transaction");
0476             }
0477             storageTrx.commit();
0478 
0479         } else if (result.count() == 1) {
0480             // Item with matching GID/RID combination exists, so merge this item into it
0481             // and send itemChanged()
0482             PimItem existingItem = result.at(0);
0483 
0484             if (!mergeItem(cmd, item, existingItem, parentCol)) {
0485                 return false;
0486             }
0487             if (!transaction.commit()) {
0488                 return failureResponse("Failed to commit transaction");
0489             }
0490             storageTrx.commit();
0491         } else {
0492             qCWarning(AKONADISERVER_LOG) << "Multiple merge candidates, will attempt to recover:";
0493             for (const PimItem &item : result) {
0494                 qCWarning(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId() << ", GID:" << item.gid()
0495                                              << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")"
0496                                              << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")";
0497             }
0498 
0499             transaction.commit(); // commit the current transaction, before we attempt MMC recovery
0500             recoverFromMultipleMergeCandidates(result, parentCol);
0501 
0502             // Even if the recovery was successful, indicate error to force the client to abort the
0503             // sync, since we've interfered with the overall state.
0504             return failureResponse(QStringLiteral("Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
0505         }
0506     }
0507 
0508     return successResponse<Protocol::CreateItemResponse>();
0509 }