File indexing completed on 2025-01-05 04:37:10
0001 /* 0002 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com> 0003 0004 SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 #include "announcetask.h" 0007 #include "announcereq.h" 0008 #include "getpeersrsp.h" 0009 #include "node.h" 0010 #include "pack.h" 0011 #include <torrent/globals.h> 0012 #include <util/log.h> 0013 0014 using namespace bt; 0015 0016 namespace dht 0017 { 0018 AnnounceTask::AnnounceTask(Database *db, RPCServer *rpc, Node *node, const dht::Key &info_hash, bt::Uint16 port, QObject *parent) 0019 : Task(rpc, node, parent) 0020 , info_hash(info_hash) 0021 , port(port) 0022 , db(db) 0023 { 0024 } 0025 0026 AnnounceTask::~AnnounceTask() 0027 { 0028 } 0029 0030 void AnnounceTask::handleNodes(const QByteArray &nodes, int ip_version) 0031 { 0032 Uint32 address_size = ip_version == 4 ? 26 : 38; 0033 Uint32 nval = nodes.size() / address_size; 0034 for (Uint32 i = 0; i < nval; i++) { 0035 // add node to todo list 0036 try { 0037 KBucketEntry e = UnpackBucketEntry(nodes, i * address_size, ip_version); 0038 if (!visited.contains(e) && todo.size() < 100) { 0039 todo.insert(e); 0040 // Out(SYS_DHT|LOG_DEBUG) << "DHT: GetPeers returned node " << e.getAddress().toString() << endl; 0041 } 0042 } catch (...) { 0043 // not enough size in buffer, just ignore the error 0044 } 0045 } 0046 } 0047 0048 void AnnounceTask::callFinished(RPCCall *c, RPCMsg::Ptr rsp) 0049 { 0050 // Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::callFinished" << endl; 0051 // if we do not have a get peers response, return 0052 // announce_peer's response are just empty anyway 0053 if (c->getMsgMethod() != dht::GET_PEERS) 0054 return; 0055 0056 GetPeersRsp::Ptr gpr = rsp.dynamicCast<GetPeersRsp>(); 0057 if (!gpr) 0058 return; 0059 0060 if (gpr->containsNodes()) { 0061 const QByteArray &n = gpr->getNodes(); 0062 if (n.size() > 0) 0063 handleNodes(n, 4); 0064 0065 const QByteArray &n6 = gpr->getNodes6(); 0066 if (n6.size() > 0) 0067 handleNodes(n6, 6); 0068 } 0069 0070 // store the items in the database if there are any present 0071 const DBItemList &items = gpr->getItemList(); 0072 for (const DBItem &i : items) { 0073 // Out(SYS_DHT|LOG_DEBUG) << "DHT: GetPeers returned item " << i->getAddress().toString() << endl; 0074 db->store(info_hash, i); 0075 // also add the items to the returned_items list 0076 returned_items.append(i); 0077 } 0078 0079 if (items.size() > 0) 0080 emitDataReady(); 0081 0082 // add the peer who responded to the answered list, so we can do an announce 0083 KBucketEntry e(rsp->getOrigin(), rsp->getID()); 0084 if (!answered_visited.contains(e)) { 0085 answered.insert(KBucketEntryAndToken(e, gpr->getToken())); 0086 } 0087 } 0088 0089 void AnnounceTask::callTimeout(RPCCall *) 0090 { 0091 // Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::callTimeout " << endl; 0092 } 0093 0094 void AnnounceTask::update() 0095 { 0096 /* Out(SYS_DHT|LOG_DEBUG) << "AnnounceTask::update " << endl; 0097 Out(SYS_DHT|LOG_DEBUG) << "todo " << todo.count() << " ; answered " << answered.count() << endl; 0098 Out(SYS_DHT|LOG_DEBUG) << "visited " << visited.count() << " ; answered_visited " << answered_visited.count() << endl; 0099 */ 0100 while (!answered.empty() && canDoRequest()) { 0101 std::set<KBucketEntryAndToken>::iterator itr = answered.begin(); 0102 if (!answered_visited.contains(*itr)) { 0103 RPCMsg::Ptr anr(new AnnounceReq(node->getOurID(), info_hash, port, itr->getToken())); 0104 anr->setOrigin(itr->getAddress()); 0105 // Out(SYS_DHT|LOG_DEBUG) << "DHT: Announcing to " << e.getAddress().toString() << endl; 0106 rpcCall(anr); 0107 answered_visited.insert(*itr); 0108 } 0109 answered.erase(itr); 0110 } 0111 0112 // go over the todo list and send get_peers requests 0113 // until we have nothing left 0114 while (!todo.empty() && canDoRequest()) { 0115 KBucketEntrySet::iterator itr = todo.begin(); 0116 // onLy send a findNode if we haven't allrready visited the node 0117 if (!visited.contains(*itr)) { 0118 // send a findNode to the node 0119 // Out(SYS_DHT|LOG_DEBUG) << "DHT: Sending GetPeers to " << e.getAddress().toString() << endl; 0120 RPCMsg::Ptr gpr(new GetPeersReq(node->getOurID(), info_hash)); 0121 gpr->setOrigin(itr->getAddress()); 0122 rpcCall(gpr); 0123 visited.insert(*itr); 0124 } 0125 // remove the entry from the todo list 0126 todo.erase(itr); 0127 } 0128 0129 if (todo.empty() && answered.empty() && getNumOutstandingRequests() == 0 && !isFinished()) { 0130 Out(SYS_DHT | LOG_NOTICE) << "DHT: AnnounceTask done" << endl; 0131 done(); 0132 } else if (answered_visited.size() > 50 || visited.size() > 200) { 0133 // don't let the task run forever 0134 Out(SYS_DHT | LOG_NOTICE) << "DHT: AnnounceTask done" << endl; 0135 done(); 0136 } 0137 } 0138 0139 bool AnnounceTask::takeItem(DBItem &item) 0140 { 0141 if (returned_items.empty()) 0142 return false; 0143 0144 item = returned_items.first(); 0145 returned_items.pop_front(); 0146 return true; 0147 } 0148 }