File indexing completed on 2025-01-05 04:37:34

0001 /*
0002     SPDX-FileCopyrightText: 2009 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 
0007 #include "utpserver.h"
0008 #include "utpserver_p.h"
0009 
0010 #include <QCoreApplication>
0011 #include <QEvent>
0012 #include <QHostAddress>
0013 #include <QRandomGenerator>
0014 #include <QTimer>
0015 
0016 #include <cstdlib>
0017 
0018 #ifndef Q_CC_MSVC
0019 #include <sys/select.h>
0020 #endif
0021 #include "utpprotocol.h"
0022 #include "utpserverthread.h"
0023 #include "utpsocket.h"
0024 #include <ctime>
0025 #include <mse/encryptedpacketsocket.h>
0026 #include <net/portlist.h>
0027 #include <torrent/globals.h>
0028 #include <util/constants.h>
0029 #include <util/log.h>
0030 
0031 #ifdef Q_WS_WIN
0032 #include <util/win32.h>
0033 #endif
0034 
0035 using namespace bt;
0036 
0037 namespace utp
0038 {
0039 MainThreadCall::MainThreadCall(UTPServer *server)
0040     : server(server)
0041 {
0042 }
0043 
0044 MainThreadCall::~MainThreadCall()
0045 {
0046 }
0047 
0048 void MainThreadCall::handlePendingConnections()
0049 {
0050     server->handlePendingConnections();
0051 }
0052 
0053 ///////////////////////////////////////////////////////////
0054 
0055 UTPServer::Private::Private(UTPServer *p)
0056     : p(p)
0057     , running(false)
0058     , utp_thread(nullptr)
0059     , mutex()
0060     , create_sockets(true)
0061     , tos(0)
0062     , mtc(new MainThreadCall(p))
0063     , timer(new QTimer)
0064 {
0065     QObject::connect(p, &UTPServer::handlePendingConnectionsDelayed, mtc, &MainThreadCall::handlePendingConnections, Qt::QueuedConnection);
0066 
0067     poll_pipes.setAutoDelete(true);
0068 }
0069 
0070 UTPServer::Private::~Private()
0071 {
0072     if (running)
0073         stop();
0074 
0075     pending.clear();
0076     delete mtc;
0077     timer->deleteLater();
0078 }
0079 
0080 void UTPServer::Private::stop()
0081 {
0082     QTimer::singleShot(0, timer, &QTimer::stop); // kill in its own thread
0083     running = false;
0084     if (utp_thread) {
0085         utp_thread->exit();
0086         utp_thread->wait();
0087         delete utp_thread;
0088         utp_thread = nullptr;
0089     }
0090 
0091     connections.clear();
0092 
0093     // Close the socket
0094     sockets.clear();
0095     Globals::instance().getPortList().removePort(port, net::UDP);
0096 }
0097 
0098 bool UTPServer::Private::bind(const net::Address &addr)
0099 {
0100     net::ServerSocket::Ptr sock(new net::ServerSocket(this));
0101     if (!sock->bind(addr)) {
0102         return false;
0103     } else {
0104         Out(SYS_UTP | LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl;
0105         sock->setTOS(tos);
0106         sock->setReadNotificationsEnabled(false);
0107         sock->setWriteNotificationsEnabled(false);
0108         sockets.append(sock);
0109         return true;
0110     }
0111 }
0112 
0113 void UTPServer::Private::syn(const PacketParser &parser, bt::Buffer::Ptr buffer, const net::Address &addr)
0114 {
0115     const Header *hdr = parser.header();
0116     quint16 recv_conn_id = hdr->connection_id + 1;
0117     if (connections.contains(recv_conn_id)) {
0118         // Send a reset packet if the ID is in use
0119         Connection::Ptr conn(new Connection(recv_conn_id, Connection::INCOMING, addr, p));
0120         conn->setWeakPointer(conn);
0121         conn->sendReset();
0122     } else {
0123         Connection::Ptr conn(new Connection(recv_conn_id, Connection::INCOMING, addr, p));
0124         try {
0125             conn->setWeakPointer(conn);
0126             conn->handlePacket(parser, buffer);
0127             connections.insert(recv_conn_id, conn);
0128             if (create_sockets) {
0129                 UTPSocket *utps = new UTPSocket(conn);
0130                 mse::EncryptedPacketSocket::Ptr ss(new mse::EncryptedPacketSocket(utps));
0131                 {
0132                     QMutexLocker lock(&pending_mutex);
0133                     pending.append(ss);
0134                 }
0135                 p->handlePendingConnectionsDelayed();
0136             } else {
0137                 last_accepted.append(conn);
0138                 p->accepted();
0139             }
0140         } catch (Connection::TransmissionError &err) {
0141             Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl;
0142             connections.remove(recv_conn_id);
0143         }
0144     }
0145 }
0146 
0147 void UTPServer::Private::reset(const utp::Header *hdr)
0148 {
0149     Connection::Ptr c = find(hdr->connection_id);
0150     if (c) {
0151         c->reset();
0152     }
0153 }
0154 
0155 Connection::Ptr UTPServer::Private::find(quint16 conn_id)
0156 {
0157     ConnectionMapItr i = connections.find(conn_id);
0158     if (i != connections.end())
0159         return i.value();
0160     else
0161         return Connection::Ptr();
0162 }
0163 
0164 void UTPServer::Private::wakeUpPollPipes(utp::Connection::Ptr conn, bool readable, bool writeable)
0165 {
0166     QMutexLocker lock(&mutex);
0167     for (PollPipePairItr itr = poll_pipes.begin(); itr != poll_pipes.end(); ++itr) {
0168         PollPipePair *pp = itr->second;
0169         if (readable && pp->read_pipe->polling(conn->receiveConnectionID()))
0170             itr->second->read_pipe->wakeUp();
0171 
0172         if (writeable && pp->write_pipe->polling(conn->receiveConnectionID()))
0173             itr->second->write_pipe->wakeUp();
0174     }
0175 }
0176 
0177 void UTPServer::Private::dataReceived(bt::Buffer::Ptr buffer, const net::Address &addr)
0178 {
0179     QMutexLocker lock(&mutex);
0180     // Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl;
0181     try {
0182         if (buffer->size() >= utp::Header::size()) { // discard packets which are to small
0183             p->handlePacket(buffer, addr);
0184         }
0185     } catch (utp::Connection::TransmissionError &err) {
0186         Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl;
0187     }
0188 }
0189 
0190 void UTPServer::Private::readyToWrite(net::ServerSocket *sock)
0191 {
0192     output_queue.send(sock);
0193 }
0194 
0195 ///////////////////////////////////////////////////////////
0196 
0197 UTPServer::UTPServer(QObject *parent)
0198     : ServerInterface(parent)
0199     , d(new Private(this))
0200 
0201 {
0202     connect(d->timer, &QTimer::timeout, this, &UTPServer::checkTimeouts);
0203 }
0204 
0205 UTPServer::~UTPServer()
0206 {
0207     delete d;
0208 }
0209 
0210 void UTPServer::handlePendingConnections()
0211 {
0212     // This should be called from the main thread
0213     QList<mse::EncryptedPacketSocket::Ptr> p;
0214     {
0215         QMutexLocker lock(&d->pending_mutex);
0216         // Copy the pending list and clear it before using it's contents to avoid a deadlock
0217         p = d->pending;
0218         d->pending.clear();
0219     }
0220 
0221     for (const mse::EncryptedPacketSocket::Ptr &s : std::as_const(p)) {
0222         newConnection(s);
0223     }
0224 }
0225 
0226 bool UTPServer::changePort(bt::Uint16 p)
0227 {
0228     if (d->sockets.count() > 0 && port == p)
0229         return true;
0230 
0231     Globals::instance().getPortList().removePort(port, net::UDP);
0232     d->sockets.clear();
0233 
0234     const QStringList possible = bindAddresses();
0235     for (const QString &addr : possible) {
0236         d->bind(net::Address(addr, p));
0237     }
0238 
0239     if (d->sockets.count() == 0) {
0240         // Try any addresses if previous binds failed
0241         d->bind(net::Address(QHostAddress(QHostAddress::AnyIPv6).toString(), p));
0242         d->bind(net::Address(QHostAddress(QHostAddress::Any).toString(), p));
0243     }
0244 
0245     if (d->sockets.count()) {
0246         Globals::instance().getPortList().addNewPort(p, net::UDP, true);
0247         return true;
0248     } else
0249         return false;
0250 }
0251 
0252 void UTPServer::setTOS(Uint8 type_of_service)
0253 {
0254     d->tos = type_of_service;
0255     for (net::ServerSocket::Ptr sock : std::as_const(d->sockets))
0256         sock->setTOS(d->tos);
0257 }
0258 
0259 void UTPServer::threadStarted()
0260 {
0261     d->timer->start(500);
0262     for (net::ServerSocket::Ptr sock : std::as_const(d->sockets)) {
0263         sock->setReadNotificationsEnabled(true);
0264     }
0265 }
0266 
0267 #if 0
0268 static void Dump(const QByteArray & data, const net::Address& addr)
0269 {
0270     Out(SYS_UTP | LOG_DEBUG) << QString("Received packet from %1 (%2 bytes)").arg(addr.toString()).arg(data.size()) << endl;
0271     const bt::Uint8* pkt = (const bt::Uint8*)data.data();
0272 
0273     QString line;
0274     for (int i = 0; i < data.size(); i++) {
0275         if (i > 0 && i % 32 == 0) {
0276             Out(SYS_UTP | LOG_DEBUG) << line << endl;
0277             line = "";
0278         }
0279 
0280         uint val = pkt[i];
0281         line += QString("%1").arg(val, 2, 16, QChar('0'));
0282         if (i + 1 % 4)
0283             line += ' ';
0284     }
0285     Out(SYS_UTP | LOG_DEBUG) << line << endl;
0286 }
0287 
0288 static void DumpPacket(const Header & hdr)
0289 {
0290     Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl;
0291     Out(SYS_UTP | LOG_NOTICE) << "UTP: Packet Header: " << endl;
0292     Out(SYS_UTP | LOG_NOTICE) << "type:                              " << TypeToString(hdr.type) << endl;
0293     Out(SYS_UTP | LOG_NOTICE) << "version:                           " << hdr.version << endl;
0294     Out(SYS_UTP | LOG_NOTICE) << "extension:                         " << hdr.extension << endl;
0295     Out(SYS_UTP | LOG_NOTICE) << "connection_id:                     " << hdr.connection_id << endl;
0296     Out(SYS_UTP | LOG_NOTICE) << "timestamp_microseconds:            " << hdr.timestamp_microseconds << endl;
0297     Out(SYS_UTP | LOG_NOTICE) << "timestamp_difference_microseconds: " << hdr.timestamp_difference_microseconds << endl;
0298     Out(SYS_UTP | LOG_NOTICE) << "wnd_size:                          " << hdr.wnd_size << endl;
0299     Out(SYS_UTP | LOG_NOTICE) << "seq_nr:                            " << hdr.seq_nr << endl;
0300     Out(SYS_UTP | LOG_NOTICE) << "ack_nr:                            " << hdr.ack_nr << endl;
0301     Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl;
0302 }
0303 #endif
0304 
0305 void UTPServer::handlePacket(bt::Buffer::Ptr buffer, const net::Address &addr)
0306 {
0307     PacketParser parser(buffer->get(), buffer->size());
0308     if (!parser.parse())
0309         return;
0310 
0311     const Header *hdr = parser.header();
0312     // Dump(packet,addr);
0313     // DumpPacket(*hdr);
0314     Connection::Ptr c;
0315     switch (hdr->type) {
0316     case ST_DATA:
0317     case ST_FIN:
0318     case ST_STATE:
0319         try {
0320             c = d->find(hdr->connection_id);
0321             if (c && c->handlePacket(parser, buffer) == CS_CLOSED) {
0322                 d->connections.remove(c->receiveConnectionID());
0323             }
0324         } catch (Connection::TransmissionError &err) {
0325             Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl;
0326             if (c)
0327                 c->close();
0328         }
0329         break;
0330     case ST_RESET:
0331         d->reset(hdr);
0332         break;
0333     case ST_SYN:
0334         d->syn(parser, buffer, addr);
0335         break;
0336     }
0337 }
0338 
0339 bool UTPServer::sendTo(utp::Connection::Ptr conn, const PacketBuffer &packet)
0340 {
0341     if (d->output_queue.add(packet, conn) == 1) {
0342         // If there is only one packet queued,
0343         // We need to enable the write notifiers, use the event queue to do this
0344         // if we are not in the utp thread
0345         if (QThread::currentThread() != d->utp_thread) {
0346             QCoreApplication::postEvent(this, new QEvent(QEvent::User));
0347         } else {
0348             for (net::ServerSocket::Ptr sock : std::as_const(d->sockets))
0349                 sock->setWriteNotificationsEnabled(true);
0350         }
0351     }
0352     return true;
0353 }
0354 
0355 void UTPServer::customEvent(QEvent *ev)
0356 {
0357     if (ev->type() == QEvent::User) {
0358         for (net::ServerSocket::Ptr sock : std::as_const(d->sockets))
0359             sock->setWriteNotificationsEnabled(true);
0360     }
0361 }
0362 
0363 Connection::WPtr UTPServer::connectTo(const net::Address &addr)
0364 {
0365     if (d->sockets.isEmpty() || addr.port() == 0)
0366         return Connection::WPtr();
0367 
0368     QMutexLocker lock(&d->mutex);
0369     quint16 recv_conn_id = QRandomGenerator::global()->bounded(32535);
0370     while (d->connections.contains(recv_conn_id))
0371         recv_conn_id = QRandomGenerator::global()->bounded(32535);
0372 
0373     Connection::Ptr conn(new Connection(recv_conn_id, Connection::OUTGOING, addr, this));
0374     conn->setWeakPointer(conn);
0375     conn->moveToThread(d->utp_thread);
0376     d->connections.insert(recv_conn_id, conn);
0377     try {
0378         conn->startConnecting();
0379         return conn;
0380     } catch (Connection::TransmissionError &err) {
0381         d->connections.remove(recv_conn_id);
0382         return Connection::WPtr();
0383     }
0384 }
0385 
0386 void UTPServer::stop()
0387 {
0388     d->stop();
0389     PacketBuffer::clearPool();
0390 }
0391 
0392 void UTPServer::start()
0393 {
0394     if (!d->utp_thread) {
0395         d->utp_thread = new UTPServerThread(this);
0396         for (const net::ServerSocket::Ptr &sock : std::as_const(d->sockets))
0397             sock->moveToThread(d->utp_thread);
0398         d->timer->moveToThread(d->utp_thread);
0399         d->utp_thread->start();
0400     }
0401 }
0402 
0403 void UTPServer::preparePolling(net::Poll *p, net::Poll::Mode mode, utp::Connection::Ptr &conn)
0404 {
0405     QMutexLocker lock(&d->mutex);
0406     PollPipePair *pair = d->poll_pipes.find(p);
0407     if (!pair) {
0408         pair = new PollPipePair();
0409         d->poll_pipes.insert(p, pair);
0410     }
0411 
0412     if (mode == net::Poll::INPUT) {
0413         if (pair->read_pipe->wokenUp())
0414             return;
0415 
0416         if (conn->bytesAvailable() > 0 || conn->connectionState() == CS_CLOSED)
0417             pair->read_pipe->wakeUp();
0418         pair->read_pipe->prepare(p, conn->receiveConnectionID(), pair->read_pipe);
0419     } else {
0420         if (pair->write_pipe->wokenUp())
0421             return;
0422 
0423         if (conn->isWriteable())
0424             pair->write_pipe->wakeUp();
0425         pair->write_pipe->prepare(p, conn->receiveConnectionID(), pair->write_pipe);
0426     }
0427 }
0428 
0429 void UTPServer::stateChanged(utp::Connection::Ptr conn, bool readable, bool writeable)
0430 {
0431     d->wakeUpPollPipes(conn, readable, writeable);
0432 }
0433 
0434 void UTPServer::setCreateSockets(bool on)
0435 {
0436     d->create_sockets = on;
0437 }
0438 
0439 Connection::WPtr UTPServer::acceptedConnection()
0440 {
0441     if (d->last_accepted.isEmpty())
0442         return Connection::WPtr();
0443     else
0444         return d->last_accepted.takeFirst();
0445 }
0446 
0447 void UTPServer::closed(Connection::Ptr conn)
0448 {
0449     Q_UNUSED(conn);
0450     QTimer::singleShot(0, this, &UTPServer::cleanup);
0451 }
0452 
0453 void UTPServer::cleanup()
0454 {
0455     QMutexLocker lock(&d->mutex);
0456     QMap<quint16, Connection::Ptr>::iterator i = d->connections.begin();
0457     while (i != d->connections.end()) {
0458         if (i.value()->connectionState() == CS_CLOSED) {
0459             i = d->connections.erase(i);
0460         } else
0461             ++i;
0462     }
0463 }
0464 
0465 void UTPServer::checkTimeouts()
0466 {
0467     QMutexLocker lock(&d->mutex);
0468 
0469     TimeValue now;
0470     QMap<quint16, Connection::Ptr>::iterator itr = d->connections.begin();
0471     while (itr != d->connections.end()) {
0472         itr.value()->checkTimeout(now);
0473         ++itr;
0474     }
0475 }
0476 
0477 ///////////////////////////////////////////////////////
0478 
0479 PollPipePair::PollPipePair()
0480     : read_pipe(new PollPipe(net::Poll::INPUT))
0481     , write_pipe(new PollPipe(net::Poll::OUTPUT))
0482 {
0483 }
0484 }
0485 
0486 #include "moc_utpserver.cpp"
0487 #include "moc_utpserver_p.cpp"