File indexing completed on 2025-01-05 04:37:12

0001 /*
0002     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 #include "rpcserver.h"
0007 #include "dht.h"
0008 #include "kbucket.h"
0009 #include "node.h"
0010 #include "pingreq.h"
0011 #include "rpccall.h"
0012 #include "rpcmsg.h"
0013 #include "rpcmsgfactory.h"
0014 #include <QHostAddress>
0015 #include <QThread>
0016 #include <bcodec/bdecoder.h>
0017 #include <bcodec/bencoder.h>
0018 #include <bcodec/bnode.h>
0019 #include <boost/scoped_ptr.hpp>
0020 #include <net/portlist.h>
0021 #include <net/serversocket.h>
0022 #include <string.h>
0023 #include <torrent/globals.h>
0024 #include <unistd.h>
0025 #include <util/error.h>
0026 #include <util/functions.h>
0027 #include <util/log.h>
0028 
0029 using namespace bt;
0030 
0031 namespace dht
0032 {
0033 class RPCServer::Private : public net::ServerSocket::DataHandler, public RPCMethodResolver
0034 {
0035 public:
0036     Private(RPCServer *p, DHT *dh_table, Uint16 port)
0037         : p(p)
0038         , dh_table(dh_table)
0039         , next_mtid(0)
0040         , port(port)
0041     {
0042     }
0043 
0044     ~Private() override
0045     {
0046         bt::Globals::instance().getPortList().removePort(port, net::UDP);
0047         calls.setAutoDelete(true);
0048         calls.clear();
0049         qDeleteAll(call_queue);
0050         call_queue.clear();
0051     }
0052 
0053     void reset()
0054     {
0055         sockets.clear();
0056     }
0057 
0058     void listen(const QString &ip)
0059     {
0060         net::Address addr(ip, port);
0061         net::ServerSocket::Ptr sock(new net::ServerSocket(this));
0062         if (!sock->bind(addr)) {
0063             Out(SYS_DHT | LOG_IMPORTANT) << "DHT: Failed to bind to " << addr.toString() << endl;
0064         } else {
0065             Out(SYS_DHT | LOG_NOTICE) << "DHT: Bound to " << addr.toString() << endl;
0066             sockets.append(sock);
0067             sock->setReadNotificationsEnabled(true);
0068         }
0069     }
0070 
0071     void dataReceived(bt::Buffer::Ptr ptr, const net::Address &addr) override
0072     {
0073         try {
0074             // read and decode the packet
0075             BDecoder bdec(ptr->get(), ptr->size(), false);
0076             boost::scoped_ptr<BNode> n(bdec.decode());
0077 
0078             if (!n || n->getType() != BNode::DICT)
0079                 return;
0080 
0081             // try to make a RPCMsg of it
0082             RPCMsg::Ptr msg = factory.build((BDictNode *)n.get(), this);
0083             if (msg) {
0084                 if (addr.ipVersion() == 6 && addr.isIPv4Mapped())
0085                     msg->setOrigin(addr.convertIPv4Mapped());
0086                 else
0087                     msg->setOrigin(addr);
0088 
0089                 msg->apply(dh_table);
0090                 // erase an existing call
0091                 if (msg->getType() == RSP_MSG && calls.contains(msg->getMTID())) {
0092                     // delete the call, but first notify it off the response
0093                     RPCCall *c = calls.find(msg->getMTID());
0094                     c->response(msg);
0095                     calls.erase(msg->getMTID());
0096                     c->deleteLater();
0097                     doQueuedCalls();
0098                 }
0099             }
0100         } catch (bt::Error &err) {
0101             Out(SYS_DHT | LOG_DEBUG) << "Error happened during parsing : " << err.toString() << endl;
0102         }
0103     }
0104 
0105     void readyToWrite(net::ServerSocket *sock) override
0106     {
0107         Q_UNUSED(sock);
0108     }
0109 
0110     Method findMethod(const QByteArray &mtid) override
0111     {
0112         const RPCCall *call = calls.find(mtid);
0113         if (call)
0114             return call->getMsgMethod();
0115         else
0116             return dht::NONE;
0117     }
0118 
0119     void send(const net::Address &addr, const QByteArray &msg)
0120     {
0121         for (net::ServerSocket::Ptr sock : std::as_const(sockets)) {
0122             if (sock->sendTo((const bt::Uint8 *)msg.data(), msg.size(), addr) == msg.size())
0123                 break;
0124         }
0125     }
0126 
0127     void doQueuedCalls()
0128     {
0129         while (call_queue.count() > 0 && calls.count() < 256) {
0130             RPCCall *c = call_queue.first();
0131             call_queue.removeFirst();
0132 
0133             while (calls.contains(QByteArray(1, next_mtid)))
0134                 next_mtid++;
0135 
0136             RPCMsg::Ptr msg = c->getRequest();
0137             QByteArray mtid(1, next_mtid);
0138             msg->setMTID(mtid);
0139             next_mtid++;
0140             sendMsg(msg);
0141             calls.insert(msg->getMTID(), c);
0142             c->start();
0143         }
0144     }
0145 
0146     RPCCall *doCall(RPCMsg::Ptr msg)
0147     {
0148         Uint8 start = next_mtid;
0149         QByteArray mtid(1, start);
0150 
0151         while (calls.contains(mtid)) {
0152             mtid[0] = ++next_mtid;
0153             if (next_mtid == start) { // if this happens we cannot do any calls
0154                 // so queue the call
0155                 RPCCall *c = new RPCCall(msg, true);
0156                 call_queue.append(c);
0157                 Out(SYS_DHT | LOG_NOTICE) << "Queueing RPC call, no slots available at the moment" << endl;
0158                 return c;
0159             }
0160         }
0161 
0162         msg->setMTID(mtid);
0163         sendMsg(msg);
0164         RPCCall *c = new RPCCall(msg, false);
0165         calls.insert(mtid, c);
0166         return c;
0167     }
0168 
0169     void sendMsg(RPCMsg::Ptr msg)
0170     {
0171         QByteArray data;
0172         msg->encode(data);
0173         send(msg->getDestination(), data);
0174         //  PrintRawData(data);
0175     }
0176 
0177     void timedOut(const QByteArray &mtid)
0178     {
0179         // delete the call
0180         RPCCall *c = calls.find(mtid);
0181         if (c) {
0182             dh_table->timeout(c->getRequest());
0183             calls.erase(mtid);
0184             c->deleteLater();
0185         }
0186         doQueuedCalls();
0187     }
0188 
0189     RPCServer *p;
0190     QList<net::ServerSocket::Ptr> sockets;
0191     DHT *dh_table;
0192     bt::PtrMap<QByteArray, RPCCall> calls;
0193     QList<RPCCall *> call_queue;
0194     bt::Uint8 next_mtid;
0195     bt::Uint16 port;
0196     RPCMsgFactory factory;
0197 };
0198 
0199 RPCServer::RPCServer(DHT *dh_table, Uint16 port, QObject *parent)
0200     : QObject(parent)
0201     , d(new Private(this, dh_table, port))
0202 {
0203 }
0204 
0205 RPCServer::~RPCServer()
0206 {
0207     delete d;
0208 }
0209 
0210 void RPCServer::start()
0211 {
0212     d->reset();
0213 
0214     const QStringList ips = NetworkInterfaceIPAddresses(NetworkInterface());
0215     for (const QString &addr : ips) {
0216         d->listen(addr);
0217     }
0218 
0219     if (d->sockets.count() == 0) {
0220         // Try all addresses if the previous listen calls all failed
0221         d->listen(QHostAddress(QHostAddress::AnyIPv6).toString());
0222         d->listen(QHostAddress(QHostAddress::Any).toString());
0223     }
0224 
0225     if (d->sockets.count() > 0)
0226         bt::Globals::instance().getPortList().addNewPort(d->port, net::UDP, true);
0227 }
0228 
0229 void RPCServer::stop()
0230 {
0231     bt::Globals::instance().getPortList().removePort(d->port, net::UDP);
0232     d->reset();
0233 }
0234 
0235 #if 0
0236 static void PrintRawData(const QByteArray & data)
0237 {
0238     QString tmp;
0239     for (int i = 0; i < data.size(); i++) {
0240         char c = QChar(data[i]).toLatin1();
0241         if (!QChar(data[i]).isPrint() || c == 0)
0242             tmp += '#';
0243         else
0244             tmp += c;
0245     }
0246 
0247     Out(SYS_DHT | LOG_DEBUG) << tmp << endl;
0248 }
0249 #endif
0250 
0251 RPCCall *RPCServer::doCall(RPCMsg::Ptr msg)
0252 {
0253     RPCCall *c = d->doCall(msg);
0254     if (c)
0255         connect(c, &RPCCall::timeout, this, &RPCServer::callTimeout);
0256 
0257     return c;
0258 }
0259 
0260 void RPCServer::sendMsg(RPCMsg::Ptr msg)
0261 {
0262     d->sendMsg(msg);
0263 }
0264 
0265 void RPCServer::sendMsg(const dht::RPCMsg &msg)
0266 {
0267     QByteArray data;
0268     msg.encode(data);
0269     d->send(msg.getDestination(), data);
0270 }
0271 
0272 void RPCServer::callTimeout(RPCCall *call)
0273 {
0274     d->timedOut(call->getRequest()->getMTID());
0275 }
0276 
0277 void RPCServer::ping(const dht::Key &our_id, const net::Address &addr)
0278 {
0279     RPCMsg::Ptr pr(new PingReq(our_id));
0280     pr->setOrigin(addr);
0281     doCall(pr);
0282 }
0283 
0284 Uint32 RPCServer::getNumActiveRPCCalls() const
0285 {
0286     return d->calls.count();
0287 }
0288 }
0289 
0290 #include "moc_rpcserver.cpp"