File indexing completed on 2024-06-09 04:48:47

0001 /**
0002  * SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
0003  *
0004  * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
0005  */
0006 
0007 #include "syncjob.h"
0008 #include "synclogging.h"
0009 
0010 #include <QDateTime>
0011 #include <QDir>
0012 #include <QSqlQuery>
0013 #include <QString>
0014 #include <QTimer>
0015 
0016 #include <KLocalizedString>
0017 
0018 #include "audiomanager.h"
0019 #include "database.h"
0020 #include "datamanager.h"
0021 #include "entry.h"
0022 #include "models/errorlogmodel.h"
0023 #include "settingsmanager.h"
0024 #include "sync/gpodder/episodeactionrequest.h"
0025 #include "sync/gpodder/gpodder.h"
0026 #include "sync/gpodder/subscriptionrequest.h"
0027 #include "sync/gpodder/uploadepisodeactionrequest.h"
0028 #include "sync/gpodder/uploadsubscriptionrequest.h"
0029 #include "sync/sync.h"
0030 #include "sync/syncutils.h"
0031 #include "utils/fetchfeedsjob.h"
0032 
0033 using namespace SyncUtils;
0034 
0035 SyncJob::SyncJob(SyncStatus syncStatus, GPodder *gpodder, const QString &device, bool forceFetchAll, QObject *parent)
0036     : KJob(parent)
0037     , m_syncStatus(syncStatus)
0038     , m_gpodder(gpodder)
0039     , m_device(device)
0040     , m_forceFetchAll(forceFetchAll)
0041 {
0042     connect(&Sync::instance(), &Sync::abortSync, this, &SyncJob::aborting);
0043 
0044     setProgressUnit(KJob::Unit::Items);
0045 }
0046 
0047 void SyncJob::start()
0048 {
0049     QTimer::singleShot(0, this, &SyncJob::doSync);
0050 }
0051 
0052 void SyncJob::abort()
0053 {
0054     m_abort = true;
0055     Q_EMIT aborting();
0056 }
0057 
0058 bool SyncJob::aborted()
0059 {
0060     return m_abort;
0061 }
0062 
0063 QString SyncJob::errorString() const
0064 {
0065     switch (error()) {
0066     case SyncJobError::SubscriptionDownloadError:
0067         return i18n("Could not retrieve subscription updates from server");
0068         break;
0069     case SyncJobError::SubscriptionUploadError:
0070         return i18n("Could not upload subscription changes to server");
0071         break;
0072     case SyncJobError::EpisodeDownloadError:
0073         return i18n("Could not retrieve episode updates from server");
0074         break;
0075     case SyncJobError::EpisodeUploadError:
0076         return i18n("Could not upload episode updates to server");
0077         break;
0078     case SyncJobError::InternalDataError:
0079         return i18n("Internal data error");
0080         break;
0081     default:
0082         return KJob::errorString();
0083         break;
0084     }
0085 }
0086 
0087 void SyncJob::doSync()
0088 {
0089     switch (m_syncStatus) {
0090     case SyncStatus::RegularSync:
0091     case SyncStatus::PushAllSync:
0092         doRegularSync();
0093         break;
0094     case SyncStatus::ForceSync:
0095         doForceSync();
0096         break;
0097     case SyncStatus::UploadOnlySync:
0098         doQuickSync();
0099         break;
0100     case SyncStatus::NoSync:
0101     default:
0102         qCDebug(kastsSync) << "Something's wrong. Sync job started with invalid sync type.";
0103         setError(SyncJobError::InternalDataError);
0104         emitResult();
0105         break;
0106     }
0107 }
0108 
0109 void SyncJob::doRegularSync()
0110 {
0111     setTotalAmount(KJob::Unit::Items, 8);
0112     setProcessedAmount(KJob::Unit::Items, 0);
0113     Q_EMIT infoMessage(this, getProgressMessage(Started));
0114 
0115     syncSubscriptions();
0116 }
0117 
0118 void SyncJob::doForceSync()
0119 {
0120     // Delete SyncTimestamps such that all feed and episode actions will be
0121     // retrieved again from the server
0122     QSqlQuery query;
0123     query.prepare(QStringLiteral("DELETE FROM SyncTimestamps;"));
0124     Database::instance().execute(query);
0125 
0126     m_forceFetchAll = true;
0127     doRegularSync();
0128 }
0129 
0130 void SyncJob::doQuickSync()
0131 {
0132     setTotalAmount(KJob::Unit::Items, 2);
0133     setProcessedAmount(KJob::Unit::Items, 0);
0134     Q_EMIT infoMessage(this, getProgressMessage(Started));
0135 
0136     // Quick sync of local subscription changes
0137     std::pair<QStringList, QStringList> localChanges = getLocalSubscriptionChanges();
0138     // store the local changes in a member variable such that the exact changes can be deleted from DB when processed
0139     m_localSubscriptionChanges = localChanges;
0140 
0141     QStringList addList = localChanges.first;
0142     QStringList removeList = localChanges.second;
0143     removeSubscriptionChangeConflicts(addList, removeList);
0144     uploadSubscriptions(addList, removeList);
0145 
0146     // Quick sync of local episodeActions
0147     // store these actions in member variable to be able to delete these exact same changes from DB when processed
0148     m_localEpisodeActions = getLocalEpisodeActions();
0149 
0150     QHash<QString, QHash<QString, EpisodeAction>> localEpisodeActionHash;
0151     for (const EpisodeAction &action : m_localEpisodeActions) {
0152         addToHashIfNewer(localEpisodeActionHash, action);
0153     }
0154     qCDebug(kastsSync) << "local hash";
0155     debugEpisodeActionHash(localEpisodeActionHash);
0156 
0157     uploadEpisodeActions(createListFromHash(localEpisodeActionHash));
0158 }
0159 
0160 void SyncJob::syncSubscriptions()
0161 {
0162     setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0163     Q_EMIT infoMessage(this, getProgressMessage(SubscriptionDownload));
0164 
0165     bool subscriptionTimestampExists = false;
0166     qulonglong subscriptionTimestamp = 0;
0167 
0168     // First check the database for previous timestamps
0169     QSqlQuery query;
0170     query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;"));
0171     query.bindValue(QStringLiteral(":syncservice"), subscriptionTimestampLabel);
0172     Database::instance().execute(query);
0173     if (query.next()) {
0174         subscriptionTimestamp = query.value(0).toULongLong();
0175         subscriptionTimestampExists = true;
0176         qCDebug(kastsSync) << "Previous gpodder subscription timestamp" << subscriptionTimestamp;
0177     }
0178 
0179     // Check for local changes
0180     // If no timestamp exists then upload all subscriptions. Otherwise, check
0181     // the database for actions since previous sync.
0182     QStringList localAddFeedList, localRemoveFeedList;
0183     if (subscriptionTimestamp == 0) {
0184         query.prepare(QStringLiteral("SELECT url FROM Feeds;"));
0185         Database::instance().execute(query);
0186         while (query.next()) {
0187             localAddFeedList << query.value(QStringLiteral("url")).toString();
0188         }
0189     } else {
0190         std::pair<QStringList, QStringList> localChanges = getLocalSubscriptionChanges();
0191         // immediately store the local changes such that the exact changes can be deleted from DB when processed
0192         m_localSubscriptionChanges = localChanges;
0193 
0194         localAddFeedList = localChanges.first;
0195         localRemoveFeedList = localChanges.second;
0196     }
0197 
0198     removeSubscriptionChangeConflicts(localAddFeedList, localRemoveFeedList);
0199 
0200     if (!m_gpodder) {
0201         setError(SyncJobError::InternalDataError);
0202         Q_EMIT infoMessage(this, getProgressMessage(Error));
0203         emitResult();
0204         return;
0205     }
0206     // Check the gpodder service for updates
0207     SubscriptionRequest *subRequest = m_gpodder->getSubscriptionChanges(subscriptionTimestamp, m_device);
0208     connect(this, &SyncJob::aborting, subRequest, &SubscriptionRequest::abort);
0209     connect(subRequest, &SubscriptionRequest::finished, this, [=]() {
0210         if (subRequest->error() || subRequest->aborted()) {
0211             if (subRequest->aborted()) {
0212                 Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0213                 emitResult();
0214                 return;
0215             } else if (subRequest->error()) {
0216                 setError(SyncJobError::SubscriptionDownloadError);
0217                 setErrorText(subRequest->errorString());
0218                 Q_EMIT infoMessage(this, getProgressMessage(Error));
0219             }
0220             // If this is a force sync (i.e. processing all updates), then
0221             // continue with fetching podcasts updates, otherwise it's not
0222             // possible to update new episodes if the sync server happens to be
0223             // down or is not reachable.
0224             if (m_forceFetchAll) {
0225                 QSqlQuery query;
0226                 query.prepare(QStringLiteral("SELECT url FROM Feeds;"));
0227                 Database::instance().execute(query);
0228                 while (query.next()) {
0229                     QString url = query.value(0).toString();
0230                     if (!m_feedsToBeUpdatedSubs.contains(url)) {
0231                         m_feedsToBeUpdatedSubs += url;
0232                     }
0233                 }
0234                 m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count();
0235                 setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 2); // skip upload step
0236                 Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0237 
0238                 QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions);
0239             } else {
0240                 emitResult();
0241                 return;
0242             }
0243         } else {
0244             qCDebug(kastsSync) << "Finished device update request";
0245 
0246             qulonglong newSubscriptionTimestamp = subRequest->timestamp();
0247             QStringList remoteAddFeedList, remoteRemoveFeedList;
0248 
0249             removeSubscriptionChangeConflicts(remoteAddFeedList, remoteRemoveFeedList);
0250 
0251             for (const QString &url : subRequest->addList()) {
0252                 qCDebug(kastsSync) << "Sync add feed:" << url;
0253                 if (DataManager::instance().feedExists(url)) {
0254                     qCDebug(kastsSync) << "this one we have; do nothing";
0255                 } else {
0256                     qCDebug(kastsSync) << "this one we don't have; add this feed";
0257                     remoteAddFeedList << url;
0258                 }
0259             }
0260 
0261             for (const QString &url : subRequest->removeList()) {
0262                 qCDebug(kastsSync) << "Sync remove feed:" << url;
0263                 if (DataManager::instance().feedExists(url)) {
0264                     qCDebug(kastsSync) << "this one we have; needs to be removed";
0265                     remoteRemoveFeedList << url;
0266                 } else {
0267                     qCDebug(kastsSync) << "this one we don't have; it was already removed locally; do nothing";
0268                 }
0269             }
0270 
0271             qCDebug(kastsSync) << "localAddFeedList" << localAddFeedList;
0272             qCDebug(kastsSync) << "localRemoveFeedList" << localRemoveFeedList;
0273             qCDebug(kastsSync) << "remoteAddFeedList" << remoteAddFeedList;
0274             qCDebug(kastsSync) << "remoteRemoveFeedList" << remoteRemoveFeedList;
0275 
0276             // Now we apply the remote changes locally:
0277             Sync::instance().applySubscriptionChangesLocally(remoteAddFeedList, remoteRemoveFeedList);
0278 
0279             // We defer fetching the new feeds, since we will fetch them later on.
0280             // if this is the first sync or a force sync, then add all local feeds to
0281             // be updated
0282             if (!subscriptionTimestampExists || m_forceFetchAll) {
0283                 QSqlQuery query;
0284                 query.prepare(QStringLiteral("SELECT url FROM Feeds;"));
0285                 Database::instance().execute(query);
0286                 while (query.next()) {
0287                     QString url = query.value(0).toString();
0288                     if (!m_feedsToBeUpdatedSubs.contains(url)) {
0289                         m_feedsToBeUpdatedSubs += url;
0290                     }
0291                 }
0292             }
0293 
0294             // Add the new feeds to the list of feeds that need to be refreshed.
0295             // We check with feedExists to make sure not to add the same podcast
0296             // with a slightly different url
0297             for (const QString &url : remoteAddFeedList) {
0298                 if (!DataManager::instance().feedExists(url)) {
0299                     m_feedsToBeUpdatedSubs += url;
0300                 }
0301             }
0302             m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count();
0303 
0304             qCDebug(kastsSync) << "newSubscriptionTimestamp" << newSubscriptionTimestamp;
0305             updateDBTimestamp(newSubscriptionTimestamp, subscriptionTimestampLabel);
0306 
0307             setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0308             Q_EMIT infoMessage(this, getProgressMessage(SubscriptionUpload));
0309 
0310             QTimer::singleShot(0, this, [=]() {
0311                 uploadSubscriptions(localAddFeedList, localRemoveFeedList);
0312             });
0313         }
0314     });
0315 }
0316 
0317 void SyncJob::uploadSubscriptions(const QStringList &localAddFeedUrlList, const QStringList &localRemoveFeedUrlList)
0318 {
0319     if (localAddFeedUrlList.isEmpty() && localRemoveFeedUrlList.isEmpty()) {
0320         qCDebug(kastsSync) << "No subscription changes to upload to server";
0321 
0322         // if this is not a quick upload only sync, continue with the feed updates
0323         if (m_syncStatus != SyncStatus::UploadOnlySync) {
0324             QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions);
0325         }
0326 
0327         // Delete the uploaded changes from the database
0328         removeAppliedSubscriptionChangesFromDB();
0329 
0330         setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0331         Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0332     } else {
0333         qCDebug(kastsSync) << "Uploading subscription changes:\n\tadd" << localAddFeedUrlList << "\n\tremove" << localRemoveFeedUrlList;
0334         if (!m_gpodder) {
0335             setError(SyncJobError::InternalDataError);
0336             Q_EMIT infoMessage(this, getProgressMessage(Error));
0337             emitResult();
0338             return;
0339         }
0340         UploadSubscriptionRequest *upSubRequest = m_gpodder->uploadSubscriptionChanges(localAddFeedUrlList, localRemoveFeedUrlList, m_device);
0341         connect(this, &SyncJob::aborting, upSubRequest, &UploadSubscriptionRequest::abort);
0342         connect(upSubRequest, &UploadSubscriptionRequest::finished, this, [=]() {
0343             if (upSubRequest->error() || upSubRequest->aborted()) {
0344                 if (upSubRequest->aborted()) {
0345                     Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0346                 } else if (upSubRequest->error()) {
0347                     setError(SyncJobError::SubscriptionUploadError);
0348                     setErrorText(upSubRequest->errorString());
0349                     Q_EMIT infoMessage(this, getProgressMessage(Error));
0350                 }
0351                 emitResult();
0352                 return;
0353             }
0354 
0355             // Upload has succeeded
0356             qulonglong timestamp = upSubRequest->timestamp();
0357             qCDebug(kastsSync) << "timestamp after uploading local changes" << timestamp;
0358             updateDBTimestamp(timestamp, (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadSubscriptionTimestampLabel : subscriptionTimestampLabel);
0359 
0360             // Delete the uploaded changes from the database
0361             removeAppliedSubscriptionChangesFromDB();
0362 
0363             // TODO: deal with updateUrlsList -> needs on-the-fly feed URL renaming
0364             QVector<std::pair<QString, QString>> updateUrlsList = upSubRequest->updateUrls();
0365             qCDebug(kastsSync) << "updateUrlsList:" << updateUrlsList;
0366 
0367             // if this is a quick upload only sync, then stop here, otherwise continue with
0368             // updating feeds that were added remotely.
0369 
0370             setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0371             Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0372 
0373             if (m_syncStatus != SyncStatus::UploadOnlySync) {
0374                 QTimer::singleShot(0, this, &SyncJob::fetchModifiedSubscriptions);
0375             }
0376         });
0377     }
0378 }
0379 
0380 void SyncJob::fetchModifiedSubscriptions()
0381 {
0382     // Update the feeds that need to be updated such that we can find the
0383     // episodes in the database when we are receiving the remote episode
0384     // actions.
0385     m_feedUpdateTotal = m_feedsToBeUpdatedSubs.count();
0386     m_feedUpdateProgress = 0;
0387     FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedSubs, this);
0388     connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort);
0389     connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) {
0390         qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount;
0391         Q_UNUSED(job);
0392         Q_ASSERT(unit == KJob::Unit::Items);
0393         m_feedUpdateProgress = amount;
0394         if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) {
0395             Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0396         }
0397     });
0398     connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() {
0399         qCDebug(kastsSync) << "Feed update finished";
0400         if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) {
0401             if (fetchFeedsJob->aborted()) {
0402                 Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0403             } else if (fetchFeedsJob->error()) {
0404                 // FetchFeedsJob takes care of its own error reporting
0405                 Q_EMIT infoMessage(this, getProgressMessage(Error));
0406             }
0407             emitResult();
0408             return;
0409         }
0410         Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0411         qCDebug(kastsSync) << "Done updating subscriptions and fetching updates";
0412 
0413         // We're ready to sync the episode states now
0414         // increase the progress counter now already since fetchRemoteEpisodeActions
0415         // can be executed multiple times
0416         setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0417         m_remoteEpisodeActionHash.clear();
0418         QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions);
0419     });
0420     fetchFeedsJob->start();
0421 }
0422 
0423 void SyncJob::fetchRemoteEpisodeActions()
0424 {
0425     qCDebug(kastsSync) << "Start syncing episode states";
0426     Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload));
0427 
0428     qulonglong episodeTimestamp = 0;
0429 
0430     // First check the database for previous timestamps
0431     QSqlQuery query;
0432     query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;"));
0433     query.bindValue(QStringLiteral(":syncservice"), episodeTimestampLabel);
0434     Database::instance().execute(query);
0435     if (query.next()) {
0436         episodeTimestamp = query.value(0).toULongLong();
0437         qCDebug(kastsSync) << "Previous gpodder episode timestamp" << episodeTimestamp;
0438     }
0439 
0440     if (!m_gpodder) {
0441         setError(SyncJobError::InternalDataError);
0442         Q_EMIT infoMessage(this, getProgressMessage(Error));
0443         emitResult();
0444         return;
0445     }
0446     // Check the gpodder service for episode action updates
0447     EpisodeActionRequest *epRequest = m_gpodder->getEpisodeActions(episodeTimestamp, (episodeTimestamp == 0));
0448     connect(this, &SyncJob::aborting, epRequest, &EpisodeActionRequest::abort);
0449     connect(epRequest, &EpisodeActionRequest::finished, this, [=]() {
0450         qCDebug(kastsSync) << "Finished episode action request";
0451         if (epRequest->error() || epRequest->aborted()) {
0452             if (epRequest->aborted()) {
0453                 Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0454             } else if (epRequest->error()) {
0455                 setError(SyncJobError::EpisodeUploadError);
0456                 setErrorText(epRequest->errorString());
0457                 Q_EMIT infoMessage(this, getProgressMessage(Error));
0458             }
0459             emitResult();
0460             return;
0461         }
0462 
0463         qulonglong newEpisodeTimestamp = epRequest->timestamp();
0464         qulonglong currentTimestamp = static_cast<qulonglong>(QDateTime::currentSecsSinceEpoch());
0465 
0466         qCDebug(kastsSync) << newEpisodeTimestamp;
0467 
0468         for (const EpisodeAction &action : epRequest->episodeActions()) {
0469             addToHashIfNewer(m_remoteEpisodeActionHash, action);
0470 
0471             qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position
0472                                << action.total << action.timestamp;
0473         }
0474 
0475         updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel);
0476 
0477         // Check returned timestamp against current timestamp.  If they aren't
0478         // close enough (let's take 10 seconds), that means that there are still
0479         // more episode actions to be fetched from the server.
0480         if (newEpisodeTimestamp > (currentTimestamp - 10) || epRequest->episodeActions().isEmpty()) {
0481             QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates);
0482         } else {
0483             qCDebug(kastsSync) << "Fetching another batch of episode actions" << newEpisodeTimestamp << currentTimestamp;
0484             QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions);
0485         }
0486     });
0487 }
0488 
0489 void SyncJob::syncEpisodeStates()
0490 {
0491     // store the local actions in member variable to be able to delete these exact same changes from DB when processed
0492 
0493     m_localEpisodeActions = getLocalEpisodeActions();
0494 
0495     QHash<QString, QHash<QString, EpisodeAction>> localEpisodeActionHash;
0496     for (const EpisodeAction &action : m_localEpisodeActions) {
0497         addToHashIfNewer(localEpisodeActionHash, action);
0498     }
0499 
0500     qCDebug(kastsSync) << "local hash";
0501     debugEpisodeActionHash(localEpisodeActionHash);
0502 
0503     qCDebug(kastsSync) << "remote hash";
0504     debugEpisodeActionHash(m_remoteEpisodeActionHash);
0505 
0506     // now remove conflicts between local and remote episode actions
0507     // based on the timestamp
0508     removeEpisodeActionConflicts(localEpisodeActionHash, m_remoteEpisodeActionHash);
0509 
0510     qCDebug(kastsSync) << "local hash";
0511     debugEpisodeActionHash(localEpisodeActionHash);
0512 
0513     qCDebug(kastsSync) << "remote hash";
0514     debugEpisodeActionHash(m_remoteEpisodeActionHash);
0515 
0516     // Now we update the feeds that need updating (don't update feeds that have
0517     // already been updated after the subscriptions were updated).
0518     for (const QString &url : getFeedsFromHash(m_remoteEpisodeActionHash)) {
0519         if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) {
0520             m_feedsToBeUpdatedEps += url;
0521         }
0522     }
0523     qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps;
0524     m_feedUpdateTotal = m_feedsToBeUpdatedEps.count();
0525     m_feedUpdateProgress = 0;
0526 
0527     setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0528     Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0529 
0530     FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this);
0531     connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort);
0532     connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) {
0533         qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount;
0534         Q_UNUSED(job);
0535         Q_ASSERT(unit == KJob::Unit::Items);
0536         m_feedUpdateProgress = amount;
0537         if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) {
0538             Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0539         }
0540     });
0541     connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() {
0542         qCDebug(kastsSync) << "Feed update finished";
0543         if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) {
0544             if (fetchFeedsJob->aborted()) {
0545                 Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0546             } else if (fetchFeedsJob->error()) {
0547                 // FetchFeedsJob takes care of its own error reporting
0548                 Q_EMIT infoMessage(this, getProgressMessage(Error));
0549             }
0550             emitResult();
0551             return;
0552         }
0553         Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
0554 
0555         // Apply the remote changes locally
0556         setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0557         Q_EMIT infoMessage(this, getProgressMessage(ApplyEpisodeActions));
0558 
0559         Sync::instance().applyEpisodeActionsLocally(m_remoteEpisodeActionHash);
0560 
0561         // Upload the local changes to the server
0562         QVector<EpisodeAction> localEpisodeActionList = createListFromHash(localEpisodeActionHash);
0563 
0564         setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0565         Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload));
0566         // Now upload the episode actions to the server
0567         QTimer::singleShot(0, this, [this, localEpisodeActionList]() {
0568             uploadEpisodeActions(localEpisodeActionList);
0569         });
0570     });
0571     fetchFeedsJob->start();
0572 }
0573 
0574 void SyncJob::uploadEpisodeActions(const QVector<EpisodeAction> &episodeActions)
0575 {
0576     // We have to upload episode actions in batches because otherwise the server
0577     // will reject them.
0578     uploadEpisodeActionsPartial(episodeActions, 0);
0579 }
0580 
0581 void SyncJob::uploadEpisodeActionsPartial(const QVector<EpisodeAction> &episodeActionList, const int startIndex)
0582 {
0583     if (episodeActionList.count() == 0) {
0584         // nothing to upload; we don't have to contact the server
0585         qCDebug(kastsSync) << "No episode actions to upload to server";
0586 
0587         removeAppliedEpisodeActionsFromDB();
0588 
0589         setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0590         Q_EMIT infoMessage(this, getProgressMessage(Finished));
0591         emitResult();
0592         return;
0593     }
0594 
0595     qCDebug(kastsSync) << "Uploading episode actions" << startIndex << "to"
0596                        << std::min(startIndex + maxAmountEpisodeUploads, static_cast<int>(episodeActionList.count())) << "of" << episodeActionList.count()
0597                        << "total episode actions";
0598 
0599     if (!m_gpodder) {
0600         setError(SyncJobError::InternalDataError);
0601         Q_EMIT infoMessage(this, getProgressMessage(Error));
0602         emitResult();
0603         return;
0604     }
0605     UploadEpisodeActionRequest *upEpRequest = m_gpodder->uploadEpisodeActions(episodeActionList.mid(startIndex, maxAmountEpisodeUploads));
0606     connect(this, &SyncJob::aborting, upEpRequest, &UploadEpisodeActionRequest::abort);
0607     connect(upEpRequest, &UploadEpisodeActionRequest::finished, this, [=]() {
0608         qCDebug(kastsSync) << "Finished uploading batch of episode actions to server";
0609         if (upEpRequest->error() || upEpRequest->aborted()) {
0610             if (upEpRequest->aborted()) {
0611                 Q_EMIT infoMessage(this, getProgressMessage(Aborted));
0612             } else if (upEpRequest->error()) {
0613                 setError(SyncJobError::EpisodeUploadError);
0614                 setErrorText(upEpRequest->errorString());
0615                 Q_EMIT infoMessage(this, getProgressMessage(Error));
0616             }
0617             emitResult();
0618             return;
0619         }
0620 
0621         if (episodeActionList.count() > startIndex + maxAmountEpisodeUploads) {
0622             // Still episodeActions remaining to be uploaded
0623             QTimer::singleShot(0, this, [this, &episodeActionList, startIndex]() {
0624                 uploadEpisodeActionsPartial(episodeActionList, startIndex + maxAmountEpisodeUploads);
0625             });
0626         } else {
0627             // All episodeActions have been uploaded
0628 
0629             qCDebug(kastsSync) << "New uploadEpisodeTimestamp from server" << upEpRequest->timestamp();
0630             updateDBTimestamp(upEpRequest->timestamp(), (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadEpisodeTimestampLabel : episodeTimestampLabel);
0631 
0632             removeAppliedEpisodeActionsFromDB();
0633 
0634             setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
0635             Q_EMIT infoMessage(this, getProgressMessage(Finished));
0636 
0637             // This is the final exit point for the Job unless an error or abort occured
0638             qCDebug(kastsSync) << "Syncing finished";
0639             emitResult();
0640         }
0641     });
0642 }
0643 
0644 void SyncJob::updateDBTimestamp(const qulonglong &timestamp, const QString &timestampLabel)
0645 {
0646     if (timestamp > 1) { // only accept timestamp if it's larger than zero
0647         bool timestampExists = false;
0648         QSqlQuery query;
0649         query.prepare(QStringLiteral("SELECT timestamp FROM SyncTimeStamps WHERE syncservice=:syncservice;"));
0650         query.bindValue(QStringLiteral(":syncservice"), timestampLabel);
0651         Database::instance().execute(query);
0652         if (query.next()) {
0653             timestampExists = true;
0654         }
0655 
0656         if (timestampExists) {
0657             query.prepare(QStringLiteral("UPDATE SyncTimeStamps SET timestamp=:timestamp WHERE syncservice=:syncservice;"));
0658         } else {
0659             query.prepare(QStringLiteral("INSERT INTO SyncTimeStamps VALUES (:syncservice, :timestamp);"));
0660         }
0661         query.bindValue(QStringLiteral(":syncservice"), timestampLabel);
0662         query.bindValue(QStringLiteral(":timestamp"), timestamp + 1); // add 1 second to avoid fetching our own previously sent updates next time
0663         Database::instance().execute(query);
0664     }
0665 }
0666 
0667 void SyncJob::removeAppliedSubscriptionChangesFromDB()
0668 {
0669     Database::instance().transaction();
0670     QSqlQuery query;
0671     query.prepare(QStringLiteral("DELETE FROM FeedActions WHERE url=:url AND action=:action;"));
0672 
0673     for (const QString &url : m_localSubscriptionChanges.first) {
0674         query.bindValue(QStringLiteral(":url"), url);
0675         query.bindValue(QStringLiteral(":action"), QStringLiteral("add"));
0676         Database::instance().execute(query);
0677     }
0678 
0679     for (const QString &url : m_localSubscriptionChanges.second) {
0680         query.bindValue(QStringLiteral(":url"), url);
0681         query.bindValue(QStringLiteral(":action"), QStringLiteral("remove"));
0682         Database::instance().execute(query);
0683     }
0684     Database::instance().commit();
0685 }
0686 
0687 void SyncJob::removeAppliedEpisodeActionsFromDB()
0688 {
0689     Database::instance().transaction();
0690     QSqlQuery query;
0691     query.prepare(
0692         QStringLiteral("DELETE FROM EpisodeActions WHERE podcast=:podcast AND url=:url AND id=:id AND action=:action AND started=:started AND "
0693                        "position=:position AND total=:total AND timestamp=:timestamp;"));
0694     for (const EpisodeAction &epAction : m_localEpisodeActions) {
0695         qCDebug(kastsSync) << "Removing episode action from DB" << epAction.id;
0696         query.bindValue(QStringLiteral(":podcast"), epAction.podcast);
0697         query.bindValue(QStringLiteral(":url"), epAction.url);
0698         query.bindValue(QStringLiteral(":id"), epAction.id);
0699         query.bindValue(QStringLiteral(":action"), epAction.action);
0700         query.bindValue(QStringLiteral(":started"), epAction.started);
0701         query.bindValue(QStringLiteral(":position"), epAction.position);
0702         query.bindValue(QStringLiteral(":total"), epAction.total);
0703         query.bindValue(QStringLiteral(":timestamp"), epAction.timestamp);
0704         Database::instance().execute(query);
0705     }
0706     Database::instance().commit();
0707 }
0708 
0709 void SyncJob::removeSubscriptionChangeConflicts(QStringList &addList, QStringList &removeList)
0710 {
0711     // Do some sanity checks and cleaning-up
0712     addList.removeDuplicates();
0713     removeList.removeDuplicates();
0714     for (const QString &addUrl : addList) {
0715         if (removeList.contains(addUrl)) {
0716             addList.removeAt(addList.indexOf(addUrl));
0717             removeList.removeAt(removeList.indexOf(addUrl));
0718         }
0719     }
0720     for (const QString &removeUrl : removeList) {
0721         if (addList.contains(removeUrl)) {
0722             removeList.removeAt(removeList.indexOf(removeUrl));
0723             addList.removeAt(addList.indexOf(removeUrl));
0724         }
0725     }
0726 }
0727 
0728 QVector<EpisodeAction> SyncJob::createListFromHash(const QHash<QString, QHash<QString, EpisodeAction>> &episodeActionHash)
0729 {
0730     QVector<EpisodeAction> episodeActionList;
0731 
0732     for (const QHash<QString, EpisodeAction> &actions : episodeActionHash) {
0733         for (const EpisodeAction &action : actions) {
0734             if (action.action == QStringLiteral("play")) {
0735                 episodeActionList << action;
0736             }
0737         }
0738     }
0739 
0740     return episodeActionList;
0741 }
0742 
0743 std::pair<QStringList, QStringList> SyncJob::getLocalSubscriptionChanges() const
0744 {
0745     std::pair<QStringList, QStringList> localChanges;
0746     QSqlQuery query;
0747     query.prepare(QStringLiteral("SELECT * FROM FeedActions;"));
0748     Database::instance().execute(query);
0749     while (query.next()) {
0750         QString url = query.value(QStringLiteral("url")).toString();
0751         QString action = query.value(QStringLiteral("action")).toString();
0752         // qulonglong timestamp = query.value(QStringLiteral("timestamp")).toULongLong();
0753         if (action == QStringLiteral("add")) {
0754             localChanges.first << url;
0755         } else if (action == QStringLiteral("remove")) {
0756             localChanges.second << url;
0757         }
0758     }
0759 
0760     return localChanges;
0761 }
0762 
0763 QVector<EpisodeAction> SyncJob::getLocalEpisodeActions() const
0764 {
0765     QVector<EpisodeAction> localEpisodeActions;
0766     QSqlQuery query;
0767     query.prepare(QStringLiteral("SELECT * FROM EpisodeActions;"));
0768     Database::instance().execute(query);
0769     while (query.next()) {
0770         QString podcast = query.value(QStringLiteral("podcast")).toString();
0771         QString url = query.value(QStringLiteral("url")).toString();
0772         QString id = query.value(QStringLiteral("id")).toString();
0773         QString action = query.value(QStringLiteral("action")).toString();
0774         qulonglong started = query.value(QStringLiteral("started")).toULongLong();
0775         qulonglong position = query.value(QStringLiteral("position")).toULongLong();
0776         qulonglong total = query.value(QStringLiteral("total")).toULongLong();
0777         qulonglong timestamp = query.value(QStringLiteral("timestamp")).toULongLong();
0778         EpisodeAction episodeAction = {podcast, url, id, m_device, action, started, position, total, timestamp};
0779         localEpisodeActions += episodeAction;
0780     }
0781 
0782     return localEpisodeActions;
0783 }
0784 
0785 void SyncJob::addToHashIfNewer(QHash<QString, QHash<QString, EpisodeAction>> &episodeActionHash, const EpisodeAction &episodeAction)
0786 {
0787     if (episodeAction.action == QStringLiteral("play")) {
0788         if (episodeActionHash.contains(episodeAction.id) && episodeActionHash[episodeAction.id].contains(QStringLiteral("play"))) {
0789             if (episodeActionHash[episodeAction.id][QStringLiteral("play")].timestamp <= episodeAction.timestamp) {
0790                 episodeActionHash[episodeAction.id][QStringLiteral("play")] = episodeAction;
0791             }
0792         } else {
0793             episodeActionHash[episodeAction.id][QStringLiteral("play")] = episodeAction;
0794         }
0795     }
0796 
0797     if (episodeAction.action == QStringLiteral("download") || episodeAction.action == QStringLiteral("delete")) {
0798         if (episodeActionHash.contains(episodeAction.id)) {
0799             if (episodeActionHash[episodeAction.id].contains(QStringLiteral("download"))) {
0800                 if (episodeActionHash[episodeAction.id][QStringLiteral("download")].timestamp <= episodeAction.timestamp) {
0801                     episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction;
0802                 }
0803             } else if (episodeActionHash[episodeAction.id].contains(QStringLiteral("delete"))) {
0804                 if (episodeActionHash[episodeAction.id][QStringLiteral("delete")].timestamp <= episodeAction.timestamp) {
0805                     episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction;
0806                 }
0807             } else {
0808                 episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction;
0809             }
0810         } else {
0811             episodeActionHash[episodeAction.id][QStringLiteral("download-delete")] = episodeAction;
0812         }
0813     }
0814 
0815     if (episodeAction.action == QStringLiteral("new")) {
0816         if (episodeActionHash.contains(episodeAction.id) && episodeActionHash[episodeAction.id].contains(QStringLiteral("new"))) {
0817             if (episodeActionHash[episodeAction.id][QStringLiteral("new")].timestamp <= episodeAction.timestamp) {
0818                 episodeActionHash[episodeAction.id][QStringLiteral("new")] = episodeAction;
0819             }
0820         } else {
0821             episodeActionHash[episodeAction.id][QStringLiteral("new")] = episodeAction;
0822         }
0823     }
0824 }
0825 
0826 void SyncJob::removeEpisodeActionConflicts(QHash<QString, QHash<QString, EpisodeAction>> &local, QHash<QString, QHash<QString, EpisodeAction>> &remote)
0827 {
0828     QStringList actions;
0829     actions << QStringLiteral("play") << QStringLiteral("download-delete") << QStringLiteral("new");
0830 
0831     // We first remove the conflicts from the hash with local changes
0832     for (const QHash<QString, EpisodeAction> &hashItem : remote) {
0833         for (const QString &action : actions) {
0834             QString id = hashItem[action].id;
0835             if (local.contains(id) && local.value(id).contains(action)) {
0836                 if (local[id][action].timestamp < remote[id][action].timestamp) {
0837                     local[id].remove(action);
0838                 }
0839             }
0840         }
0841     }
0842 
0843     // And now the same for the remote
0844     for (const QHash<QString, EpisodeAction> &hashItem : local) {
0845         for (const QString &action : actions) {
0846             QString id = hashItem[action].id;
0847             if (remote.contains(id) && remote.value(id).contains(action)) {
0848                 if (remote[id][action].timestamp < local[id][action].timestamp) {
0849                     remote[id].remove(action);
0850                 }
0851             }
0852         }
0853     }
0854 }
0855 
0856 QStringList SyncJob::getFeedsFromHash(const QHash<QString, QHash<QString, EpisodeAction>> &hash)
0857 {
0858     QStringList feedUrls;
0859     for (const QHash<QString, EpisodeAction> &actionList : hash) {
0860         for (const EpisodeAction &action : actionList) {
0861             feedUrls += action.podcast;
0862         }
0863     }
0864     return feedUrls;
0865 }
0866 
0867 void SyncJob::debugEpisodeActionHash(const QHash<QString, QHash<QString, EpisodeAction>> &hash)
0868 {
0869     for (const QHash<QString, EpisodeAction> &hashItem : hash) {
0870         for (const EpisodeAction &action : hashItem) {
0871             qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position
0872                                << action.total << action.timestamp;
0873         }
0874     }
0875 }
0876 
0877 QString SyncJob::getProgressMessage(SyncJobStatus status) const
0878 {
0879     int processed = processedAmount(KJob::Unit::Items);
0880     int total = totalAmount(KJob::Unit::Items);
0881 
0882     switch (status) {
0883     case Started:
0884         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Start sync", processed, total);
0885         break;
0886     case SubscriptionDownload:
0887         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Requesting remote subscription updates", processed, total);
0888         break;
0889     case SubscriptionUpload:
0890         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Uploading local subscription updates", processed, total);
0891         break;
0892     case SubscriptionFetch:
0893         return i18ncp("Subscription/Episode sync progress step",
0894                       "(Step %3 of %4) Updated %2 of %1 podcast",
0895                       "(Step %3 of %4) Updated %2 of %1 podcasts",
0896                       m_feedUpdateTotal,
0897                       m_feedUpdateProgress,
0898                       processed,
0899                       total);
0900         break;
0901     case EpisodeDownload:
0902         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Requesting remote episode updates", processed, total);
0903         break;
0904     case ApplyEpisodeActions:
0905         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Applying remote episode changes", processed, total);
0906         break;
0907     case EpisodeUpload:
0908         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Uploading local episode updates", processed, total);
0909         break;
0910     case Finished:
0911         return i18nc("Subscription/Episode sync progress step", "(Step %1 of %2) Finished sync", processed, total);
0912         break;
0913     case Aborted:
0914         return i18nc("Subscription/Episode sync progress step", "Sync aborted");
0915         break;
0916     case Error:
0917     default:
0918         return i18nc("Subscription/Episode sync progress step", "Sync finished with error");
0919         break;
0920     }
0921 }