File indexing completed on 2025-01-05 04:37:19
0001 /* 0002 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com> 0003 0004 SPDX-License-Identifier: GPL-2.0-or-later 0005 */ 0006 #include "packetsocket.h" 0007 #include "speed.h" 0008 #include <net/socketmonitor.h> 0009 #include <util/log.h> 0010 0011 using namespace bt; 0012 0013 namespace net 0014 { 0015 PacketSocket::PacketSocket(SocketDevice *sock) 0016 : TrafficShapedSocket(sock) 0017 , ctrl_packets_sent(0) 0018 , pending_upload_data_bytes(0) 0019 , uploaded_data_bytes(0) 0020 { 0021 } 0022 0023 PacketSocket::PacketSocket(int fd, int ip_version) 0024 : TrafficShapedSocket(fd, ip_version) 0025 , ctrl_packets_sent(0) 0026 , pending_upload_data_bytes(0) 0027 , uploaded_data_bytes(0) 0028 { 0029 } 0030 0031 PacketSocket::PacketSocket(bool tcp, int ip_version) 0032 : TrafficShapedSocket(tcp, ip_version) 0033 , ctrl_packets_sent(0) 0034 , pending_upload_data_bytes(0) 0035 , uploaded_data_bytes(0) 0036 { 0037 } 0038 0039 PacketSocket::~PacketSocket() 0040 { 0041 } 0042 0043 Packet::Ptr PacketSocket::selectPacket() 0044 { 0045 QMutexLocker locker(&mutex); 0046 Packet::Ptr ret(nullptr); 0047 // this function should ensure that between 0048 // each data packet at least 3 control packets are sent 0049 // so requests can get through 0050 0051 if (ctrl_packets_sent < 3) { 0052 // try to send another control packet 0053 if (control_packets.size() > 0) 0054 ret = control_packets.front(); 0055 else if (data_packets.size() > 0) 0056 ret = data_packets.front(); 0057 } else { 0058 if (data_packets.size() > 0) { 0059 ctrl_packets_sent = 0; 0060 ret = data_packets.front(); 0061 } else if (control_packets.size() > 0) 0062 ret = control_packets.front(); 0063 } 0064 0065 if (ret) 0066 preProcess(ret); 0067 0068 return ret; 0069 } 0070 0071 Uint32 PacketSocket::write(Uint32 max, bt::TimeStamp now) 0072 { 0073 if (sock->state() == net::SocketDevice::CONNECTING && !sock->connectSuccesFull()) 0074 return 0; 0075 0076 if (!curr_packet) 0077 curr_packet = selectPacket(); 0078 0079 Uint32 written = 0; 0080 while (curr_packet && (written < max || max == 0)) { 0081 Uint32 limit = (max == 0) ? 0 : max - written; 0082 int ret = curr_packet->send(sock, limit); 0083 if (ret > 0) { 0084 written += ret; 0085 QMutexLocker locker(&mutex); 0086 if (curr_packet->getType() == PIECE) { 0087 up_speed->onData(ret, now); 0088 pending_upload_data_bytes -= ret; 0089 uploaded_data_bytes += ret; 0090 } 0091 } else 0092 break; // Socket buffer full, so stop sending for now 0093 0094 if (curr_packet->isSent()) { 0095 // packet sent, so remove it 0096 if (curr_packet->getType() == PIECE) { 0097 QMutexLocker locker(&mutex); 0098 if (!data_packets.empty()) 0099 data_packets.pop_front(); 0100 // reset ctrl_packets_sent so the next packet should be a ctrl packet 0101 ctrl_packets_sent = 0; 0102 } else { 0103 QMutexLocker locker(&mutex); 0104 if (!control_packets.empty()) 0105 control_packets.pop_front(); 0106 ctrl_packets_sent++; 0107 } 0108 curr_packet = selectPacket(); 0109 } else { 0110 // we can't write it fully, so break out of loop 0111 break; 0112 } 0113 } 0114 0115 return written; 0116 } 0117 0118 void PacketSocket::addPacket(Packet::Ptr packet) 0119 { 0120 Q_ASSERT(!packet->sending()); 0121 QMutexLocker locker(&mutex); 0122 if (packet->getType() == PIECE) { 0123 data_packets.push_back(packet); 0124 pending_upload_data_bytes += packet->getDataLength(); 0125 } else 0126 control_packets.push_back(packet); 0127 // tell upload thread we have data ready should it be sleeping 0128 net::SocketMonitor::instance().signalPacketReady(); 0129 } 0130 0131 bool PacketSocket::bytesReadyToWrite() const 0132 { 0133 QMutexLocker locker(&mutex); 0134 return !data_packets.empty() || !control_packets.empty(); 0135 } 0136 0137 void PacketSocket::preProcess(Packet::Ptr packet) 0138 { 0139 Q_UNUSED(packet); 0140 } 0141 0142 Uint32 PacketSocket::dataBytesUploaded() 0143 { 0144 QMutexLocker locker(&mutex); 0145 Uint32 ret = uploaded_data_bytes; 0146 uploaded_data_bytes = 0; 0147 return ret; 0148 } 0149 0150 void PacketSocket::clearPieces(bool reject) 0151 { 0152 QMutexLocker locker(&mutex); 0153 0154 auto i = data_packets.begin(); 0155 while (i != data_packets.end()) { 0156 Packet::Ptr p = *i; 0157 if (p->getType() == bt::PIECE && !p->sending() && curr_packet != p) { 0158 if (reject) 0159 addPacket(Packet::Ptr(p->makeRejectOfPiece())); 0160 pending_upload_data_bytes -= p->getDataLength(); 0161 i = data_packets.erase(i); 0162 } else { 0163 i++; 0164 } 0165 } 0166 } 0167 0168 void PacketSocket::doNotSendPiece(const bt::Request &req, bool reject) 0169 { 0170 QMutexLocker locker(&mutex); 0171 auto i = data_packets.begin(); 0172 while (i != data_packets.end()) { 0173 Packet::Ptr p = *i; 0174 if (p->isPiece(req) && !p->sending() && p != curr_packet) { 0175 pending_upload_data_bytes -= p->getDataLength(); 0176 i = data_packets.erase(i); 0177 if (reject) { 0178 // queue a reject packet 0179 addPacket(Packet::Ptr(p->makeRejectOfPiece())); 0180 } 0181 } else { 0182 i++; 0183 } 0184 } 0185 } 0186 0187 Uint32 PacketSocket::numPendingPieceUploads() const 0188 { 0189 QMutexLocker locker(&mutex); 0190 return data_packets.size(); 0191 } 0192 0193 Uint32 PacketSocket::numPendingPieceUploadBytes() const 0194 { 0195 QMutexLocker locker(&mutex); 0196 return pending_upload_data_bytes; 0197 } 0198 0199 }