File indexing completed on 2024-06-16 04:50:31
0001 /* 0002 SPDX-FileCopyrightText: 2014 Daniel Vrátil <dvratil@redhat.com> 0003 0004 SPDX-License-Identifier: LGPL-2.0-or-later 0005 */ 0006 0007 #include "collectionscheduler.h" 0008 #include "akonadiserver_debug.h" 0009 #include "storage/datastore.h" 0010 #include "storage/selectquerybuilder.h" 0011 0012 #include "private/tristate_p.h" 0013 #include <chrono> 0014 0015 #include <QDateTime> 0016 #include <QTimer> 0017 0018 using namespace std::literals::chrono_literals; 0019 0020 namespace Akonadi 0021 { 0022 namespace Server 0023 { 0024 /** 0025 * @warning: QTimer's methods are not virtual, so it's necessary to always call 0026 * methods on pointer to PauseableTimer! 0027 */ 0028 class PauseableTimer : public QTimer 0029 { 0030 Q_OBJECT 0031 0032 public: 0033 explicit PauseableTimer(QObject *parent = nullptr) 0034 : QTimer(parent) 0035 { 0036 } 0037 0038 void start(std::chrono::milliseconds interval) 0039 { 0040 mStarted = QDateTime::currentDateTimeUtc(); 0041 mPaused = QDateTime(); 0042 setInterval(interval); 0043 QTimer::start(interval); 0044 } 0045 0046 void start() 0047 { 0048 start(std::chrono::milliseconds{interval()}); 0049 } 0050 0051 void stop() 0052 { 0053 mStarted = QDateTime(); 0054 mPaused = QDateTime(); 0055 QTimer::stop(); 0056 } 0057 0058 Q_INVOKABLE void pause() 0059 { 0060 if (!isActive() || isPaused()) { 0061 return; 0062 } 0063 0064 mPaused = QDateTime::currentDateTimeUtc(); 0065 QTimer::stop(); 0066 } 0067 0068 Q_INVOKABLE void resume() 0069 { 0070 if (!isPaused()) { 0071 return; 0072 } 0073 0074 const auto remainder = std::chrono::milliseconds{interval()} - std::chrono::seconds{mStarted.secsTo(mPaused)}; 0075 start(qMax(std::chrono::milliseconds{0}, remainder)); 0076 mPaused = QDateTime(); 0077 // Update mStarted so that pause() can be called repeatedly 0078 mStarted = QDateTime::currentDateTimeUtc(); 0079 } 0080 0081 bool isPaused() const 0082 { 0083 return mPaused.isValid(); 0084 } 0085 0086 private: 0087 QDateTime mStarted; 0088 QDateTime mPaused; 0089 }; 0090 0091 } // namespace Server 0092 } // namespace Akonadi 0093 0094 using namespace Akonadi::Server; 0095 0096 CollectionScheduler::CollectionScheduler(const QString &threadName, QThread::Priority priority, QObject *parent) 0097 : AkThread(threadName, priority, parent) 0098 { 0099 } 0100 0101 CollectionScheduler::~CollectionScheduler() 0102 { 0103 } 0104 0105 // Called in secondary thread 0106 void CollectionScheduler::quit() 0107 { 0108 delete mScheduler; 0109 mScheduler = nullptr; 0110 0111 AkThread::quit(); 0112 } 0113 0114 void CollectionScheduler::inhibit(bool inhibit) 0115 { 0116 if (inhibit) { 0117 const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::pause, Qt::QueuedConnection); 0118 Q_ASSERT(success); 0119 Q_UNUSED(success) 0120 } else { 0121 const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::resume, Qt::QueuedConnection); 0122 Q_ASSERT(success); 0123 Q_UNUSED(success) 0124 } 0125 } 0126 0127 int CollectionScheduler::minimumInterval() const 0128 { 0129 return mMinInterval; 0130 } 0131 0132 CollectionScheduler::TimePoint CollectionScheduler::nextScheduledTime(qint64 collectionId) const 0133 { 0134 QMutexLocker locker(&mScheduleLock); 0135 const auto i = constFind(collectionId); 0136 if (i != mSchedule.cend()) { 0137 return i.key(); 0138 } 0139 return {}; 0140 } 0141 0142 std::chrono::milliseconds CollectionScheduler::currentTimerInterval() const 0143 { 0144 return std::chrono::milliseconds(mScheduler->isActive() ? mScheduler->interval() : 0); 0145 } 0146 0147 void CollectionScheduler::setMinimumInterval(int intervalMinutes) 0148 { 0149 // No mutex -- you can only call this before starting the thread 0150 mMinInterval = intervalMinutes; 0151 } 0152 0153 void CollectionScheduler::collectionAdded(qint64 collectionId) 0154 { 0155 Collection collection = Collection::retrieveById(collectionId); 0156 DataStore::self()->activeCachePolicy(collection); 0157 if (shouldScheduleCollection(collection)) { 0158 QMetaObject::invokeMethod( 0159 this, 0160 [this, collection]() { 0161 scheduleCollection(collection); 0162 }, 0163 Qt::QueuedConnection); 0164 } 0165 } 0166 0167 void CollectionScheduler::collectionChanged(qint64 collectionId) 0168 { 0169 QMutexLocker locker(&mScheduleLock); 0170 const auto it = constFind(collectionId); 0171 if (it != mSchedule.cend()) { 0172 const Collection &oldCollection = it.value(); 0173 Collection changed = Collection::retrieveById(collectionId); 0174 DataStore::self()->activeCachePolicy(changed); 0175 if (hasChanged(oldCollection, changed)) { 0176 if (shouldScheduleCollection(changed)) { 0177 locker.unlock(); 0178 // Scheduling the changed collection will automatically remove the old one 0179 QMetaObject::invokeMethod( 0180 this, 0181 [this, changed]() { 0182 scheduleCollection(changed); 0183 }, 0184 Qt::QueuedConnection); 0185 } else { 0186 locker.unlock(); 0187 // If the collection should no longer be scheduled then remove it 0188 collectionRemoved(collectionId); 0189 } 0190 } 0191 } else { 0192 // We don't know the collection yet, but maybe now it can be scheduled 0193 collectionAdded(collectionId); 0194 } 0195 } 0196 0197 void CollectionScheduler::collectionRemoved(qint64 collectionId) 0198 { 0199 QMutexLocker locker(&mScheduleLock); 0200 auto it = find(collectionId); 0201 if (it != mSchedule.end()) { 0202 const bool reschedule = it == mSchedule.begin(); 0203 mSchedule.erase(it); 0204 0205 // If we just remove currently scheduled collection, schedule the next one 0206 if (reschedule) { 0207 QMetaObject::invokeMethod(this, &CollectionScheduler::startScheduler, Qt::QueuedConnection); 0208 } 0209 } 0210 } 0211 0212 // Called in secondary thread 0213 void CollectionScheduler::startScheduler() 0214 { 0215 QMutexLocker locker(&mScheduleLock); 0216 // Don't restart timer if we are paused. 0217 if (mScheduler->isPaused()) { 0218 return; 0219 } 0220 0221 if (mSchedule.isEmpty()) { 0222 // Stop the timer. It will be started again once some collection is scheduled 0223 mScheduler->stop(); 0224 return; 0225 } 0226 0227 // Get next collection to expire and start the timer 0228 const auto next = mSchedule.constBegin().key(); 0229 // TimePoint uses a signed representation internally (int64_t), so we get negative result when next is in the past 0230 const auto delayUntilNext = std::chrono::duration_cast<std::chrono::milliseconds>(next - std::chrono::steady_clock::now()); 0231 mScheduler->start(qMax(std::chrono::milliseconds{0}, delayUntilNext)); 0232 } 0233 0234 // Called in secondary thread 0235 void CollectionScheduler::scheduleCollection(Collection collection, bool shouldStartScheduler) 0236 { 0237 DataStore::self()->activeCachePolicy(collection); 0238 0239 QMutexLocker locker(&mScheduleLock); 0240 auto i = find(collection.id()); 0241 if (i != mSchedule.end()) { 0242 mSchedule.erase(i); 0243 } 0244 0245 if (!shouldScheduleCollection(collection)) { 0246 return; 0247 } 0248 0249 const int expireMinutes = qMax(mMinInterval, collectionScheduleInterval(collection)); 0250 TimePoint nextCheck(std::chrono::steady_clock::now() + std::chrono::minutes(expireMinutes)); 0251 0252 // Check whether there's another check scheduled within a minute after this one. 0253 // If yes, then delay this check so that it's scheduled together with the others 0254 // This is a minor optimization to reduce wakeups and SQL queries 0255 auto it = constLowerBound(nextCheck); 0256 if (it != mSchedule.cend() && it.key() - nextCheck < 1min) { 0257 nextCheck = it.key(); 0258 0259 // Also check whether there's another checked scheduled within a minute before 0260 // this one. 0261 } else if (it != mSchedule.cbegin()) { 0262 --it; 0263 if (nextCheck - it.key() < 1min) { 0264 nextCheck = it.key(); 0265 } 0266 } 0267 0268 mSchedule.insert(nextCheck, collection); 0269 if (shouldStartScheduler && !mScheduler->isActive()) { 0270 locker.unlock(); 0271 startScheduler(); 0272 } 0273 } 0274 0275 CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constFind(qint64 collectionId) const 0276 { 0277 return std::find_if(mSchedule.cbegin(), mSchedule.cend(), [collectionId](const Collection &c) { 0278 return c.id() == collectionId; 0279 }); 0280 } 0281 0282 CollectionScheduler::ScheduleMap::iterator CollectionScheduler::find(qint64 collectionId) 0283 { 0284 return std::find_if(mSchedule.begin(), mSchedule.end(), [collectionId](const Collection &c) { 0285 return c.id() == collectionId; 0286 }); 0287 } 0288 0289 // separate method so we call the const version of QMap::lowerBound 0290 CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constLowerBound(TimePoint timestamp) const 0291 { 0292 return mSchedule.lowerBound(timestamp); 0293 } 0294 0295 // Called in secondary thread 0296 void CollectionScheduler::init() 0297 { 0298 AkThread::init(); 0299 0300 mScheduler = new PauseableTimer(); 0301 mScheduler->setSingleShot(true); 0302 connect(mScheduler, &QTimer::timeout, this, &CollectionScheduler::schedulerTimeout); 0303 0304 // Only retrieve enabled collections and referenced collections, we don't care 0305 // about anything else 0306 SelectQueryBuilder<Collection> qb; 0307 if (!qb.exec()) { 0308 qCWarning(AKONADISERVER_LOG) << "Failed to query initial collections for scheduler!"; 0309 qCWarning(AKONADISERVER_LOG) << "Not a fatal error, no collections will be scheduled for sync or cache expiration!"; 0310 } 0311 0312 const Collection::List collections = qb.result(); 0313 for (const Collection &collection : collections) { 0314 scheduleCollection(collection); 0315 } 0316 0317 startScheduler(); 0318 } 0319 0320 // Called in secondary thread 0321 void CollectionScheduler::schedulerTimeout() 0322 { 0323 QMutexLocker locker(&mScheduleLock); 0324 0325 // Call stop() explicitly to reset the timer 0326 mScheduler->stop(); 0327 0328 const auto timestamp = mSchedule.constBegin().key(); 0329 const QList<Collection> collections = mSchedule.values(timestamp); 0330 mSchedule.remove(timestamp); 0331 locker.unlock(); 0332 0333 for (const Collection &collection : collections) { 0334 collectionExpired(collection); 0335 scheduleCollection(collection, false); 0336 } 0337 0338 startScheduler(); 0339 } 0340 0341 #include "collectionscheduler.moc" 0342 0343 #include "moc_collectionscheduler.cpp"