File indexing completed on 2024-04-21 04:57:04

0001 /* This file is part of the KDE project
0002 
0003    Copyright (C) 2004 Dario Massarin <nekkar@libero.it>
0004    Copyright (C) 2010 Matthias Fuchs <mat69@gmx.net>
0005 
0006    This program is free software; you can redistribute it and/or
0007    modify it under the terms of the GNU General Public
0008    License as published by the Free Software Foundation; either
0009    version 2 of the License, or (at your option) any later version.
0010 */
0011 
0012 #include "core/scheduler.h"
0013 
0014 #include "core/transferhandler.h"
0015 #include "settings.h"
0016 
0017 #include <algorithm>
0018 #include <boost/bind/bind.hpp>
0019 
0020 #include "kget_debug.h"
0021 #include <QDebug>
0022 
0023 Scheduler::Scheduler(QObject *parent)
0024     : QObject(parent)
0025     , m_failureCheckTimer(0)
0026     , m_stallTime(5)
0027     , m_stallTimeout(Settings::reconnectDelay())
0028     , m_abortTimeout(Settings::reconnectDelay())
0029     , m_isSuspended(false)
0030     , m_hasConnection(true)
0031 {
0032 }
0033 
0034 Scheduler::~Scheduler()
0035 {
0036 }
0037 
0038 void Scheduler::setIsSuspended(bool isSuspended)
0039 {
0040     const bool changed = (isSuspended != m_isSuspended);
0041     m_isSuspended = isSuspended;
0042 
0043     // update all the queues
0044     if (changed && shouldUpdate()) {
0045         updateAllQueues();
0046     }
0047 }
0048 
0049 void Scheduler::setHasNetworkConnection(bool hasConnection)
0050 {
0051     const bool changed = (hasConnection != m_hasConnection);
0052     m_hasConnection = hasConnection;
0053 
0054     if (changed) {
0055         if (hasConnection) {
0056             if (!m_failureCheckTimer) {
0057                 m_failureCheckTimer = startTimer(1000);
0058             }
0059             updateAllQueues();
0060         } else {
0061             if (m_failureCheckTimer) {
0062                 killTimer(m_failureCheckTimer);
0063                 m_failureCheckTimer = 0;
0064             }
0065             foreach (JobQueue *queue, m_queues) {
0066                 std::for_each(queue->begin(), queue->end(), boost::bind(&Job::stop, boost::placeholders::_1));
0067             }
0068         }
0069     }
0070 }
0071 
0072 void Scheduler::addQueue(JobQueue *queue)
0073 {
0074     if (!m_queues.contains(queue))
0075         m_queues.append(queue);
0076 }
0077 
0078 void Scheduler::delQueue(JobQueue *queue)
0079 {
0080     m_queues.removeAll(queue);
0081 }
0082 
0083 struct IsRunningJob {
0084     bool operator()(Job *job) const
0085     {
0086         return (job->status() == Job::Running);
0087     }
0088 };
0089 
0090 bool Scheduler::hasRunningJobs() const
0091 {
0092     foreach (JobQueue *queue, m_queues) {
0093         if (std::find_if(queue->begin(), queue->end(), IsRunningJob()) != queue->end()) {
0094             return true;
0095         }
0096     }
0097     return false;
0098 }
0099 
0100 int Scheduler::countRunningJobs() const
0101 {
0102     int count = 0;
0103     foreach (JobQueue *queue, m_queues) {
0104         count += std::count_if(queue->begin(), queue->end(), IsRunningJob());
0105     }
0106 
0107     return count;
0108 }
0109 
0110 void Scheduler::settingsChanged()
0111 {
0112     m_stallTimeout = Settings::reconnectDelay();
0113     m_abortTimeout = Settings::reconnectDelay();
0114 
0115     updateAllQueues();
0116 }
0117 
0118 void Scheduler::jobQueueChangedEvent(JobQueue *queue, JobQueue::Status status)
0119 {
0120     if (status == JobQueue::Stopped) {
0121         JobQueue::iterator it = queue->begin();
0122         JobQueue::iterator itEnd = queue->end();
0123 
0124         for (; it != itEnd; ++it) {
0125             if ((*it)->status() != Job::Stopped)
0126                 (*it)->stop();
0127         }
0128     } else
0129         updateQueue(queue);
0130 }
0131 
0132 void Scheduler::jobQueueMovedJobEvent(JobQueue *queue, Job *job)
0133 {
0134     Q_UNUSED(job)
0135 
0136     updateQueue(queue);
0137 }
0138 
0139 void Scheduler::jobQueueAddedJobEvent(JobQueue *queue, Job *job)
0140 {
0141     Q_UNUSED(job)
0142 
0143     updateQueue(queue);
0144 }
0145 
0146 void Scheduler::jobQueueAddedJobsEvent(JobQueue *queue, const QList<Job *> jobs)
0147 {
0148     Q_UNUSED(jobs)
0149 
0150     updateQueue(queue);
0151 }
0152 
0153 void Scheduler::jobQueueRemovedJobEvent(JobQueue *queue, Job *job)
0154 {
0155     Q_UNUSED(job)
0156 
0157     updateQueue(queue);
0158 }
0159 
0160 void Scheduler::jobQueueRemovedJobsEvent(JobQueue *queue, const QList<Job *> jobs)
0161 {
0162     Q_UNUSED(jobs)
0163 
0164     updateQueue(queue);
0165 }
0166 
0167 void Scheduler::jobChangedEvent(Job *job, Job::Status status)
0168 {
0169     qCDebug(KGET_DEBUG) << "Scheduler::jobChangedEvent  (job=" << job << " status=" << status << ")";
0170 
0171     if (!m_failureCheckTimer)
0172         m_failureCheckTimer = startTimer(1000);
0173 
0174     if (status != Job::Running)
0175         updateQueue(job->jobQueue());
0176 }
0177 
0178 void Scheduler::jobChangedEvent(Job *job, Job::Policy policy)
0179 {
0180     Q_UNUSED(policy)
0181 
0182     updateQueue(job->jobQueue());
0183 }
0184 
0185 void Scheduler::jobChangedEvent(Job *job, JobFailure failure)
0186 {
0187     switch (failure.status) {
0188     case None:
0189         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = None ";
0190         break;
0191     case AboutToStall:
0192         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = AboutToStall ";
0193         break;
0194     case Stall:
0195         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Stall ";
0196         break;
0197     case StallTimeout:
0198         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = StallTimeout ";
0199         break;
0200     case Abort:
0201         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Abort ";
0202         break;
0203     case AbortTimeout:
0204         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = AbortTimeout ";
0205         break;
0206     case Error:
0207         qCDebug(KGET_DEBUG) << "job = " << job << " failure (#" << failure.count << ") = Error ";
0208         break;
0209     }
0210 
0211     if (failure.status == Error) {
0212         static_cast<Transfer *>(job)->handler()->stop();
0213     } else if ( // If this happens the job just gets stopped
0214                 //  Second condition: if count >  reconnectRetries and Timeout happened trigger a stop/start BUT only if
0215                 //  10 timeouts have happened (9 of them without taking any action). This means every 10*Settings::reconnectDelay() (ex. 15s -> 150s)
0216         (failure.count > Settings::reconnectRetries() && (failure.status == StallTimeout || failure.status == AbortTimeout)
0217          && !((failure.count - Settings::reconnectRetries()) % 10))) {
0218         // FIXME reenable once a connection limit per mirror is in place BUG:262098
0219         // static_cast<Transfer*>(job)->handler()->stop();// This will trigger the changedEvent which will trigger an updateQueue call
0220         job->stop(); // FIXME remove once a connection limit per mirror is in place
0221     } else if (failure.count <= Settings::reconnectRetries() && (failure.status == StallTimeout || failure.status == AbortTimeout)) {
0222         // First  condition: if count <= reconnectRetries and Timeout happened trigger a stop/start
0223         job->stop(); // stops the job, it will be later restarted by updateQueue
0224     } else
0225         updateQueue(job->jobQueue());
0226 }
0227 
0228 void Scheduler::start()
0229 {
0230     std::for_each(m_queues.begin(), m_queues.end(), boost::bind(&JobQueue::setStatus, boost::placeholders::_1, JobQueue::Running));
0231 }
0232 
0233 void Scheduler::stop()
0234 {
0235     std::for_each(m_queues.begin(), m_queues.end(), boost::bind(&JobQueue::setStatus, boost::placeholders::_1, JobQueue::Stopped));
0236 }
0237 
0238 void Scheduler::updateQueue(JobQueue *queue)
0239 {
0240     static bool updatingQueue = false;
0241 
0242     if (!shouldUpdate() || updatingQueue)
0243         return;
0244 
0245     updatingQueue = true;
0246 
0247     int runningJobs = 0; // Jobs that are running (and not in the stallTimeout)
0248     int waitingJobs = 0; // Jobs that we leave running but are in stallTimeout. We wait for them to start downloading, while we start other ones
0249 
0250     /**
0251      * Implemented behaviour
0252      *
0253      * The scheduler allows a maximum number of runningJobs equal to the queue->maxSimultaneousJobs() setting.
0254      * If that number is not reached because of stallTimeout transfers, the scheduler allows that:
0255      *     (runningJobs + waitingJobs) < 2 * queue->maxSimultaneousJobs()
0256      * Examples (with maxSimultaneousJobs = 2):
0257      *        These are if the running jobs come first in the queue
0258      *     1) 2 runningJobs - 0 waitingJobs
0259      *     2) 1 runningJobs - up to 3 waitingJobs
0260      *     3) 0 runningJobs - up to 4 waitingJobs
0261      *        These are if the waiting jobs come first in the queue
0262      *     1) 1 waitingJobs - 2 runningJobs
0263      *     2) 2 waitingJobs - 2 runningJobs
0264      *     3) 3 waitingJobs - 1 runningJobs
0265      *     4) 4 waitingJobs - 0 runningJobs
0266      **/
0267 
0268     JobQueue::iterator it = queue->begin();
0269     JobQueue::iterator itEnd = queue->end();
0270 
0271     for (int job = 0; it != itEnd; ++it, ++job) {
0272         // qCDebug(KGET_DEBUG) << "MaxSimJobs " << queue->maxSimultaneousJobs();
0273         qCDebug(KGET_DEBUG) << "Scheduler: Evaluating job " << job;
0274 
0275         JobFailure failure = m_failedJobs.value(*it);
0276 
0277         if (runningJobs < queue->maxSimultaneousJobs() && ((runningJobs + waitingJobs) < 2 * queue->maxSimultaneousJobs())) {
0278             if ((*it)->status() == Job::Running || (*it)->status() == Job::FinishedKeepAlive) {
0279                 if (!shouldBeRunning(*it)) {
0280                     qCDebug(KGET_DEBUG) << "Scheduler:    stopping job";
0281                     (*it)->stop();
0282                 } else if (failure.status == None || failure.status == AboutToStall)
0283                     runningJobs++;
0284                 else
0285                     waitingJobs++;
0286             } else // != Job::Running
0287             {
0288                 if (shouldBeRunning(*it)) {
0289                     qCDebug(KGET_DEBUG) << "Scheduler:    starting job";
0290                     (*it)->start();
0291                     if ((failure.status == None || failure.status == AboutToStall) && (*it)->status() != Job::FinishedKeepAlive)
0292                         runningJobs++;
0293                     else
0294                         waitingJobs++;
0295                 }
0296             }
0297         } else {
0298             // Stop all the other running downloads
0299             qCDebug(KGET_DEBUG) << "Scheduler:    stopping job over maxSimJobs limit";
0300             (*it)->stop();
0301         }
0302     }
0303 
0304     updatingQueue = false;
0305 }
0306 
0307 void Scheduler::updateAllQueues()
0308 {
0309     foreach (JobQueue *queue, m_queues) {
0310         updateQueue(queue);
0311     }
0312 }
0313 
0314 bool Scheduler::shouldBeRunning(Job *job)
0315 {
0316     Job::Policy policy = job->policy();
0317     Job::Status status = job->status();
0318 
0319     if (job->jobQueue()->status() == JobQueue::Stopped) {
0320         return ((policy == Job::Start) && ((status != Job::Finished) && (status != Job::Aborted || job->error().type == Job::AutomaticRetry)));
0321     } else // JobQueue::Running
0322     {
0323         return ((policy != Job::Stop) && ((status != Job::Finished) && (status != Job::Aborted || job->error().type == Job::AutomaticRetry)));
0324     }
0325 }
0326 
0327 void Scheduler::timerEvent(QTimerEvent *event)
0328 {
0329     Q_UNUSED(event)
0330     //     qCDebug(KGET_DEBUG);
0331 
0332     if (!shouldUpdate()) {
0333         return;
0334     }
0335 
0336     foreach (JobQueue *queue, m_queues) {
0337         JobQueue::iterator it = queue->begin();
0338         JobQueue::iterator itEnd = queue->end();
0339 
0340         for (int job = 0; it != itEnd; ++it, ++job) {
0341             JobFailure failure = m_failedJobs[*it];
0342             JobFailure prevFailure = failure;
0343 
0344             if ((*it)->isStalled()) // Stall status initialization
0345             {
0346                 if (failure.status != AboutToStall && failure.status != Stall && failure.status != StallTimeout) {
0347                     failure.status = AboutToStall;
0348                     failure.time = 0;
0349                     failure.count = 0;
0350                 } else {
0351                     failure.time++;
0352 
0353                     if (failure.time >= m_stallTime + m_stallTimeout) {
0354                         failure.status = StallTimeout;
0355                         failure.count++;
0356 
0357                     } else if (failure.time >= m_stallTime)
0358                         failure.status = Stall;
0359                     else
0360                         failure.status = AboutToStall;
0361 
0362                     if (failure.status == StallTimeout)
0363                         failure.time = m_stallTime;
0364                 }
0365             } else if ((*it)->status() == Job::Aborted) // Abort status initialization
0366             {
0367                 if ((*it)->error().type != Job::AutomaticRetry) {
0368                     failure.status = Error;
0369                 } else {
0370                     if (failure.status != Abort) {
0371                         failure.status = Abort;
0372                         failure.time = 0;
0373                         failure.count = 0;
0374                     } else {
0375                         failure.time++;
0376                         failure.count++;
0377 
0378                         if (failure.time >= m_abortTimeout) {
0379                             failure.status = AbortTimeout;
0380                             failure.count++;
0381                         }
0382 
0383                         if (failure.status == AbortTimeout)
0384                             failure.time = 0;
0385                     }
0386                 }
0387             } else if ((*it)->isWorking()) {
0388                 failure = JobFailure();
0389             }
0390 
0391             if (failure.isValid()) // A failure has been detected
0392                 m_failedJobs[*it] = failure;
0393             else // No failure detected, remove it
0394                 m_failedJobs.remove(*it);
0395 
0396             //             if(failure.isValid() || prevFailure.isValid())
0397             //                 qCDebug(KGET_DEBUG) << "failure = " << failure.status << " T=" << failure.time << " prevFailure = " << prevFailure.status;
0398 
0399             if (failure.status != prevFailure.status)
0400                 jobChangedEvent(*it, failure); // Notify the scheduler
0401         }
0402     }
0403 }
0404 
0405 #include "moc_scheduler.cpp"