File indexing completed on 2023-09-24 04:08:42
0001 /* 0002 This file is part of the KDE libraries 0003 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> 0004 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org> 0005 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> 0006 0007 SPDX-License-Identifier: LGPL-2.0-only 0008 */ 0009 0010 #include "scheduler.h" 0011 #include "scheduler_p.h" 0012 0013 #include "connection_p.h" 0014 #include "job_p.h" 0015 #include "sessiondata_p.h" 0016 #include "slave.h" 0017 #include "workerconfig.h" 0018 0019 #include <kprotocolinfo.h> 0020 #include <kprotocolmanager.h> 0021 0022 #ifndef KIO_ANDROID_STUB 0023 #include <QDBusConnection> 0024 #include <QDBusMessage> 0025 #endif 0026 #include <QHash> 0027 #include <QThread> 0028 #include <QThreadStorage> 0029 0030 // Slaves may be idle for a certain time (3 minutes) before they are killed. 0031 static const int s_idleSlaveLifetime = 3 * 60; 0032 0033 using namespace KIO; 0034 0035 static inline Slave *jobSlave(SimpleJob *job) 0036 { 0037 return SimpleJobPrivate::get(job)->m_slave; 0038 } 0039 0040 static inline int jobCommand(SimpleJob *job) 0041 { 0042 return SimpleJobPrivate::get(job)->m_command; 0043 } 0044 0045 static inline void startJob(SimpleJob *job, Slave *slave) 0046 { 0047 SimpleJobPrivate::get(job)->start(slave); 0048 } 0049 0050 class KIO::SchedulerPrivate 0051 { 0052 public: 0053 SchedulerPrivate() 0054 : q(new Scheduler()) 0055 { 0056 } 0057 0058 ~SchedulerPrivate() 0059 { 0060 removeSlaveOnHold(); 0061 delete q; 0062 q = nullptr; 0063 qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all slaves 0064 } 0065 0066 SchedulerPrivate(const SchedulerPrivate &) = delete; 0067 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete; 0068 0069 Scheduler *q; 0070 0071 Slave *m_slaveOnHold = nullptr; 0072 QUrl m_urlOnHold; 0073 bool m_ignoreConfigReparse = false; 0074 0075 SessionData sessionData; 0076 0077 void doJob(SimpleJob *job); 0078 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5) 0079 void scheduleJob(SimpleJob *job); 0080 #endif 0081 void setJobPriority(SimpleJob *job, int priority); 0082 void cancelJob(SimpleJob *job); 0083 void jobFinished(KIO::SimpleJob *job, KIO::Slave *slave); 0084 void putSlaveOnHold(KIO::SimpleJob *job, const QUrl &url); 0085 void removeSlaveOnHold(); 0086 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0087 Slave *getConnectedSlave(const QUrl &url, const KIO::MetaData &metaData); 0088 bool assignJobToSlave(KIO::Slave *slave, KIO::SimpleJob *job); 0089 bool disconnectSlave(KIO::Slave *slave); 0090 #endif 0091 Slave *heldSlaveForJob(KIO::SimpleJob *job); 0092 bool isSlaveOnHoldFor(const QUrl &url); 0093 void updateInternalMetaData(SimpleJob *job); 0094 0095 MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url); 0096 void 0097 setupSlave(KIO::Slave *slave, const QUrl &url, const QString &protocol, const QStringList &proxyList, bool newSlave, const KIO::MetaData *config = nullptr); 0098 0099 void slotSlaveDied(KIO::Slave *slave); 0100 0101 #ifndef KIO_ANDROID_STUB 0102 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &); 0103 #endif 0104 0105 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0106 void slotSlaveConnected(); 0107 void slotSlaveError(int error, const QString &errorMsg); 0108 #endif 0109 0110 ProtoQueue *protoQ(const QString &protocol, const QString &host); 0111 0112 private: 0113 QHash<QString, ProtoQueue *> m_protocols; 0114 }; 0115 0116 static QThreadStorage<SchedulerPrivate *> s_storage; 0117 static SchedulerPrivate *schedulerPrivate() 0118 { 0119 if (!s_storage.hasLocalData()) { 0120 s_storage.setLocalData(new SchedulerPrivate); 0121 } 0122 return s_storage.localData(); 0123 } 0124 0125 Scheduler *Scheduler::self() 0126 { 0127 return schedulerPrivate()->q; 0128 } 0129 0130 SchedulerPrivate *Scheduler::d_func() 0131 { 0132 return schedulerPrivate(); 0133 } 0134 0135 // static 0136 Scheduler *scheduler() 0137 { 0138 return schedulerPrivate()->q; 0139 } 0140 0141 //////////////////////////// 0142 0143 int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const 0144 { 0145 Q_ASSERT(newPriority >= -10 && newPriority <= 10); 0146 newPriority = qBound(-10, newPriority, 10); 0147 int unbiasedSerial = oldSerial % m_jobsPerPriority; 0148 return unbiasedSerial + newPriority * m_jobsPerPriority; 0149 } 0150 0151 SlaveKeeper::SlaveKeeper() 0152 { 0153 m_grimTimer.setSingleShot(true); 0154 connect(&m_grimTimer, &QTimer::timeout, this, &SlaveKeeper::grimReaper); 0155 } 0156 0157 SlaveKeeper::~SlaveKeeper() 0158 { 0159 grimReaper(); 0160 } 0161 0162 void SlaveKeeper::returnSlave(Slave *slave) 0163 { 0164 Q_ASSERT(slave); 0165 slave->setIdle(); 0166 m_idleSlaves.insert(slave->host(), slave); 0167 scheduleGrimReaper(); 0168 } 0169 0170 Slave *SlaveKeeper::takeSlaveForJob(SimpleJob *job) 0171 { 0172 Slave *slave = schedulerPrivate()->heldSlaveForJob(job); 0173 if (slave) { 0174 return slave; 0175 } 0176 0177 QUrl url = SimpleJobPrivate::get(job)->m_url; 0178 // TODO take port, username and password into account 0179 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.find(url.host()); 0180 if (it == m_idleSlaves.end()) { 0181 it = m_idleSlaves.begin(); 0182 } 0183 if (it == m_idleSlaves.end()) { 0184 return nullptr; 0185 } 0186 slave = it.value(); 0187 m_idleSlaves.erase(it); 0188 return slave; 0189 } 0190 0191 bool SlaveKeeper::removeSlave(Slave *slave) 0192 { 0193 // ### performance not so great 0194 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin(); 0195 for (; it != m_idleSlaves.end(); ++it) { 0196 if (it.value() == slave) { 0197 m_idleSlaves.erase(it); 0198 return true; 0199 } 0200 } 0201 return false; 0202 } 0203 0204 void SlaveKeeper::clear() 0205 { 0206 m_idleSlaves.clear(); 0207 } 0208 0209 QList<Slave *> SlaveKeeper::allSlaves() const 0210 { 0211 return m_idleSlaves.values(); 0212 } 0213 0214 void SlaveKeeper::scheduleGrimReaper() 0215 { 0216 if (!m_grimTimer.isActive()) { 0217 m_grimTimer.start((s_idleSlaveLifetime / 2) * 1000); 0218 } 0219 } 0220 0221 // private slot 0222 void SlaveKeeper::grimReaper() 0223 { 0224 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin(); 0225 while (it != m_idleSlaves.end()) { 0226 Slave *slave = it.value(); 0227 if (slave->idleTime() >= s_idleSlaveLifetime) { 0228 it = m_idleSlaves.erase(it); 0229 if (slave->job()) { 0230 // qDebug() << "Idle slave" << slave << "still has job" << slave->job(); 0231 } 0232 // avoid invoking slotSlaveDied() because its cleanup services are not needed 0233 slave->kill(); 0234 } else { 0235 ++it; 0236 } 0237 } 0238 if (!m_idleSlaves.isEmpty()) { 0239 scheduleGrimReaper(); 0240 } 0241 } 0242 0243 int HostQueue::lowestSerial() const 0244 { 0245 QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin(); 0246 if (first != m_queuedJobs.constEnd()) { 0247 return first.key(); 0248 } 0249 return SerialPicker::maxSerial; 0250 } 0251 0252 void HostQueue::queueJob(SimpleJob *job) 0253 { 0254 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 0255 Q_ASSERT(serial != 0); 0256 Q_ASSERT(!m_queuedJobs.contains(serial)); 0257 Q_ASSERT(!m_runningJobs.contains(job)); 0258 m_queuedJobs.insert(serial, job); 0259 } 0260 0261 SimpleJob *HostQueue::takeFirstInQueue() 0262 { 0263 Q_ASSERT(!m_queuedJobs.isEmpty()); 0264 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); 0265 SimpleJob *job = first.value(); 0266 m_queuedJobs.erase(first); 0267 m_runningJobs.insert(job); 0268 return job; 0269 } 0270 0271 bool HostQueue::removeJob(SimpleJob *job) 0272 { 0273 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 0274 if (m_runningJobs.remove(job)) { 0275 Q_ASSERT(!m_queuedJobs.contains(serial)); 0276 return true; 0277 } 0278 if (m_queuedJobs.remove(serial)) { 0279 return true; 0280 } 0281 return false; 0282 } 0283 0284 QList<Slave *> HostQueue::allSlaves() const 0285 { 0286 QList<Slave *> ret; 0287 ret.reserve(m_runningJobs.size()); 0288 for (SimpleJob *job : m_runningJobs) { 0289 Slave *slave = jobSlave(job); 0290 Q_ASSERT(slave); 0291 ret.append(slave); 0292 } 0293 return ret; 0294 } 0295 0296 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0297 ConnectedSlaveQueue::ConnectedSlaveQueue() 0298 { 0299 m_startJobsTimer.setSingleShot(true); 0300 connect(&m_startJobsTimer, &QTimer::timeout, this, &ConnectedSlaveQueue::startRunnableJobs); 0301 } 0302 0303 bool ConnectedSlaveQueue::queueJob(SimpleJob *job, Slave *slave) 0304 { 0305 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 0306 if (it == m_connectedSlaves.end()) { 0307 return false; 0308 } 0309 SimpleJobPrivate::get(job)->m_slave = slave; 0310 0311 PerSlaveQueue &jobs = it.value(); 0312 jobs.waitingList.append(job); 0313 if (!jobs.runningJob) { 0314 // idle slave now has a job to run 0315 m_runnableSlaves.insert(slave); 0316 m_startJobsTimer.start(); 0317 } 0318 return true; 0319 } 0320 0321 bool ConnectedSlaveQueue::removeJob(SimpleJob *job) 0322 { 0323 Slave *slave = jobSlave(job); 0324 Q_ASSERT(slave); 0325 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 0326 if (it == m_connectedSlaves.end()) { 0327 return false; 0328 } 0329 PerSlaveQueue &jobs = it.value(); 0330 if (jobs.runningJob || jobs.waitingList.isEmpty()) { 0331 // a slave that was busy running a job was not runnable. 0332 // a slave that has no waiting job(s) was not runnable either. 0333 Q_ASSERT(!m_runnableSlaves.contains(slave)); 0334 } 0335 0336 const bool removedRunning = jobs.runningJob == job; 0337 const bool removedWaiting = jobs.waitingList.removeAll(job) != 0; 0338 if (removedRunning) { 0339 jobs.runningJob = nullptr; 0340 Q_ASSERT(!removedWaiting); 0341 } 0342 const bool removedTheJob = removedRunning || removedWaiting; 0343 0344 if (!slave->isAlive()) { 0345 removeSlave(slave); 0346 return removedTheJob; 0347 } 0348 0349 if (removedRunning && jobs.waitingList.count()) { 0350 m_runnableSlaves.insert(slave); 0351 m_startJobsTimer.start(); 0352 } 0353 if (removedWaiting && jobs.waitingList.isEmpty()) { 0354 m_runnableSlaves.remove(slave); 0355 } 0356 return removedTheJob; 0357 } 0358 0359 void ConnectedSlaveQueue::addSlave(Slave *slave) 0360 { 0361 Q_ASSERT(slave); 0362 if (!m_connectedSlaves.contains(slave)) { 0363 m_connectedSlaves.insert(slave, PerSlaveQueue()); 0364 } 0365 } 0366 0367 bool ConnectedSlaveQueue::removeSlave(Slave *slave) 0368 { 0369 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 0370 if (it == m_connectedSlaves.end()) { 0371 return false; 0372 } 0373 PerSlaveQueue &jobs = it.value(); 0374 // we need a copy as kill() ends up removing the job from waitingList 0375 const QList<SimpleJob *> waitingJobs = jobs.waitingList; 0376 for (SimpleJob *job : waitingJobs) { 0377 // ### for compatibility with the old scheduler we don't touch the running job, if any. 0378 // make sure that the job doesn't call back into Scheduler::cancelJob(); this would 0379 // a) crash and b) be unnecessary because we clean up just fine. 0380 SimpleJobPrivate::get(job)->m_schedSerial = 0; 0381 job->kill(); 0382 } 0383 m_connectedSlaves.erase(it); 0384 m_runnableSlaves.remove(slave); 0385 0386 slave->kill(); 0387 return true; 0388 } 0389 0390 // KDE5: only one caller, for doubtful reasons. remove this if possible. 0391 bool ConnectedSlaveQueue::isIdle(Slave *slave) 0392 { 0393 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 0394 if (it == m_connectedSlaves.end()) { 0395 return false; 0396 } 0397 return it.value().runningJob == nullptr; 0398 } 0399 0400 // private slot 0401 void ConnectedSlaveQueue::startRunnableJobs() 0402 { 0403 QSet<Slave *>::Iterator it = m_runnableSlaves.begin(); 0404 while (it != m_runnableSlaves.end()) { 0405 Slave *slave = *it; 0406 if (!slave->isConnected()) { 0407 // this polling is somewhat inefficient... 0408 m_startJobsTimer.start(); 0409 ++it; 0410 continue; 0411 } 0412 it = m_runnableSlaves.erase(it); 0413 PerSlaveQueue &jobs = m_connectedSlaves[slave]; 0414 SimpleJob *job = jobs.waitingList.takeFirst(); 0415 Q_ASSERT(!jobs.runningJob); 0416 jobs.runningJob = job; 0417 0418 const QUrl url = job->url(); 0419 // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. 0420 const int port = url.port() == -1 ? 0 : url.port(); 0421 0422 if (slave->host() == QLatin1String("<reset>")) { 0423 MetaData configData = WorkerConfig::self()->configData(url.scheme(), url.host()); 0424 slave->setConfig(configData); 0425 slave->setProtocol(url.scheme()); 0426 slave->setHost(url.host(), port, url.userName(), url.password()); 0427 } 0428 0429 Q_ASSERT(slave->protocol() == url.scheme()); 0430 Q_ASSERT(slave->host() == url.host()); 0431 Q_ASSERT(slave->port() == port); 0432 startJob(job, slave); 0433 } 0434 } 0435 #endif 0436 0437 static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) 0438 { 0439 Q_UNUSED(queuesBySerial); 0440 #ifdef SCHEDULER_DEBUG 0441 // a host queue may *never* be in queuesBySerial twice. 0442 QSet<HostQueue *> seen; 0443 auto it = queuesBySerial->cbegin(); 0444 for (; it != queuesBySerial->cend(); ++it) { 0445 Q_ASSERT(!seen.contains(it.value())); 0446 seen.insert(it.value()); 0447 } 0448 #endif 0449 } 0450 0451 static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount) 0452 { 0453 Q_UNUSED(queues); 0454 Q_UNUSED(runningJobsCount); 0455 #ifdef SCHEDULER_DEBUG 0456 int realRunningJobsCount = 0; 0457 auto it = queues->cbegin(); 0458 for (; it != queues->cend(); ++it) { 0459 realRunningJobsCount += it.value().runningJobsCount(); 0460 } 0461 Q_ASSERT(realRunningJobsCount == runningJobsCount); 0462 0463 // ...and of course we may never run the same job twice! 0464 QSet<SimpleJob *> seenJobs; 0465 auto it2 = queues->cbegin(); 0466 for (; it2 != queues->cend(); ++it2) { 0467 for (SimpleJob *job : it2.value().runningJobs()) { 0468 Q_ASSERT(!seenJobs.contains(job)); 0469 seenJobs.insert(job); 0470 } 0471 } 0472 #endif 0473 } 0474 0475 ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost) 0476 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers) 0477 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost)) 0478 , m_runningJobsCount(0) 0479 0480 { 0481 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal 0482 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/ 0483 Q_ASSERT(m_maxConnectionsPerHost >= 1); 0484 Q_ASSERT(maxWorkers >= maxWorkersPerHost); 0485 m_startJobTimer.setSingleShot(true); 0486 connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob); 0487 } 0488 0489 ProtoQueue::~ProtoQueue() 0490 { 0491 // Gather list of all slaves first 0492 const QList<Slave *> slaves = allSlaves(); 0493 // Clear the idle slaves in the keeper to avoid dangling pointers 0494 m_slaveKeeper.clear(); 0495 for (Slave *slave : slaves) { 0496 // kill the slave process and remove the interface in our process 0497 slave->kill(); 0498 } 0499 } 0500 0501 void ProtoQueue::queueJob(SimpleJob *job) 0502 { 0503 QString hostname = SimpleJobPrivate::get(job)->m_url.host(); 0504 HostQueue &hq = m_queuesByHostname[hostname]; 0505 const int prevLowestSerial = hq.lowestSerial(); 0506 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 0507 0508 // nevert insert a job twice 0509 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); 0510 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); 0511 0512 const bool wasQueueEmpty = hq.isQueueEmpty(); 0513 hq.queueJob(job); 0514 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... 0515 // the queue's lowest serial job may have changed, so update the ordered list of queues. 0516 // however, we ignore all jobs that would cause more connections to a host than allowed. 0517 if (prevLowestSerial != hq.lowestSerial()) { 0518 if (hq.runningJobsCount() < m_maxConnectionsPerHost) { 0519 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs 0520 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 0521 Q_UNUSED(wasQueueEmpty); 0522 Q_ASSERT(wasQueueEmpty); 0523 } 0524 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0525 } else { 0526 #ifdef SCHEDULER_DEBUG 0527 // ### this assertion may fail if the limits were modified at runtime! 0528 // if the per-host connection limit is already reached the host queue's lowest serial 0529 // should not be queued. 0530 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); 0531 #endif 0532 } 0533 } 0534 // just in case; startAJob() will refuse to start a job if it shouldn't. 0535 m_startJobTimer.start(); 0536 0537 ensureNoDuplicates(&m_queuesBySerial); 0538 } 0539 0540 void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio) 0541 { 0542 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 0543 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host()); 0544 if (it == m_queuesByHostname.end()) { 0545 return; 0546 } 0547 HostQueue &hq = it.value(); 0548 const int prevLowestSerial = hq.lowestSerial(); 0549 if (hq.isJobRunning(job) || !hq.removeJob(job)) { 0550 return; 0551 } 0552 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio); 0553 hq.queueJob(job); 0554 const bool needReinsert = hq.lowestSerial() != prevLowestSerial; 0555 // the host queue might be absent from m_queuesBySerial because the connections per host limit 0556 // for that host has been reached. 0557 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) { 0558 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0559 } 0560 ensureNoDuplicates(&m_queuesBySerial); 0561 } 0562 0563 void ProtoQueue::removeJob(SimpleJob *job) 0564 { 0565 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 0566 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; 0567 const int prevLowestSerial = hq.lowestSerial(); 0568 const int prevRunningJobs = hq.runningJobsCount(); 0569 0570 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 0571 0572 if (hq.removeJob(job)) { 0573 if (hq.lowestSerial() != prevLowestSerial) { 0574 // we have dequeued the not yet running job with the lowest serial 0575 Q_ASSERT(!jobPriv->m_slave); 0576 Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); 0577 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 0578 // make sure that the queue was not scheduled for a good reason 0579 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); 0580 } 0581 } else { 0582 if (prevRunningJobs != hq.runningJobsCount()) { 0583 // we have dequeued a previously running job 0584 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); 0585 m_runningJobsCount--; 0586 Q_ASSERT(m_runningJobsCount >= 0); 0587 } 0588 } 0589 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { 0590 // this may be a no-op, but it's faster than first checking if it's already in. 0591 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 0592 } 0593 0594 if (hq.isEmpty()) { 0595 // no queued jobs, no running jobs. this destroys hq from above. 0596 m_queuesByHostname.remove(jobPriv->m_url.host()); 0597 } 0598 0599 if (jobPriv->m_slave && jobPriv->m_slave->isAlive()) { 0600 m_slaveKeeper.returnSlave(jobPriv->m_slave); 0601 } 0602 // just in case; startAJob() will refuse to start a job if it shouldn't. 0603 m_startJobTimer.start(); 0604 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0605 } else { 0606 // should be a connected slave 0607 // if the assertion fails the job has probably changed the host part of its URL while 0608 // running, so we can't find it by hostname. don't do this. 0609 const bool removed = m_connectedSlaveQueue.removeJob(job); 0610 Q_UNUSED(removed); 0611 Q_ASSERT(removed); 0612 #endif 0613 } 0614 0615 ensureNoDuplicates(&m_queuesBySerial); 0616 } 0617 0618 Slave *ProtoQueue::createSlave(const QString &protocol, SimpleJob *job, const QUrl &url) 0619 { 0620 int error; 0621 QString errortext; 0622 Slave *slave = Slave::createSlave(protocol, url, error, errortext); 0623 if (slave) { 0624 connect(slave, &Slave::slaveDied, scheduler(), [](KIO::Slave *slave) { 0625 schedulerPrivate()->slotSlaveDied(slave); 0626 }); 0627 } else { 0628 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext; 0629 if (job) { 0630 job->slotError(error, errortext); 0631 } 0632 } 0633 return slave; 0634 } 0635 0636 bool ProtoQueue::removeSlave(KIO::Slave *slave) 0637 { 0638 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0639 const bool removedConnected = m_connectedSlaveQueue.removeSlave(slave); 0640 const bool removedUnconnected = m_slaveKeeper.removeSlave(slave); 0641 Q_ASSERT(!(removedConnected && removedUnconnected)); 0642 return removedConnected || removedUnconnected; 0643 #else 0644 const bool removed = m_slaveKeeper.removeSlave(slave); 0645 return removed; 0646 #endif 0647 } 0648 0649 QList<Slave *> ProtoQueue::allSlaves() const 0650 { 0651 QList<Slave *> ret(m_slaveKeeper.allSlaves()); 0652 auto it = m_queuesByHostname.cbegin(); 0653 for (; it != m_queuesByHostname.cend(); ++it) { 0654 ret.append(it.value().allSlaves()); 0655 } 0656 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0657 ret.append(m_connectedSlaveQueue.allSlaves()); 0658 #endif 0659 return ret; 0660 } 0661 0662 // private slot 0663 void ProtoQueue::startAJob() 0664 { 0665 ensureNoDuplicates(&m_queuesBySerial); 0666 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount); 0667 0668 #ifdef SCHEDULER_DEBUG 0669 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount; 0670 auto it = m_queuesByHostname.cbegin(); 0671 for (; it != m_queuesByHostname.cend(); ++it) { 0672 const QList<KIO::SimpleJob *> list = it.value().runningJobs(); 0673 for (SimpleJob *job : list) { 0674 // qDebug() << SimpleJobPrivate::get(job)->m_url; 0675 } 0676 } 0677 #endif 0678 if (m_runningJobsCount >= m_maxConnectionsTotal) { 0679 #ifdef SCHEDULER_DEBUG 0680 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached."; 0681 #endif 0682 return; 0683 } 0684 0685 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); 0686 if (first != m_queuesBySerial.end()) { 0687 // pick a job and maintain the queue invariant: lower serials first 0688 HostQueue *hq = first.value(); 0689 const int prevLowestSerial = first.key(); 0690 Q_UNUSED(prevLowestSerial); 0691 Q_ASSERT(hq->lowestSerial() == prevLowestSerial); 0692 // the following assertions should hold due to queueJob(), takeFirstInQueue() and 0693 // removeJob() being correct 0694 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); 0695 SimpleJob *startingJob = hq->takeFirstInQueue(); 0696 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); 0697 Q_ASSERT(hq->lowestSerial() != prevLowestSerial); 0698 0699 m_queuesBySerial.erase(first); 0700 // we've increased hq's runningJobsCount() by calling nexStartingJob() 0701 // so we need to check again. 0702 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { 0703 m_queuesBySerial.insert(hq->lowestSerial(), hq); 0704 } 0705 0706 // always increase m_runningJobsCount because it's correct if there is a slave and if there 0707 // is no slave, removeJob() will balance the number again. removeJob() would decrease the 0708 // number too much otherwise. 0709 // Note that createSlave() can call slotError() on a job which in turn calls removeJob(), 0710 // so increase the count here already. 0711 m_runningJobsCount++; 0712 0713 bool isNewSlave = false; 0714 Slave *slave = m_slaveKeeper.takeSlaveForJob(startingJob); 0715 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob); 0716 if (!slave) { 0717 isNewSlave = true; 0718 slave = createSlave(jobPriv->m_protocol, startingJob, jobPriv->m_url); 0719 } 0720 0721 if (slave) { 0722 jobPriv->m_slave = slave; 0723 schedulerPrivate()->setupSlave(slave, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewSlave); 0724 startJob(startingJob, slave); 0725 } else { 0726 // dispose of our records about the job and mark the job as unknown 0727 // (to prevent crashes later) 0728 // note that the job's slotError() can have called removeJob() first, so check that 0729 // it's not a ghost job with null serial already. 0730 if (jobPriv->m_schedSerial) { 0731 removeJob(startingJob); 0732 jobPriv->m_schedSerial = 0; 0733 } 0734 } 0735 } else { 0736 #ifdef SCHEDULER_DEBUG 0737 // qDebug() << "not starting any jobs because there is no queued job."; 0738 #endif 0739 } 0740 0741 if (!m_queuesBySerial.isEmpty()) { 0742 m_startJobTimer.start(); 0743 } 0744 } 0745 0746 Scheduler::Scheduler() 0747 { 0748 setObjectName(QStringLiteral("scheduler")); 0749 0750 #ifndef KIO_ANDROID_STUB 0751 const QString dbusPath = QStringLiteral("/KIO/Scheduler"); 0752 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler"); 0753 QDBusConnection dbus = QDBusConnection::sessionBus(); 0754 // Not needed, right? We just want to emit two signals. 0755 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots | 0756 // QDBusConnection::ExportScriptableSignals); 0757 dbus.connect(QString(), 0758 dbusPath, 0759 dbusInterface, 0760 QStringLiteral("reparseSlaveConfiguration"), 0761 this, 0762 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage))); 0763 #endif 0764 } 0765 0766 Scheduler::~Scheduler() 0767 { 0768 } 0769 0770 void Scheduler::doJob(SimpleJob *job) 0771 { 0772 schedulerPrivate()->doJob(job); 0773 } 0774 0775 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5) 0776 void Scheduler::scheduleJob(SimpleJob *job) 0777 { 0778 schedulerPrivate()->scheduleJob(job); 0779 } 0780 #endif 0781 0782 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 90) 0783 void Scheduler::setJobPriority(SimpleJob *job, int priority) 0784 { 0785 schedulerPrivate()->setJobPriority(job, priority); 0786 } 0787 #endif 0788 0789 // static 0790 void Scheduler::setSimpleJobPriority(SimpleJob *job, int priority) 0791 { 0792 schedulerPrivate()->setJobPriority(job, priority); 0793 } 0794 0795 void Scheduler::cancelJob(SimpleJob *job) 0796 { 0797 schedulerPrivate()->cancelJob(job); 0798 } 0799 0800 void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Slave *slave) 0801 { 0802 schedulerPrivate()->jobFinished(job, slave); 0803 } 0804 0805 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101) 0806 void Scheduler::putSlaveOnHold(KIO::SimpleJob *job, const QUrl &url) 0807 { 0808 schedulerPrivate()->putSlaveOnHold(job, url); 0809 } 0810 #endif 0811 0812 void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) 0813 { 0814 schedulerPrivate()->putSlaveOnHold(job, url); 0815 } 0816 0817 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101) 0818 void Scheduler::removeSlaveOnHold() 0819 { 0820 schedulerPrivate()->removeSlaveOnHold(); 0821 } 0822 #endif 0823 0824 void Scheduler::removeWorkerOnHold() 0825 { 0826 schedulerPrivate()->removeSlaveOnHold(); 0827 } 0828 0829 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 88) 0830 void Scheduler::publishSlaveOnHold() 0831 { 0832 } 0833 #endif 0834 0835 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101) 0836 bool Scheduler::isSlaveOnHoldFor(const QUrl &url) 0837 { 0838 return schedulerPrivate()->isSlaveOnHoldFor(url); 0839 } 0840 #endif 0841 0842 bool Scheduler::isWorkerOnHoldFor(const QUrl &url) 0843 { 0844 return schedulerPrivate()->isSlaveOnHoldFor(url); 0845 } 0846 0847 void Scheduler::updateInternalMetaData(SimpleJob *job) 0848 { 0849 schedulerPrivate()->updateInternalMetaData(job); 0850 } 0851 0852 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 0853 KIO::Slave *Scheduler::getConnectedSlave(const QUrl &url, const KIO::MetaData &config) 0854 { 0855 return schedulerPrivate()->getConnectedSlave(url, config); 0856 } 0857 0858 bool Scheduler::assignJobToSlave(KIO::Slave *slave, KIO::SimpleJob *job) 0859 { 0860 return schedulerPrivate()->assignJobToSlave(slave, job); 0861 } 0862 0863 bool Scheduler::disconnectSlave(KIO::Slave *slave) 0864 { 0865 return schedulerPrivate()->disconnectSlave(slave); 0866 } 0867 #endif 0868 0869 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103) 0870 bool Scheduler::connect(const char *signal, const QObject *receiver, const char *member) 0871 { 0872 return QObject::connect(self(), signal, receiver, member); 0873 } 0874 #endif 0875 0876 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103) 0877 bool Scheduler::connect(const QObject *sender, const char *signal, const QObject *receiver, const char *member) 0878 { 0879 return QObject::connect(sender, signal, receiver, member); 0880 } 0881 #endif 0882 0883 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103) 0884 bool Scheduler::disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *member) 0885 { 0886 return QObject::disconnect(sender, signal, receiver, member); 0887 } 0888 #endif 0889 0890 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103) 0891 bool Scheduler::connect(const QObject *sender, const char *signal, const char *member) 0892 { 0893 return QObject::connect(sender, signal, member); 0894 } 0895 #endif 0896 0897 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 88) 0898 void Scheduler::checkSlaveOnHold(bool) 0899 { 0900 } 0901 #endif 0902 0903 void Scheduler::emitReparseSlaveConfiguration() 0904 { 0905 #ifndef KIO_ANDROID_STUB 0906 // Do it immediately in this process, otherwise we might send a request before reparsing 0907 // (e.g. when changing useragent in the plugin) 0908 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage()); 0909 #endif 0910 0911 schedulerPrivate()->m_ignoreConfigReparse = true; 0912 Q_EMIT self()->reparseSlaveConfiguration(QString()); 0913 } 0914 0915 #ifndef KIO_ANDROID_STUB 0916 void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &) 0917 { 0918 if (m_ignoreConfigReparse) { 0919 // qDebug() << "Ignoring signal sent by myself"; 0920 m_ignoreConfigReparse = false; 0921 return; 0922 } 0923 0924 // qDebug() << "proto=" << proto; 0925 KProtocolManager::reparseConfiguration(); 0926 WorkerConfig::self()->reset(); 0927 sessionData.reset(); 0928 NetRC::self()->reload(); 0929 0930 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto); 0931 QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd(); 0932 0933 // not found? 0934 if (it == endIt) { 0935 return; 0936 } 0937 0938 if (!proto.isEmpty()) { 0939 endIt = it; 0940 ++endIt; 0941 } 0942 0943 for (; it != endIt; ++it) { 0944 const QList<KIO::Slave *> list = it.value()->allSlaves(); 0945 for (Slave *slave : list) { 0946 slave->send(CMD_REPARSECONFIGURATION); 0947 slave->resetHost(); 0948 } 0949 } 0950 } 0951 #endif 0952 0953 void SchedulerPrivate::doJob(SimpleJob *job) 0954 { 0955 // qDebug() << job; 0956 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0957 jobPriv->m_proxyList.clear(); 0958 jobPriv->m_protocol = KProtocolManager::workerProtocol(job->url(), jobPriv->m_proxyList); 0959 0960 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host()); 0961 proto->queueJob(job); 0962 } 0963 0964 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5) 0965 void SchedulerPrivate::scheduleJob(SimpleJob *job) 0966 { 0967 // qDebug() << job; 0968 setJobPriority(job, 1); 0969 } 0970 #endif 0971 0972 void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority) 0973 { 0974 // qDebug() << job << priority; 0975 const QString protocol = SimpleJobPrivate::get(job)->m_protocol; 0976 if (!protocol.isEmpty()) { 0977 ProtoQueue *proto = protoQ(protocol, job->url().host()); 0978 proto->changeJobPriority(job, priority); 0979 } 0980 } 0981 0982 void SchedulerPrivate::cancelJob(SimpleJob *job) 0983 { 0984 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 0985 // this method is called all over the place in job.cpp, so just do this check here to avoid 0986 // much boilerplate in job code. 0987 if (jobPriv->m_schedSerial == 0) { 0988 // qDebug() << "Doing nothing because I don't know job" << job; 0989 return; 0990 } 0991 Slave *slave = jobSlave(job); 0992 // qDebug() << job << slave; 0993 jobFinished(job, slave); 0994 if (slave) { 0995 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol); 0996 if (pq) { 0997 pq->removeSlave(slave); 0998 } 0999 slave->kill(); // don't use slave after this! 1000 } 1001 } 1002 1003 void SchedulerPrivate::jobFinished(SimpleJob *job, Slave *slave) 1004 { 1005 // qDebug() << job << slave; 1006 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 1007 1008 // make sure that we knew about the job! 1009 Q_ASSERT(jobPriv->m_schedSerial); 1010 1011 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol); 1012 if (pq) { 1013 pq->removeJob(job); 1014 } 1015 1016 if (slave) { 1017 // If we have internal meta-data, tell existing KIO workers to reload 1018 // their configuration. 1019 if (jobPriv->m_internalMetaData.count()) { 1020 // qDebug() << "Updating KIO workers with new internal metadata information"; 1021 ProtoQueue *queue = m_protocols.value(slave->protocol()); 1022 if (queue) { 1023 const QList<Slave *> slaves = queue->allSlaves(); 1024 for (auto *runningSlave : slaves) { 1025 if (slave->host() == runningSlave->host()) { 1026 slave->setConfig(metaDataFor(slave->protocol(), jobPriv->m_proxyList, job->url())); 1027 /*qDebug() << "Updated configuration of" << slave->protocol() 1028 << "KIO worker, pid=" << slave->slave_pid();*/ 1029 } 1030 } 1031 } 1032 } 1033 slave->setJob(nullptr); 1034 slave->disconnect(job); 1035 } 1036 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again 1037 jobPriv->m_slave = nullptr; 1038 // Clear the values in the internal metadata container since they have 1039 // already been taken care of above... 1040 jobPriv->m_internalMetaData.clear(); 1041 } 1042 1043 MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url) 1044 { 1045 const QString host = url.host(); 1046 MetaData configData = WorkerConfig::self()->configData(protocol, host); 1047 sessionData.configDataFor(configData, protocol, host); 1048 if (proxyList.isEmpty()) { 1049 configData.remove(QStringLiteral("UseProxy")); 1050 configData.remove(QStringLiteral("ProxyUrls")); 1051 } else { 1052 configData[QStringLiteral("UseProxy")] = proxyList.first(); 1053 configData[QStringLiteral("ProxyUrls")] = proxyList.join(QLatin1Char(',')); 1054 } 1055 1056 if (configData.contains(QLatin1String("EnableAutoLogin")) 1057 && configData.value(QStringLiteral("EnableAutoLogin")).compare(QLatin1String("true"), Qt::CaseInsensitive) == 0) { 1058 NetRC::AutoLogin l; 1059 l.login = url.userName(); 1060 bool usern = (protocol == QLatin1String("ftp")); 1061 if (NetRC::self()->lookup(url, l, usern)) { 1062 configData[QStringLiteral("autoLoginUser")] = l.login; 1063 configData[QStringLiteral("autoLoginPass")] = l.password; 1064 if (usern) { 1065 QString macdef; 1066 QMap<QString, QStringList>::ConstIterator it = l.macdef.constBegin(); 1067 for (; it != l.macdef.constEnd(); ++it) { 1068 macdef += it.key() + QLatin1Char('\\') + it.value().join(QLatin1Char('\\')) + QLatin1Char('\n'); 1069 } 1070 configData[QStringLiteral("autoLoginMacro")] = macdef; 1071 } 1072 } 1073 } 1074 1075 return configData; 1076 } 1077 1078 void SchedulerPrivate::setupSlave(KIO::Slave *slave, 1079 const QUrl &url, 1080 const QString &protocol, 1081 const QStringList &proxyList, 1082 bool newSlave, 1083 const KIO::MetaData *config) 1084 { 1085 int port = url.port(); 1086 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. 1087 port = 0; 1088 } 1089 const QString host = url.host(); 1090 const QString user = url.userName(); 1091 const QString passwd = url.password(); 1092 1093 if (newSlave || slave->host() != host || slave->port() != port || slave->user() != user || slave->passwd() != passwd) { 1094 MetaData configData = metaDataFor(protocol, proxyList, url); 1095 if (config) { 1096 configData += *config; 1097 } 1098 1099 slave->setConfig(configData); 1100 slave->setProtocol(url.scheme()); 1101 slave->setHost(host, port, user, passwd); 1102 } 1103 } 1104 1105 void SchedulerPrivate::slotSlaveDied(KIO::Slave *slave) 1106 { 1107 // qDebug() << slave; 1108 Q_ASSERT(slave); 1109 Q_ASSERT(!slave->isAlive()); 1110 ProtoQueue *pq = m_protocols.value(slave->protocol()); 1111 if (pq) { 1112 if (slave->job()) { 1113 pq->removeJob(slave->job()); 1114 } 1115 // in case this was a connected slave... 1116 pq->removeSlave(slave); 1117 } 1118 if (slave == m_slaveOnHold) { 1119 m_slaveOnHold = nullptr; 1120 m_urlOnHold.clear(); 1121 } 1122 // can't use slave->deref() here because we need to use deleteLater 1123 slave->aboutToDelete(); 1124 slave->deleteLater(); 1125 } 1126 1127 void SchedulerPrivate::putSlaveOnHold(KIO::SimpleJob *job, const QUrl &url) 1128 { 1129 Slave *slave = jobSlave(job); 1130 // qDebug() << job << url << slave; 1131 slave->disconnect(job); 1132 // prevent the fake death of the slave from trying to kill the job again; 1133 // cf. Slave::hold(const QUrl &url) called in SchedulerPrivate::publishSlaveOnHold(). 1134 slave->setJob(nullptr); 1135 SimpleJobPrivate::get(job)->m_slave = nullptr; 1136 1137 if (m_slaveOnHold) { 1138 m_slaveOnHold->kill(); 1139 } 1140 m_slaveOnHold = slave; 1141 m_urlOnHold = url; 1142 m_slaveOnHold->suspend(); 1143 } 1144 1145 bool SchedulerPrivate::isSlaveOnHoldFor(const QUrl &url) 1146 { 1147 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) { 1148 return true; 1149 } 1150 1151 return false; 1152 } 1153 1154 Slave *SchedulerPrivate::heldSlaveForJob(SimpleJob *job) 1155 { 1156 Slave *slave = nullptr; 1157 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 1158 1159 if (m_slaveOnHold) { 1160 // Make sure that the job wants to do a GET or a POST, and with no offset 1161 const int cmd = jobPriv->m_command; 1162 bool canJobReuse = (cmd == CMD_GET || cmd == CMD_MULTI_GET); 1163 1164 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) { 1165 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL); 1166 if (canJobReuse) { 1167 KIO::MetaData outgoing = tJob->outgoingMetaData(); 1168 const QString resume = outgoing.value(QStringLiteral("resume")); 1169 const QString rangeStart = outgoing.value(QStringLiteral("range-start")); 1170 // qDebug() << "Resume metadata is" << resume; 1171 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0')); 1172 } 1173 } 1174 1175 if (job->url() == m_urlOnHold) { 1176 if (canJobReuse) { 1177 // qDebug() << "HOLD: Reusing held slave (" << m_slaveOnHold << ")"; 1178 slave = m_slaveOnHold; 1179 } else { 1180 // qDebug() << "HOLD: Discarding held slave (" << m_slaveOnHold << ")"; 1181 m_slaveOnHold->kill(); 1182 } 1183 m_slaveOnHold = nullptr; 1184 m_urlOnHold.clear(); 1185 } 1186 } 1187 1188 return slave; 1189 } 1190 1191 void SchedulerPrivate::removeSlaveOnHold() 1192 { 1193 // qDebug() << m_slaveOnHold; 1194 if (m_slaveOnHold) { 1195 m_slaveOnHold->kill(); 1196 } 1197 m_slaveOnHold = nullptr; 1198 m_urlOnHold.clear(); 1199 } 1200 1201 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 1202 Slave *SchedulerPrivate::getConnectedSlave(const QUrl &url, const KIO::MetaData &config) 1203 { 1204 QStringList proxyList; 1205 const QString protocol = KProtocolManager::workerProtocol(url, proxyList); 1206 ProtoQueue *pq = protoQ(protocol, url.host()); 1207 1208 Slave *slave = pq->createSlave(protocol, /* job */ nullptr, url); 1209 if (slave) { 1210 setupSlave(slave, url, protocol, proxyList, true, &config); 1211 pq->m_connectedSlaveQueue.addSlave(slave); 1212 1213 slave->send(CMD_CONNECT); 1214 q->connect(slave, SIGNAL(connected()), SLOT(slotSlaveConnected())); 1215 q->connect(slave, SIGNAL(error(int, QString)), SLOT(slotSlaveError(int, QString))); 1216 } 1217 // qDebug() << url << slave; 1218 return slave; 1219 } 1220 1221 void SchedulerPrivate::slotSlaveConnected() 1222 { 1223 // qDebug(); 1224 Slave *slave = static_cast<Slave *>(q->sender()); 1225 slave->setConnected(true); 1226 q->disconnect(slave, SIGNAL(connected()), q, SLOT(slotSlaveConnected())); 1227 Q_EMIT q->slaveConnected(slave); 1228 } 1229 1230 void SchedulerPrivate::slotSlaveError(int errorNr, const QString &errorMsg) 1231 { 1232 Slave *slave = static_cast<Slave *>(q->sender()); 1233 // qDebug() << slave << errorNr << errorMsg; 1234 ProtoQueue *pq = protoQ(slave->protocol(), slave->host()); 1235 if (!slave->isConnected() || pq->m_connectedSlaveQueue.isIdle(slave)) { 1236 // Only forward to application if slave is idle or still connecting. 1237 // ### KDE5: can we remove this apparently arbitrary behavior and just always emit SlaveError? 1238 Q_EMIT q->slaveError(slave, errorNr, errorMsg); 1239 } 1240 } 1241 #endif 1242 1243 ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host) 1244 { 1245 ProtoQueue *pq = m_protocols.value(protocol, nullptr); 1246 if (!pq) { 1247 // qDebug() << "creating ProtoQueue instance for" << protocol; 1248 1249 const int maxWorkers = KProtocolInfo::maxWorkers(protocol); 1250 int maxWorkersPerHost = -1; 1251 if (!host.isEmpty()) { 1252 bool ok = false; 1253 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok); 1254 if (ok) { 1255 maxWorkersPerHost = value; 1256 } 1257 } 1258 if (maxWorkersPerHost == -1) { 1259 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol); 1260 } 1261 // Never allow maxWorkersPerHost to exceed maxWorkers. 1262 pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost)); 1263 m_protocols.insert(protocol, pq); 1264 } 1265 return pq; 1266 } 1267 1268 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91) 1269 bool SchedulerPrivate::assignJobToSlave(KIO::Slave *slave, SimpleJob *job) 1270 { 1271 // qDebug() << slave << job; 1272 // KDE5: queueing of jobs can probably be removed, it provides very little benefit 1273 ProtoQueue *pq = m_protocols.value(slave->protocol()); 1274 if (pq) { 1275 pq->removeJob(job); 1276 return pq->m_connectedSlaveQueue.queueJob(job, slave); 1277 } 1278 return false; 1279 } 1280 1281 bool SchedulerPrivate::disconnectSlave(KIO::Slave *slave) 1282 { 1283 // qDebug() << slave; 1284 ProtoQueue *pq = m_protocols.value(slave->protocol()); 1285 return (pq ? pq->m_connectedSlaveQueue.removeSlave(slave) : false); 1286 } 1287 #endif 1288 1289 void SchedulerPrivate::updateInternalMetaData(SimpleJob *job) 1290 { 1291 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 1292 // Preserve all internal meta-data so they can be sent back to the 1293 // KIO workers as needed... 1294 const QUrl jobUrl = job->url(); 1295 1296 const QLatin1String currHostToken("{internal~currenthost}"); 1297 const QLatin1String allHostsToken("{internal~allhosts}"); 1298 // qDebug() << job << jobPriv->m_internalMetaData; 1299 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData); 1300 while (it.hasNext()) { 1301 it.next(); 1302 if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) { 1303 WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value()); 1304 } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) { 1305 WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value()); 1306 } 1307 } 1308 } 1309 1310 #include "moc_scheduler.cpp" 1311 #include "moc_scheduler_p.cpp"