File indexing completed on 2024-06-02 05:05:25

0001 /*
0002     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 #include "socketmonitor.h"
0007 #include "downloadthread.h"
0008 #include "trafficshapedsocket.h"
0009 #include "uploadthread.h"
0010 
0011 #include <torrent/globals.h>
0012 #include <unistd.h>
0013 #include <util/functions.h>
0014 #include <util/log.h>
0015 
0016 #include <QRecursiveMutex>
0017 #include <cmath>
0018 
0019 using namespace bt;
0020 
0021 namespace net
0022 {
0023 SocketMonitor SocketMonitor::self;
0024 
0025 class SocketMonitor::Private
0026 {
0027 public:
0028     Private(SocketMonitor *p)
0029         : mutex()
0030         , ut(nullptr)
0031         , dt(nullptr)
0032         , next_group_id(1)
0033     {
0034         dt = new DownloadThread(p);
0035         ut = new UploadThread(p);
0036     }
0037 
0038     ~Private()
0039     {
0040         shutdown();
0041     }
0042 
0043     void shutdown();
0044 
0045     QRecursiveMutex mutex;
0046     UploadThread *ut;
0047     DownloadThread *dt;
0048     Uint32 next_group_id;
0049 };
0050 
0051 SocketMonitor::SocketMonitor()
0052     : d(new Private(this))
0053 {
0054 }
0055 
0056 SocketMonitor::~SocketMonitor()
0057 {
0058     delete d;
0059 }
0060 
0061 void SocketMonitor::shutdown()
0062 {
0063     d->shutdown();
0064 }
0065 
0066 void SocketMonitor::Private::shutdown()
0067 {
0068     if (ut && ut->isRunning()) {
0069         ut->stop();
0070         ut->signalDataReady(); // kick it in the nuts, if the thread is waiting for data
0071         if (!ut->wait(250)) {
0072             ut->terminate();
0073             ut->wait();
0074         }
0075     }
0076 
0077     if (dt && dt->isRunning()) {
0078         dt->stop();
0079         dt->wakeUp(); // wake it up if necessary
0080         if (!dt->wait(250)) {
0081             dt->terminate();
0082             dt->wait();
0083         }
0084     }
0085 
0086     delete ut;
0087     delete dt;
0088     ut = nullptr;
0089     dt = nullptr;
0090 }
0091 
0092 void SocketMonitor::lock()
0093 {
0094     d->mutex.lock();
0095 }
0096 
0097 void SocketMonitor::unlock()
0098 {
0099     d->mutex.unlock();
0100 }
0101 
0102 void SocketMonitor::setDownloadCap(Uint32 bytes_per_sec)
0103 {
0104     DownloadThread::setCap(bytes_per_sec);
0105 }
0106 
0107 Uint32 SocketMonitor::getDownloadCap()
0108 {
0109     return DownloadThread::cap();
0110 }
0111 
0112 void SocketMonitor::setUploadCap(Uint32 bytes_per_sec)
0113 {
0114     UploadThread::setCap(bytes_per_sec);
0115 }
0116 
0117 Uint32 SocketMonitor::getUploadCap()
0118 {
0119     return UploadThread::cap();
0120 }
0121 
0122 void SocketMonitor::setSleepTime(Uint32 sleep_time)
0123 {
0124     DownloadThread::setSleepTime(sleep_time);
0125     UploadThread::setSleepTime(sleep_time);
0126 }
0127 
0128 void SocketMonitor::add(TrafficShapedSocket *sock)
0129 {
0130     QMutexLocker lock(&d->mutex);
0131 
0132     if (!d->dt || !d->ut)
0133         return;
0134 
0135     bool start_threads = sockets.size() == 0;
0136     sockets.push_back(sock);
0137 
0138     if (start_threads) {
0139         Out(SYS_CON | LOG_DEBUG) << "Starting socketmonitor threads" << endl;
0140 
0141         if (!d->dt->isRunning())
0142             d->dt->start(QThread::IdlePriority);
0143         if (!d->ut->isRunning())
0144             d->ut->start(QThread::IdlePriority);
0145     }
0146     // wake up download thread so that it can start polling the new socket
0147     d->dt->wakeUp();
0148 }
0149 
0150 void SocketMonitor::remove(TrafficShapedSocket *sock)
0151 {
0152     QMutexLocker lock(&d->mutex);
0153     if (sockets.size() == 0)
0154         return;
0155 
0156     sockets.remove(sock);
0157 }
0158 
0159 void SocketMonitor::signalPacketReady()
0160 {
0161     if (d->ut)
0162         d->ut->signalDataReady();
0163 }
0164 
0165 Uint32 SocketMonitor::newGroup(GroupType type, Uint32 limit, Uint32 assured_rate)
0166 {
0167     QMutexLocker lock(&d->mutex);
0168     if (!d->dt || !d->ut)
0169         return 0;
0170 
0171     Uint32 gid = d->next_group_id++;
0172     if (type == UPLOAD_GROUP)
0173         d->ut->addGroup(gid, limit, assured_rate);
0174     else
0175         d->dt->addGroup(gid, limit, assured_rate);
0176 
0177     return gid;
0178 }
0179 
0180 void SocketMonitor::setGroupLimit(GroupType type, Uint32 gid, Uint32 limit)
0181 {
0182     QMutexLocker lock(&d->mutex);
0183     if (!d->dt || !d->ut)
0184         return;
0185 
0186     if (type == UPLOAD_GROUP)
0187         d->ut->setGroupLimit(gid, limit);
0188     else
0189         d->dt->setGroupLimit(gid, limit);
0190 }
0191 
0192 void SocketMonitor::setGroupAssuredRate(GroupType type, Uint32 gid, Uint32 as)
0193 {
0194     QMutexLocker lock(&d->mutex);
0195     if (!d->dt || !d->ut)
0196         return;
0197 
0198     if (type == UPLOAD_GROUP)
0199         d->ut->setGroupAssuredRate(gid, as);
0200     else
0201         d->dt->setGroupAssuredRate(gid, as);
0202 }
0203 
0204 void SocketMonitor::removeGroup(GroupType type, Uint32 gid)
0205 {
0206     QMutexLocker lock(&d->mutex);
0207     if (!d->dt || !d->ut)
0208         return;
0209 
0210     if (type == UPLOAD_GROUP)
0211         d->ut->removeGroup(gid);
0212     else
0213         d->dt->removeGroup(gid);
0214 }
0215 
0216 }