File indexing completed on 2024-06-02 05:05:25
0001 /* 0002 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com> 0003 0004 SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 #include "socketmonitor.h" 0007 #include "downloadthread.h" 0008 #include "trafficshapedsocket.h" 0009 #include "uploadthread.h" 0010 0011 #include <torrent/globals.h> 0012 #include <unistd.h> 0013 #include <util/functions.h> 0014 #include <util/log.h> 0015 0016 #include <QRecursiveMutex> 0017 #include <cmath> 0018 0019 using namespace bt; 0020 0021 namespace net 0022 { 0023 SocketMonitor SocketMonitor::self; 0024 0025 class SocketMonitor::Private 0026 { 0027 public: 0028 Private(SocketMonitor *p) 0029 : mutex() 0030 , ut(nullptr) 0031 , dt(nullptr) 0032 , next_group_id(1) 0033 { 0034 dt = new DownloadThread(p); 0035 ut = new UploadThread(p); 0036 } 0037 0038 ~Private() 0039 { 0040 shutdown(); 0041 } 0042 0043 void shutdown(); 0044 0045 QRecursiveMutex mutex; 0046 UploadThread *ut; 0047 DownloadThread *dt; 0048 Uint32 next_group_id; 0049 }; 0050 0051 SocketMonitor::SocketMonitor() 0052 : d(new Private(this)) 0053 { 0054 } 0055 0056 SocketMonitor::~SocketMonitor() 0057 { 0058 delete d; 0059 } 0060 0061 void SocketMonitor::shutdown() 0062 { 0063 d->shutdown(); 0064 } 0065 0066 void SocketMonitor::Private::shutdown() 0067 { 0068 if (ut && ut->isRunning()) { 0069 ut->stop(); 0070 ut->signalDataReady(); // kick it in the nuts, if the thread is waiting for data 0071 if (!ut->wait(250)) { 0072 ut->terminate(); 0073 ut->wait(); 0074 } 0075 } 0076 0077 if (dt && dt->isRunning()) { 0078 dt->stop(); 0079 dt->wakeUp(); // wake it up if necessary 0080 if (!dt->wait(250)) { 0081 dt->terminate(); 0082 dt->wait(); 0083 } 0084 } 0085 0086 delete ut; 0087 delete dt; 0088 ut = nullptr; 0089 dt = nullptr; 0090 } 0091 0092 void SocketMonitor::lock() 0093 { 0094 d->mutex.lock(); 0095 } 0096 0097 void SocketMonitor::unlock() 0098 { 0099 d->mutex.unlock(); 0100 } 0101 0102 void SocketMonitor::setDownloadCap(Uint32 bytes_per_sec) 0103 { 0104 DownloadThread::setCap(bytes_per_sec); 0105 } 0106 0107 Uint32 SocketMonitor::getDownloadCap() 0108 { 0109 return DownloadThread::cap(); 0110 } 0111 0112 void SocketMonitor::setUploadCap(Uint32 bytes_per_sec) 0113 { 0114 UploadThread::setCap(bytes_per_sec); 0115 } 0116 0117 Uint32 SocketMonitor::getUploadCap() 0118 { 0119 return UploadThread::cap(); 0120 } 0121 0122 void SocketMonitor::setSleepTime(Uint32 sleep_time) 0123 { 0124 DownloadThread::setSleepTime(sleep_time); 0125 UploadThread::setSleepTime(sleep_time); 0126 } 0127 0128 void SocketMonitor::add(TrafficShapedSocket *sock) 0129 { 0130 QMutexLocker lock(&d->mutex); 0131 0132 if (!d->dt || !d->ut) 0133 return; 0134 0135 bool start_threads = sockets.size() == 0; 0136 sockets.push_back(sock); 0137 0138 if (start_threads) { 0139 Out(SYS_CON | LOG_DEBUG) << "Starting socketmonitor threads" << endl; 0140 0141 if (!d->dt->isRunning()) 0142 d->dt->start(QThread::IdlePriority); 0143 if (!d->ut->isRunning()) 0144 d->ut->start(QThread::IdlePriority); 0145 } 0146 // wake up download thread so that it can start polling the new socket 0147 d->dt->wakeUp(); 0148 } 0149 0150 void SocketMonitor::remove(TrafficShapedSocket *sock) 0151 { 0152 QMutexLocker lock(&d->mutex); 0153 if (sockets.size() == 0) 0154 return; 0155 0156 sockets.remove(sock); 0157 } 0158 0159 void SocketMonitor::signalPacketReady() 0160 { 0161 if (d->ut) 0162 d->ut->signalDataReady(); 0163 } 0164 0165 Uint32 SocketMonitor::newGroup(GroupType type, Uint32 limit, Uint32 assured_rate) 0166 { 0167 QMutexLocker lock(&d->mutex); 0168 if (!d->dt || !d->ut) 0169 return 0; 0170 0171 Uint32 gid = d->next_group_id++; 0172 if (type == UPLOAD_GROUP) 0173 d->ut->addGroup(gid, limit, assured_rate); 0174 else 0175 d->dt->addGroup(gid, limit, assured_rate); 0176 0177 return gid; 0178 } 0179 0180 void SocketMonitor::setGroupLimit(GroupType type, Uint32 gid, Uint32 limit) 0181 { 0182 QMutexLocker lock(&d->mutex); 0183 if (!d->dt || !d->ut) 0184 return; 0185 0186 if (type == UPLOAD_GROUP) 0187 d->ut->setGroupLimit(gid, limit); 0188 else 0189 d->dt->setGroupLimit(gid, limit); 0190 } 0191 0192 void SocketMonitor::setGroupAssuredRate(GroupType type, Uint32 gid, Uint32 as) 0193 { 0194 QMutexLocker lock(&d->mutex); 0195 if (!d->dt || !d->ut) 0196 return; 0197 0198 if (type == UPLOAD_GROUP) 0199 d->ut->setGroupAssuredRate(gid, as); 0200 else 0201 d->dt->setGroupAssuredRate(gid, as); 0202 } 0203 0204 void SocketMonitor::removeGroup(GroupType type, Uint32 gid) 0205 { 0206 QMutexLocker lock(&d->mutex); 0207 if (!d->dt || !d->ut) 0208 return; 0209 0210 if (type == UPLOAD_GROUP) 0211 d->ut->removeGroup(gid); 0212 else 0213 d->dt->removeGroup(gid); 0214 } 0215 0216 }