File indexing completed on 2025-01-05 04:37:12
0001 /* 0002 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com> 0003 0004 SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 #include "rpcserver.h" 0007 #include "dht.h" 0008 #include "kbucket.h" 0009 #include "node.h" 0010 #include "pingreq.h" 0011 #include "rpccall.h" 0012 #include "rpcmsg.h" 0013 #include "rpcmsgfactory.h" 0014 #include <QHostAddress> 0015 #include <QThread> 0016 #include <bcodec/bdecoder.h> 0017 #include <bcodec/bencoder.h> 0018 #include <bcodec/bnode.h> 0019 #include <boost/scoped_ptr.hpp> 0020 #include <net/portlist.h> 0021 #include <net/serversocket.h> 0022 #include <string.h> 0023 #include <torrent/globals.h> 0024 #include <unistd.h> 0025 #include <util/error.h> 0026 #include <util/functions.h> 0027 #include <util/log.h> 0028 0029 using namespace bt; 0030 0031 namespace dht 0032 { 0033 class RPCServer::Private : public net::ServerSocket::DataHandler, public RPCMethodResolver 0034 { 0035 public: 0036 Private(RPCServer *p, DHT *dh_table, Uint16 port) 0037 : p(p) 0038 , dh_table(dh_table) 0039 , next_mtid(0) 0040 , port(port) 0041 { 0042 } 0043 0044 ~Private() override 0045 { 0046 bt::Globals::instance().getPortList().removePort(port, net::UDP); 0047 calls.setAutoDelete(true); 0048 calls.clear(); 0049 qDeleteAll(call_queue); 0050 call_queue.clear(); 0051 } 0052 0053 void reset() 0054 { 0055 sockets.clear(); 0056 } 0057 0058 void listen(const QString &ip) 0059 { 0060 net::Address addr(ip, port); 0061 net::ServerSocket::Ptr sock(new net::ServerSocket(this)); 0062 if (!sock->bind(addr)) { 0063 Out(SYS_DHT | LOG_IMPORTANT) << "DHT: Failed to bind to " << addr.toString() << endl; 0064 } else { 0065 Out(SYS_DHT | LOG_NOTICE) << "DHT: Bound to " << addr.toString() << endl; 0066 sockets.append(sock); 0067 sock->setReadNotificationsEnabled(true); 0068 } 0069 } 0070 0071 void dataReceived(bt::Buffer::Ptr ptr, const net::Address &addr) override 0072 { 0073 try { 0074 // read and decode the packet 0075 BDecoder bdec(ptr->get(), ptr->size(), false); 0076 boost::scoped_ptr<BNode> n(bdec.decode()); 0077 0078 if (!n || n->getType() != BNode::DICT) 0079 return; 0080 0081 // try to make a RPCMsg of it 0082 RPCMsg::Ptr msg = factory.build((BDictNode *)n.get(), this); 0083 if (msg) { 0084 if (addr.ipVersion() == 6 && addr.isIPv4Mapped()) 0085 msg->setOrigin(addr.convertIPv4Mapped()); 0086 else 0087 msg->setOrigin(addr); 0088 0089 msg->apply(dh_table); 0090 // erase an existing call 0091 if (msg->getType() == RSP_MSG && calls.contains(msg->getMTID())) { 0092 // delete the call, but first notify it off the response 0093 RPCCall *c = calls.find(msg->getMTID()); 0094 c->response(msg); 0095 calls.erase(msg->getMTID()); 0096 c->deleteLater(); 0097 doQueuedCalls(); 0098 } 0099 } 0100 } catch (bt::Error &err) { 0101 Out(SYS_DHT | LOG_DEBUG) << "Error happened during parsing : " << err.toString() << endl; 0102 } 0103 } 0104 0105 void readyToWrite(net::ServerSocket *sock) override 0106 { 0107 Q_UNUSED(sock); 0108 } 0109 0110 Method findMethod(const QByteArray &mtid) override 0111 { 0112 const RPCCall *call = calls.find(mtid); 0113 if (call) 0114 return call->getMsgMethod(); 0115 else 0116 return dht::NONE; 0117 } 0118 0119 void send(const net::Address &addr, const QByteArray &msg) 0120 { 0121 for (net::ServerSocket::Ptr sock : std::as_const(sockets)) { 0122 if (sock->sendTo((const bt::Uint8 *)msg.data(), msg.size(), addr) == msg.size()) 0123 break; 0124 } 0125 } 0126 0127 void doQueuedCalls() 0128 { 0129 while (call_queue.count() > 0 && calls.count() < 256) { 0130 RPCCall *c = call_queue.first(); 0131 call_queue.removeFirst(); 0132 0133 while (calls.contains(QByteArray(1, next_mtid))) 0134 next_mtid++; 0135 0136 RPCMsg::Ptr msg = c->getRequest(); 0137 QByteArray mtid(1, next_mtid); 0138 msg->setMTID(mtid); 0139 next_mtid++; 0140 sendMsg(msg); 0141 calls.insert(msg->getMTID(), c); 0142 c->start(); 0143 } 0144 } 0145 0146 RPCCall *doCall(RPCMsg::Ptr msg) 0147 { 0148 Uint8 start = next_mtid; 0149 QByteArray mtid(1, start); 0150 0151 while (calls.contains(mtid)) { 0152 mtid[0] = ++next_mtid; 0153 if (next_mtid == start) { // if this happens we cannot do any calls 0154 // so queue the call 0155 RPCCall *c = new RPCCall(msg, true); 0156 call_queue.append(c); 0157 Out(SYS_DHT | LOG_NOTICE) << "Queueing RPC call, no slots available at the moment" << endl; 0158 return c; 0159 } 0160 } 0161 0162 msg->setMTID(mtid); 0163 sendMsg(msg); 0164 RPCCall *c = new RPCCall(msg, false); 0165 calls.insert(mtid, c); 0166 return c; 0167 } 0168 0169 void sendMsg(RPCMsg::Ptr msg) 0170 { 0171 QByteArray data; 0172 msg->encode(data); 0173 send(msg->getDestination(), data); 0174 // PrintRawData(data); 0175 } 0176 0177 void timedOut(const QByteArray &mtid) 0178 { 0179 // delete the call 0180 RPCCall *c = calls.find(mtid); 0181 if (c) { 0182 dh_table->timeout(c->getRequest()); 0183 calls.erase(mtid); 0184 c->deleteLater(); 0185 } 0186 doQueuedCalls(); 0187 } 0188 0189 RPCServer *p; 0190 QList<net::ServerSocket::Ptr> sockets; 0191 DHT *dh_table; 0192 bt::PtrMap<QByteArray, RPCCall> calls; 0193 QList<RPCCall *> call_queue; 0194 bt::Uint8 next_mtid; 0195 bt::Uint16 port; 0196 RPCMsgFactory factory; 0197 }; 0198 0199 RPCServer::RPCServer(DHT *dh_table, Uint16 port, QObject *parent) 0200 : QObject(parent) 0201 , d(new Private(this, dh_table, port)) 0202 { 0203 } 0204 0205 RPCServer::~RPCServer() 0206 { 0207 delete d; 0208 } 0209 0210 void RPCServer::start() 0211 { 0212 d->reset(); 0213 0214 const QStringList ips = NetworkInterfaceIPAddresses(NetworkInterface()); 0215 for (const QString &addr : ips) { 0216 d->listen(addr); 0217 } 0218 0219 if (d->sockets.count() == 0) { 0220 // Try all addresses if the previous listen calls all failed 0221 d->listen(QHostAddress(QHostAddress::AnyIPv6).toString()); 0222 d->listen(QHostAddress(QHostAddress::Any).toString()); 0223 } 0224 0225 if (d->sockets.count() > 0) 0226 bt::Globals::instance().getPortList().addNewPort(d->port, net::UDP, true); 0227 } 0228 0229 void RPCServer::stop() 0230 { 0231 bt::Globals::instance().getPortList().removePort(d->port, net::UDP); 0232 d->reset(); 0233 } 0234 0235 #if 0 0236 static void PrintRawData(const QByteArray & data) 0237 { 0238 QString tmp; 0239 for (int i = 0; i < data.size(); i++) { 0240 char c = QChar(data[i]).toLatin1(); 0241 if (!QChar(data[i]).isPrint() || c == 0) 0242 tmp += '#'; 0243 else 0244 tmp += c; 0245 } 0246 0247 Out(SYS_DHT | LOG_DEBUG) << tmp << endl; 0248 } 0249 #endif 0250 0251 RPCCall *RPCServer::doCall(RPCMsg::Ptr msg) 0252 { 0253 RPCCall *c = d->doCall(msg); 0254 if (c) 0255 connect(c, &RPCCall::timeout, this, &RPCServer::callTimeout); 0256 0257 return c; 0258 } 0259 0260 void RPCServer::sendMsg(RPCMsg::Ptr msg) 0261 { 0262 d->sendMsg(msg); 0263 } 0264 0265 void RPCServer::sendMsg(const dht::RPCMsg &msg) 0266 { 0267 QByteArray data; 0268 msg.encode(data); 0269 d->send(msg.getDestination(), data); 0270 } 0271 0272 void RPCServer::callTimeout(RPCCall *call) 0273 { 0274 d->timedOut(call->getRequest()->getMTID()); 0275 } 0276 0277 void RPCServer::ping(const dht::Key &our_id, const net::Address &addr) 0278 { 0279 RPCMsg::Ptr pr(new PingReq(our_id)); 0280 pr->setOrigin(addr); 0281 doCall(pr); 0282 } 0283 0284 Uint32 RPCServer::getNumActiveRPCCalls() const 0285 { 0286 return d->calls.count(); 0287 } 0288 } 0289 0290 #include "moc_rpcserver.cpp"