File indexing completed on 2024-06-23 05:07:01
0001 /*************************************************************************** 0002 * SPDX-FileCopyrightText: 2007 Robert Zwerus <arzie@dds.nl> * 0003 * * 0004 * SPDX-License-Identifier: LGPL-2.0-or-later * 0005 ***************************************************************************/ 0006 0007 #include "itemcreatehandler.h" 0008 0009 #include "akonadi.h" 0010 #include "connection.h" 0011 #include "handlerhelper.h" 0012 #include "itemfetchhelper.h" 0013 #include "preprocessormanager.h" 0014 #include "private/externalpartstorage_p.h" 0015 #include "storage/datastore.h" 0016 #include "storage/dbconfig.h" 0017 #include "storage/itemretrievalmanager.h" 0018 #include "storage/parthelper.h" 0019 #include "storage/partstreamer.h" 0020 #include "storage/parttypehelper.h" 0021 #include "storage/selectquerybuilder.h" 0022 #include "storage/transaction.h" 0023 0024 #include "shared/akranges.h" 0025 #include "shared/akscopeguard.h" 0026 0027 #include <numeric> //std::accumulate 0028 0029 using namespace Akonadi; 0030 using namespace Akonadi::Server; 0031 using namespace AkRanges; 0032 0033 ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi) 0034 : Handler(akonadi) 0035 { 0036 } 0037 0038 bool ItemCreateHandler::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item, Collection &parentCol) 0039 { 0040 parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context()); 0041 if (!parentCol.isValid()) { 0042 return failureResponse(QStringLiteral("Invalid parent collection")); 0043 } 0044 if (parentCol.isVirtual()) { 0045 return failureResponse(QStringLiteral("Cannot append item into virtual collection")); 0046 } 0047 0048 MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType()); 0049 if (!mimeType.isValid()) { 0050 return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'.")); 0051 } 0052 0053 item.setRev(0); 0054 item.setSize(cmd.itemSize()); 0055 item.setMimeTypeId(mimeType.id()); 0056 item.setCollectionId(parentCol.id()); 0057 item.setDatetime(cmd.dateTime()); 0058 if (cmd.remoteId().isEmpty()) { 0059 // from application 0060 item.setDirty(true); 0061 } else { 0062 // from resource 0063 item.setRemoteId(cmd.remoteId()); 0064 item.setDirty(false); 0065 } 0066 item.setRemoteRevision(cmd.remoteRevision()); 0067 item.setGid(cmd.gid()); 0068 0069 item.setAtime(cmd.modificationTime().isValid() ? cmd.modificationTime() : QDateTime::currentDateTimeUtc()); 0070 0071 return true; 0072 } 0073 0074 bool ItemCreateHandler::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item, const Collection &parentCol) 0075 { 0076 if (!item.datetime().isValid()) { 0077 item.setDatetime(QDateTime::currentDateTimeUtc()); 0078 } 0079 0080 if (!item.insert()) { 0081 return failureResponse(QStringLiteral("Failed to append item")); 0082 } 0083 0084 // set message flags 0085 const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags(); 0086 if (!flags.isEmpty()) { 0087 // This will hit an entry in cache inserted there in buildPimItem() 0088 const Flag::List flagList = HandlerHelper::resolveFlags(flags); 0089 bool flagsChanged = false; 0090 if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, false, parentCol, true)) { 0091 return failureResponse("Unable to append item flags."); 0092 } 0093 } 0094 0095 const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags(); 0096 if (!tags.isEmpty()) { 0097 const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context()); 0098 bool tagsChanged = false; 0099 if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, false, parentCol, true)) { 0100 return failureResponse(QStringLiteral("Unable to append item tags.")); 0101 } 0102 } 0103 0104 // Handle individual parts 0105 qint64 partSizes = 0; 0106 PartStreamer streamer(connection(), item); 0107 const auto parts = cmd.parts(); 0108 for (const QByteArray &partName : parts) { 0109 qint64 partSize = 0; 0110 try { 0111 streamer.stream(true, partName, partSize); 0112 } catch (const PartStreamerException &e) { 0113 return failureResponse(e.what()); 0114 } 0115 partSizes += partSize; 0116 } 0117 const Protocol::Attributes attrs = cmd.attributes(); 0118 for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) { 0119 try { 0120 streamer.streamAttribute(true, iter.key(), iter.value()); 0121 } catch (const PartStreamerException &e) { 0122 return failureResponse(e.what()); 0123 } 0124 } 0125 0126 // TODO: Try to avoid this addition query 0127 if (partSizes > item.size()) { 0128 item.setSize(partSizes); 0129 item.update(); 0130 } 0131 0132 // Preprocessing 0133 if (akonadi().preprocessorManager().isActive()) { 0134 Part hiddenAttribute; 0135 hiddenAttribute.setPimItemId(item.id()); 0136 hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN))); 0137 hiddenAttribute.setData(QByteArray()); 0138 hiddenAttribute.setDatasize(0); 0139 // TODO: Handle errors? Technically, this is not a critical issue as no data are lost 0140 PartHelper::insert(&hiddenAttribute); 0141 } 0142 0143 const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED); 0144 notify(item, seen, item.collection()); 0145 sendResponse(item, Protocol::CreateItemCommand::None); 0146 0147 return true; 0148 } 0149 0150 bool ItemCreateHandler::mergeItem(const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem, const Collection &parentCol) 0151 { 0152 bool needsUpdate = false; 0153 bool ignoreFlagsChanges = false; 0154 QSet<QByteArray> changedParts; 0155 0156 if (currentItem.atime() > newItem.atime()) { 0157 qCDebug(AKONADISERVER_LOG) << "Akoandi has newer atime of Item " << currentItem.id() << " than the resource (local atime =" << currentItem.atime() 0158 << ", remote atime =" << newItem.atime() << "), ignoring flags changes."; 0159 // This handles a race that is rather specific to IMAP: if I change flags in KMail while the folder is syncing, the flags from sync will 0160 // overwrite my local changes. 0161 // Without server-side change recording we don't have any way to know what has really changed locally, so we just assume it's flags and 0162 // we will assume that the flags have not changed on the server as well (and if so, we will consider the local state superior to remote). 0163 ignoreFlagsChanges = true; 0164 } 0165 0166 if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) { 0167 currentItem.setRemoteId(newItem.remoteId()); 0168 changedParts.insert(AKONADI_PARAM_REMOTEID); 0169 needsUpdate = true; 0170 } 0171 if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) { 0172 currentItem.setRemoteRevision(newItem.remoteRevision()); 0173 changedParts.insert(AKONADI_PARAM_REMOTEREVISION); 0174 needsUpdate = true; 0175 } 0176 if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) { 0177 currentItem.setGid(newItem.gid()); 0178 changedParts.insert(AKONADI_PARAM_GID); 0179 needsUpdate = true; 0180 } 0181 if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) { 0182 currentItem.setDatetime(newItem.datetime()); 0183 needsUpdate = true; 0184 } 0185 0186 if (newItem.size() > 0 && newItem.size() != currentItem.size()) { 0187 currentItem.setSize(newItem.size()); 0188 needsUpdate = true; 0189 } 0190 0191 const Collection col = Collection::retrieveById(parentCol.id()); 0192 if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) { 0193 bool flagsAdded = false; 0194 bool flagsRemoved = false; 0195 if (!cmd.addedFlags().isEmpty()) { 0196 const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags()); 0197 storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, true, col, true); 0198 } 0199 if (!cmd.removedFlags().isEmpty()) { 0200 const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags()); 0201 storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, true); 0202 } 0203 if (flagsAdded || flagsRemoved) { 0204 changedParts.insert(AKONADI_PARAM_FLAGS); 0205 needsUpdate = true; 0206 } 0207 } else if (!ignoreFlagsChanges) { 0208 bool flagsChanged = false; 0209 QSet<QByteArray> flagNames = cmd.flags(); 0210 0211 static QList<QByteArray> localFlagsToPreserve = {"$ATTACHMENT", "$INVITATION", "$ENCRYPTED", "$SIGNED", "$WATCHED"}; 0212 0213 // Make sure we don't overwrite some local-only flags that can't come 0214 // through from Resource during ItemSync, like $ATTACHMENT, because the 0215 // resource is not aware of them (they are usually assigned by client 0216 // upon inspecting the payload) 0217 const Flag::List currentFlags = currentItem.flags(); 0218 for (const Flag ¤tFlag : currentFlags) { 0219 const QByteArray currentFlagName = currentFlag.name().toLatin1(); 0220 if (localFlagsToPreserve.contains(currentFlagName)) { 0221 flagNames.insert(currentFlagName); 0222 } 0223 } 0224 const auto flags = HandlerHelper::resolveFlags(flagNames); 0225 storageBackend()->setItemsFlags({currentItem}, ¤tFlags, flags, &flagsChanged, col, true); 0226 if (flagsChanged) { 0227 changedParts.insert(AKONADI_PARAM_FLAGS); 0228 needsUpdate = true; 0229 } 0230 } 0231 0232 if (cmd.tags().isEmpty()) { 0233 bool tagsAdded = false; 0234 bool tagsRemoved = false; 0235 if (!cmd.addedTags().isEmpty()) { 0236 const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context()); 0237 storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, true, col, true); 0238 } 0239 if (!cmd.removedTags().isEmpty()) { 0240 const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context()); 0241 storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, true); 0242 } 0243 0244 if (tagsAdded || tagsRemoved) { 0245 changedParts.insert(AKONADI_PARAM_TAGS); 0246 needsUpdate = true; 0247 } 0248 } else { 0249 bool tagsChanged = false; 0250 const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context()); 0251 storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, true); 0252 if (tagsChanged) { 0253 changedParts.insert(AKONADI_PARAM_TAGS); 0254 needsUpdate = true; 0255 } 0256 } 0257 0258 const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id()); 0259 QMap<QByteArray, qint64> partsSizes; 0260 for (const Part &part : existingParts) { 0261 partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize()); 0262 } 0263 0264 PartStreamer streamer(connection(), currentItem); 0265 const auto partNames = cmd.parts(); 0266 for (const QByteArray &partName : partNames) { 0267 bool changed = false; 0268 qint64 partSize = 0; 0269 try { 0270 streamer.stream(true, partName, partSize, &changed); 0271 } catch (const PartStreamerException &e) { 0272 return failureResponse(e.what()); 0273 } 0274 0275 if (changed) { 0276 changedParts.insert(partName); 0277 partsSizes.insert(partName, partSize); 0278 needsUpdate = true; 0279 } 0280 } 0281 0282 const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0LL); 0283 if (size > currentItem.size()) { 0284 currentItem.setSize(size); 0285 needsUpdate = true; 0286 } 0287 0288 if (needsUpdate) { 0289 currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1); 0290 currentItem.setAtime(QDateTime::currentDateTimeUtc()); 0291 // Only mark dirty when merged from application 0292 currentItem.setDirty(!connection()->context().resource().isValid()); 0293 0294 // Store all changes 0295 if (!currentItem.update()) { 0296 return failureResponse("Failed to store merged item"); 0297 } 0298 0299 notify(currentItem, currentItem.collection(), changedParts); 0300 } 0301 0302 sendResponse(currentItem, cmd.mergeModes()); 0303 0304 return true; 0305 } 0306 0307 bool ItemCreateHandler::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes) 0308 { 0309 if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) { 0310 Protocol::FetchItemsResponse resp; 0311 resp.setId(item.id()); 0312 resp.setMTime(item.datetime()); 0313 Handler::sendResponse(std::move(resp)); 0314 return true; 0315 } 0316 0317 Protocol::ItemFetchScope fetchScope; 0318 fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor); 0319 fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly 0320 | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID 0321 | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags); 0322 ImapSet set; 0323 set.add(QList<qint64>() << item.id()); 0324 Scope scope; 0325 scope.setUidSet(set); 0326 0327 ItemFetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}, akonadi()); 0328 if (!fetchHelper.fetchItems()) { 0329 return failureResponse("Failed to retrieve item"); 0330 } 0331 0332 return true; 0333 } 0334 0335 bool ItemCreateHandler::notify(const PimItem &item, bool seen, const Collection &collection) 0336 { 0337 storageBackend()->notificationCollector()->itemAdded(item, seen, collection); 0338 0339 if (akonadi().preprocessorManager().isActive()) { 0340 // enqueue the item for preprocessing 0341 akonadi().preprocessorManager().beginHandleItem(item, storageBackend()); 0342 } 0343 return true; 0344 } 0345 0346 bool ItemCreateHandler::notify(const PimItem &item, const Collection &collection, const QSet<QByteArray> &changedParts) 0347 { 0348 if (!changedParts.isEmpty()) { 0349 storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection); 0350 } 0351 return true; 0352 } 0353 0354 void ItemCreateHandler::recoverFromMultipleMergeCandidates(const PimItem::List &items, const Collection &collection) 0355 { 0356 // HACK HACK HACK: When this happens within ItemSync, we are running inside a client-side 0357 // transaction, so just calling commit here won't have any effect, since this handler will 0358 // ultimately fail and the client will rollback the transaction. To circumvent this, we 0359 // will forcibly commit the transaction, do our changes here within a new transaction and 0360 // then we open a new transaction so that the client won't notice. 0361 0362 int transactionDepth = 0; 0363 while (storageBackend()->inTransaction()) { 0364 ++transactionDepth; 0365 storageBackend()->commitTransaction(); 0366 } 0367 const AkScopeGuard restoreTransaction([&]() { 0368 for (int i = 0; i < transactionDepth; ++i) { 0369 storageBackend()->beginTransaction(QStringLiteral("RestoredTransactionAfterMMCRecovery")); 0370 } 0371 }); 0372 0373 Transaction transaction(storageBackend(), QStringLiteral("MMC Recovery Transaction")); 0374 0375 // If any of the conflicting items is dirty or does not have a remote ID, we don't want to remove 0376 // them as it would cause data loss. There's a chance next changeReplay will fix this, so 0377 // next time the ItemSync hits this multiple merge candidates, all changes will be committed 0378 // and this check will succeed 0379 if (items | Actions::any([](const auto &item) { 0380 return item.dirty() || item.remoteId().isEmpty(); 0381 })) { 0382 qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!"; 0383 return; 0384 } 0385 0386 // This cannot happen with ItemSync, but in theory could happen during individual GID merge. 0387 if (items | Actions::any([collection](const auto &item) { 0388 return item.collectionId() != collection.id(); 0389 })) { 0390 qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection."; 0391 return; 0392 } 0393 0394 storageBackend()->cleanupPimItems(items, DataStore::Silent); 0395 if (!transaction.commit()) { 0396 qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: failed to commit database transaction."; 0397 return; 0398 } 0399 0400 // Schedule a new sync of the collection, one that will succeed 0401 akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id()); 0402 0403 qCInfo(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery successful: conflicting items" 0404 << (items | Views::transform([](const auto &i) { 0405 return i.id(); 0406 }) 0407 | Actions::toQVector) 0408 << "in collection" << collection.name() << "(ID:" << collection.id() 0409 << ") were removed and a new sync was scheduled in the resource" << collection.resource().name(); 0410 } 0411 0412 bool ItemCreateHandler::parseStream() 0413 { 0414 const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command); 0415 0416 // FIXME: The streaming/reading of all item parts can hold the transaction for 0417 // unnecessary long time -> should we wrap the PimItem into one transaction 0418 // and try to insert Parts independently? In case we fail to insert a part, 0419 // it's not a problem as it can be re-fetched at any time, except for attributes. 0420 Transaction transaction(storageBackend(), QStringLiteral("ItemCreateHandler")); 0421 ExternalPartStorageTransaction storageTrx; 0422 0423 PimItem item; 0424 Collection parentCol; 0425 if (!buildPimItem(cmd, item, parentCol)) { 0426 return false; 0427 } 0428 0429 if ((cmd.mergeModes() & ~Protocol::CreateItemCommand::Silent) == 0) { 0430 if (!insertItem(cmd, item, parentCol)) { 0431 return false; 0432 } 0433 if (!transaction.commit()) { 0434 return failureResponse(QStringLiteral("Failed to commit transaction")); 0435 } 0436 storageTrx.commit(); 0437 } else { 0438 // Merging is always restricted to the same collection 0439 SelectQueryBuilder<PimItem> qb; 0440 qb.setForUpdate(); 0441 qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id()); 0442 Query::Condition rootCondition(Query::Or); 0443 0444 Query::Condition mergeCondition(Query::And); 0445 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) { 0446 mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid()); 0447 } 0448 if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) { 0449 mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); 0450 } 0451 rootCondition.addCondition(mergeCondition); 0452 0453 // If an Item with matching RID but empty GID exists during GID merge, 0454 // merge into this item instead of creating a new one 0455 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) { 0456 mergeCondition = Query::Condition(Query::And); 0457 mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); 0458 mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QLatin1StringView("")); 0459 rootCondition.addCondition(mergeCondition); 0460 } 0461 qb.addCondition(rootCondition); 0462 0463 if (!qb.exec()) { 0464 return failureResponse("Failed to query database for item"); 0465 } 0466 0467 const QList<PimItem> result = qb.result(); 0468 if (result.isEmpty()) { 0469 // No item with such GID/RID exists, so call ItemCreateHandler::insert() and behave 0470 // like if this was a new item 0471 if (!insertItem(cmd, item, parentCol)) { 0472 return false; 0473 } 0474 if (!transaction.commit()) { 0475 return failureResponse("Failed to commit transaction"); 0476 } 0477 storageTrx.commit(); 0478 0479 } else if (result.count() == 1) { 0480 // Item with matching GID/RID combination exists, so merge this item into it 0481 // and send itemChanged() 0482 PimItem existingItem = result.at(0); 0483 0484 if (!mergeItem(cmd, item, existingItem, parentCol)) { 0485 return false; 0486 } 0487 if (!transaction.commit()) { 0488 return failureResponse("Failed to commit transaction"); 0489 } 0490 storageTrx.commit(); 0491 } else { 0492 qCWarning(AKONADISERVER_LOG) << "Multiple merge candidates, will attempt to recover:"; 0493 for (const PimItem &item : result) { 0494 qCWarning(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId() << ", GID:" << item.gid() 0495 << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")" 0496 << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")"; 0497 } 0498 0499 transaction.commit(); // commit the current transaction, before we attempt MMC recovery 0500 recoverFromMultipleMergeCandidates(result, parentCol); 0501 0502 // Even if the recovery was successful, indicate error to force the client to abort the 0503 // sync, since we've interfered with the overall state. 0504 return failureResponse(QStringLiteral("Multiple merge candidates in collection '%1', aborting").arg(item.collection().name())); 0505 } 0506 } 0507 0508 return successResponse<Protocol::CreateItemResponse>(); 0509 }