File indexing completed on 2025-01-05 04:37:19

0001 /*
0002     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 #include "packetsocket.h"
0007 #include "speed.h"
0008 #include <net/socketmonitor.h>
0009 #include <util/log.h>
0010 
0011 using namespace bt;
0012 
0013 namespace net
0014 {
0015 PacketSocket::PacketSocket(SocketDevice *sock)
0016     : TrafficShapedSocket(sock)
0017     , ctrl_packets_sent(0)
0018     , pending_upload_data_bytes(0)
0019     , uploaded_data_bytes(0)
0020 {
0021 }
0022 
0023 PacketSocket::PacketSocket(int fd, int ip_version)
0024     : TrafficShapedSocket(fd, ip_version)
0025     , ctrl_packets_sent(0)
0026     , pending_upload_data_bytes(0)
0027     , uploaded_data_bytes(0)
0028 {
0029 }
0030 
0031 PacketSocket::PacketSocket(bool tcp, int ip_version)
0032     : TrafficShapedSocket(tcp, ip_version)
0033     , ctrl_packets_sent(0)
0034     , pending_upload_data_bytes(0)
0035     , uploaded_data_bytes(0)
0036 {
0037 }
0038 
0039 PacketSocket::~PacketSocket()
0040 {
0041 }
0042 
0043 Packet::Ptr PacketSocket::selectPacket()
0044 {
0045     QMutexLocker locker(&mutex);
0046     Packet::Ptr ret(nullptr);
0047     // this function should ensure that between
0048     // each data packet at least 3 control packets are sent
0049     // so requests can get through
0050 
0051     if (ctrl_packets_sent < 3) {
0052         // try to send another control packet
0053         if (control_packets.size() > 0)
0054             ret = control_packets.front();
0055         else if (data_packets.size() > 0)
0056             ret = data_packets.front();
0057     } else {
0058         if (data_packets.size() > 0) {
0059             ctrl_packets_sent = 0;
0060             ret = data_packets.front();
0061         } else if (control_packets.size() > 0)
0062             ret = control_packets.front();
0063     }
0064 
0065     if (ret)
0066         preProcess(ret);
0067 
0068     return ret;
0069 }
0070 
0071 Uint32 PacketSocket::write(Uint32 max, bt::TimeStamp now)
0072 {
0073     if (sock->state() == net::SocketDevice::CONNECTING && !sock->connectSuccesFull())
0074         return 0;
0075 
0076     if (!curr_packet)
0077         curr_packet = selectPacket();
0078 
0079     Uint32 written = 0;
0080     while (curr_packet && (written < max || max == 0)) {
0081         Uint32 limit = (max == 0) ? 0 : max - written;
0082         int ret = curr_packet->send(sock, limit);
0083         if (ret > 0) {
0084             written += ret;
0085             QMutexLocker locker(&mutex);
0086             if (curr_packet->getType() == PIECE) {
0087                 up_speed->onData(ret, now);
0088                 pending_upload_data_bytes -= ret;
0089                 uploaded_data_bytes += ret;
0090             }
0091         } else
0092             break; // Socket buffer full, so stop sending for now
0093 
0094         if (curr_packet->isSent()) {
0095             // packet sent, so remove it
0096             if (curr_packet->getType() == PIECE) {
0097                 QMutexLocker locker(&mutex);
0098                 if (!data_packets.empty())
0099                     data_packets.pop_front();
0100                 // reset ctrl_packets_sent so the next packet should be a ctrl packet
0101                 ctrl_packets_sent = 0;
0102             } else {
0103                 QMutexLocker locker(&mutex);
0104                 if (!control_packets.empty())
0105                     control_packets.pop_front();
0106                 ctrl_packets_sent++;
0107             }
0108             curr_packet = selectPacket();
0109         } else {
0110             // we can't write it fully, so break out of loop
0111             break;
0112         }
0113     }
0114 
0115     return written;
0116 }
0117 
0118 void PacketSocket::addPacket(Packet::Ptr packet)
0119 {
0120     Q_ASSERT(!packet->sending());
0121     QMutexLocker locker(&mutex);
0122     if (packet->getType() == PIECE) {
0123         data_packets.push_back(packet);
0124         pending_upload_data_bytes += packet->getDataLength();
0125     } else
0126         control_packets.push_back(packet);
0127     // tell upload thread we have data ready should it be sleeping
0128     net::SocketMonitor::instance().signalPacketReady();
0129 }
0130 
0131 bool PacketSocket::bytesReadyToWrite() const
0132 {
0133     QMutexLocker locker(&mutex);
0134     return !data_packets.empty() || !control_packets.empty();
0135 }
0136 
0137 void PacketSocket::preProcess(Packet::Ptr packet)
0138 {
0139     Q_UNUSED(packet);
0140 }
0141 
0142 Uint32 PacketSocket::dataBytesUploaded()
0143 {
0144     QMutexLocker locker(&mutex);
0145     Uint32 ret = uploaded_data_bytes;
0146     uploaded_data_bytes = 0;
0147     return ret;
0148 }
0149 
0150 void PacketSocket::clearPieces(bool reject)
0151 {
0152     QMutexLocker locker(&mutex);
0153 
0154     auto i = data_packets.begin();
0155     while (i != data_packets.end()) {
0156         Packet::Ptr p = *i;
0157         if (p->getType() == bt::PIECE && !p->sending() && curr_packet != p) {
0158             if (reject)
0159                 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
0160             pending_upload_data_bytes -= p->getDataLength();
0161             i = data_packets.erase(i);
0162         } else {
0163             i++;
0164         }
0165     }
0166 }
0167 
0168 void PacketSocket::doNotSendPiece(const bt::Request &req, bool reject)
0169 {
0170     QMutexLocker locker(&mutex);
0171     auto i = data_packets.begin();
0172     while (i != data_packets.end()) {
0173         Packet::Ptr p = *i;
0174         if (p->isPiece(req) && !p->sending() && p != curr_packet) {
0175             pending_upload_data_bytes -= p->getDataLength();
0176             i = data_packets.erase(i);
0177             if (reject) {
0178                 // queue a reject packet
0179                 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
0180             }
0181         } else {
0182             i++;
0183         }
0184     }
0185 }
0186 
0187 Uint32 PacketSocket::numPendingPieceUploads() const
0188 {
0189     QMutexLocker locker(&mutex);
0190     return data_packets.size();
0191 }
0192 
0193 Uint32 PacketSocket::numPendingPieceUploadBytes() const
0194 {
0195     QMutexLocker locker(&mutex);
0196     return pending_upload_data_bytes;
0197 }
0198 
0199 }