File indexing completed on 2024-04-14 03:53:00
0001 /* 0002 This file is part of the KDE libraries 0003 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> 0004 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org> 0005 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> 0006 0007 SPDX-License-Identifier: LGPL-2.0-only 0008 */ 0009 0010 #include "scheduler.h" 0011 #include "scheduler_p.h" 0012 0013 #include "connection_p.h" 0014 #include "job_p.h" 0015 #include "kprotocolmanager_p.h" 0016 #include "sessiondata_p.h" 0017 #include "worker_p.h" 0018 #include "workerconfig.h" 0019 0020 #include <kprotocolinfo.h> 0021 #include <kprotocolmanager.h> 0022 0023 #ifndef KIO_ANDROID_STUB 0024 #include <QDBusConnection> 0025 #include <QDBusMessage> 0026 #endif 0027 #include <QHash> 0028 #include <QThread> 0029 #include <QThreadStorage> 0030 0031 // Workers may be idle for a certain time (3 minutes) before they are killed. 0032 static const int s_idleWorkerLifetime = 3 * 60; 0033 0034 using namespace KIO; 0035 0036 static inline Worker *jobSWorker(SimpleJob *job) 0037 { 0038 return SimpleJobPrivate::get(job)->m_worker; 0039 } 0040 0041 static inline int jobCommand(SimpleJob *job) 0042 { 0043 return SimpleJobPrivate::get(job)->m_command; 0044 } 0045 0046 static inline void startJob(SimpleJob *job, Worker *worker) 0047 { 0048 SimpleJobPrivate::get(job)->start(worker); 0049 } 0050 0051 class KIO::SchedulerPrivate 0052 { 0053 public: 0054 SchedulerPrivate() 0055 : q(new Scheduler()) 0056 { 0057 } 0058 0059 ~SchedulerPrivate() 0060 { 0061 removeWorkerOnHold(); 0062 delete q; 0063 q = nullptr; 0064 qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all workers 0065 } 0066 0067 SchedulerPrivate(const SchedulerPrivate &) = delete; 0068 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete; 0069 0070 Scheduler *q; 0071 0072 Worker *m_workerOnHold = nullptr; 0073 QUrl m_urlOnHold; 0074 bool m_ignoreConfigReparse = false; 0075 0076 SessionData sessionData; 0077 0078 void doJob(SimpleJob *job); 0079 void setJobPriority(SimpleJob *job, int priority); 0080 void cancelJob(SimpleJob *job); 0081 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker); 0082 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url); 0083 void removeWorkerOnHold(); 0084 Worker *heldWorkerForJob(KIO::SimpleJob *job); 0085 bool isWorkerOnHoldFor(const QUrl &url); 0086 void updateInternalMetaData(SimpleJob *job); 0087 0088 MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url); 0089 void setupWorker(KIO::Worker *worker, 0090 const QUrl &url, 0091 const QString &protocol, 0092 const QStringList &proxyList, 0093 bool newWorker, 0094 const KIO::MetaData *config = nullptr); 0095 0096 void slotWorkerDied(KIO::Worker *worker); 0097 0098 #ifndef KIO_ANDROID_STUB 0099 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &); 0100 #endif 0101 0102 ProtoQueue *protoQ(const QString &protocol, const QString &host); 0103 0104 private: 0105 QHash<QString, ProtoQueue *> m_protocols; 0106 }; 0107 0108 static QThreadStorage<SchedulerPrivate *> s_storage; 0109 static SchedulerPrivate *schedulerPrivate() 0110 { 0111 if (!s_storage.hasLocalData()) { 0112 s_storage.setLocalData(new SchedulerPrivate); 0113 } 0114 return s_storage.localData(); 0115 } 0116 0117 Scheduler *Scheduler::self() 0118 { 0119 return schedulerPrivate()->q; 0120 } 0121 0122 SchedulerPrivate *Scheduler::d_func() 0123 { 0124 return schedulerPrivate(); 0125 } 0126 0127 // static 0128 Scheduler *scheduler() 0129 { 0130 return schedulerPrivate()->q; 0131 } 0132 0133 //////////////////////////// 0134 0135 int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const 0136 { 0137 Q_ASSERT(newPriority >= -10 && newPriority <= 10); 0138 newPriority = qBound(-10, newPriority, 10); 0139 int unbiasedSerial = oldSerial % m_jobsPerPriority; 0140 return unbiasedSerial + newPriority * m_jobsPerPriority; 0141 } 0142 0143 WorkerManager::WorkerManager() 0144 { 0145 m_grimTimer.setSingleShot(true); 0146 connect(&m_grimTimer, &QTimer::timeout, this, &WorkerManager::grimReaper); 0147 } 0148 0149 WorkerManager::~WorkerManager() 0150 { 0151 grimReaper(); 0152 } 0153 0154 void WorkerManager::returnWorker(Worker *worker) 0155 { 0156 Q_ASSERT(worker); 0157 worker->setIdle(); 0158 m_idleWorkers.insert(worker->host(), worker); 0159 scheduleGrimReaper(); 0160 } 0161 0162 Worker *WorkerManager::takeWorkerForJob(SimpleJob *job) 0163 { 0164 Worker *worker = schedulerPrivate()->heldWorkerForJob(job); 0165 if (worker) { 0166 return worker; 0167 } 0168 0169 QUrl url = SimpleJobPrivate::get(job)->m_url; 0170 // TODO take port, username and password into account 0171 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(url.host()); 0172 if (it == m_idleWorkers.end()) { 0173 it = m_idleWorkers.begin(); 0174 } 0175 if (it == m_idleWorkers.end()) { 0176 return nullptr; 0177 } 0178 worker = it.value(); 0179 m_idleWorkers.erase(it); 0180 return worker; 0181 } 0182 0183 bool WorkerManager::removeWorker(Worker *worker) 0184 { 0185 // ### performance not so great 0186 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); 0187 for (; it != m_idleWorkers.end(); ++it) { 0188 if (it.value() == worker) { 0189 m_idleWorkers.erase(it); 0190 return true; 0191 } 0192 } 0193 return false; 0194 } 0195 0196 void WorkerManager::clear() 0197 { 0198 m_idleWorkers.clear(); 0199 } 0200 0201 QList<Worker *> WorkerManager::allWorkers() const 0202 { 0203 return m_idleWorkers.values(); 0204 } 0205 0206 void WorkerManager::scheduleGrimReaper() 0207 { 0208 if (!m_grimTimer.isActive()) { 0209 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000); 0210 } 0211 } 0212 0213 // private slot 0214 void WorkerManager::grimReaper() 0215 { 0216 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); 0217 while (it != m_idleWorkers.end()) { 0218 Worker *worker = it.value(); 0219 if (worker->idleTime() >= s_idleWorkerLifetime) { 0220 it = m_idleWorkers.erase(it); 0221 if (worker->job()) { 0222 // qDebug() << "Idle worker" << worker << "still has job" << worker->job(); 0223 } 0224 // avoid invoking slotWorkerDied() because its cleanup services are not needed 0225 worker->kill(); 0226 } else { 0227 ++it; 0228 } 0229 } 0230 if (!m_idleWorkers.isEmpty()) { 0231 scheduleGrimReaper(); 0232 } 0233 } 0234 0235 int HostQueue::lowestSerial() const 0236 { 0237 QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin(); 0238 if (first != m_queuedJobs.constEnd()) { 0239 return first.key(); 0240 } 0241 return SerialPicker::maxSerial; 0242 } 0243 0244 void HostQueue::queueJob(SimpleJob *job) 0245 { 0246 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 0247 Q_ASSERT(serial != 0); 0248 Q_ASSERT(!m_queuedJobs.contains(serial)); 0249 Q_ASSERT(!m_runningJobs.contains(job)); 0250 m_queuedJobs.insert(serial, job); 0251 } 0252 0253 SimpleJob *HostQueue::takeFirstInQueue() 0254 { 0255 Q_ASSERT(!m_queuedJobs.isEmpty()); 0256 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); 0257 SimpleJob *job = first.value(); 0258 m_queuedJobs.erase(first); 0259 m_runningJobs.insert(job); 0260 return job; 0261 } 0262 0263 bool HostQueue::removeJob(SimpleJob *job) 0264 { 0265 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 0266 if (m_runningJobs.remove(job)) { 0267 Q_ASSERT(!m_queuedJobs.contains(serial)); 0268 return true; 0269 } 0270 if (m_queuedJobs.remove(serial)) { 0271 return true; 0272 } 0273 return false; 0274 } 0275 0276 QList<Worker *> HostQueue::allWorkers() const 0277 { 0278 QList<Worker *> ret; 0279 ret.reserve(m_runningJobs.size()); 0280 for (SimpleJob *job : m_runningJobs) { 0281 Worker *worker = jobSWorker(job); 0282 Q_ASSERT(worker); 0283 ret.append(worker); 0284 } 0285 return ret; 0286 } 0287 0288 static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) 0289 { 0290 Q_UNUSED(queuesBySerial); 0291 #ifdef SCHEDULER_DEBUG 0292 // a host queue may *never* be in queuesBySerial twice. 0293 QSet<HostQueue *> seen; 0294 auto it = queuesBySerial->cbegin(); 0295 for (; it != queuesBySerial->cend(); ++it) { 0296 Q_ASSERT(!seen.contains(it.value())); 0297 seen.insert(it.value()); 0298 } 0299 #endif 0300 } 0301 0302 static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount) 0303 { 0304 Q_UNUSED(queues); 0305 Q_UNUSED(runningJobsCount); 0306 #ifdef SCHEDULER_DEBUG 0307 int realRunningJobsCount = 0; 0308 auto it = queues->cbegin(); 0309 for (; it != queues->cend(); ++it) { 0310 realRunningJobsCount += it.value().runningJobsCount(); 0311 } 0312 Q_ASSERT(realRunningJobsCount == runningJobsCount); 0313 0314 // ...and of course we may never run the same job twice! 0315 QSet<SimpleJob *> seenJobs; 0316 auto it2 = queues->cbegin(); 0317 for (; it2 != queues->cend(); ++it2) { 0318 for (SimpleJob *job : it2.value().runningJobs()) { 0319 Q_ASSERT(!seenJobs.contains(job)); 0320 seenJobs.insert(job); 0321 } 0322 } 0323 #endif 0324 } 0325 0326 ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost) 0327 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers) 0328 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost)) 0329 , m_runningJobsCount(0) 0330 0331 { 0332 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal 0333 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/ 0334 Q_ASSERT(m_maxConnectionsPerHost >= 1); 0335 Q_ASSERT(maxWorkers >= maxWorkersPerHost); 0336 m_startJobTimer.setSingleShot(true); 0337 connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob); 0338 } 0339 0340 ProtoQueue::~ProtoQueue() 0341 { 0342 // Gather list of all workers first 0343 const QList<Worker *> workers = allWorkers(); 0344 // Clear the idle workers in the manager to avoid dangling pointers 0345 m_workerManager.clear(); 0346 for (Worker *worker : workers) { 0347 // kill the worker process and remove the interface in our process 0348 worker->kill(); 0349 } 0350 } 0351 0352 void ProtoQueue::queueJob(SimpleJob *job) 0353 { 0354 QString hostname = SimpleJobPrivate::get(job)->m_url.host(); 0355 HostQueue &hq = m_queuesByHostname[hostname]; 0356 const int prevLowestSerial = hq.lowestSerial(); 0357 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 0358 0359 // nevert insert a job twice 0360 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); 0361 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); 0362 0363 const bool wasQueueEmpty = hq.isQueueEmpty(); 0364 hq.queueJob(job); 0365 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... 0366 // the queue's lowest serial job may have changed, so update the ordered list of queues. 0367 // however, we ignore all jobs that would cause more connections to a host than allowed. 0368 if (prevLowestSerial != hq.lowestSerial()) { 0369 if (hq.runningJobsCount() < m_maxConnectionsPerHost) { 0370 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs 0371 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 0372 Q_UNUSED(wasQueueEmpty); 0373 Q_ASSERT(wasQueueEmpty); 0374 } 0375 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0376 } else { 0377 #ifdef SCHEDULER_DEBUG 0378 // ### this assertion may fail if the limits were modified at runtime! 0379 // if the per-host connection limit is already reached the host queue's lowest serial 0380 // should not be queued. 0381 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); 0382 #endif 0383 } 0384 } 0385 // just in case; startAJob() will refuse to start a job if it shouldn't. 0386 m_startJobTimer.start(); 0387 0388 ensureNoDuplicates(&m_queuesBySerial); 0389 } 0390 0391 void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio) 0392 { 0393 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 0394 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host()); 0395 if (it == m_queuesByHostname.end()) { 0396 return; 0397 } 0398 HostQueue &hq = it.value(); 0399 const int prevLowestSerial = hq.lowestSerial(); 0400 if (hq.isJobRunning(job) || !hq.removeJob(job)) { 0401 return; 0402 } 0403 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio); 0404 hq.queueJob(job); 0405 const bool needReinsert = hq.lowestSerial() != prevLowestSerial; 0406 // the host queue might be absent from m_queuesBySerial because the connections per host limit 0407 // for that host has been reached. 0408 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) { 0409 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0410 } 0411 ensureNoDuplicates(&m_queuesBySerial); 0412 } 0413 0414 void ProtoQueue::removeJob(SimpleJob *job) 0415 { 0416 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 0417 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; 0418 const int prevLowestSerial = hq.lowestSerial(); 0419 const int prevRunningJobs = hq.runningJobsCount(); 0420 0421 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 0422 0423 if (hq.removeJob(job)) { 0424 if (hq.lowestSerial() != prevLowestSerial) { 0425 // we have dequeued the not yet running job with the lowest serial 0426 Q_ASSERT(!jobPriv->m_worker); 0427 Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); 0428 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 0429 // make sure that the queue was not scheduled for a good reason 0430 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); 0431 } 0432 } else { 0433 if (prevRunningJobs != hq.runningJobsCount()) { 0434 // we have dequeued a previously running job 0435 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); 0436 m_runningJobsCount--; 0437 Q_ASSERT(m_runningJobsCount >= 0); 0438 } 0439 } 0440 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { 0441 // this may be a no-op, but it's faster than first checking if it's already in. 0442 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0443 } 0444 0445 if (hq.isEmpty()) { 0446 // no queued jobs, no running jobs. this destroys hq from above. 0447 m_queuesByHostname.remove(jobPriv->m_url.host()); 0448 } 0449 0450 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) { 0451 m_workerManager.returnWorker(jobPriv->m_worker); 0452 } 0453 // just in case; startAJob() will refuse to start a job if it shouldn't. 0454 m_startJobTimer.start(); 0455 } 0456 0457 ensureNoDuplicates(&m_queuesBySerial); 0458 } 0459 0460 Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url) 0461 { 0462 int error; 0463 QString errortext; 0464 Worker *worker = Worker::createWorker(protocol, url, error, errortext); 0465 if (worker) { 0466 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) { 0467 schedulerPrivate()->slotWorkerDied(worker); 0468 }); 0469 } else { 0470 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext; 0471 if (job) { 0472 job->slotError(error, errortext); 0473 } 0474 } 0475 return worker; 0476 } 0477 0478 bool ProtoQueue::removeWorker(KIO::Worker *worker) 0479 { 0480 const bool removed = m_workerManager.removeWorker(worker); 0481 return removed; 0482 } 0483 0484 QList<Worker *> ProtoQueue::allWorkers() const 0485 { 0486 QList<Worker *> ret(m_workerManager.allWorkers()); 0487 auto it = m_queuesByHostname.cbegin(); 0488 for (; it != m_queuesByHostname.cend(); ++it) { 0489 ret.append(it.value().allWorkers()); 0490 } 0491 0492 return ret; 0493 } 0494 0495 // private slot 0496 void ProtoQueue::startAJob() 0497 { 0498 ensureNoDuplicates(&m_queuesBySerial); 0499 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount); 0500 0501 #ifdef SCHEDULER_DEBUG 0502 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount; 0503 auto it = m_queuesByHostname.cbegin(); 0504 for (; it != m_queuesByHostname.cend(); ++it) { 0505 const QList<KIO::SimpleJob *> list = it.value().runningJobs(); 0506 for (SimpleJob *job : list) { 0507 // qDebug() << SimpleJobPrivate::get(job)->m_url; 0508 } 0509 } 0510 #endif 0511 if (m_runningJobsCount >= m_maxConnectionsTotal) { 0512 #ifdef SCHEDULER_DEBUG 0513 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached."; 0514 #endif 0515 return; 0516 } 0517 0518 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); 0519 if (first != m_queuesBySerial.end()) { 0520 // pick a job and maintain the queue invariant: lower serials first 0521 HostQueue *hq = first.value(); 0522 const int prevLowestSerial = first.key(); 0523 Q_UNUSED(prevLowestSerial); 0524 Q_ASSERT(hq->lowestSerial() == prevLowestSerial); 0525 // the following assertions should hold due to queueJob(), takeFirstInQueue() and 0526 // removeJob() being correct 0527 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); 0528 SimpleJob *startingJob = hq->takeFirstInQueue(); 0529 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); 0530 Q_ASSERT(hq->lowestSerial() != prevLowestSerial); 0531 0532 m_queuesBySerial.erase(first); 0533 // we've increased hq's runningJobsCount() by calling nexStartingJob() 0534 // so we need to check again. 0535 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { 0536 m_queuesBySerial.insert(hq->lowestSerial(), hq); 0537 } 0538 0539 // always increase m_runningJobsCount because it's correct if there is a worker and if there 0540 // is no worker, removeJob() will balance the number again. removeJob() would decrease the 0541 // number too much otherwise. 0542 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(), 0543 // so increase the count here already. 0544 m_runningJobsCount++; 0545 0546 bool isNewWorker = false; 0547 Worker *worker = m_workerManager.takeWorkerForJob(startingJob); 0548 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob); 0549 if (!worker) { 0550 isNewWorker = true; 0551 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url); 0552 } 0553 0554 if (worker) { 0555 jobPriv->m_worker = worker; 0556 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewWorker); 0557 startJob(startingJob, worker); 0558 } else { 0559 // dispose of our records about the job and mark the job as unknown 0560 // (to prevent crashes later) 0561 // note that the job's slotError() can have called removeJob() first, so check that 0562 // it's not a ghost job with null serial already. 0563 if (jobPriv->m_schedSerial) { 0564 removeJob(startingJob); 0565 jobPriv->m_schedSerial = 0; 0566 } 0567 } 0568 } else { 0569 #ifdef SCHEDULER_DEBUG 0570 // qDebug() << "not starting any jobs because there is no queued job."; 0571 #endif 0572 } 0573 0574 if (!m_queuesBySerial.isEmpty()) { 0575 m_startJobTimer.start(); 0576 } 0577 } 0578 0579 Scheduler::Scheduler() 0580 { 0581 setObjectName(QStringLiteral("scheduler")); 0582 0583 #ifndef KIO_ANDROID_STUB 0584 const QString dbusPath = QStringLiteral("/KIO/Scheduler"); 0585 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler"); 0586 QDBusConnection dbus = QDBusConnection::sessionBus(); 0587 // Not needed, right? We just want to emit two signals. 0588 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots | 0589 // QDBusConnection::ExportScriptableSignals); 0590 dbus.connect(QString(), 0591 dbusPath, 0592 dbusInterface, 0593 QStringLiteral("reparseSlaveConfiguration"), 0594 this, 0595 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage))); 0596 #endif 0597 } 0598 0599 Scheduler::~Scheduler() 0600 { 0601 } 0602 0603 void Scheduler::doJob(SimpleJob *job) 0604 { 0605 schedulerPrivate()->doJob(job); 0606 } 0607 0608 // static 0609 void Scheduler::cancelJob(SimpleJob *job) 0610 { 0611 schedulerPrivate()->cancelJob(job); 0612 } 0613 0614 void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker) 0615 { 0616 schedulerPrivate()->jobFinished(job, worker); 0617 } 0618 0619 void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) 0620 { 0621 schedulerPrivate()->putWorkerOnHold(job, url); 0622 } 0623 0624 void Scheduler::removeWorkerOnHold() 0625 { 0626 schedulerPrivate()->removeWorkerOnHold(); 0627 } 0628 0629 bool Scheduler::isWorkerOnHoldFor(const QUrl &url) 0630 { 0631 return schedulerPrivate()->isWorkerOnHoldFor(url); 0632 } 0633 0634 void Scheduler::updateInternalMetaData(SimpleJob *job) 0635 { 0636 schedulerPrivate()->updateInternalMetaData(job); 0637 } 0638 0639 void Scheduler::emitReparseSlaveConfiguration() 0640 { 0641 #ifndef KIO_ANDROID_STUB 0642 // Do it immediately in this process, otherwise we might send a request before reparsing 0643 // (e.g. when changing useragent in the plugin) 0644 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage()); 0645 #endif 0646 0647 schedulerPrivate()->m_ignoreConfigReparse = true; 0648 Q_EMIT self()->reparseSlaveConfiguration(QString()); 0649 } 0650 0651 #ifndef KIO_ANDROID_STUB 0652 void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &) 0653 { 0654 if (m_ignoreConfigReparse) { 0655 // qDebug() << "Ignoring signal sent by myself"; 0656 m_ignoreConfigReparse = false; 0657 return; 0658 } 0659 0660 // qDebug() << "proto=" << proto; 0661 KProtocolManager::reparseConfiguration(); 0662 WorkerConfig::self()->reset(); 0663 sessionData.reset(); 0664 0665 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto); 0666 QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd(); 0667 0668 // not found? 0669 if (it == endIt) { 0670 return; 0671 } 0672 0673 if (!proto.isEmpty()) { 0674 endIt = it; 0675 ++endIt; 0676 } 0677 0678 for (; it != endIt; ++it) { 0679 const QList<KIO::Worker *> list = it.value()->allWorkers(); 0680 for (Worker *worker : list) { 0681 worker->send(CMD_REPARSECONFIGURATION); 0682 worker->resetHost(); 0683 } 0684 } 0685 } 0686 #endif 0687 0688 void SchedulerPrivate::doJob(SimpleJob *job) 0689 { 0690 // qDebug() << job; 0691 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0692 jobPriv->m_proxyList.clear(); 0693 jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(job->url(), jobPriv->m_proxyList); 0694 0695 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host()); 0696 proto->queueJob(job); 0697 } 0698 0699 void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority) 0700 { 0701 // qDebug() << job << priority; 0702 const QString protocol = SimpleJobPrivate::get(job)->m_protocol; 0703 if (!protocol.isEmpty()) { 0704 ProtoQueue *proto = protoQ(protocol, job->url().host()); 0705 proto->changeJobPriority(job, priority); 0706 } 0707 } 0708 0709 void SchedulerPrivate::cancelJob(SimpleJob *job) 0710 { 0711 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0712 // this method is called all over the place in job.cpp, so just do this check here to avoid 0713 // much boilerplate in job code. 0714 if (jobPriv->m_schedSerial == 0) { 0715 // qDebug() << "Doing nothing because I don't know job" << job; 0716 return; 0717 } 0718 Worker *worker = jobSWorker(job); 0719 // qDebug() << job << worker; 0720 jobFinished(job, worker); 0721 if (worker) { 0722 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol); 0723 if (pq) { 0724 pq->removeWorker(worker); 0725 } 0726 worker->kill(); // don't use worker after this! 0727 } 0728 } 0729 0730 void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker) 0731 { 0732 // qDebug() << job << worker; 0733 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0734 0735 // make sure that we knew about the job! 0736 Q_ASSERT(jobPriv->m_schedSerial); 0737 0738 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol); 0739 if (pq) { 0740 pq->removeJob(job); 0741 } 0742 0743 if (worker) { 0744 // If we have internal meta-data, tell existing KIO workers to reload 0745 // their configuration. 0746 if (jobPriv->m_internalMetaData.count()) { 0747 // qDebug() << "Updating KIO workers with new internal metadata information"; 0748 ProtoQueue *queue = m_protocols.value(worker->protocol()); 0749 if (queue) { 0750 const QList<Worker *> workers = queue->allWorkers(); 0751 for (auto *runningWorker : workers) { 0752 if (worker->host() == runningWorker->host()) { 0753 worker->setConfig(metaDataFor(worker->protocol(), jobPriv->m_proxyList, job->url())); 0754 /*qDebug() << "Updated configuration of" << worker->protocol() 0755 << "KIO worker, pid=" << worker->worker_pid();*/ 0756 } 0757 } 0758 } 0759 } 0760 worker->setJob(nullptr); 0761 worker->disconnect(job); 0762 } 0763 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again 0764 jobPriv->m_worker = nullptr; 0765 // Clear the values in the internal metadata container since they have 0766 // already been taken care of above... 0767 jobPriv->m_internalMetaData.clear(); 0768 } 0769 0770 MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url) 0771 { 0772 const QString host = url.host(); 0773 MetaData configData = WorkerConfig::self()->configData(protocol, host); 0774 sessionData.configDataFor(configData, protocol, host); 0775 if (proxyList.isEmpty()) { 0776 configData.remove(QStringLiteral("UseProxy")); 0777 configData.remove(QStringLiteral("ProxyUrls")); 0778 } else { 0779 configData[QStringLiteral("UseProxy")] = proxyList.first(); 0780 configData[QStringLiteral("ProxyUrls")] = proxyList.join(QLatin1Char(',')); 0781 } 0782 0783 return configData; 0784 } 0785 0786 void SchedulerPrivate::setupWorker(KIO::Worker *worker, 0787 const QUrl &url, 0788 const QString &protocol, 0789 const QStringList &proxyList, 0790 bool newWorker, 0791 const KIO::MetaData *config) 0792 { 0793 int port = url.port(); 0794 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. 0795 port = 0; 0796 } 0797 const QString host = url.host(); 0798 const QString user = url.userName(); 0799 const QString passwd = url.password(); 0800 0801 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) { 0802 MetaData configData = metaDataFor(protocol, proxyList, url); 0803 if (config) { 0804 configData += *config; 0805 } 0806 0807 worker->setConfig(configData); 0808 worker->setProtocol(url.scheme()); 0809 worker->setHost(host, port, user, passwd); 0810 } 0811 } 0812 0813 void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker) 0814 { 0815 // qDebug() << worker; 0816 Q_ASSERT(worker); 0817 Q_ASSERT(!worker->isAlive()); 0818 ProtoQueue *pq = m_protocols.value(worker->protocol()); 0819 if (pq) { 0820 if (worker->job()) { 0821 pq->removeJob(worker->job()); 0822 } 0823 // in case this was a connected worker... 0824 pq->removeWorker(worker); 0825 } 0826 if (worker == m_workerOnHold) { 0827 m_workerOnHold = nullptr; 0828 m_urlOnHold.clear(); 0829 } 0830 // can't use worker->deref() here because we need to use deleteLater 0831 worker->aboutToDelete(); 0832 worker->deleteLater(); 0833 } 0834 0835 void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) 0836 { 0837 Worker *worker = jobSWorker(job); 0838 // qDebug() << job << url << worker; 0839 worker->disconnect(job); 0840 // prevent the fake death of the worker from trying to kill the job again; 0841 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold(). 0842 worker->setJob(nullptr); 0843 SimpleJobPrivate::get(job)->m_worker = nullptr; 0844 0845 if (m_workerOnHold) { 0846 m_workerOnHold->kill(); 0847 } 0848 m_workerOnHold = worker; 0849 m_urlOnHold = url; 0850 m_workerOnHold->suspend(); 0851 } 0852 0853 bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url) 0854 { 0855 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) { 0856 return true; 0857 } 0858 0859 return false; 0860 } 0861 0862 Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job) 0863 { 0864 Worker *worker = nullptr; 0865 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0866 0867 if (m_workerOnHold) { 0868 // Make sure that the job wants to do a GET or a POST, and with no offset 0869 const int cmd = jobPriv->m_command; 0870 bool canJobReuse = (cmd == CMD_GET); 0871 0872 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) { 0873 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL); 0874 if (canJobReuse) { 0875 KIO::MetaData outgoing = tJob->outgoingMetaData(); 0876 const QString resume = outgoing.value(QStringLiteral("resume")); 0877 const QString rangeStart = outgoing.value(QStringLiteral("range-start")); 0878 // qDebug() << "Resume metadata is" << resume; 0879 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0')); 0880 } 0881 } 0882 0883 if (job->url() == m_urlOnHold) { 0884 if (canJobReuse) { 0885 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")"; 0886 worker = m_workerOnHold; 0887 } else { 0888 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")"; 0889 m_workerOnHold->kill(); 0890 } 0891 m_workerOnHold = nullptr; 0892 m_urlOnHold.clear(); 0893 } 0894 } 0895 0896 return worker; 0897 } 0898 0899 void SchedulerPrivate::removeWorkerOnHold() 0900 { 0901 // qDebug() << m_workerOnHold; 0902 if (m_workerOnHold) { 0903 m_workerOnHold->kill(); 0904 } 0905 m_workerOnHold = nullptr; 0906 m_urlOnHold.clear(); 0907 } 0908 0909 ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host) 0910 { 0911 ProtoQueue *pq = m_protocols.value(protocol, nullptr); 0912 if (!pq) { 0913 // qDebug() << "creating ProtoQueue instance for" << protocol; 0914 0915 const int maxWorkers = KProtocolInfo::maxWorkers(protocol); 0916 int maxWorkersPerHost = -1; 0917 if (!host.isEmpty()) { 0918 bool ok = false; 0919 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok); 0920 if (ok) { 0921 maxWorkersPerHost = value; 0922 } 0923 } 0924 if (maxWorkersPerHost == -1) { 0925 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol); 0926 } 0927 // Never allow maxWorkersPerHost to exceed maxWorkers. 0928 pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost)); 0929 m_protocols.insert(protocol, pq); 0930 } 0931 return pq; 0932 } 0933 0934 void SchedulerPrivate::updateInternalMetaData(SimpleJob *job) 0935 { 0936 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0937 // Preserve all internal meta-data so they can be sent back to the 0938 // KIO workers as needed... 0939 const QUrl jobUrl = job->url(); 0940 0941 const QLatin1String currHostToken("{internal~currenthost}"); 0942 const QLatin1String allHostsToken("{internal~allhosts}"); 0943 // qDebug() << job << jobPriv->m_internalMetaData; 0944 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData); 0945 while (it.hasNext()) { 0946 it.next(); 0947 if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) { 0948 WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value()); 0949 } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) { 0950 WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value()); 0951 } 0952 } 0953 } 0954 0955 #include "moc_scheduler.cpp" 0956 #include "moc_scheduler_p.cpp"