File indexing completed on 2024-04-14 03:53:00

0001 /*
0002     This file is part of the KDE libraries
0003     SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
0004     SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
0005     SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
0006 
0007     SPDX-License-Identifier: LGPL-2.0-only
0008 */
0009 
0010 #include "scheduler.h"
0011 #include "scheduler_p.h"
0012 
0013 #include "connection_p.h"
0014 #include "job_p.h"
0015 #include "kprotocolmanager_p.h"
0016 #include "sessiondata_p.h"
0017 #include "worker_p.h"
0018 #include "workerconfig.h"
0019 
0020 #include <kprotocolinfo.h>
0021 #include <kprotocolmanager.h>
0022 
0023 #ifndef KIO_ANDROID_STUB
0024 #include <QDBusConnection>
0025 #include <QDBusMessage>
0026 #endif
0027 #include <QHash>
0028 #include <QThread>
0029 #include <QThreadStorage>
0030 
0031 // Workers may be idle for a certain time (3 minutes) before they are killed.
0032 static const int s_idleWorkerLifetime = 3 * 60;
0033 
0034 using namespace KIO;
0035 
0036 static inline Worker *jobSWorker(SimpleJob *job)
0037 {
0038     return SimpleJobPrivate::get(job)->m_worker;
0039 }
0040 
0041 static inline int jobCommand(SimpleJob *job)
0042 {
0043     return SimpleJobPrivate::get(job)->m_command;
0044 }
0045 
0046 static inline void startJob(SimpleJob *job, Worker *worker)
0047 {
0048     SimpleJobPrivate::get(job)->start(worker);
0049 }
0050 
0051 class KIO::SchedulerPrivate
0052 {
0053 public:
0054     SchedulerPrivate()
0055         : q(new Scheduler())
0056     {
0057     }
0058 
0059     ~SchedulerPrivate()
0060     {
0061         removeWorkerOnHold();
0062         delete q;
0063         q = nullptr;
0064         qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all workers
0065     }
0066 
0067     SchedulerPrivate(const SchedulerPrivate &) = delete;
0068     SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
0069 
0070     Scheduler *q;
0071 
0072     Worker *m_workerOnHold = nullptr;
0073     QUrl m_urlOnHold;
0074     bool m_ignoreConfigReparse = false;
0075 
0076     SessionData sessionData;
0077 
0078     void doJob(SimpleJob *job);
0079     void setJobPriority(SimpleJob *job, int priority);
0080     void cancelJob(SimpleJob *job);
0081     void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
0082     void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
0083     void removeWorkerOnHold();
0084     Worker *heldWorkerForJob(KIO::SimpleJob *job);
0085     bool isWorkerOnHoldFor(const QUrl &url);
0086     void updateInternalMetaData(SimpleJob *job);
0087 
0088     MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url);
0089     void setupWorker(KIO::Worker *worker,
0090                      const QUrl &url,
0091                      const QString &protocol,
0092                      const QStringList &proxyList,
0093                      bool newWorker,
0094                      const KIO::MetaData *config = nullptr);
0095 
0096     void slotWorkerDied(KIO::Worker *worker);
0097 
0098 #ifndef KIO_ANDROID_STUB
0099     void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
0100 #endif
0101 
0102     ProtoQueue *protoQ(const QString &protocol, const QString &host);
0103 
0104 private:
0105     QHash<QString, ProtoQueue *> m_protocols;
0106 };
0107 
0108 static QThreadStorage<SchedulerPrivate *> s_storage;
0109 static SchedulerPrivate *schedulerPrivate()
0110 {
0111     if (!s_storage.hasLocalData()) {
0112         s_storage.setLocalData(new SchedulerPrivate);
0113     }
0114     return s_storage.localData();
0115 }
0116 
0117 Scheduler *Scheduler::self()
0118 {
0119     return schedulerPrivate()->q;
0120 }
0121 
0122 SchedulerPrivate *Scheduler::d_func()
0123 {
0124     return schedulerPrivate();
0125 }
0126 
0127 // static
0128 Scheduler *scheduler()
0129 {
0130     return schedulerPrivate()->q;
0131 }
0132 
0133 ////////////////////////////
0134 
0135 int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const
0136 {
0137     Q_ASSERT(newPriority >= -10 && newPriority <= 10);
0138     newPriority = qBound(-10, newPriority, 10);
0139     int unbiasedSerial = oldSerial % m_jobsPerPriority;
0140     return unbiasedSerial + newPriority * m_jobsPerPriority;
0141 }
0142 
0143 WorkerManager::WorkerManager()
0144 {
0145     m_grimTimer.setSingleShot(true);
0146     connect(&m_grimTimer, &QTimer::timeout, this, &WorkerManager::grimReaper);
0147 }
0148 
0149 WorkerManager::~WorkerManager()
0150 {
0151     grimReaper();
0152 }
0153 
0154 void WorkerManager::returnWorker(Worker *worker)
0155 {
0156     Q_ASSERT(worker);
0157     worker->setIdle();
0158     m_idleWorkers.insert(worker->host(), worker);
0159     scheduleGrimReaper();
0160 }
0161 
0162 Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
0163 {
0164     Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
0165     if (worker) {
0166         return worker;
0167     }
0168 
0169     QUrl url = SimpleJobPrivate::get(job)->m_url;
0170     // TODO take port, username and password into account
0171     QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(url.host());
0172     if (it == m_idleWorkers.end()) {
0173         it = m_idleWorkers.begin();
0174     }
0175     if (it == m_idleWorkers.end()) {
0176         return nullptr;
0177     }
0178     worker = it.value();
0179     m_idleWorkers.erase(it);
0180     return worker;
0181 }
0182 
0183 bool WorkerManager::removeWorker(Worker *worker)
0184 {
0185     // ### performance not so great
0186     QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
0187     for (; it != m_idleWorkers.end(); ++it) {
0188         if (it.value() == worker) {
0189             m_idleWorkers.erase(it);
0190             return true;
0191         }
0192     }
0193     return false;
0194 }
0195 
0196 void WorkerManager::clear()
0197 {
0198     m_idleWorkers.clear();
0199 }
0200 
0201 QList<Worker *> WorkerManager::allWorkers() const
0202 {
0203     return m_idleWorkers.values();
0204 }
0205 
0206 void WorkerManager::scheduleGrimReaper()
0207 {
0208     if (!m_grimTimer.isActive()) {
0209         m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
0210     }
0211 }
0212 
0213 // private slot
0214 void WorkerManager::grimReaper()
0215 {
0216     QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
0217     while (it != m_idleWorkers.end()) {
0218         Worker *worker = it.value();
0219         if (worker->idleTime() >= s_idleWorkerLifetime) {
0220             it = m_idleWorkers.erase(it);
0221             if (worker->job()) {
0222                 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
0223             }
0224             // avoid invoking slotWorkerDied() because its cleanup services are not needed
0225             worker->kill();
0226         } else {
0227             ++it;
0228         }
0229     }
0230     if (!m_idleWorkers.isEmpty()) {
0231         scheduleGrimReaper();
0232     }
0233 }
0234 
0235 int HostQueue::lowestSerial() const
0236 {
0237     QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin();
0238     if (first != m_queuedJobs.constEnd()) {
0239         return first.key();
0240     }
0241     return SerialPicker::maxSerial;
0242 }
0243 
0244 void HostQueue::queueJob(SimpleJob *job)
0245 {
0246     const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
0247     Q_ASSERT(serial != 0);
0248     Q_ASSERT(!m_queuedJobs.contains(serial));
0249     Q_ASSERT(!m_runningJobs.contains(job));
0250     m_queuedJobs.insert(serial, job);
0251 }
0252 
0253 SimpleJob *HostQueue::takeFirstInQueue()
0254 {
0255     Q_ASSERT(!m_queuedJobs.isEmpty());
0256     QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
0257     SimpleJob *job = first.value();
0258     m_queuedJobs.erase(first);
0259     m_runningJobs.insert(job);
0260     return job;
0261 }
0262 
0263 bool HostQueue::removeJob(SimpleJob *job)
0264 {
0265     const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
0266     if (m_runningJobs.remove(job)) {
0267         Q_ASSERT(!m_queuedJobs.contains(serial));
0268         return true;
0269     }
0270     if (m_queuedJobs.remove(serial)) {
0271         return true;
0272     }
0273     return false;
0274 }
0275 
0276 QList<Worker *> HostQueue::allWorkers() const
0277 {
0278     QList<Worker *> ret;
0279     ret.reserve(m_runningJobs.size());
0280     for (SimpleJob *job : m_runningJobs) {
0281         Worker *worker = jobSWorker(job);
0282         Q_ASSERT(worker);
0283         ret.append(worker);
0284     }
0285     return ret;
0286 }
0287 
0288 static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
0289 {
0290     Q_UNUSED(queuesBySerial);
0291 #ifdef SCHEDULER_DEBUG
0292     // a host queue may *never* be in queuesBySerial twice.
0293     QSet<HostQueue *> seen;
0294     auto it = queuesBySerial->cbegin();
0295     for (; it != queuesBySerial->cend(); ++it) {
0296         Q_ASSERT(!seen.contains(it.value()));
0297         seen.insert(it.value());
0298     }
0299 #endif
0300 }
0301 
0302 static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount)
0303 {
0304     Q_UNUSED(queues);
0305     Q_UNUSED(runningJobsCount);
0306 #ifdef SCHEDULER_DEBUG
0307     int realRunningJobsCount = 0;
0308     auto it = queues->cbegin();
0309     for (; it != queues->cend(); ++it) {
0310         realRunningJobsCount += it.value().runningJobsCount();
0311     }
0312     Q_ASSERT(realRunningJobsCount == runningJobsCount);
0313 
0314     // ...and of course we may never run the same job twice!
0315     QSet<SimpleJob *> seenJobs;
0316     auto it2 = queues->cbegin();
0317     for (; it2 != queues->cend(); ++it2) {
0318         for (SimpleJob *job : it2.value().runningJobs()) {
0319             Q_ASSERT(!seenJobs.contains(job));
0320             seenJobs.insert(job);
0321         }
0322     }
0323 #endif
0324 }
0325 
0326 ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
0327     : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
0328     , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
0329     , m_runningJobsCount(0)
0330 
0331 {
0332     /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
0333                  << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
0334     Q_ASSERT(m_maxConnectionsPerHost >= 1);
0335     Q_ASSERT(maxWorkers >= maxWorkersPerHost);
0336     m_startJobTimer.setSingleShot(true);
0337     connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob);
0338 }
0339 
0340 ProtoQueue::~ProtoQueue()
0341 {
0342     // Gather list of all workers first
0343     const QList<Worker *> workers = allWorkers();
0344     // Clear the idle workers in the manager to avoid dangling pointers
0345     m_workerManager.clear();
0346     for (Worker *worker : workers) {
0347         // kill the worker process and remove the interface in our process
0348         worker->kill();
0349     }
0350 }
0351 
0352 void ProtoQueue::queueJob(SimpleJob *job)
0353 {
0354     QString hostname = SimpleJobPrivate::get(job)->m_url.host();
0355     HostQueue &hq = m_queuesByHostname[hostname];
0356     const int prevLowestSerial = hq.lowestSerial();
0357     Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
0358 
0359     // nevert insert a job twice
0360     Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
0361     SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
0362 
0363     const bool wasQueueEmpty = hq.isQueueEmpty();
0364     hq.queueJob(job);
0365     // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
0366     // the queue's lowest serial job may have changed, so update the ordered list of queues.
0367     // however, we ignore all jobs that would cause more connections to a host than allowed.
0368     if (prevLowestSerial != hq.lowestSerial()) {
0369         if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
0370             // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
0371             if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
0372                 Q_UNUSED(wasQueueEmpty);
0373                 Q_ASSERT(wasQueueEmpty);
0374             }
0375             m_queuesBySerial.insert(hq.lowestSerial(), &hq);
0376         } else {
0377 #ifdef SCHEDULER_DEBUG
0378             // ### this assertion may fail if the limits were modified at runtime!
0379             // if the per-host connection limit is already reached the host queue's lowest serial
0380             // should not be queued.
0381             Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
0382 #endif
0383         }
0384     }
0385     // just in case; startAJob() will refuse to start a job if it shouldn't.
0386     m_startJobTimer.start();
0387 
0388     ensureNoDuplicates(&m_queuesBySerial);
0389 }
0390 
0391 void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio)
0392 {
0393     SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
0394     QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host());
0395     if (it == m_queuesByHostname.end()) {
0396         return;
0397     }
0398     HostQueue &hq = it.value();
0399     const int prevLowestSerial = hq.lowestSerial();
0400     if (hq.isJobRunning(job) || !hq.removeJob(job)) {
0401         return;
0402     }
0403     jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
0404     hq.queueJob(job);
0405     const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
0406     // the host queue might be absent from m_queuesBySerial because the connections per host limit
0407     // for that host has been reached.
0408     if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
0409         m_queuesBySerial.insert(hq.lowestSerial(), &hq);
0410     }
0411     ensureNoDuplicates(&m_queuesBySerial);
0412 }
0413 
0414 void ProtoQueue::removeJob(SimpleJob *job)
0415 {
0416     SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
0417     HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
0418     const int prevLowestSerial = hq.lowestSerial();
0419     const int prevRunningJobs = hq.runningJobsCount();
0420 
0421     Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
0422 
0423     if (hq.removeJob(job)) {
0424         if (hq.lowestSerial() != prevLowestSerial) {
0425             // we have dequeued the not yet running job with the lowest serial
0426             Q_ASSERT(!jobPriv->m_worker);
0427             Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
0428             if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
0429                 // make sure that the queue was not scheduled for a good reason
0430                 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
0431             }
0432         } else {
0433             if (prevRunningJobs != hq.runningJobsCount()) {
0434                 // we have dequeued a previously running job
0435                 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
0436                 m_runningJobsCount--;
0437                 Q_ASSERT(m_runningJobsCount >= 0);
0438             }
0439         }
0440         if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
0441             // this may be a no-op, but it's faster than first checking if it's already in.
0442             m_queuesBySerial.insert(hq.lowestSerial(), &hq);
0443         }
0444 
0445         if (hq.isEmpty()) {
0446             // no queued jobs, no running jobs. this destroys hq from above.
0447             m_queuesByHostname.remove(jobPriv->m_url.host());
0448         }
0449 
0450         if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
0451             m_workerManager.returnWorker(jobPriv->m_worker);
0452         }
0453         // just in case; startAJob() will refuse to start a job if it shouldn't.
0454         m_startJobTimer.start();
0455     }
0456 
0457     ensureNoDuplicates(&m_queuesBySerial);
0458 }
0459 
0460 Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
0461 {
0462     int error;
0463     QString errortext;
0464     Worker *worker = Worker::createWorker(protocol, url, error, errortext);
0465     if (worker) {
0466         connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
0467             schedulerPrivate()->slotWorkerDied(worker);
0468         });
0469     } else {
0470         qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
0471         if (job) {
0472             job->slotError(error, errortext);
0473         }
0474     }
0475     return worker;
0476 }
0477 
0478 bool ProtoQueue::removeWorker(KIO::Worker *worker)
0479 {
0480     const bool removed = m_workerManager.removeWorker(worker);
0481     return removed;
0482 }
0483 
0484 QList<Worker *> ProtoQueue::allWorkers() const
0485 {
0486     QList<Worker *> ret(m_workerManager.allWorkers());
0487     auto it = m_queuesByHostname.cbegin();
0488     for (; it != m_queuesByHostname.cend(); ++it) {
0489         ret.append(it.value().allWorkers());
0490     }
0491 
0492     return ret;
0493 }
0494 
0495 // private slot
0496 void ProtoQueue::startAJob()
0497 {
0498     ensureNoDuplicates(&m_queuesBySerial);
0499     verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
0500 
0501 #ifdef SCHEDULER_DEBUG
0502     // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
0503     auto it = m_queuesByHostname.cbegin();
0504     for (; it != m_queuesByHostname.cend(); ++it) {
0505         const QList<KIO::SimpleJob *> list = it.value().runningJobs();
0506         for (SimpleJob *job : list) {
0507             // qDebug() << SimpleJobPrivate::get(job)->m_url;
0508         }
0509     }
0510 #endif
0511     if (m_runningJobsCount >= m_maxConnectionsTotal) {
0512 #ifdef SCHEDULER_DEBUG
0513         // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
0514 #endif
0515         return;
0516     }
0517 
0518     QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
0519     if (first != m_queuesBySerial.end()) {
0520         // pick a job and maintain the queue invariant: lower serials first
0521         HostQueue *hq = first.value();
0522         const int prevLowestSerial = first.key();
0523         Q_UNUSED(prevLowestSerial);
0524         Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
0525         // the following assertions should hold due to queueJob(), takeFirstInQueue() and
0526         // removeJob() being correct
0527         Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
0528         SimpleJob *startingJob = hq->takeFirstInQueue();
0529         Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
0530         Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
0531 
0532         m_queuesBySerial.erase(first);
0533         // we've increased hq's runningJobsCount() by calling nexStartingJob()
0534         // so we need to check again.
0535         if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
0536             m_queuesBySerial.insert(hq->lowestSerial(), hq);
0537         }
0538 
0539         // always increase m_runningJobsCount because it's correct if there is a worker and if there
0540         // is no worker, removeJob() will balance the number again. removeJob() would decrease the
0541         // number too much otherwise.
0542         // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
0543         // so increase the count here already.
0544         m_runningJobsCount++;
0545 
0546         bool isNewWorker = false;
0547         Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
0548         SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
0549         if (!worker) {
0550             isNewWorker = true;
0551             worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
0552         }
0553 
0554         if (worker) {
0555             jobPriv->m_worker = worker;
0556             schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewWorker);
0557             startJob(startingJob, worker);
0558         } else {
0559             // dispose of our records about the job and mark the job as unknown
0560             // (to prevent crashes later)
0561             // note that the job's slotError() can have called removeJob() first, so check that
0562             // it's not a ghost job with null serial already.
0563             if (jobPriv->m_schedSerial) {
0564                 removeJob(startingJob);
0565                 jobPriv->m_schedSerial = 0;
0566             }
0567         }
0568     } else {
0569 #ifdef SCHEDULER_DEBUG
0570         // qDebug() << "not starting any jobs because there is no queued job.";
0571 #endif
0572     }
0573 
0574     if (!m_queuesBySerial.isEmpty()) {
0575         m_startJobTimer.start();
0576     }
0577 }
0578 
0579 Scheduler::Scheduler()
0580 {
0581     setObjectName(QStringLiteral("scheduler"));
0582 
0583 #ifndef KIO_ANDROID_STUB
0584     const QString dbusPath = QStringLiteral("/KIO/Scheduler");
0585     const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
0586     QDBusConnection dbus = QDBusConnection::sessionBus();
0587     // Not needed, right? We just want to emit two signals.
0588     // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
0589     //                    QDBusConnection::ExportScriptableSignals);
0590     dbus.connect(QString(),
0591                  dbusPath,
0592                  dbusInterface,
0593                  QStringLiteral("reparseSlaveConfiguration"),
0594                  this,
0595                  SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
0596 #endif
0597 }
0598 
0599 Scheduler::~Scheduler()
0600 {
0601 }
0602 
0603 void Scheduler::doJob(SimpleJob *job)
0604 {
0605     schedulerPrivate()->doJob(job);
0606 }
0607 
0608 // static
0609 void Scheduler::cancelJob(SimpleJob *job)
0610 {
0611     schedulerPrivate()->cancelJob(job);
0612 }
0613 
0614 void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
0615 {
0616     schedulerPrivate()->jobFinished(job, worker);
0617 }
0618 
0619 void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
0620 {
0621     schedulerPrivate()->putWorkerOnHold(job, url);
0622 }
0623 
0624 void Scheduler::removeWorkerOnHold()
0625 {
0626     schedulerPrivate()->removeWorkerOnHold();
0627 }
0628 
0629 bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
0630 {
0631     return schedulerPrivate()->isWorkerOnHoldFor(url);
0632 }
0633 
0634 void Scheduler::updateInternalMetaData(SimpleJob *job)
0635 {
0636     schedulerPrivate()->updateInternalMetaData(job);
0637 }
0638 
0639 void Scheduler::emitReparseSlaveConfiguration()
0640 {
0641 #ifndef KIO_ANDROID_STUB
0642     // Do it immediately in this process, otherwise we might send a request before reparsing
0643     // (e.g. when changing useragent in the plugin)
0644     schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
0645 #endif
0646 
0647     schedulerPrivate()->m_ignoreConfigReparse = true;
0648     Q_EMIT self()->reparseSlaveConfiguration(QString());
0649 }
0650 
0651 #ifndef KIO_ANDROID_STUB
0652 void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
0653 {
0654     if (m_ignoreConfigReparse) {
0655         // qDebug() << "Ignoring signal sent by myself";
0656         m_ignoreConfigReparse = false;
0657         return;
0658     }
0659 
0660     // qDebug() << "proto=" << proto;
0661     KProtocolManager::reparseConfiguration();
0662     WorkerConfig::self()->reset();
0663     sessionData.reset();
0664 
0665     QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto);
0666     QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd();
0667 
0668     // not found?
0669     if (it == endIt) {
0670         return;
0671     }
0672 
0673     if (!proto.isEmpty()) {
0674         endIt = it;
0675         ++endIt;
0676     }
0677 
0678     for (; it != endIt; ++it) {
0679         const QList<KIO::Worker *> list = it.value()->allWorkers();
0680         for (Worker *worker : list) {
0681             worker->send(CMD_REPARSECONFIGURATION);
0682             worker->resetHost();
0683         }
0684     }
0685 }
0686 #endif
0687 
0688 void SchedulerPrivate::doJob(SimpleJob *job)
0689 {
0690     // qDebug() << job;
0691     KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
0692     jobPriv->m_proxyList.clear();
0693     jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(job->url(), jobPriv->m_proxyList);
0694 
0695     ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host());
0696     proto->queueJob(job);
0697 }
0698 
0699 void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority)
0700 {
0701     // qDebug() << job << priority;
0702     const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
0703     if (!protocol.isEmpty()) {
0704         ProtoQueue *proto = protoQ(protocol, job->url().host());
0705         proto->changeJobPriority(job, priority);
0706     }
0707 }
0708 
0709 void SchedulerPrivate::cancelJob(SimpleJob *job)
0710 {
0711     KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
0712     // this method is called all over the place in job.cpp, so just do this check here to avoid
0713     // much boilerplate in job code.
0714     if (jobPriv->m_schedSerial == 0) {
0715         // qDebug() << "Doing nothing because I don't know job" << job;
0716         return;
0717     }
0718     Worker *worker = jobSWorker(job);
0719     // qDebug() << job << worker;
0720     jobFinished(job, worker);
0721     if (worker) {
0722         ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
0723         if (pq) {
0724             pq->removeWorker(worker);
0725         }
0726         worker->kill(); // don't use worker after this!
0727     }
0728 }
0729 
0730 void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
0731 {
0732     // qDebug() << job << worker;
0733     KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
0734 
0735     // make sure that we knew about the job!
0736     Q_ASSERT(jobPriv->m_schedSerial);
0737 
0738     ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
0739     if (pq) {
0740         pq->removeJob(job);
0741     }
0742 
0743     if (worker) {
0744         // If we have internal meta-data, tell existing KIO workers to reload
0745         // their configuration.
0746         if (jobPriv->m_internalMetaData.count()) {
0747             // qDebug() << "Updating KIO workers with new internal metadata information";
0748             ProtoQueue *queue = m_protocols.value(worker->protocol());
0749             if (queue) {
0750                 const QList<Worker *> workers = queue->allWorkers();
0751                 for (auto *runningWorker : workers) {
0752                     if (worker->host() == runningWorker->host()) {
0753                         worker->setConfig(metaDataFor(worker->protocol(), jobPriv->m_proxyList, job->url()));
0754                         /*qDebug() << "Updated configuration of" << worker->protocol()
0755                                      << "KIO worker, pid=" << worker->worker_pid();*/
0756                     }
0757                 }
0758             }
0759         }
0760         worker->setJob(nullptr);
0761         worker->disconnect(job);
0762     }
0763     jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
0764     jobPriv->m_worker = nullptr;
0765     // Clear the values in the internal metadata container since they have
0766     // already been taken care of above...
0767     jobPriv->m_internalMetaData.clear();
0768 }
0769 
0770 MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url)
0771 {
0772     const QString host = url.host();
0773     MetaData configData = WorkerConfig::self()->configData(protocol, host);
0774     sessionData.configDataFor(configData, protocol, host);
0775     if (proxyList.isEmpty()) {
0776         configData.remove(QStringLiteral("UseProxy"));
0777         configData.remove(QStringLiteral("ProxyUrls"));
0778     } else {
0779         configData[QStringLiteral("UseProxy")] = proxyList.first();
0780         configData[QStringLiteral("ProxyUrls")] = proxyList.join(QLatin1Char(','));
0781     }
0782 
0783     return configData;
0784 }
0785 
0786 void SchedulerPrivate::setupWorker(KIO::Worker *worker,
0787                                    const QUrl &url,
0788                                    const QString &protocol,
0789                                    const QStringList &proxyList,
0790                                    bool newWorker,
0791                                    const KIO::MetaData *config)
0792 {
0793     int port = url.port();
0794     if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
0795         port = 0;
0796     }
0797     const QString host = url.host();
0798     const QString user = url.userName();
0799     const QString passwd = url.password();
0800 
0801     if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
0802         MetaData configData = metaDataFor(protocol, proxyList, url);
0803         if (config) {
0804             configData += *config;
0805         }
0806 
0807         worker->setConfig(configData);
0808         worker->setProtocol(url.scheme());
0809         worker->setHost(host, port, user, passwd);
0810     }
0811 }
0812 
0813 void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
0814 {
0815     // qDebug() << worker;
0816     Q_ASSERT(worker);
0817     Q_ASSERT(!worker->isAlive());
0818     ProtoQueue *pq = m_protocols.value(worker->protocol());
0819     if (pq) {
0820         if (worker->job()) {
0821             pq->removeJob(worker->job());
0822         }
0823         // in case this was a connected worker...
0824         pq->removeWorker(worker);
0825     }
0826     if (worker == m_workerOnHold) {
0827         m_workerOnHold = nullptr;
0828         m_urlOnHold.clear();
0829     }
0830     // can't use worker->deref() here because we need to use deleteLater
0831     worker->aboutToDelete();
0832     worker->deleteLater();
0833 }
0834 
0835 void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
0836 {
0837     Worker *worker = jobSWorker(job);
0838     // qDebug() << job << url << worker;
0839     worker->disconnect(job);
0840     // prevent the fake death of the worker from trying to kill the job again;
0841     // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
0842     worker->setJob(nullptr);
0843     SimpleJobPrivate::get(job)->m_worker = nullptr;
0844 
0845     if (m_workerOnHold) {
0846         m_workerOnHold->kill();
0847     }
0848     m_workerOnHold = worker;
0849     m_urlOnHold = url;
0850     m_workerOnHold->suspend();
0851 }
0852 
0853 bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
0854 {
0855     if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
0856         return true;
0857     }
0858 
0859     return false;
0860 }
0861 
0862 Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
0863 {
0864     Worker *worker = nullptr;
0865     KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
0866 
0867     if (m_workerOnHold) {
0868         // Make sure that the job wants to do a GET or a POST, and with no offset
0869         const int cmd = jobPriv->m_command;
0870         bool canJobReuse = (cmd == CMD_GET);
0871 
0872         if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) {
0873             canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
0874             if (canJobReuse) {
0875                 KIO::MetaData outgoing = tJob->outgoingMetaData();
0876                 const QString resume = outgoing.value(QStringLiteral("resume"));
0877                 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
0878                 // qDebug() << "Resume metadata is" << resume;
0879                 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
0880             }
0881         }
0882 
0883         if (job->url() == m_urlOnHold) {
0884             if (canJobReuse) {
0885                 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
0886                 worker = m_workerOnHold;
0887             } else {
0888                 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
0889                 m_workerOnHold->kill();
0890             }
0891             m_workerOnHold = nullptr;
0892             m_urlOnHold.clear();
0893         }
0894     }
0895 
0896     return worker;
0897 }
0898 
0899 void SchedulerPrivate::removeWorkerOnHold()
0900 {
0901     // qDebug() << m_workerOnHold;
0902     if (m_workerOnHold) {
0903         m_workerOnHold->kill();
0904     }
0905     m_workerOnHold = nullptr;
0906     m_urlOnHold.clear();
0907 }
0908 
0909 ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
0910 {
0911     ProtoQueue *pq = m_protocols.value(protocol, nullptr);
0912     if (!pq) {
0913         // qDebug() << "creating ProtoQueue instance for" << protocol;
0914 
0915         const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
0916         int maxWorkersPerHost = -1;
0917         if (!host.isEmpty()) {
0918             bool ok = false;
0919             const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok);
0920             if (ok) {
0921                 maxWorkersPerHost = value;
0922             }
0923         }
0924         if (maxWorkersPerHost == -1) {
0925             maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
0926         }
0927         // Never allow maxWorkersPerHost to exceed maxWorkers.
0928         pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
0929         m_protocols.insert(protocol, pq);
0930     }
0931     return pq;
0932 }
0933 
0934 void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
0935 {
0936     KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
0937     // Preserve all internal meta-data so they can be sent back to the
0938     // KIO workers as needed...
0939     const QUrl jobUrl = job->url();
0940 
0941     const QLatin1String currHostToken("{internal~currenthost}");
0942     const QLatin1String allHostsToken("{internal~allhosts}");
0943     // qDebug() << job << jobPriv->m_internalMetaData;
0944     QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
0945     while (it.hasNext()) {
0946         it.next();
0947         if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) {
0948             WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value());
0949         } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) {
0950             WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value());
0951         }
0952     }
0953 }
0954 
0955 #include "moc_scheduler.cpp"
0956 #include "moc_scheduler_p.cpp"