File indexing completed on 2023-10-03 07:53:59
0001 /* This file is part of the KDE project 0002 0003 Copyright (C) 2004 Dario Massarin <nekkar@libero.it> 0004 Copyright (C) 2010 Matthias Fuchs <mat69@gmx.net> 0005 0006 This program is free software; you can redistribute it and/or 0007 modify it under the terms of the GNU General Public 0008 License as published by the Free Software Foundation; either 0009 version 2 of the License, or (at your option) any later version. 0010 */ 0011 0012 #include "core/scheduler.h" 0013 0014 #include "core/transferhandler.h" 0015 #include "settings.h" 0016 0017 #include <algorithm> 0018 #include <boost/bind/bind.hpp> 0019 0020 #include "kget_debug.h" 0021 #include <QDebug> 0022 0023 Scheduler::Scheduler(QObject *parent) 0024 : QObject(parent) 0025 , m_failureCheckTimer(0) 0026 , m_stallTime(5) 0027 , m_stallTimeout(Settings::reconnectDelay()) 0028 , m_abortTimeout(Settings::reconnectDelay()) 0029 , m_isSuspended(false) 0030 , m_hasConnection(true) 0031 { 0032 } 0033 0034 Scheduler::~Scheduler() 0035 { 0036 } 0037 0038 void Scheduler::setIsSuspended(bool isSuspended) 0039 { 0040 const bool changed = (isSuspended != m_isSuspended); 0041 m_isSuspended = isSuspended; 0042 0043 // update all the queues 0044 if (changed && shouldUpdate()) { 0045 updateAllQueues(); 0046 } 0047 } 0048 0049 void Scheduler::setHasNetworkConnection(bool hasConnection) 0050 { 0051 const bool changed = (hasConnection != m_hasConnection); 0052 m_hasConnection = hasConnection; 0053 0054 if (changed) { 0055 if (hasConnection) { 0056 if (!m_failureCheckTimer) { 0057 m_failureCheckTimer = startTimer(1000); 0058 } 0059 updateAllQueues(); 0060 } else { 0061 if (m_failureCheckTimer) { 0062 killTimer(m_failureCheckTimer); 0063 m_failureCheckTimer = 0; 0064 } 0065 foreach (JobQueue *queue, m_queues) { 0066 std::for_each(queue->begin(), queue->end(), boost::bind(&Job::stop, boost::placeholders::_1)); 0067 } 0068 } 0069 } 0070 } 0071 0072 void Scheduler::addQueue(JobQueue *queue) 0073 { 0074 if (!m_queues.contains(queue)) 0075 m_queues.append(queue); 0076 } 0077 0078 void Scheduler::delQueue(JobQueue *queue) 0079 { 0080 m_queues.removeAll(queue); 0081 } 0082 0083 struct IsRunningJob { 0084 bool operator()(Job *job) const 0085 { 0086 return (job->status() == Job::Running); 0087 } 0088 }; 0089 0090 bool Scheduler::hasRunningJobs() const 0091 { 0092 foreach (JobQueue *queue, m_queues) { 0093 if (std::find_if(queue->begin(), queue->end(), IsRunningJob()) != queue->end()) { 0094 return true; 0095 } 0096 } 0097 return false; 0098 } 0099 0100 int Scheduler::countRunningJobs() const 0101 { 0102 int count = 0; 0103 foreach (JobQueue *queue, m_queues) { 0104 count += std::count_if(queue->begin(), queue->end(), IsRunningJob()); 0105 } 0106 0107 return count; 0108 } 0109 0110 void Scheduler::settingsChanged() 0111 { 0112 m_stallTimeout = Settings::reconnectDelay(); 0113 m_abortTimeout = Settings::reconnectDelay(); 0114 0115 updateAllQueues(); 0116 } 0117 0118 void Scheduler::jobQueueChangedEvent(JobQueue *queue, JobQueue::Status status) 0119 { 0120 if (status == JobQueue::Stopped) { 0121 JobQueue::iterator it = queue->begin(); 0122 JobQueue::iterator itEnd = queue->end(); 0123 0124 for (; it != itEnd; ++it) { 0125 if ((*it)->status() != Job::Stopped) 0126 (*it)->stop(); 0127 } 0128 } else 0129 updateQueue(queue); 0130 } 0131 0132 void Scheduler::jobQueueMovedJobEvent(JobQueue *queue, Job *job) 0133 { 0134 Q_UNUSED(job) 0135 0136 updateQueue(queue); 0137 } 0138 0139 void Scheduler::jobQueueAddedJobEvent(JobQueue *queue, Job *job) 0140 { 0141 Q_UNUSED(job) 0142 0143 updateQueue(queue); 0144 } 0145 0146 void Scheduler::jobQueueAddedJobsEvent(JobQueue *queue, const QList<Job *> jobs) 0147 { 0148 Q_UNUSED(jobs) 0149 0150 updateQueue(queue); 0151 } 0152 0153 void Scheduler::jobQueueRemovedJobEvent(JobQueue *queue, Job *job) 0154 { 0155 Q_UNUSED(job) 0156 0157 updateQueue(queue); 0158 } 0159 0160 void Scheduler::jobQueueRemovedJobsEvent(JobQueue *queue, const QList<Job *> jobs) 0161 { 0162 Q_UNUSED(jobs) 0163 0164 updateQueue(queue); 0165 } 0166 0167 void Scheduler::jobChangedEvent(Job *job, Job::Status status) 0168 { 0169 qCDebug(KGET_DEBUG) << "Scheduler::jobChangedEvent (job=" << job << " status=" << status << ")"; 0170 0171 if (!m_failureCheckTimer) 0172 m_failureCheckTimer = startTimer(1000); 0173 0174 if (status != Job::Running) 0175 updateQueue(job->jobQueue()); 0176 } 0177 0178 void Scheduler::jobChangedEvent(Job *job, Job::Policy policy) 0179 { 0180 Q_UNUSED(policy) 0181 0182 updateQueue(job->jobQueue()); 0183 } 0184 0185 void Scheduler::jobChangedEvent(Job *job, JobFailure failure) 0186 { 0187 switch (failure.status) { 0188 case None: 0189 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = None "; 0190 break; 0191 case AboutToStall: 0192 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = AboutToStall "; 0193 break; 0194 case Stall: 0195 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Stall "; 0196 break; 0197 case StallTimeout: 0198 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = StallTimeout "; 0199 break; 0200 case Abort: 0201 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Abort "; 0202 break; 0203 case AbortTimeout: 0204 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = AbortTimeout "; 0205 break; 0206 case Error: 0207 qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Error "; 0208 break; 0209 } 0210 0211 if (failure.status == Error) { 0212 static_cast<Transfer *>(job)->handler()->stop(); 0213 } else if ( // If this happens the job just gets stopped 0214 // Second condition: if count > reconnectRetries and Timeout happened trigger a stop/start BUT only if 0215 // 10 timeouts have happened (9 of them without taking any action). This means every 10*Settings::reconnectDelay() (ex. 15s -> 150s) 0216 (failure.count > Settings::reconnectRetries() && (failure.status == StallTimeout || failure.status == AbortTimeout) 0217 && !((failure.count - Settings::reconnectRetries()) % 10))) { 0218 // FIXME reenable once a connection limit per mirror is in place BUG:262098 0219 // static_cast<Transfer*>(job)->handler()->stop();// This will trigger the changedEvent which will trigger an updateQueue call 0220 job->stop(); // FIXME remove once a connection limit per mirror is in place 0221 } else if (failure.count <= Settings::reconnectRetries() && (failure.status == StallTimeout || failure.status == AbortTimeout)) { 0222 // First condition: if count <= reconnectRetries and Timeout happened trigger a stop/start 0223 job->stop(); // stops the job, it will be later restarted by updateQueue 0224 } else 0225 updateQueue(job->jobQueue()); 0226 } 0227 0228 void Scheduler::start() 0229 { 0230 std::for_each(m_queues.begin(), m_queues.end(), boost::bind(&JobQueue::setStatus, boost::placeholders::_1, JobQueue::Running)); 0231 } 0232 0233 void Scheduler::stop() 0234 { 0235 std::for_each(m_queues.begin(), m_queues.end(), boost::bind(&JobQueue::setStatus, boost::placeholders::_1, JobQueue::Stopped)); 0236 } 0237 0238 void Scheduler::updateQueue(JobQueue *queue) 0239 { 0240 static bool updatingQueue = false; 0241 0242 if (!shouldUpdate() || updatingQueue) 0243 return; 0244 0245 updatingQueue = true; 0246 0247 int runningJobs = 0; // Jobs that are running (and not in the stallTimeout) 0248 int waitingJobs = 0; // Jobs that we leave running but are in stallTimeout. We wait for them to start downloading, while we start other ones 0249 0250 /** 0251 * Implemented behaviour 0252 * 0253 * The scheduler allows a maximum number of runningJobs equal to the queue->maxSimultaneousJobs() setting. 0254 * If that number is not reached because of stallTimeout transfers, the scheduler allows that: 0255 * (runningJobs + waitingJobs) < 2 * queue->maxSimultaneousJobs() 0256 * Examples (with maxSimultaneousJobs = 2): 0257 * These are if the running jobs come first in the queue 0258 * 1) 2 runningJobs - 0 waitingJobs 0259 * 2) 1 runningJobs - up to 3 waitingJobs 0260 * 3) 0 runningJobs - up to 4 waitingJobs 0261 * These are if the waiting jobs come first in the queue 0262 * 1) 1 waitingJobs - 2 runningJobs 0263 * 2) 2 waitingJobs - 2 runningJobs 0264 * 3) 3 waitingJobs - 1 runningJobs 0265 * 4) 4 waitingJobs - 0 runningJobs 0266 **/ 0267 0268 JobQueue::iterator it = queue->begin(); 0269 JobQueue::iterator itEnd = queue->end(); 0270 0271 for (int job = 0; it != itEnd; ++it, ++job) { 0272 // qCDebug(KGET_DEBUG) << "MaxSimJobs " << queue->maxSimultaneousJobs(); 0273 qCDebug(KGET_DEBUG) << "Scheduler: Evaluating job " << job; 0274 0275 JobFailure failure = m_failedJobs.value(*it); 0276 0277 if (runningJobs < queue->maxSimultaneousJobs() && ((runningJobs + waitingJobs) < 2 * queue->maxSimultaneousJobs())) { 0278 if ((*it)->status() == Job::Running || (*it)->status() == Job::FinishedKeepAlive) { 0279 if (!shouldBeRunning(*it)) { 0280 qCDebug(KGET_DEBUG) << "Scheduler: stopping job"; 0281 (*it)->stop(); 0282 } else if (failure.status == None || failure.status == AboutToStall) 0283 runningJobs++; 0284 else 0285 waitingJobs++; 0286 } else // != Job::Running 0287 { 0288 if (shouldBeRunning(*it)) { 0289 qCDebug(KGET_DEBUG) << "Scheduler: starting job"; 0290 (*it)->start(); 0291 if ((failure.status == None || failure.status == AboutToStall) && (*it)->status() != Job::FinishedKeepAlive) 0292 runningJobs++; 0293 else 0294 waitingJobs++; 0295 } 0296 } 0297 } else { 0298 // Stop all the other running downloads 0299 qCDebug(KGET_DEBUG) << "Scheduler: stopping job over maxSimJobs limit"; 0300 (*it)->stop(); 0301 } 0302 } 0303 0304 updatingQueue = false; 0305 } 0306 0307 void Scheduler::updateAllQueues() 0308 { 0309 foreach (JobQueue *queue, m_queues) { 0310 updateQueue(queue); 0311 } 0312 } 0313 0314 bool Scheduler::shouldBeRunning(Job *job) 0315 { 0316 Job::Policy policy = job->policy(); 0317 Job::Status status = job->status(); 0318 0319 if (job->jobQueue()->status() == JobQueue::Stopped) { 0320 return ((policy == Job::Start) && ((status != Job::Finished) && (status != Job::Aborted || job->error().type == Job::AutomaticRetry))); 0321 } else // JobQueue::Running 0322 { 0323 return ((policy != Job::Stop) && ((status != Job::Finished) && (status != Job::Aborted || job->error().type == Job::AutomaticRetry))); 0324 } 0325 } 0326 0327 void Scheduler::timerEvent(QTimerEvent *event) 0328 { 0329 Q_UNUSED(event) 0330 // qCDebug(KGET_DEBUG); 0331 0332 if (!shouldUpdate()) { 0333 return; 0334 } 0335 0336 foreach (JobQueue *queue, m_queues) { 0337 JobQueue::iterator it = queue->begin(); 0338 JobQueue::iterator itEnd = queue->end(); 0339 0340 for (int job = 0; it != itEnd; ++it, ++job) { 0341 JobFailure failure = m_failedJobs[*it]; 0342 JobFailure prevFailure = failure; 0343 0344 if ((*it)->isStalled()) // Stall status initialization 0345 { 0346 if (failure.status != AboutToStall && failure.status != Stall && failure.status != StallTimeout) { 0347 failure.status = AboutToStall; 0348 failure.time = 0; 0349 failure.count = 0; 0350 } else { 0351 failure.time++; 0352 0353 if (failure.time >= m_stallTime + m_stallTimeout) { 0354 failure.status = StallTimeout; 0355 failure.count++; 0356 0357 } else if (failure.time >= m_stallTime) 0358 failure.status = Stall; 0359 else 0360 failure.status = AboutToStall; 0361 0362 if (failure.status == StallTimeout) 0363 failure.time = m_stallTime; 0364 } 0365 } else if ((*it)->status() == Job::Aborted) // Abort status initialization 0366 { 0367 if ((*it)->error().type != Job::AutomaticRetry) { 0368 failure.status = Error; 0369 } else { 0370 if (failure.status != Abort) { 0371 failure.status = Abort; 0372 failure.time = 0; 0373 failure.count = 0; 0374 } else { 0375 failure.time++; 0376 failure.count++; 0377 0378 if (failure.time >= m_abortTimeout) { 0379 failure.status = AbortTimeout; 0380 failure.count++; 0381 } 0382 0383 if (failure.status == AbortTimeout) 0384 failure.time = 0; 0385 } 0386 } 0387 } else if ((*it)->isWorking()) { 0388 failure = JobFailure(); 0389 } 0390 0391 if (failure.isValid()) // A failure has been detected 0392 m_failedJobs[*it] = failure; 0393 else // No failure detected, remove it 0394 m_failedJobs.remove(*it); 0395 0396 // if(failure.isValid() || prevFailure.isValid()) 0397 // qCDebug(KGET_DEBUG) << "failure = " << failure.status << " T=" << failure.time << " prevFailure = " << prevFailure.status; 0398 0399 if (failure.status != prevFailure.status) 0400 jobChangedEvent(*it, failure); // Notify the scheduler 0401 } 0402 } 0403 } 0404 0405 #include "moc_scheduler.cpp"