File indexing completed on 2024-06-09 04:48:47
0001 /** 0002 * SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL 0005 */ 0006 0007 #include "syncjob.h" 0008 #include "synclogging.h" 0009 0010 #include <QDateTime> 0011 #include <QDir> 0012 #include <QSqlQuery> 0013 #include <QString> 0014 #include <QTimer> 0015 0016 #include <KLocalizedString> 0017 0018 #include "audiomanager.h" 0019 #include "database.h" 0020 #include "datamanager.h" 0021 #include "entry.h" 0022 #include "models/errorlogmodel.h" 0023 #include "settingsmanager.h" 0024 #include "sync/gpodder/episodeactionrequest.h" 0025 #include "sync/gpodder/gpodder.h" 0026 #include "sync/gpodder/subscriptionrequest.h" 0027 #include "sync/gpodder/uploadepisodeactionrequest.h" 0028 #include "sync/gpodder/uploadsubscriptionrequest.h" 0029 #include "sync/sync.h" 0030 #include "sync/syncutils.h" 0031 #include "utils/fetchfeedsjob.h" 0032 0033 using namespace SyncUtils; 0034 0035 SyncJob::SyncJob(SyncStatus syncStatus, GPodder *gpodder, const QString &device, bool forceFetchAll, QObject *parent) 0036 : KJob(parent) 0037 , m_syncStatus(syncStatus) 0038 , m_gpodder(gpodder) 0039 , m_device(device) 0040 , m_forceFetchAll(forceFetchAll) 0041 { 0042 connect(&Sync::instance(), &Sync::abortSync, this, &SyncJob::aborting); 0043 0044 setProgressUnit(KJob::Unit::Items); 0045 } 0046 0047 void SyncJob::start() 0048 { 0049 QTimer::singleShot(0, this, &SyncJob::doSync); 0050 } 0051 0052 void SyncJob::abort() 0053 { 0054 m_abort = true; 0055 Q_EMIT aborting(); 0056 } 0057 0058 bool SyncJob::aborted() 0059 { 0060 return m_abort; 0061 } 0062 0063 QString SyncJob::errorString() const 0064 { 0065 switch (error()) { 0066 case SyncJobError::SubscriptionDownloadError: 0067 return i18n("Could not retrieve subscription updates from server"); 0068 break; 0069 case SyncJobError::SubscriptionUploadError: 0070 return i18n("Could not upload subscription changes to server"); 0071 break; 0072 case SyncJobError::EpisodeDownloadError: 0073 return i18n("Could not retrieve episode updates from server"); 0074 break; 0075 case SyncJobError::EpisodeUploadError: 0076 return i18n("Could not upload episode updates to server"); 0077 break; 0078 case SyncJobError::InternalDataError: 0079 return i18n("Internal data error"); 0080 break; 0081 default: 0082 return KJob::errorString(); 0083 break; 0084 } 0085 } 0086 0087 void SyncJob::doSync() 0088 { 0089 switch (m_syncStatus) { 0090 case SyncStatus::RegularSync: 0091 case SyncStatus::PushAllSync: 0092 doRegularSync(); 0093 break; 0094 case SyncStatus::ForceSync: 0095 doForceSync(); 0096 break; 0097 case SyncStatus::UploadOnlySync: 0098 doQuickSync(); 0099 break; 0100 case SyncStatus::NoSync: 0101 default: 0102 qCDebug(kastsSync) << "Something's wrong. Sync job started with invalid sync type."; 0103 setError(SyncJobError::InternalDataError); 0104 emitResult(); 0105 break; 0106 } 0107 } 0108 0109 void SyncJob::doRegularSync() 0110 { 0111 setTotalAmount(KJob::Unit::Items, 8); 0112 setProcessedAmount(KJob::Unit::Items, 0); 0113 Q_EMIT infoMessage(this, getProgressMessage(Started)); 0114 0115 syncSubscriptions(); 0116 } 0117 0118 void SyncJob::doForceSync() 0119 { 0120 // Delete SyncTimestamps such that all feed and episode actions will be 0121 // retrieved again from the server 0122 QSqlQuery query; 0123 query.prepare(QStringLiteral("DELETE FROM SyncTimestamps;")); 0124 Database::instance().execute(query); 0125 0126 m_forceFetchAll = true; 0127 doRegularSync(); 0128 } 0129 0130 void SyncJob::doQuickSync() 0131 { 0132 setTotalAmount(KJob::Unit::Items, 2); 0133 setProcessedAmount(KJob::Unit::Items, 0); 0134 Q_EMIT infoMessage(this, getProgressMessage(Started)); 0135 0136 // Quick sync of local subscription changes 0137 std::pair<QStringList, QStringList> localChanges = getLocalSubscriptionChanges(); 0138 // store the local changes in a member variable such that the exact changes can be deleted from DB when processed 0139 m_localSubscriptionChanges = localChanges; 0140 0141 QStringList addList = localChanges.first; 0142 QStringList removeList = localChanges.second; 0143 removeSubscriptionChangeConflicts(addList, removeList); 0144 uploadSubscriptions(addList, removeList); 0145 0146 // Quick sync of local episodeActions 0147 // store these actions in member variable to be able to delete these exact same changes from DB when processed 0148 m_localEpisodeActions = getLocalEpisodeActions(); 0149 0150 QHash<QString, QHash<QString, EpisodeAction>> localEpisodeActionHash; 0151 for (const EpisodeAction &action : m_localEpisodeActions) { 0152 addToHashIfNewer(localEpisodeActionHash, action); 0153 } 0154 qCDebug(kastsSync) << "local hash"; 0155 debugEpisodeActionHash(localEpisodeActionHash); 0156 0157 uploadEpisodeActions(createListFromHash(localEpisodeActionHash)); 0158 } 0159 0160 void SyncJob::syncSubscriptions() 0161 { 0162 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0163 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionDownload)); 0164 0165 bool subscriptionTimestampExists = false; 0166 qulonglong subscriptionTimestamp = 0; 0167 0168 // First check the database for previous timestamps 0169 QSqlQuery query; 0170 query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;")); 0171 query.bindValue(QStringLiteral(":syncservice"), subscriptionTimestampLabel); 0172 Database::instance().execute(query); 0173 if (query.next()) { 0174 subscriptionTimestamp = query.value(0).toULongLong(); 0175 subscriptionTimestampExists = true; 0176 qCDebug(kastsSync) << "Previous gpodder subscription timestamp" << subscriptionTimestamp; 0177 } 0178 0179 // Check for local changes 0180 // If no timestamp exists then upload all subscriptions. Otherwise, check 0181 // the database for actions since previous sync. 0182 QStringList localAddFeedList, localRemoveFeedList; 0183 if (subscriptionTimestamp == 0) { 0184 query.prepare(QStringLiteral("SELECT url FROM Feeds;")); 0185 Database::instance().execute(query); 0186 while (query.next()) { 0187 localAddFeedList << query.value(QStringLiteral("url")).toString(); 0188 } 0189 } else { 0190 std::pair<QStringList, QStringList> localChanges = getLocalSubscriptionChanges(); 0191 // immediately store the local changes such that the exact changes can be deleted from DB when processed 0192 m_localSubscriptionChanges = localChanges; 0193 0194 localAddFeedList = localChanges.first; 0195 localRemoveFeedList = localChanges.second; 0196 } 0197 0198 removeSubscriptionChangeConflicts(localAddFeedList, localRemoveFeedList); 0199 0200 if (!m_gpodder) { 0201 setError(SyncJobError::InternalDataError); 0202 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0203 emitResult(); 0204 return; 0205 } 0206 // Check the gpodder service for updates 0207 SubscriptionRequest *subRequest = m_gpodder->getSubscriptionChanges(subscriptionTimestamp, m_device); 0208 connect(this, &SyncJob::aborting, subRequest, &SubscriptionRequest::abort); 0209 connect(subRequest, &SubscriptionRequest::finished, this, [=]() { 0210 if (subRequest->error() || subRequest->aborted()) { 0211 if (subRequest->aborted()) { 0212 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0213 emitResult(); 0214 return; 0215 } else if (subRequest->error()) { 0216 setError(SyncJobError::SubscriptionDownloadError); 0217 setErrorText(subRequest->errorString()); 0218 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0219 } 0220 // If this is a force sync (i.e. processing all updates), then 0221 // continue with fetching podcasts updates, otherwise it's not 0222 // possible to update new episodes if the sync server happens to be 0223 // down or is not reachable. 0224 if (m_forceFetchAll) { 0225 QSqlQuery query; 0226 query.prepare(QStringLiteral("SELECT url FROM Feeds;")); 0227 Database::instance().execute(query); 0228 while (query.next()) { 0229 QString url = query.value(0).toString(); 0230 if (!m_feedsToBeUpdatedSubs.contains(url)) { 0231 m_feedsToBeUpdatedSubs += url; 0232 } 0233 } 0234 m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count(); 0235 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 2); // skip upload step 0236 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0237 0238 QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions); 0239 } else { 0240 emitResult(); 0241 return; 0242 } 0243 } else { 0244 qCDebug(kastsSync) << "Finished device update request"; 0245 0246 qulonglong newSubscriptionTimestamp = subRequest->timestamp(); 0247 QStringList remoteAddFeedList, remoteRemoveFeedList; 0248 0249 removeSubscriptionChangeConflicts(remoteAddFeedList, remoteRemoveFeedList); 0250 0251 for (const QString &url : subRequest->addList()) { 0252 qCDebug(kastsSync) << "Sync add feed:" << url; 0253 if (DataManager::instance().feedExists(url)) { 0254 qCDebug(kastsSync) << "this one we have; do nothing"; 0255 } else { 0256 qCDebug(kastsSync) << "this one we don't have; add this feed"; 0257 remoteAddFeedList << url; 0258 } 0259 } 0260 0261 for (const QString &url : subRequest->removeList()) { 0262 qCDebug(kastsSync) << "Sync remove feed:" << url; 0263 if (DataManager::instance().feedExists(url)) { 0264 qCDebug(kastsSync) << "this one we have; needs to be removed"; 0265 remoteRemoveFeedList << url; 0266 } else { 0267 qCDebug(kastsSync) << "this one we don't have; it was already removed locally; do nothing"; 0268 } 0269 } 0270 0271 qCDebug(kastsSync) << "localAddFeedList" << localAddFeedList; 0272 qCDebug(kastsSync) << "localRemoveFeedList" << localRemoveFeedList; 0273 qCDebug(kastsSync) << "remoteAddFeedList" << remoteAddFeedList; 0274 qCDebug(kastsSync) << "remoteRemoveFeedList" << remoteRemoveFeedList; 0275 0276 // Now we apply the remote changes locally: 0277 Sync::instance().applySubscriptionChangesLocally(remoteAddFeedList, remoteRemoveFeedList); 0278 0279 // We defer fetching the new feeds, since we will fetch them later on. 0280 // if this is the first sync or a force sync, then add all local feeds to 0281 // be updated 0282 if (!subscriptionTimestampExists || m_forceFetchAll) { 0283 QSqlQuery query; 0284 query.prepare(QStringLiteral("SELECT url FROM Feeds;")); 0285 Database::instance().execute(query); 0286 while (query.next()) { 0287 QString url = query.value(0).toString(); 0288 if (!m_feedsToBeUpdatedSubs.contains(url)) { 0289 m_feedsToBeUpdatedSubs += url; 0290 } 0291 } 0292 } 0293 0294 // Add the new feeds to the list of feeds that need to be refreshed. 0295 // We check with feedExists to make sure not to add the same podcast 0296 // with a slightly different url 0297 for (const QString &url : remoteAddFeedList) { 0298 if (!DataManager::instance().feedExists(url)) { 0299 m_feedsToBeUpdatedSubs += url; 0300 } 0301 } 0302 m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count(); 0303 0304 qCDebug(kastsSync) << "newSubscriptionTimestamp" << newSubscriptionTimestamp; 0305 updateDBTimestamp(newSubscriptionTimestamp, subscriptionTimestampLabel); 0306 0307 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0308 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionUpload)); 0309 0310 QTimer::singleShot(0, this, [=]() { 0311 uploadSubscriptions(localAddFeedList, localRemoveFeedList); 0312 }); 0313 } 0314 }); 0315 } 0316 0317 void SyncJob::uploadSubscriptions(const QStringList &localAddFeedUrlList, const QStringList &localRemoveFeedUrlList) 0318 { 0319 if (localAddFeedUrlList.isEmpty() && localRemoveFeedUrlList.isEmpty()) { 0320 qCDebug(kastsSync) << "No subscription changes to upload to server"; 0321 0322 // if this is not a quick upload only sync, continue with the feed updates 0323 if (m_syncStatus != SyncStatus::UploadOnlySync) { 0324 QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions); 0325 } 0326 0327 // Delete the uploaded changes from the database 0328 removeAppliedSubscriptionChangesFromDB(); 0329 0330 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0331 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0332 } else { 0333 qCDebug(kastsSync) << "Uploading subscription changes:\n\tadd" << localAddFeedUrlList << "\n\tremove" << localRemoveFeedUrlList; 0334 if (!m_gpodder) { 0335 setError(SyncJobError::InternalDataError); 0336 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0337 emitResult(); 0338 return; 0339 } 0340 UploadSubscriptionRequest *upSubRequest = m_gpodder->uploadSubscriptionChanges(localAddFeedUrlList, localRemoveFeedUrlList, m_device); 0341 connect(this, &SyncJob::aborting, upSubRequest, &UploadSubscriptionRequest::abort); 0342 connect(upSubRequest, &UploadSubscriptionRequest::finished, this, [=]() { 0343 if (upSubRequest->error() || upSubRequest->aborted()) { 0344 if (upSubRequest->aborted()) { 0345 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0346 } else if (upSubRequest->error()) { 0347 setError(SyncJobError::SubscriptionUploadError); 0348 setErrorText(upSubRequest->errorString()); 0349 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0350 } 0351 emitResult(); 0352 return; 0353 } 0354 0355 // Upload has succeeded 0356 qulonglong timestamp = upSubRequest->timestamp(); 0357 qCDebug(kastsSync) << "timestamp after uploading local changes" << timestamp; 0358 updateDBTimestamp(timestamp, (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadSubscriptionTimestampLabel : subscriptionTimestampLabel); 0359 0360 // Delete the uploaded changes from the database 0361 removeAppliedSubscriptionChangesFromDB(); 0362 0363 // TODO: deal with updateUrlsList -> needs on-the-fly feed URL renaming 0364 QVector<std::pair<QString, QString>> updateUrlsList = upSubRequest->updateUrls(); 0365 qCDebug(kastsSync) << "updateUrlsList:" << updateUrlsList; 0366 0367 // if this is a quick upload only sync, then stop here, otherwise continue with 0368 // updating feeds that were added remotely. 0369 0370 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0371 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0372 0373 if (m_syncStatus != SyncStatus::UploadOnlySync) { 0374 QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions); 0375 } 0376 }); 0377 } 0378 } 0379 0380 void SyncJob::fetchModifiedSubscriptions() 0381 { 0382 // Update the feeds that need to be updated such that we can find the 0383 // episodes in the database when we are receiving the remote episode 0384 // actions. 0385 m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count(); 0386 m_feedUpdateProgress = 0; 0387 FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedSubs, this); 0388 connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort); 0389 connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) { 0390 qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount; 0391 Q_UNUSED(job); 0392 Q_ASSERT(unit == KJob::Unit::Items); 0393 m_feedUpdateProgress = amount; 0394 if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) { 0395 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0396 } 0397 }); 0398 connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() { 0399 qCDebug(kastsSync) << "Feed update finished"; 0400 if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) { 0401 if (fetchFeedsJob->aborted()) { 0402 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0403 } else if (fetchFeedsJob->error()) { 0404 // FetchFeedsJob takes care of its own error reporting 0405 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0406 } 0407 emitResult(); 0408 return; 0409 } 0410 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0411 qCDebug(kastsSync) << "Done updating subscriptions and fetching updates"; 0412 0413 // We're ready to sync the episode states now 0414 // increase the progress counter now already since fetchRemoteEpisodeActions 0415 // can be executed multiple times 0416 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0417 m_remoteEpisodeActionHash.clear(); 0418 QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions); 0419 }); 0420 fetchFeedsJob->start(); 0421 } 0422 0423 void SyncJob::fetchRemoteEpisodeActions() 0424 { 0425 qCDebug(kastsSync) << "Start syncing episode states"; 0426 Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload)); 0427 0428 qulonglong episodeTimestamp = 0; 0429 0430 // First check the database for previous timestamps 0431 QSqlQuery query; 0432 query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;")); 0433 query.bindValue(QStringLiteral(":syncservice"), episodeTimestampLabel); 0434 Database::instance().execute(query); 0435 if (query.next()) { 0436 episodeTimestamp = query.value(0).toULongLong(); 0437 qCDebug(kastsSync) << "Previous gpodder episode timestamp" << episodeTimestamp; 0438 } 0439 0440 if (!m_gpodder) { 0441 setError(SyncJobError::InternalDataError); 0442 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0443 emitResult(); 0444 return; 0445 } 0446 // Check the gpodder service for episode action updates 0447 EpisodeActionRequest *epRequest = m_gpodder->getEpisodeActions(episodeTimestamp, (episodeTimestamp == 0)); 0448 connect(this, &SyncJob::aborting, epRequest, &EpisodeActionRequest::abort); 0449 connect(epRequest, &EpisodeActionRequest::finished, this, [=]() { 0450 qCDebug(kastsSync) << "Finished episode action request"; 0451 if (epRequest->error() || epRequest->aborted()) { 0452 if (epRequest->aborted()) { 0453 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0454 } else if (epRequest->error()) { 0455 setError(SyncJobError::EpisodeUploadError); 0456 setErrorText(epRequest->errorString()); 0457 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0458 } 0459 emitResult(); 0460 return; 0461 } 0462 0463 qulonglong newEpisodeTimestamp = epRequest->timestamp(); 0464 qulonglong currentTimestamp = static_cast<qulonglong>(QDateTime::currentSecsSinceEpoch()); 0465 0466 qCDebug(kastsSync) << newEpisodeTimestamp; 0467 0468 for (const EpisodeAction &action : epRequest->episodeActions()) { 0469 addToHashIfNewer(m_remoteEpisodeActionHash, action); 0470 0471 qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position 0472 << action.total << action.timestamp; 0473 } 0474 0475 updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel); 0476 0477 // Check returned timestamp against current timestamp. If they aren't 0478 // close enough (let's take 10 seconds), that means that there are still 0479 // more episode actions to be fetched from the server. 0480 if (newEpisodeTimestamp > (currentTimestamp - 10) || epRequest->episodeActions().isEmpty()) { 0481 QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates); 0482 } else { 0483 qCDebug(kastsSync) << "Fetching another batch of episode actions" << newEpisodeTimestamp << currentTimestamp; 0484 QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions); 0485 } 0486 }); 0487 } 0488 0489 void SyncJob::syncEpisodeStates() 0490 { 0491 // store the local actions in member variable to be able to delete these exact same changes from DB when processed 0492 0493 m_localEpisodeActions = getLocalEpisodeActions(); 0494 0495 QHash<QString, QHash<QString, EpisodeAction>> localEpisodeActionHash; 0496 for (const EpisodeAction &action : m_localEpisodeActions) { 0497 addToHashIfNewer(localEpisodeActionHash, action); 0498 } 0499 0500 qCDebug(kastsSync) << "local hash"; 0501 debugEpisodeActionHash(localEpisodeActionHash); 0502 0503 qCDebug(kastsSync) << "remote hash"; 0504 debugEpisodeActionHash(m_remoteEpisodeActionHash); 0505 0506 // now remove conflicts between local and remote episode actions 0507 // based on the timestamp 0508 removeEpisodeActionConflicts(localEpisodeActionHash, m_remoteEpisodeActionHash); 0509 0510 qCDebug(kastsSync) << "local hash"; 0511 debugEpisodeActionHash(localEpisodeActionHash); 0512 0513 qCDebug(kastsSync) << "remote hash"; 0514 debugEpisodeActionHash(m_remoteEpisodeActionHash); 0515 0516 // Now we update the feeds that need updating (don't update feeds that have 0517 // already been updated after the subscriptions were updated). 0518 for (const QString &url : getFeedsFromHash(m_remoteEpisodeActionHash)) { 0519 if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) { 0520 m_feedsToBeUpdatedEps += url; 0521 } 0522 } 0523 qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps; 0524 m_feedUpdateTotal = m_feedsToBeUpdatedEps.count(); 0525 m_feedUpdateProgress = 0; 0526 0527 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0528 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0529 0530 FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this); 0531 connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort); 0532 connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) { 0533 qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount; 0534 Q_UNUSED(job); 0535 Q_ASSERT(unit == KJob::Unit::Items); 0536 m_feedUpdateProgress = amount; 0537 if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) { 0538 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0539 } 0540 }); 0541 connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() { 0542 qCDebug(kastsSync) << "Feed update finished"; 0543 if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) { 0544 if (fetchFeedsJob->aborted()) { 0545 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0546 } else if (fetchFeedsJob->error()) { 0547 // FetchFeedsJob takes care of its own error reporting 0548 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0549 } 0550 emitResult(); 0551 return; 0552 } 0553 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); 0554 0555 // Apply the remote changes locally 0556 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0557 Q_EMIT infoMessage(this, getProgressMessage(ApplyEpisodeActions)); 0558 0559 Sync::instance().applyEpisodeActionsLocally(m_remoteEpisodeActionHash); 0560 0561 // Upload the local changes to the server 0562 QVector<EpisodeAction> localEpisodeActionList = createListFromHash(localEpisodeActionHash); 0563 0564 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0565 Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload)); 0566 // Now upload the episode actions to the server 0567 QTimer::singleShot(0, this, [this, localEpisodeActionList]() { 0568 uploadEpisodeActions(localEpisodeActionList); 0569 }); 0570 }); 0571 fetchFeedsJob->start(); 0572 } 0573 0574 void SyncJob::uploadEpisodeActions(const QVector<EpisodeAction> &episodeActions) 0575 { 0576 // We have to upload episode actions in batches because otherwise the server 0577 // will reject them. 0578 uploadEpisodeActionsPartial(episodeActions, 0); 0579 } 0580 0581 void SyncJob::uploadEpisodeActionsPartial(const QVector<EpisodeAction> &episodeActionList, const int startIndex) 0582 { 0583 if (episodeActionList.count() == 0) { 0584 // nothing to upload; we don't have to contact the server 0585 qCDebug(kastsSync) << "No episode actions to upload to server"; 0586 0587 removeAppliedEpisodeActionsFromDB(); 0588 0589 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0590 Q_EMIT infoMessage(this, getProgressMessage(Finished)); 0591 emitResult(); 0592 return; 0593 } 0594 0595 qCDebug(kastsSync) << "Uploading episode actions" << startIndex << "to" 0596 << std::min(startIndex + maxAmountEpisodeUploads, static_cast<int>(episodeActionList.count())) << "of" << episodeActionList.count() 0597 << "total episode actions"; 0598 0599 if (!m_gpodder) { 0600 setError(SyncJobError::InternalDataError); 0601 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0602 emitResult(); 0603 return; 0604 } 0605 UploadEpisodeActionRequest *upEpRequest = m_gpodder->uploadEpisodeActions(episodeActionList.mid(startIndex, maxAmountEpisodeUploads)); 0606 connect(this, &SyncJob::aborting, upEpRequest, &UploadEpisodeActionRequest::abort); 0607 connect(upEpRequest, &UploadEpisodeActionRequest::finished, this, [=]() { 0608 qCDebug(kastsSync) << "Finished uploading batch of episode actions to server"; 0609 if (upEpRequest->error() || upEpRequest->aborted()) { 0610 if (upEpRequest->aborted()) { 0611 Q_EMIT infoMessage(this, getProgressMessage(Aborted)); 0612 } else if (upEpRequest->error()) { 0613 setError(SyncJobError::EpisodeUploadError); 0614 setErrorText(upEpRequest->errorString()); 0615 Q_EMIT infoMessage(this, getProgressMessage(Error)); 0616 } 0617 emitResult(); 0618 return; 0619 } 0620 0621 if (episodeActionList.count() > startIndex + maxAmountEpisodeUploads) { 0622 // Still episodeActions remaining to be uploaded 0623 QTimer::singleShot(0, this, [this, &episodeActionList, startIndex]() { 0624 uploadEpisodeActionsPartial(episodeActionList, startIndex + maxAmountEpisodeUploads); 0625 }); 0626 } else { 0627 // All episodeActions have been uploaded 0628 0629 qCDebug(kastsSync) << "New uploadEpisodeTimestamp from server" << upEpRequest->timestamp(); 0630 updateDBTimestamp(upEpRequest->timestamp(), (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadEpisodeTimestampLabel : episodeTimestampLabel); 0631 0632 removeAppliedEpisodeActionsFromDB(); 0633 0634 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); 0635 Q_EMIT infoMessage(this, getProgressMessage(Finished)); 0636 0637 // This is the final exit point for the Job unless an error or abort occured 0638 qCDebug(kastsSync) << "Syncing finished"; 0639 emitResult(); 0640 } 0641 }); 0642 } 0643 0644 void SyncJob::updateDBTimestamp(const qulonglong ×tamp, const QString ×tampLabel) 0645 { 0646 if (timestamp > 1) { // only accept timestamp if it's larger than zero 0647 bool timestampExists = false; 0648 QSqlQuery query; 0649 query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;")); 0650 query.bindValue(QStringLiteral(":syncservice"), timestampLabel); 0651 Database::instance().execute(query); 0652 if (query.next()) { 0653 timestampExists = true; 0654 } 0655 0656 if (timestampExists) { 0657 query.prepare(QStringLiteral("UPDATE SyncTimeStamps SET timestamp=:timestamp WHERE syncservice=:syncservice;")); 0658 } else { 0659 query.prepare(QStringLiteral("INSERT INTO SyncTimeStamps VALUES (:syncservice, :timestamp);")); 0660 } 0661 query.bindValue(QStringLiteral(":syncservice"), timestampLabel); 0662 query.bindValue(QStringLiteral(":timestamp"), timestamp + 1); // add 1 second to avoid fetching our own previously sent updates next time 0663 Database::instance().execute(query); 0664 } 0665 } 0666 0667 void SyncJob::removeAppliedSubscriptionChangesFromDB() 0668 { 0669 Database::instance().transaction(); 0670 QSqlQuery query; 0671 query.prepare(QStringLiteral("DELETE FROM FeedActions WHERE url=:url AND action=:action;")); 0672 0673 for (const QString &url : m_localSubscriptionChanges.first) { 0674 query.bindValue(QStringLiteral(":url"), url); 0675 query.bindValue(QStringLiteral(":action"), QStringLiteral("add")); 0676 Database::instance().execute(query); 0677 } 0678 0679 for (const QString &url : m_localSubscriptionChanges.second) { 0680 query.bindValue(QStringLiteral(":url"), url); 0681 query.bindValue(QStringLiteral(":action"), QStringLiteral("remove")); 0682 Database::instance().execute(query); 0683 } 0684 Database::instance().commit(); 0685 } 0686 0687 void SyncJob::removeAppliedEpisodeActionsFromDB() 0688 { 0689 Database::instance().transaction(); 0690 QSqlQuery query; 0691 query.prepare( 0692 QStringLiteral("DELETE FROM EpisodeActions WHERE podcast=:podcast AND url=:url AND id=:id AND action=:action AND started=:started AND " 0693 "position=:position AND total=:total AND timestamp=:timestamp;")); 0694 for (const EpisodeAction &epAction : m_localEpisodeActions) { 0695 qCDebug(kastsSync) << "Removing episode action from DB" << epAction.id; 0696 query.bindValue(QStringLiteral(":podcast"), epAction.podcast); 0697 query.bindValue(QStringLiteral(":url"), epAction.url); 0698 query.bindValue(QStringLiteral(":id"), epAction.id); 0699 query.bindValue(QStringLiteral(":action"), epAction.action); 0700 query.bindValue(QStringLiteral(":started"), epAction.started); 0701 query.bindValue(QStringLiteral(":position"), epAction.position); 0702 query.bindValue(QStringLiteral(":total"), epAction.total); 0703 query.bindValue(QStringLiteral(":timestamp"), epAction.timestamp); 0704 Database::instance().execute(query); 0705 } 0706 Database::instance().commit(); 0707 } 0708 0709 void SyncJob::removeSubscriptionChangeConflicts(QStringList &addList, QStringList &removeList) 0710 { 0711 // Do some sanity checks and cleaning-up 0712 addList.removeDuplicates(); 0713 removeList.removeDuplicates(); 0714 for (const QString &addUrl : addList) { 0715 if (removeList.contains(addUrl)) { 0716 addList.removeAt(addList.indexOf(addUrl)); 0717 removeList.removeAt(removeList.indexOf(addUrl)); 0718 } 0719 } 0720 for (const QString &removeUrl : removeList) { 0721 if (addList.contains(removeUrl)) { 0722 removeList.removeAt(removeList.indexOf(removeUrl)); 0723 addList.removeAt(addList.indexOf(removeUrl)); 0724 } 0725 } 0726 } 0727 0728 QVector<EpisodeAction> SyncJob::createListFromHash(const QHash<QString, QHash<QString, EpisodeAction>> &episodeActionHash) 0729 { 0730 QVector<EpisodeAction> episodeActionList; 0731 0732 for (const QHash<QString, EpisodeAction> &actions : episodeActionHash) { 0733 for (const EpisodeAction &action : actions) { 0734 if (action.action == QStringLiteral("play")) { 0735 episodeActionList << action; 0736 } 0737 } 0738 } 0739 0740 return episodeActionList; 0741 } 0742 0743 std::pair<QStringList, QStringList> SyncJob::getLocalSubscriptionChanges() const 0744 { 0745 std::pair<QStringList, QStringList> localChanges; 0746 QSqlQuery query; 0747 query.prepare(QStringLiteral("SELECT * FROM FeedActions;")); 0748 Database::instance().execute(query); 0749 while (query.next()) { 0750 QString url = query.value(QStringLiteral("url")).toString(); 0751 QString action = query.value(QStringLiteral("action")).toString(); 0752 // qulonglong timestamp = query.value(QStringLiteral("timestamp")).toULongLong(); 0753 if (action == QStringLiteral("add")) { 0754 localChanges.first << url; 0755 } else if (action == QStringLiteral("remove")) { 0756 localChanges.second << url; 0757 } 0758 } 0759 0760 return localChanges; 0761 } 0762 0763 QVector<EpisodeAction> SyncJob::getLocalEpisodeActions() const 0764 { 0765 QVector<EpisodeAction> localEpisodeActions; 0766 QSqlQuery query; 0767 query.prepare(QStringLiteral("SELECT * FROM EpisodeActions;")); 0768 Database::instance().execute(query); 0769 while (query.next()) { 0770 QString podcast = query.value(QStringLiteral("podcast")).toString(); 0771 QString url = query.value(QStringLiteral("url")).toString(); 0772 QString id = query.value(QStringLiteral("id")).toString(); 0773 QString action = query.value(QStringLiteral("action")).toString(); 0774 qulonglong started = query.value(QStringLiteral("started")).toULongLong(); 0775 qulonglong position = query.value(QStringLiteral("position")).toULongLong(); 0776 qulonglong total = query.value(QStringLiteral("total")).toULongLong(); 0777 qulonglong timestamp = query.value(QStringLiteral("timestamp")).toULongLong(); 0778 EpisodeAction episodeAction = {podcast, url, id, m_device, action, started, position, total, timestamp}; 0779 localEpisodeActions += episodeAction; 0780 } 0781 0782 return localEpisodeActions; 0783 } 0784 0785 void SyncJob::addToHashIfNewer(QHash<QString, QHash<QString, EpisodeAction>> &episodeActionHash, const EpisodeAction &episodeAction) 0786 { 0787 if (episodeAction.action == QStringLiteral("play")) { 0788 if (episodeActionHash.contains(episodeAction.id) && episodeActionHash[episodeAction.id].contains(QStringLiteral("play"))) { 0789 if (episodeActionHash[episodeAction.id][QStringLiteral("play")].timestamp <= episodeAction.timestamp) { 0790 episodeActionHash[episodeAction.id][QStringLiteral("play")] = episodeAction; 0791 } 0792 } else { 0793 episodeActionHash[episodeAction.id][QStringLiteral("play")] = episodeAction; 0794 } 0795 } 0796 0797 if (episodeAction.action == QStringLiteral("download") || episodeAction.action == QStringLiteral("delete")) { 0798 if (episodeActionHash.contains(episodeAction.id)) { 0799 if (episodeActionHash[episodeAction.id].contains(QStringLiteral("download"))) { 0800 if (episodeActionHash[episodeAction.id][QStringLiteral("download")].timestamp <= episodeAction.timestamp) { 0801 episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction; 0802 } 0803 } else if (episodeActionHash[episodeAction.id].contains(QStringLiteral("delete"))) { 0804 if (episodeActionHash[episodeAction.id][QStringLiteral("delete")].timestamp <= episodeAction.timestamp) { 0805 episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction; 0806 } 0807 } else { 0808 episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction; 0809 } 0810 } else { 0811 episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction; 0812 } 0813 } 0814 0815 if (episodeAction.action == QStringLiteral("new")) { 0816 if (episodeActionHash.contains(episodeAction.id) && episodeActionHash[episodeAction.id].contains(QStringLiteral("new"))) { 0817 if (episodeActionHash[episodeAction.id][QStringLiteral("new")].timestamp <= episodeAction.timestamp) { 0818 episodeActionHash[episodeAction.id][QStringLiteral("new")] = episodeAction; 0819 } 0820 } else { 0821 episodeActionHash[episodeAction.id][QStringLiteral("new")] = episodeAction; 0822 } 0823 } 0824 } 0825 0826 void SyncJob::removeEpisodeActionConflicts(QHash<QString, QHash<QString, EpisodeAction>> &local, QHash<QString, QHash<QString, EpisodeAction>> &remote) 0827 { 0828 QStringList actions; 0829 actions << QStringLiteral("play") << QStringLiteral("download-delete") << QStringLiteral("new"); 0830 0831 // We first remove the conflicts from the hash with local changes 0832 for (const QHash<QString, EpisodeAction> &hashItem : remote) { 0833 for (const QString &action : actions) { 0834 QString id = hashItem[action].id; 0835 if (local.contains(id) && local.value(id).contains(action)) { 0836 if (local[id][action].timestamp < remote[id][action].timestamp) { 0837 local[id].remove(action); 0838 } 0839 } 0840 } 0841 } 0842 0843 // And now the same for the remote 0844 for (const QHash<QString, EpisodeAction> &hashItem : local) { 0845 for (const QString &action : actions) { 0846 QString id = hashItem[action].id; 0847 if (remote.contains(id) && remote.value(id).contains(action)) { 0848 if (remote[id][action].timestamp < local[id][action].timestamp) { 0849 remote[id].remove(action); 0850 } 0851 } 0852 } 0853 } 0854 } 0855 0856 QStringList SyncJob::getFeedsFromHash(const QHash<QString, QHash<QString, EpisodeAction>> &hash) 0857 { 0858 QStringList feedUrls; 0859 for (const QHash<QString, EpisodeAction> &actionList : hash) { 0860 for (const EpisodeAction &action : actionList) { 0861 feedUrls += action.podcast; 0862 } 0863 } 0864 return feedUrls; 0865 } 0866 0867 void SyncJob::debugEpisodeActionHash(const QHash<QString, QHash<QString, EpisodeAction>> &hash) 0868 { 0869 for (const QHash<QString, EpisodeAction> &hashItem : hash) { 0870 for (const EpisodeAction &action : hashItem) { 0871 qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position 0872 << action.total << action.timestamp; 0873 } 0874 } 0875 } 0876 0877 QString SyncJob::getProgressMessage(SyncJobStatus status) const 0878 { 0879 int processed = processedAmount(KJob::Unit::Items); 0880 int total = totalAmount(KJob::Unit::Items); 0881 0882 switch (status) { 0883 case Started: 0884 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Start sync", processed, total); 0885 break; 0886 case SubscriptionDownload: 0887 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Requesting remote subscription updates", processed, total); 0888 break; 0889 case SubscriptionUpload: 0890 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Uploading local subscription updates", processed, total); 0891 break; 0892 case SubscriptionFetch: 0893 return i18ncp("Subscription/Episode sync progress step", 0894 "(Step %3 of %4) Updated %2 of %1 podcast", 0895 "(Step %3 of %4) Updated %2 of %1 podcasts", 0896 m_feedUpdateTotal, 0897 m_feedUpdateProgress, 0898 processed, 0899 total); 0900 break; 0901 case EpisodeDownload: 0902 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Requesting remote episode updates", processed, total); 0903 break; 0904 case ApplyEpisodeActions: 0905 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Applying remote episode changes", processed, total); 0906 break; 0907 case EpisodeUpload: 0908 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Uploading local episode updates", processed, total); 0909 break; 0910 case Finished: 0911 return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Finished sync", processed, total); 0912 break; 0913 case Aborted: 0914 return i18nc("Subscription/Episode sync progress step", "Sync aborted"); 0915 break; 0916 case Error: 0917 default: 0918 return i18nc("Subscription/Episode sync progress step", "Sync finished with error"); 0919 break; 0920 } 0921 }