File indexing completed on 2025-01-05 04:47:00
0001 /* 0002 * SPDX-FileCopyrightText: 2014 Daniel Vrátil <dvratil@redhat.com> 0003 * 0004 * SPDX-License-Identifier: LGPL-2.1-or-later 0005 * 0006 */ 0007 0008 #include "partstreamer.h" 0009 #include "akonadiserver_debug.h" 0010 #include "capabilities_p.h" 0011 #include "connection.h" 0012 #include "dbconfig.h" 0013 #include "parthelper.h" 0014 #include "parttypehelper.h" 0015 #include "selectquerybuilder.h" 0016 0017 #include "private/externalpartstorage_p.h" 0018 #include "private/protocol_p.h" 0019 #include "private/standarddirs_p.h" 0020 0021 #include <config-akonadi.h> 0022 #if HAVE_UNISTD_H 0023 #include <unistd.h> 0024 #endif 0025 0026 #include <QFile> 0027 #include <QFileInfo> 0028 0029 using namespace Akonadi; 0030 using namespace Akonadi::Server; 0031 0032 PartStreamer::PartStreamer(Connection *connection, const PimItem &pimItem) 0033 : mConnection(connection) 0034 , mItem(pimItem) 0035 { 0036 // Make sure the file_db_data path exists 0037 StandardDirs::saveDir("data", QStringLiteral("file_db_data")); 0038 } 0039 0040 PartStreamer::~PartStreamer() 0041 { 0042 } 0043 0044 Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName) 0045 { 0046 { 0047 Protocol::StreamPayloadCommand resp; 0048 resp.setPayloadName(partName); 0049 resp.setRequest(Protocol::StreamPayloadCommand::MetaData); 0050 mConnection->sendResponse(std::move(resp)); 0051 } 0052 0053 const auto cmd = mConnection->readCommand(); 0054 if (!cmd->isValid() || Protocol::cmdCast<Protocol::Response>(cmd).isError()) { 0055 throw PartStreamerException("Client failed to provide part metadata."); 0056 } 0057 0058 return Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd).metaData(); 0059 } 0060 0061 void PartStreamer::streamPayload(Part &part, const QByteArray &partName) 0062 { 0063 Protocol::PartMetaData metaPart = requestPartMetaData(partName); 0064 if (metaPart.name().isEmpty()) { 0065 throw PartStreamerException(QStringLiteral("Client sent empty metadata for part '%1'.").arg(QString::fromUtf8(partName))); 0066 } 0067 if (metaPart.name() != partName) { 0068 throw PartStreamerException( 0069 QStringLiteral("Client sent metadata for part '%1' but requested part '%2'.").arg(QString::fromUtf8(metaPart.name()), QString::fromUtf8(partName))); 0070 } 0071 part.setVersion(metaPart.version()); 0072 0073 if (part.datasize() != metaPart.size()) { 0074 part.setDatasize(metaPart.size()); 0075 // Shortcut: if sizes differ, we don't need to compare data later no in order 0076 // to detect whether the part has changed 0077 mDataChanged = mDataChanged || (metaPart.size() != part.datasize()); 0078 } 0079 0080 if (metaPart.storageType() == Protocol::PartMetaData::Foreign) { 0081 streamForeignPayload(part, metaPart); 0082 } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) { 0083 // actual case when streaming storage is used: external payload is enabled, 0084 // data is big enough in a literal 0085 streamPayloadToFile(part, metaPart); 0086 } else { 0087 streamPayloadData(part, metaPart); 0088 } 0089 } 0090 0091 void PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart) 0092 { 0093 // If the part WAS external previously, remove data file 0094 if (part.storage() == Part::External) { 0095 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(part.data())); 0096 } 0097 0098 // Request the actual data 0099 { 0100 Protocol::StreamPayloadCommand resp; 0101 resp.setPayloadName(metaPart.name()); 0102 resp.setRequest(Protocol::StreamPayloadCommand::Data); 0103 mConnection->sendResponse(std::move(resp)); 0104 } 0105 0106 const auto cmd = mConnection->readCommand(); 0107 const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd); 0108 if (!response.isValid() || response.isError()) { 0109 throw PartStreamerException(QStringLiteral("Client failed to provide payload data for part ID %1 (%2).").arg(part.id()).arg(part.partType().name())); 0110 } 0111 const QByteArray newData = response.data(); 0112 // only use the data size with internal payload parts, for foreign parts 0113 // we use the size reported by client 0114 const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal) ? newData.size() : metaPart.size(); 0115 if (newSize != metaPart.size()) { 0116 throw PartStreamerException(QStringLiteral("Payload size mismatch: client advertised %1 bytes but sent %2 bytes.").arg(metaPart.size()).arg(newSize)); 0117 } 0118 0119 if (part.isValid()) { 0120 if (!mDataChanged) { 0121 mDataChanged = mDataChanged || (newData != part.data()); 0122 } 0123 PartHelper::update(&part, newData, newSize); 0124 } else { 0125 part.setData(newData); 0126 part.setDatasize(newSize); 0127 if (!part.insert()) { 0128 throw PartStreamerException("Failed to insert new part into database."); 0129 } 0130 } 0131 } 0132 0133 void PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart) 0134 { 0135 QByteArray origData; 0136 if (!mDataChanged && mCheckChanged) { 0137 origData = PartHelper::translateData(part); 0138 } 0139 0140 QByteArray filename; 0141 if (part.isValid()) { 0142 if (part.storage() == Part::External) { 0143 // Part was external and is still external 0144 filename = part.data(); 0145 if (!filename.isEmpty()) { 0146 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename)); 0147 filename = ExternalPartStorage::updateFileNameRevision(filename); 0148 } else { 0149 // recover from data corruption 0150 filename = ExternalPartStorage::nameForPartId(part.id()); 0151 } 0152 } else { 0153 // Part wasn't external, but is now 0154 filename = ExternalPartStorage::nameForPartId(part.id()); 0155 } 0156 0157 QFileInfo finfo(QString::fromUtf8(filename)); 0158 if (finfo.isAbsolute()) { 0159 filename = finfo.fileName().toUtf8(); 0160 } 0161 } 0162 0163 part.setStorage(Part::External); 0164 part.setDatasize(metaPart.size()); 0165 part.setData(filename); 0166 0167 if (part.isValid()) { 0168 if (!part.update()) { 0169 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); 0170 } 0171 } else { 0172 if (!part.insert()) { 0173 throw PartStreamerException(QStringLiteral("Failed to insert new part fo PimItem %1 into database.").arg(part.pimItemId())); 0174 } 0175 0176 filename = ExternalPartStorage::nameForPartId(part.id()); 0177 part.setData(filename); 0178 if (!part.update()) { 0179 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); 0180 } 0181 } 0182 0183 { 0184 Protocol::StreamPayloadCommand cmd; 0185 cmd.setPayloadName(metaPart.name()); 0186 cmd.setRequest(Protocol::StreamPayloadCommand::Data); 0187 cmd.setDestination(QString::fromUtf8(filename)); 0188 mConnection->sendResponse(std::move(cmd)); 0189 } 0190 0191 const auto cmd = mConnection->readCommand(); 0192 const auto &response = Protocol::cmdCast<Protocol::Response>(cmd); 0193 if (!response.isValid() || response.isError()) { 0194 throw PartStreamerException("Client failed to store payload into file."); 0195 } 0196 0197 QFile file(ExternalPartStorage::resolveAbsolutePath(filename)); 0198 if (!file.exists()) { 0199 throw PartStreamerException(QStringLiteral("External payload file %1 does not exist.").arg(file.fileName())); 0200 } 0201 0202 if (file.size() != metaPart.size()) { 0203 throw PartStreamerException( 0204 QStringLiteral("Payload size mismatch, client advertised %1 bytes, but the file is %2 bytes.").arg(metaPart.size(), file.size())); 0205 } 0206 0207 if (mCheckChanged && !mDataChanged) { 0208 // This is invoked only when part already exists, data sizes match and 0209 // caller wants to know whether parts really differ 0210 mDataChanged = (origData != PartHelper::translateData(part)); 0211 } 0212 } 0213 0214 void PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart) 0215 { 0216 QByteArray origData; 0217 if (!mDataChanged && mCheckChanged) { 0218 origData = PartHelper::translateData(part); 0219 } 0220 0221 { 0222 Protocol::StreamPayloadCommand cmd; 0223 cmd.setPayloadName(metaPart.name()); 0224 cmd.setRequest(Protocol::StreamPayloadCommand::Data); 0225 mConnection->sendResponse(std::move(cmd)); 0226 } 0227 0228 const auto cmd = mConnection->readCommand(); 0229 const auto &response = Protocol::cmdCast<Protocol::StreamPayloadResponse>(cmd); 0230 if (!response.isValid() || response.isError()) { 0231 throw PartStreamerException("Client failed to store payload into file."); 0232 } 0233 0234 // If the part was previously external, clean up the data 0235 if (part.storage() == Part::External) { 0236 const QString filename = QString::fromUtf8(part.data()); 0237 ExternalPartStorage::self()->removePartFile(ExternalPartStorage::resolveAbsolutePath(filename)); 0238 } 0239 0240 part.setStorage(Part::Foreign); 0241 part.setData(response.data()); 0242 0243 if (part.isValid()) { 0244 if (!part.update()) { 0245 throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); 0246 } 0247 } else { 0248 if (!part.insert()) { 0249 throw PartStreamerException(QStringLiteral("Failed to insert part for PimItem %1 into database.").arg(part.pimItemId())); 0250 } 0251 } 0252 0253 const QString filename = QString::fromUtf8(response.data()); 0254 QFile file(filename); 0255 if (!file.exists()) { 0256 throw PartStreamerException(QStringLiteral("Foreign payload file %1 does not exist.").arg(filename)); 0257 } 0258 0259 if (file.size() != metaPart.size()) { 0260 throw PartStreamerException( 0261 QStringLiteral("Foreign payload size mismatch, client advertised %1 bytes, but the file size is %2 bytes.").arg(metaPart.size(), file.size())); 0262 } 0263 0264 if (mCheckChanged && !mDataChanged) { 0265 // This is invoked only when part already exists, data sizes match and 0266 // caller wants to know whether parts really differ 0267 mDataChanged = (origData != PartHelper::translateData(part)); 0268 } 0269 } 0270 0271 void PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part) 0272 { 0273 mDataChanged = false; 0274 0275 const PartType partType = PartTypeHelper::fromFqName(partName); 0276 0277 if (checkExists || mCheckChanged) { 0278 SelectQueryBuilder<Part> qb; 0279 qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id()); 0280 qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id()); 0281 if (!qb.exec()) { 0282 throw PartStreamerException(QStringLiteral("Failed to check if part %1 exists in PimItem %2.").arg(QString::fromUtf8(partName)).arg(mItem.id())); 0283 } 0284 0285 const Part::List result = qb.result(); 0286 if (!result.isEmpty()) { 0287 part = result.at(0); 0288 } 0289 } 0290 0291 // Shortcut: newly created parts are always "changed" 0292 if (!part.isValid()) { 0293 mDataChanged = true; 0294 } 0295 0296 part.setPartType(partType); 0297 part.setPimItemId(mItem.id()); 0298 } 0299 0300 void PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed) 0301 { 0302 mCheckChanged = (changed != nullptr); 0303 if (changed != nullptr) { 0304 *changed = false; 0305 } 0306 0307 Part part; 0308 preparePart(checkExists, partName, part); 0309 0310 streamPayload(part, partName); 0311 if (changed && mCheckChanged) { 0312 *changed = mDataChanged; 0313 } 0314 0315 partSize = part.datasize(); 0316 } 0317 0318 void PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed) 0319 { 0320 mCheckChanged = (changed != nullptr); 0321 if (changed != nullptr) { 0322 *changed = false; 0323 } 0324 0325 QByteArray partName; 0326 if (!_partName.startsWith("ATR:")) { 0327 partName = "ATR:" + _partName; 0328 } else { 0329 partName = _partName; 0330 } 0331 0332 Part part; 0333 preparePart(checkExists, partName, part); 0334 0335 if (part.isValid()) { 0336 if (mCheckChanged) { 0337 if (PartHelper::translateData(part) != value) { 0338 mDataChanged = true; 0339 } 0340 } 0341 PartHelper::update(&part, value, value.size()); 0342 } else { 0343 const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold(); 0344 part.setDatasize(value.size()); 0345 part.setVersion(0); 0346 if (storeInFile) { 0347 if (!part.insert()) { 0348 throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId())); 0349 } 0350 PartHelper::update(&part, value, value.size()); 0351 } else { 0352 part.setData(value); 0353 if (!part.insert()) { 0354 throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.").arg(part.pimItemId())); 0355 } 0356 } 0357 } 0358 0359 if (mCheckChanged) { 0360 *changed = mDataChanged; 0361 } 0362 }