File indexing completed on 2025-01-05 04:37:34
0001 /* 0002 SPDX-FileCopyrightText: 2009 Joris Guisson <joris.guisson@gmail.com> 0003 0004 SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 0007 #include "utpserver.h" 0008 #include "utpserver_p.h" 0009 0010 #include <QCoreApplication> 0011 #include <QEvent> 0012 #include <QHostAddress> 0013 #include <QRandomGenerator> 0014 #include <QTimer> 0015 0016 #include <cstdlib> 0017 0018 #ifndef Q_CC_MSVC 0019 #include <sys/select.h> 0020 #endif 0021 #include "utpprotocol.h" 0022 #include "utpserverthread.h" 0023 #include "utpsocket.h" 0024 #include <ctime> 0025 #include <mse/encryptedpacketsocket.h> 0026 #include <net/portlist.h> 0027 #include <torrent/globals.h> 0028 #include <util/constants.h> 0029 #include <util/log.h> 0030 0031 #ifdef Q_WS_WIN 0032 #include <util/win32.h> 0033 #endif 0034 0035 using namespace bt; 0036 0037 namespace utp 0038 { 0039 MainThreadCall::MainThreadCall(UTPServer *server) 0040 : server(server) 0041 { 0042 } 0043 0044 MainThreadCall::~MainThreadCall() 0045 { 0046 } 0047 0048 void MainThreadCall::handlePendingConnections() 0049 { 0050 server->handlePendingConnections(); 0051 } 0052 0053 /////////////////////////////////////////////////////////// 0054 0055 UTPServer::Private::Private(UTPServer *p) 0056 : p(p) 0057 , running(false) 0058 , utp_thread(nullptr) 0059 , mutex() 0060 , create_sockets(true) 0061 , tos(0) 0062 , mtc(new MainThreadCall(p)) 0063 , timer(new QTimer) 0064 { 0065 QObject::connect(p, &UTPServer::handlePendingConnectionsDelayed, mtc, &MainThreadCall::handlePendingConnections, Qt::QueuedConnection); 0066 0067 poll_pipes.setAutoDelete(true); 0068 } 0069 0070 UTPServer::Private::~Private() 0071 { 0072 if (running) 0073 stop(); 0074 0075 pending.clear(); 0076 delete mtc; 0077 timer->deleteLater(); 0078 } 0079 0080 void UTPServer::Private::stop() 0081 { 0082 QTimer::singleShot(0, timer, &QTimer::stop); // kill in its own thread 0083 running = false; 0084 if (utp_thread) { 0085 utp_thread->exit(); 0086 utp_thread->wait(); 0087 delete utp_thread; 0088 utp_thread = nullptr; 0089 } 0090 0091 connections.clear(); 0092 0093 // Close the socket 0094 sockets.clear(); 0095 Globals::instance().getPortList().removePort(port, net::UDP); 0096 } 0097 0098 bool UTPServer::Private::bind(const net::Address &addr) 0099 { 0100 net::ServerSocket::Ptr sock(new net::ServerSocket(this)); 0101 if (!sock->bind(addr)) { 0102 return false; 0103 } else { 0104 Out(SYS_UTP | LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl; 0105 sock->setTOS(tos); 0106 sock->setReadNotificationsEnabled(false); 0107 sock->setWriteNotificationsEnabled(false); 0108 sockets.append(sock); 0109 return true; 0110 } 0111 } 0112 0113 void UTPServer::Private::syn(const PacketParser &parser, bt::Buffer::Ptr buffer, const net::Address &addr) 0114 { 0115 const Header *hdr = parser.header(); 0116 quint16 recv_conn_id = hdr->connection_id + 1; 0117 if (connections.contains(recv_conn_id)) { 0118 // Send a reset packet if the ID is in use 0119 Connection::Ptr conn(new Connection(recv_conn_id, Connection::INCOMING, addr, p)); 0120 conn->setWeakPointer(conn); 0121 conn->sendReset(); 0122 } else { 0123 Connection::Ptr conn(new Connection(recv_conn_id, Connection::INCOMING, addr, p)); 0124 try { 0125 conn->setWeakPointer(conn); 0126 conn->handlePacket(parser, buffer); 0127 connections.insert(recv_conn_id, conn); 0128 if (create_sockets) { 0129 UTPSocket *utps = new UTPSocket(conn); 0130 mse::EncryptedPacketSocket::Ptr ss(new mse::EncryptedPacketSocket(utps)); 0131 { 0132 QMutexLocker lock(&pending_mutex); 0133 pending.append(ss); 0134 } 0135 p->handlePendingConnectionsDelayed(); 0136 } else { 0137 last_accepted.append(conn); 0138 p->accepted(); 0139 } 0140 } catch (Connection::TransmissionError &err) { 0141 Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl; 0142 connections.remove(recv_conn_id); 0143 } 0144 } 0145 } 0146 0147 void UTPServer::Private::reset(const utp::Header *hdr) 0148 { 0149 Connection::Ptr c = find(hdr->connection_id); 0150 if (c) { 0151 c->reset(); 0152 } 0153 } 0154 0155 Connection::Ptr UTPServer::Private::find(quint16 conn_id) 0156 { 0157 ConnectionMapItr i = connections.find(conn_id); 0158 if (i != connections.end()) 0159 return i.value(); 0160 else 0161 return Connection::Ptr(); 0162 } 0163 0164 void UTPServer::Private::wakeUpPollPipes(utp::Connection::Ptr conn, bool readable, bool writeable) 0165 { 0166 QMutexLocker lock(&mutex); 0167 for (PollPipePairItr itr = poll_pipes.begin(); itr != poll_pipes.end(); ++itr) { 0168 PollPipePair *pp = itr->second; 0169 if (readable && pp->read_pipe->polling(conn->receiveConnectionID())) 0170 itr->second->read_pipe->wakeUp(); 0171 0172 if (writeable && pp->write_pipe->polling(conn->receiveConnectionID())) 0173 itr->second->write_pipe->wakeUp(); 0174 } 0175 } 0176 0177 void UTPServer::Private::dataReceived(bt::Buffer::Ptr buffer, const net::Address &addr) 0178 { 0179 QMutexLocker lock(&mutex); 0180 // Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl; 0181 try { 0182 if (buffer->size() >= utp::Header::size()) { // discard packets which are to small 0183 p->handlePacket(buffer, addr); 0184 } 0185 } catch (utp::Connection::TransmissionError &err) { 0186 Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl; 0187 } 0188 } 0189 0190 void UTPServer::Private::readyToWrite(net::ServerSocket *sock) 0191 { 0192 output_queue.send(sock); 0193 } 0194 0195 /////////////////////////////////////////////////////////// 0196 0197 UTPServer::UTPServer(QObject *parent) 0198 : ServerInterface(parent) 0199 , d(new Private(this)) 0200 0201 { 0202 connect(d->timer, &QTimer::timeout, this, &UTPServer::checkTimeouts); 0203 } 0204 0205 UTPServer::~UTPServer() 0206 { 0207 delete d; 0208 } 0209 0210 void UTPServer::handlePendingConnections() 0211 { 0212 // This should be called from the main thread 0213 QList<mse::EncryptedPacketSocket::Ptr> p; 0214 { 0215 QMutexLocker lock(&d->pending_mutex); 0216 // Copy the pending list and clear it before using it's contents to avoid a deadlock 0217 p = d->pending; 0218 d->pending.clear(); 0219 } 0220 0221 for (const mse::EncryptedPacketSocket::Ptr &s : std::as_const(p)) { 0222 newConnection(s); 0223 } 0224 } 0225 0226 bool UTPServer::changePort(bt::Uint16 p) 0227 { 0228 if (d->sockets.count() > 0 && port == p) 0229 return true; 0230 0231 Globals::instance().getPortList().removePort(port, net::UDP); 0232 d->sockets.clear(); 0233 0234 const QStringList possible = bindAddresses(); 0235 for (const QString &addr : possible) { 0236 d->bind(net::Address(addr, p)); 0237 } 0238 0239 if (d->sockets.count() == 0) { 0240 // Try any addresses if previous binds failed 0241 d->bind(net::Address(QHostAddress(QHostAddress::AnyIPv6).toString(), p)); 0242 d->bind(net::Address(QHostAddress(QHostAddress::Any).toString(), p)); 0243 } 0244 0245 if (d->sockets.count()) { 0246 Globals::instance().getPortList().addNewPort(p, net::UDP, true); 0247 return true; 0248 } else 0249 return false; 0250 } 0251 0252 void UTPServer::setTOS(Uint8 type_of_service) 0253 { 0254 d->tos = type_of_service; 0255 for (net::ServerSocket::Ptr sock : std::as_const(d->sockets)) 0256 sock->setTOS(d->tos); 0257 } 0258 0259 void UTPServer::threadStarted() 0260 { 0261 d->timer->start(500); 0262 for (net::ServerSocket::Ptr sock : std::as_const(d->sockets)) { 0263 sock->setReadNotificationsEnabled(true); 0264 } 0265 } 0266 0267 #if 0 0268 static void Dump(const QByteArray & data, const net::Address& addr) 0269 { 0270 Out(SYS_UTP | LOG_DEBUG) << QString("Received packet from %1 (%2 bytes)").arg(addr.toString()).arg(data.size()) << endl; 0271 const bt::Uint8* pkt = (const bt::Uint8*)data.data(); 0272 0273 QString line; 0274 for (int i = 0; i < data.size(); i++) { 0275 if (i > 0 && i % 32 == 0) { 0276 Out(SYS_UTP | LOG_DEBUG) << line << endl; 0277 line = ""; 0278 } 0279 0280 uint val = pkt[i]; 0281 line += QString("%1").arg(val, 2, 16, QChar('0')); 0282 if (i + 1 % 4) 0283 line += ' '; 0284 } 0285 Out(SYS_UTP | LOG_DEBUG) << line << endl; 0286 } 0287 0288 static void DumpPacket(const Header & hdr) 0289 { 0290 Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl; 0291 Out(SYS_UTP | LOG_NOTICE) << "UTP: Packet Header: " << endl; 0292 Out(SYS_UTP | LOG_NOTICE) << "type: " << TypeToString(hdr.type) << endl; 0293 Out(SYS_UTP | LOG_NOTICE) << "version: " << hdr.version << endl; 0294 Out(SYS_UTP | LOG_NOTICE) << "extension: " << hdr.extension << endl; 0295 Out(SYS_UTP | LOG_NOTICE) << "connection_id: " << hdr.connection_id << endl; 0296 Out(SYS_UTP | LOG_NOTICE) << "timestamp_microseconds: " << hdr.timestamp_microseconds << endl; 0297 Out(SYS_UTP | LOG_NOTICE) << "timestamp_difference_microseconds: " << hdr.timestamp_difference_microseconds << endl; 0298 Out(SYS_UTP | LOG_NOTICE) << "wnd_size: " << hdr.wnd_size << endl; 0299 Out(SYS_UTP | LOG_NOTICE) << "seq_nr: " << hdr.seq_nr << endl; 0300 Out(SYS_UTP | LOG_NOTICE) << "ack_nr: " << hdr.ack_nr << endl; 0301 Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl; 0302 } 0303 #endif 0304 0305 void UTPServer::handlePacket(bt::Buffer::Ptr buffer, const net::Address &addr) 0306 { 0307 PacketParser parser(buffer->get(), buffer->size()); 0308 if (!parser.parse()) 0309 return; 0310 0311 const Header *hdr = parser.header(); 0312 // Dump(packet,addr); 0313 // DumpPacket(*hdr); 0314 Connection::Ptr c; 0315 switch (hdr->type) { 0316 case ST_DATA: 0317 case ST_FIN: 0318 case ST_STATE: 0319 try { 0320 c = d->find(hdr->connection_id); 0321 if (c && c->handlePacket(parser, buffer) == CS_CLOSED) { 0322 d->connections.remove(c->receiveConnectionID()); 0323 } 0324 } catch (Connection::TransmissionError &err) { 0325 Out(SYS_UTP | LOG_NOTICE) << "UTP: " << err.location << endl; 0326 if (c) 0327 c->close(); 0328 } 0329 break; 0330 case ST_RESET: 0331 d->reset(hdr); 0332 break; 0333 case ST_SYN: 0334 d->syn(parser, buffer, addr); 0335 break; 0336 } 0337 } 0338 0339 bool UTPServer::sendTo(utp::Connection::Ptr conn, const PacketBuffer &packet) 0340 { 0341 if (d->output_queue.add(packet, conn) == 1) { 0342 // If there is only one packet queued, 0343 // We need to enable the write notifiers, use the event queue to do this 0344 // if we are not in the utp thread 0345 if (QThread::currentThread() != d->utp_thread) { 0346 QCoreApplication::postEvent(this, new QEvent(QEvent::User)); 0347 } else { 0348 for (net::ServerSocket::Ptr sock : std::as_const(d->sockets)) 0349 sock->setWriteNotificationsEnabled(true); 0350 } 0351 } 0352 return true; 0353 } 0354 0355 void UTPServer::customEvent(QEvent *ev) 0356 { 0357 if (ev->type() == QEvent::User) { 0358 for (net::ServerSocket::Ptr sock : std::as_const(d->sockets)) 0359 sock->setWriteNotificationsEnabled(true); 0360 } 0361 } 0362 0363 Connection::WPtr UTPServer::connectTo(const net::Address &addr) 0364 { 0365 if (d->sockets.isEmpty() || addr.port() == 0) 0366 return Connection::WPtr(); 0367 0368 QMutexLocker lock(&d->mutex); 0369 quint16 recv_conn_id = QRandomGenerator::global()->bounded(32535); 0370 while (d->connections.contains(recv_conn_id)) 0371 recv_conn_id = QRandomGenerator::global()->bounded(32535); 0372 0373 Connection::Ptr conn(new Connection(recv_conn_id, Connection::OUTGOING, addr, this)); 0374 conn->setWeakPointer(conn); 0375 conn->moveToThread(d->utp_thread); 0376 d->connections.insert(recv_conn_id, conn); 0377 try { 0378 conn->startConnecting(); 0379 return conn; 0380 } catch (Connection::TransmissionError &err) { 0381 d->connections.remove(recv_conn_id); 0382 return Connection::WPtr(); 0383 } 0384 } 0385 0386 void UTPServer::stop() 0387 { 0388 d->stop(); 0389 PacketBuffer::clearPool(); 0390 } 0391 0392 void UTPServer::start() 0393 { 0394 if (!d->utp_thread) { 0395 d->utp_thread = new UTPServerThread(this); 0396 for (const net::ServerSocket::Ptr &sock : std::as_const(d->sockets)) 0397 sock->moveToThread(d->utp_thread); 0398 d->timer->moveToThread(d->utp_thread); 0399 d->utp_thread->start(); 0400 } 0401 } 0402 0403 void UTPServer::preparePolling(net::Poll *p, net::Poll::Mode mode, utp::Connection::Ptr &conn) 0404 { 0405 QMutexLocker lock(&d->mutex); 0406 PollPipePair *pair = d->poll_pipes.find(p); 0407 if (!pair) { 0408 pair = new PollPipePair(); 0409 d->poll_pipes.insert(p, pair); 0410 } 0411 0412 if (mode == net::Poll::INPUT) { 0413 if (pair->read_pipe->wokenUp()) 0414 return; 0415 0416 if (conn->bytesAvailable() > 0 || conn->connectionState() == CS_CLOSED) 0417 pair->read_pipe->wakeUp(); 0418 pair->read_pipe->prepare(p, conn->receiveConnectionID(), pair->read_pipe); 0419 } else { 0420 if (pair->write_pipe->wokenUp()) 0421 return; 0422 0423 if (conn->isWriteable()) 0424 pair->write_pipe->wakeUp(); 0425 pair->write_pipe->prepare(p, conn->receiveConnectionID(), pair->write_pipe); 0426 } 0427 } 0428 0429 void UTPServer::stateChanged(utp::Connection::Ptr conn, bool readable, bool writeable) 0430 { 0431 d->wakeUpPollPipes(conn, readable, writeable); 0432 } 0433 0434 void UTPServer::setCreateSockets(bool on) 0435 { 0436 d->create_sockets = on; 0437 } 0438 0439 Connection::WPtr UTPServer::acceptedConnection() 0440 { 0441 if (d->last_accepted.isEmpty()) 0442 return Connection::WPtr(); 0443 else 0444 return d->last_accepted.takeFirst(); 0445 } 0446 0447 void UTPServer::closed(Connection::Ptr conn) 0448 { 0449 Q_UNUSED(conn); 0450 QTimer::singleShot(0, this, &UTPServer::cleanup); 0451 } 0452 0453 void UTPServer::cleanup() 0454 { 0455 QMutexLocker lock(&d->mutex); 0456 QMap<quint16, Connection::Ptr>::iterator i = d->connections.begin(); 0457 while (i != d->connections.end()) { 0458 if (i.value()->connectionState() == CS_CLOSED) { 0459 i = d->connections.erase(i); 0460 } else 0461 ++i; 0462 } 0463 } 0464 0465 void UTPServer::checkTimeouts() 0466 { 0467 QMutexLocker lock(&d->mutex); 0468 0469 TimeValue now; 0470 QMap<quint16, Connection::Ptr>::iterator itr = d->connections.begin(); 0471 while (itr != d->connections.end()) { 0472 itr.value()->checkTimeout(now); 0473 ++itr; 0474 } 0475 } 0476 0477 /////////////////////////////////////////////////////// 0478 0479 PollPipePair::PollPipePair() 0480 : read_pipe(new PollPipe(net::Poll::INPUT)) 0481 , write_pipe(new PollPipe(net::Poll::OUTPUT)) 0482 { 0483 } 0484 } 0485 0486 #include "moc_utpserver.cpp" 0487 #include "moc_utpserver_p.cpp"