File indexing completed on 2025-01-05 04:37:10

0001 /*
0002     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 #include "announcetask.h"
0007 #include "announcereq.h"
0008 #include "getpeersrsp.h"
0009 #include "node.h"
0010 #include "pack.h"
0011 #include <torrent/globals.h>
0012 #include <util/log.h>
0013 
0014 using namespace bt;
0015 
0016 namespace dht
0017 {
0018 AnnounceTask::AnnounceTask(Database *db, RPCServer *rpc, Node *node, const dht::Key &info_hash, bt::Uint16 port, QObject *parent)
0019     : Task(rpc, node, parent)
0020     , info_hash(info_hash)
0021     , port(port)
0022     , db(db)
0023 {
0024 }
0025 
0026 AnnounceTask::~AnnounceTask()
0027 {
0028 }
0029 
0030 void AnnounceTask::handleNodes(const QByteArray &nodes, int ip_version)
0031 {
0032     Uint32 address_size = ip_version == 4 ? 26 : 38;
0033     Uint32 nval = nodes.size() / address_size;
0034     for (Uint32 i = 0; i < nval; i++) {
0035         // add node to todo list
0036         try {
0037             KBucketEntry e = UnpackBucketEntry(nodes, i * address_size, ip_version);
0038             if (!visited.contains(e) && todo.size() < 100) {
0039                 todo.insert(e);
0040                 //  Out(SYS_DHT|LOG_DEBUG) << "DHT: GetPeers returned node " << e.getAddress().toString() << endl;
0041             }
0042         } catch (...) {
0043             // not enough size in buffer, just ignore the error
0044         }
0045     }
0046 }
0047 
0048 void AnnounceTask::callFinished(RPCCall *c, RPCMsg::Ptr rsp)
0049 {
0050     // Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::callFinished" << endl;
0051     // if we do not have a get peers response, return
0052     // announce_peer's response are just empty anyway
0053     if (c->getMsgMethod() != dht::GET_PEERS)
0054         return;
0055 
0056     GetPeersRsp::Ptr gpr = rsp.dynamicCast<GetPeersRsp>();
0057     if (!gpr)
0058         return;
0059 
0060     if (gpr->containsNodes()) {
0061         const QByteArray &n = gpr->getNodes();
0062         if (n.size() > 0)
0063             handleNodes(n, 4);
0064 
0065         const QByteArray &n6 = gpr->getNodes6();
0066         if (n6.size() > 0)
0067             handleNodes(n6, 6);
0068     }
0069 
0070     // store the items in the database if there are any present
0071     const DBItemList &items = gpr->getItemList();
0072     for (const DBItem &i : items) {
0073         //  Out(SYS_DHT|LOG_DEBUG) << "DHT: GetPeers returned item " << i->getAddress().toString() << endl;
0074         db->store(info_hash, i);
0075         // also add the items to the returned_items list
0076         returned_items.append(i);
0077     }
0078 
0079     if (items.size() > 0)
0080         emitDataReady();
0081 
0082     // add the peer who responded to the answered list, so we can do an announce
0083     KBucketEntry e(rsp->getOrigin(), rsp->getID());
0084     if (!answered_visited.contains(e)) {
0085         answered.insert(KBucketEntryAndToken(e, gpr->getToken()));
0086     }
0087 }
0088 
0089 void AnnounceTask::callTimeout(RPCCall *)
0090 {
0091     // Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::callTimeout " << endl;
0092 }
0093 
0094 void AnnounceTask::update()
0095 {
0096     /*  Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::update " << endl;
0097         Out(SYS_DHT|LOG_DEBUG) << "todo " << todo.count() << " ; answered " << answered.count() << endl;
0098         Out(SYS_DHT|LOG_DEBUG) << "visited " << visited.count() << " ; answered_visited " << answered_visited.count() << endl;
0099     */
0100     while (!answered.empty() && canDoRequest()) {
0101         std::set<KBucketEntryAndToken>::iterator itr = answered.begin();
0102         if (!answered_visited.contains(*itr)) {
0103             RPCMsg::Ptr anr(new AnnounceReq(node->getOurID(), info_hash, port, itr->getToken()));
0104             anr->setOrigin(itr->getAddress());
0105             //      Out(SYS_DHT|LOG_DEBUG) << "DHT: Announcing to " << e.getAddress().toString() << endl;
0106             rpcCall(anr);
0107             answered_visited.insert(*itr);
0108         }
0109         answered.erase(itr);
0110     }
0111 
0112     // go over the todo list and send get_peers requests
0113     // until we have nothing left
0114     while (!todo.empty() && canDoRequest()) {
0115         KBucketEntrySet::iterator itr = todo.begin();
0116         // onLy send a findNode if we haven't allrready visited the node
0117         if (!visited.contains(*itr)) {
0118             // send a findNode to the node
0119             //      Out(SYS_DHT|LOG_DEBUG) << "DHT: Sending GetPeers to " << e.getAddress().toString() << endl;
0120             RPCMsg::Ptr gpr(new GetPeersReq(node->getOurID(), info_hash));
0121             gpr->setOrigin(itr->getAddress());
0122             rpcCall(gpr);
0123             visited.insert(*itr);
0124         }
0125         // remove the entry from the todo list
0126         todo.erase(itr);
0127     }
0128 
0129     if (todo.empty() && answered.empty() && getNumOutstandingRequests() == 0 && !isFinished()) {
0130         Out(SYS_DHT | LOG_NOTICE) << "DHT: AnnounceTask done" << endl;
0131         done();
0132     } else if (answered_visited.size() > 50 || visited.size() > 200) {
0133         // don't let the task run forever
0134         Out(SYS_DHT | LOG_NOTICE) << "DHT: AnnounceTask done" << endl;
0135         done();
0136     }
0137 }
0138 
0139 bool AnnounceTask::takeItem(DBItem &item)
0140 {
0141     if (returned_items.empty())
0142         return false;
0143 
0144     item = returned_items.first();
0145     returned_items.pop_front();
0146     return true;
0147 }
0148 }