File indexing completed on 2025-01-05 04:37:33

0001 /*
0002     SPDX-FileCopyrightText: 2009 Joris Guisson <joris.guisson@gmail.com>
0003 
0004     SPDX-License-Identifier: GPL-2.0-or-later
0005 */
0006 
0007 #include "connection.h"
0008 #include "delaywindow.h"
0009 #include "localwindow.h"
0010 #include "remotewindow.h"
0011 #include <QEvent>
0012 #include <QFile>
0013 #include <QTextStream>
0014 #include <QThread>
0015 #include <time.h>
0016 #include <util/functions.h>
0017 #include <util/log.h>
0018 #include <util/sha1hash.h>
0019 
0020 using namespace bt;
0021 
0022 namespace utp
0023 {
0024 Connection::TransmissionError::TransmissionError(const char *file, int line)
0025 {
0026     location = QString("TransmissionError in %1 at line %2\n").arg(file).arg(line);
0027     Out(SYS_GEN | LOG_DEBUG) << location << endl;
0028 }
0029 
0030 Connection::Connection(bt::Uint16 recv_connection_id, Type type, const net::Address &remote, Transmitter *transmitter)
0031     : transmitter(transmitter)
0032     , blocking(false)
0033 {
0034     stats.type = type;
0035     stats.remote = remote;
0036     stats.recv_connection_id = recv_connection_id;
0037     stats.reply_micro = 0;
0038     stats.eof_seq_nr = -1;
0039     local_wnd = new LocalWindow(128 * 1024);
0040     remote_wnd = new RemoteWindow();
0041     delay_window = new DelayWindow();
0042     fin_sent = false;
0043     stats.rtt = 100;
0044     stats.rtt_var = 0;
0045     stats.timeout = 1000;
0046     stats.packet_size = 1500 - IP_AND_UDP_OVERHEAD - sizeof(utp::Header);
0047     stats.last_window_size_transmitted = 128 * 1024;
0048     if (type == OUTGOING) {
0049         stats.send_connection_id = recv_connection_id + 1;
0050     } else {
0051         stats.send_connection_id = recv_connection_id - 1;
0052         stats.state = CS_IDLE;
0053         stats.seq_nr = 5;
0054     }
0055 
0056     stats.bytes_received = 0;
0057     stats.bytes_sent = 0;
0058     stats.packets_received = 0;
0059     stats.packets_sent = 0;
0060     stats.bytes_lost = 0;
0061     stats.packets_lost = 0;
0062     stats.readable = stats.writeable = false;
0063 }
0064 
0065 Connection::~Connection()
0066 {
0067     delete local_wnd;
0068     delete remote_wnd;
0069     delete delay_window;
0070 }
0071 
0072 void Connection::startConnecting()
0073 {
0074     if (stats.type == OUTGOING) {
0075         sendSYN();
0076     }
0077 }
0078 
0079 #if 0
0080 static void DumpPacket(const Header & hdr, const SelectiveAck* sack)
0081 {
0082     Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl;
0083     Out(SYS_UTP | LOG_NOTICE) << "UTP: Packet Header: " << endl;
0084     Out(SYS_UTP | LOG_NOTICE) << "type:                              " << TypeToString(hdr.type) << endl;
0085     Out(SYS_UTP | LOG_NOTICE) << "version:                           " << hdr.version << endl;
0086     Out(SYS_UTP | LOG_NOTICE) << "extension:                         " << hdr.extension << endl;
0087     Out(SYS_UTP | LOG_NOTICE) << "connection_id:                     " << hdr.connection_id << endl;
0088     Out(SYS_UTP | LOG_NOTICE) << "timestamp_microseconds:            " << hdr.timestamp_microseconds << endl;
0089     Out(SYS_UTP | LOG_NOTICE) << "timestamp_difference_microseconds: " << hdr.timestamp_difference_microseconds << endl;
0090     Out(SYS_UTP | LOG_NOTICE) << "wnd_size:                          " << hdr.wnd_size << endl;
0091     Out(SYS_UTP | LOG_NOTICE) << "seq_nr:                            " << hdr.seq_nr << endl;
0092     Out(SYS_UTP | LOG_NOTICE) << "ack_nr:                            " << hdr.ack_nr << endl;
0093     Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl;
0094     if (!sack)
0095         return;
0096 
0097     Out(SYS_UTP | LOG_NOTICE) << "SelectiveAck:                      " << endl;
0098     Out(SYS_UTP | LOG_NOTICE) << "extension:                         " << sack->extension << endl;
0099     Out(SYS_UTP | LOG_NOTICE) << "length:                            " << sack->length << endl;
0100     Out(SYS_UTP | LOG_NOTICE) << "bitmask:                           " << hex(sack->bitmask[0]) << endl;
0101     Out(SYS_UTP | LOG_NOTICE) << "bitmask:                           " << hex(sack->bitmask[1]) << endl;
0102     Out(SYS_UTP | LOG_NOTICE) << "bitmask:                           " << hex(sack->bitmask[2]) << endl;
0103     Out(SYS_UTP | LOG_NOTICE) << "bitmask:                           " << hex(sack->bitmask[3]) << endl;
0104     Out(SYS_UTP | LOG_NOTICE) << "==============================================" << endl;
0105 }
0106 #endif
0107 
0108 ConnectionState Connection::handlePacket(const PacketParser &parser, bt::Buffer::Ptr packet)
0109 {
0110     QMutexLocker lock(&mutex);
0111     stats.packets_received++;
0112 
0113     const Header *hdr = parser.header();
0114     const SelectiveAck *sack = parser.selectiveAck();
0115     int data_off = parser.dataOffset();
0116 
0117     // DumpPacket(*hdr,sack);
0118 
0119     updateDelayMeasurement(hdr);
0120     remote_wnd->packetReceived(hdr, sack, this);
0121     switch (stats.state) {
0122     case CS_SYN_SENT:
0123         // now we should have a state packet
0124         if (hdr->type == ST_STATE) {
0125             // connection estabished
0126             stats.state = CS_CONNECTED;
0127             local_wnd->setLastSeqNr(hdr->seq_nr - 1);
0128             if (blocking)
0129                 connected.wakeAll();
0130             stats.timeout = 1000;
0131             Out(SYS_UTP | LOG_NOTICE) << "UTP: established connection with " << stats.remote.toString() << endl;
0132         } else {
0133             sendReset();
0134             stats.state = CS_CLOSED;
0135             if (blocking)
0136                 data_ready.wakeAll();
0137         }
0138         break;
0139     case CS_IDLE:
0140         if (hdr->type == ST_SYN) {
0141             // Send back a state packet
0142             local_wnd->setLastSeqNr(hdr->seq_nr);
0143             sendState();
0144             stats.state = CS_CONNECTED;
0145             stats.timeout = 1000;
0146             Out(SYS_UTP | LOG_NOTICE) << "UTP: established connection with " << stats.remote.toString() << endl;
0147         } else {
0148             sendReset();
0149             stats.state = CS_CLOSED;
0150             if (blocking)
0151                 data_ready.wakeAll();
0152         }
0153         break;
0154     case CS_CONNECTED:
0155         if (hdr->type == ST_DATA) {
0156             // push data into local window
0157             if (!local_wnd->packetReceived(hdr, packet, data_off)) {
0158                 // Panick
0159                 sendReset();
0160                 stats.state = CS_CLOSED;
0161                 if (blocking)
0162                     data_ready.wakeAll();
0163             } else {
0164                 // send back an ACK
0165                 sendStateOrData();
0166                 if (blocking && local_wnd->isReadable() > 0)
0167                     data_ready.wakeAll();
0168             }
0169         } else if (hdr->type == ST_STATE) {
0170             // try to send more data packets
0171             sendPackets();
0172             if (blocking && local_wnd->isReadable() > 0)
0173                 data_ready.wakeAll();
0174         } else if (hdr->type == ST_FIN) {
0175             stats.eof_seq_nr = hdr->seq_nr;
0176             // other side now has closed the connection
0177             stats.state = CS_FINISHED; // state becomes finished
0178             sendPackets();
0179             checkIfClosed();
0180             if (blocking && local_wnd->isReadable() > 0)
0181                 data_ready.wakeAll();
0182         } else {
0183             sendReset();
0184             stats.state = CS_CLOSED;
0185             if (blocking)
0186                 data_ready.wakeAll();
0187         }
0188         break;
0189     case CS_FINISHED:
0190         if (hdr->type == ST_DATA) {
0191             if (SeqNrCmpSE(hdr->seq_nr, stats.eof_seq_nr)) {
0192                 // push data into local window
0193                 if (!local_wnd->packetReceived(hdr, packet, data_off)) {
0194                     // Panick
0195                     sendReset();
0196                     stats.state = CS_CLOSED;
0197                     if (blocking)
0198                         data_ready.wakeAll();
0199                     break;
0200                 }
0201             }
0202 
0203             // send back an ACK
0204             sendStateOrData();
0205             if (stats.state == CS_FINISHED && !fin_sent && output_buffer.size() == 0) {
0206                 sendFIN();
0207                 fin_sent = true;
0208             }
0209             checkIfClosed();
0210             if (blocking && local_wnd->isReadable() > 0)
0211                 data_ready.wakeAll();
0212         } else if (hdr->type == ST_STATE) {
0213             // try to send more data packets
0214             sendPackets();
0215             checkIfClosed();
0216             if (blocking && local_wnd->isReadable() > 0)
0217                 data_ready.wakeAll();
0218         } else if (hdr->type == ST_FIN) {
0219             stats.eof_seq_nr = hdr->seq_nr;
0220             sendPackets();
0221             checkIfClosed();
0222             if (blocking && local_wnd->isReadable() > 0)
0223                 data_ready.wakeAll();
0224         } else {
0225             sendReset();
0226             stats.state = CS_CLOSED;
0227             if (blocking)
0228                 data_ready.wakeAll();
0229         }
0230         break;
0231     case CS_CLOSED:
0232         break;
0233     }
0234 
0235     checkState();
0236     startTimer();
0237     return stats.state;
0238 }
0239 
0240 void Connection::checkState()
0241 {
0242     // Check if we have become readable or writeable, and notify if necessary
0243     bool r = local_wnd->isReadable() > 0 || stats.state == CS_CLOSED;
0244     bool w = remote_wnd->availableSpace() > 0 && stats.state == CS_CONNECTED;
0245     bool r_changed = !stats.readable && r;
0246     bool w_changed = !stats.writeable && w;
0247     stats.readable = r;
0248     stats.writeable = w;
0249 
0250     mutex.unlock();
0251     // Temporary unlock the mutex to avoid a deadlock
0252     if (r_changed || w_changed)
0253         transmitter->stateChanged(self.toStrongRef(), r_changed, w_changed);
0254     mutex.lock();
0255 }
0256 
0257 void Connection::checkIfClosed()
0258 {
0259     // Check if we need to go to the closed state
0260     // We can do this if all our packets have been acked and the local window
0261     // has been fully read
0262     if (stats.state == CS_FINISHED && remote_wnd->allPacketsAcked() && local_wnd->isEmpty()) {
0263         stats.state = CS_CLOSED;
0264         Out(SYS_UTP | LOG_NOTICE) << "UTP: Connection " << stats.recv_connection_id << "|" << stats.send_connection_id << " closed " << endl;
0265         if (blocking)
0266             data_ready.wakeAll();
0267     }
0268 }
0269 
0270 void Connection::updateRTT(const utp::Header *hdr, bt::Uint32 packet_rtt, bt::Uint32 packet_size)
0271 {
0272     Q_UNUSED(hdr);
0273     int delta = stats.rtt - (int)packet_rtt;
0274     stats.rtt_var += (qAbs(delta) - stats.rtt_var) / 4;
0275     stats.rtt += ((int)packet_rtt - stats.rtt) / 8;
0276     stats.timeout = qMax(stats.rtt + stats.rtt_var * 4, 500);
0277     stats.bytes_sent += packet_size;
0278 }
0279 
0280 void Connection::sendPacket(Uint32 type, Uint16 p_ack_nr)
0281 {
0282     bt::Uint32 extension_length = extensionLength();
0283 
0284     PacketBuffer packet;
0285     TimeValue tv;
0286 
0287     Header hdr;
0288     hdr.version = 1;
0289     hdr.type = type;
0290     hdr.extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
0291     hdr.connection_id = type == ST_SYN ? stats.recv_connection_id : stats.send_connection_id;
0292     hdr.timestamp_microseconds = tv.timestampMicroSeconds();
0293     hdr.timestamp_difference_microseconds = stats.reply_micro;
0294     hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
0295     hdr.seq_nr = stats.seq_nr;
0296     hdr.ack_nr = p_ack_nr;
0297     packet.setHeader(hdr, extension_length);
0298 
0299     if (extension_length > 0) {
0300         bt::Uint8 *ptr = packet.extensionData();
0301         SelectiveAck sack;
0302         sack.extension = ptr[0] = 0;
0303         sack.length = ptr[1] = extension_length - 2;
0304         sack.bitmask = ptr + 2;
0305         local_wnd->fillSelectiveAck(&sack);
0306     }
0307 
0308     if (!transmitter->sendTo(self.toStrongRef(), packet))
0309         throw TransmissionError(__FILE__, __LINE__);
0310 
0311     last_packet_sent = tv;
0312     stats.packets_sent++;
0313     startTimer();
0314 }
0315 
0316 void Connection::sendSYN()
0317 {
0318     stats.seq_nr = 1;
0319     stats.state = CS_SYN_SENT;
0320     stats.timeout = CONNECT_TIMEOUT;
0321     sendPacket(ST_SYN, 0);
0322     stats.seq_nr++;
0323 }
0324 
0325 void Connection::sendState()
0326 {
0327     sendPacket(ST_STATE, local_wnd->lastSeqNr());
0328 }
0329 
0330 void Connection::sendFIN()
0331 {
0332     sendPacket(ST_FIN, local_wnd->lastSeqNr());
0333 }
0334 
0335 void Connection::sendReset()
0336 {
0337     sendPacket(ST_RESET, local_wnd->lastSeqNr());
0338 }
0339 
0340 void Connection::updateDelayMeasurement(const utp::Header *hdr)
0341 {
0342     TimeValue now;
0343     bt::Uint32 tms = now.timestampMicroSeconds();
0344     if (tms > hdr->timestamp_microseconds)
0345         stats.reply_micro = tms - hdr->timestamp_microseconds;
0346     else
0347         stats.reply_micro = hdr->timestamp_difference_microseconds - tms;
0348 
0349     bt::Uint32 base_delay = delay_window->update(hdr, now.toTimeStamp());
0350 
0351     int our_delay = hdr->timestamp_difference_microseconds / 1000 - base_delay;
0352     int off_target = CCONTROL_TARGET - our_delay;
0353     double delay_factor = (double)off_target / (double)CCONTROL_TARGET;
0354     double window_factor = remote_wnd->windowUsageFactor();
0355     double scaled_gain = MAX_CWND_INCREASE_PACKETS_PER_RTT * delay_factor * window_factor;
0356 
0357     remote_wnd->updateWindowSize(scaled_gain);
0358     if (remote_wnd->maxWindow() <= MIN_PACKET_SIZE)
0359         stats.packet_size = MIN_PACKET_SIZE;
0360     else if (remote_wnd->maxWindow() <= 1000)
0361         stats.packet_size = 500;
0362     else if (remote_wnd->maxWindow() <= 5000)
0363         stats.packet_size = 1000;
0364     else
0365         stats.packet_size = 1500 - IP_AND_UDP_OVERHEAD - sizeof(utp::Header);
0366 
0367     /*
0368     Out(SYS_UTP|LOG_DEBUG) << "base_delay " << base_delay << endl;
0369     Out(SYS_UTP|LOG_DEBUG) << "our_delay " << our_delay << endl;
0370     Out(SYS_UTP|LOG_DEBUG) << "off_target " << off_target << endl;
0371     Out(SYS_UTP|LOG_DEBUG) << "delay_factor " << delay_factor << endl;
0372     Out(SYS_UTP|LOG_DEBUG) << "window_factor " << window_factor << endl;
0373     Out(SYS_UTP|LOG_DEBUG) << "scaled_gain " << scaled_gain << endl;
0374     Out(SYS_UTP|LOG_DEBUG) << "packet_size " << stats.packet_size << endl;
0375     */
0376 }
0377 
0378 int Connection::send(const bt::Uint8 *data, Uint32 len)
0379 {
0380     QMutexLocker lock(&mutex);
0381     if (stats.state != CS_CONNECTED)
0382         return -1;
0383 
0384     // first put data in the output buffer then send packets
0385     bt::Uint32 ret = output_buffer.write(data, len);
0386     sendPackets();
0387     stats.writeable = !output_buffer.full();
0388     return ret;
0389 }
0390 
0391 void Connection::sendPackets()
0392 {
0393     // chop output_buffer data in packets and keep sending
0394     // until we are no longer allowed or the buffer is empty
0395     while (output_buffer.size() > 0 && remote_wnd->availableSpace() > 0) {
0396         bt::Uint32 to_read = qMin((bt::Uint32)output_buffer.size(), remote_wnd->availableSpace());
0397         to_read = qMin(to_read, stats.packet_size);
0398         to_read = qMin(to_read, PacketBuffer::MAX_SIZE - extensionLength() - Header::size());
0399         if (to_read == 0)
0400             break;
0401 
0402         PacketBuffer packet;
0403         packet.fillData(output_buffer, to_read);
0404 
0405         TimeValue now;
0406         sendDataPacket(packet, stats.seq_nr, now);
0407 
0408         remote_wnd->addPacket(packet, stats.seq_nr, now.toTimeStamp());
0409         stats.seq_nr++;
0410     }
0411 
0412     if (stats.state == CS_FINISHED && !fin_sent && output_buffer.size() == 0) {
0413         sendFIN();
0414         fin_sent = true;
0415     } else
0416         startTimer();
0417 }
0418 
0419 void Connection::sendStateOrData()
0420 {
0421     if (output_buffer.size() > 0 && remote_wnd->availableSpace() > 0)
0422         sendPackets();
0423     else
0424         sendState();
0425 }
0426 
0427 void Connection::sendDataPacket(PacketBuffer &packet, Uint16 seq_nr, const utp::TimeValue &now)
0428 {
0429     bt::Uint32 extension_length = extensionLength();
0430 
0431     Header hdr;
0432     hdr.version = 1;
0433     hdr.type = ST_DATA;
0434     hdr.extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
0435     hdr.connection_id = stats.send_connection_id;
0436     hdr.timestamp_microseconds = now.timestampMicroSeconds();
0437     hdr.timestamp_difference_microseconds = stats.reply_micro;
0438     hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
0439     hdr.seq_nr = seq_nr;
0440     hdr.ack_nr = local_wnd->lastSeqNr();
0441     if (!packet.setHeader(hdr, extension_length)) {
0442         // Not enough head room
0443         throw TransmissionError(__FILE__, __LINE__);
0444     }
0445 
0446     if (extension_length > 0) {
0447         bt::Uint8 *ptr = packet.extensionData();
0448         SelectiveAck sack;
0449         sack.extension = ptr[0] = 0;
0450         sack.length = ptr[1] = extension_length - 2;
0451         sack.bitmask = ptr + 2;
0452         local_wnd->fillSelectiveAck(&sack);
0453     }
0454 
0455     if (!transmitter->sendTo(self.toStrongRef(), packet))
0456         throw TransmissionError(__FILE__, __LINE__);
0457 
0458     last_packet_sent = now;
0459     stats.packets_sent++;
0460 }
0461 
0462 void Connection::retransmit(PacketBuffer &packet, Uint16 p_seq_nr)
0463 {
0464     TimeValue now;
0465     sendDataPacket(packet, p_seq_nr, now);
0466     startTimer();
0467 }
0468 
0469 bt::Uint32 Connection::bytesAvailable() const
0470 {
0471     QMutexLocker lock(&mutex);
0472     return local_wnd->bytesAvailable();
0473 }
0474 
0475 bool Connection::isWriteable() const
0476 {
0477     QMutexLocker lock(&mutex);
0478     return remote_wnd->availableSpace() > 0 && stats.state == CS_CONNECTED;
0479 }
0480 
0481 int Connection::recv(Uint8 *buf, Uint32 max_len)
0482 {
0483     QMutexLocker lock(&mutex);
0484     if (stats.state == CS_FINISHED)
0485         checkIfClosed();
0486 
0487     if (!local_wnd->bytesAvailable() && stats.state == CS_CLOSED)
0488         return -1;
0489 
0490     bt::Uint32 ret = local_wnd->read(buf, max_len);
0491     // Update the window if there is room again
0492     if (stats.last_window_size_transmitted < 2000 && local_wnd->availableSpace() > 2000)
0493         sendState();
0494 
0495     stats.bytes_received += ret;
0496     stats.readable = local_wnd->isReadable();
0497     return ret;
0498 }
0499 
0500 bool Connection::waitUntilConnected()
0501 {
0502     QMutexLocker lock(&mutex);
0503     if (stats.state == CS_CONNECTED)
0504         return true;
0505 
0506     connected.wait(&mutex);
0507     return stats.state == CS_CONNECTED;
0508 }
0509 
0510 bool Connection::waitForData(Uint32 timeout)
0511 {
0512     QMutexLocker lock(&mutex);
0513     if (local_wnd->isReadable())
0514         return true;
0515 
0516     data_ready.wait(&mutex, timeout == 0 ? ULONG_MAX : timeout);
0517     return local_wnd->isReadable();
0518 }
0519 
0520 void Connection::close()
0521 {
0522     QMutexLocker lock(&mutex);
0523     if (stats.state == CS_CONNECTED) {
0524         stats.state = CS_FINISHED;
0525         sendPackets();
0526     }
0527 }
0528 
0529 void Connection::reset()
0530 {
0531     QMutexLocker lock(&mutex);
0532     if (stats.state != CS_CLOSED) {
0533         sendReset();
0534         stats.state = CS_CLOSED;
0535         remote_wnd->clear();
0536         if (blocking)
0537             data_ready.wakeAll();
0538     }
0539 }
0540 
0541 void Connection::checkTimeout(const TimeValue &now)
0542 {
0543     QMutexLocker lock(&mutex);
0544     if (now >= stats.absolute_timeout)
0545         handleTimeout();
0546 }
0547 
0548 void Connection::handleTimeout()
0549 {
0550     switch (stats.state) {
0551     case CS_SYN_SENT:
0552         // No answer to SYN, so just close the connection
0553         stats.state = CS_CLOSED;
0554         if (blocking)
0555             connected.wakeAll();
0556         break;
0557     case CS_FINISHED:
0558         stats.state = CS_CLOSED;
0559         if (blocking)
0560             data_ready.wakeAll();
0561         break;
0562     case CS_CONNECTED:
0563         remote_wnd->timeout(this);
0564         stats.packet_size = MIN_PACKET_SIZE;
0565         stats.timeout *= 2;
0566 
0567         if (stats.timeout >= MAX_TIMEOUT) {
0568             // If we have reached the max timeout, kill the connection
0569             Out(SYS_UTP | LOG_DEBUG) << "Connection " << stats.recv_connection_id << "|" << stats.send_connection_id << " max timeout reached, closing" << endl;
0570             stats.state = CS_FINISHED;
0571             sendReset();
0572         } else {
0573             sendPackets();
0574             if (TimeValue() - last_packet_sent > KEEP_ALIVE_TIMEOUT) {
0575                 // Keep the connection alive
0576                 sendState();
0577             }
0578         }
0579         break;
0580     case CS_IDLE:
0581         startTimer();
0582         break;
0583     case CS_CLOSED:
0584         break;
0585     }
0586 
0587     checkState();
0588     if (stats.state == CS_CLOSED)
0589         transmitter->closed(self.toStrongRef());
0590 }
0591 
0592 void Connection::dumpStats()
0593 {
0594     Out(SYS_UTP | LOG_DEBUG) << "Connection " << stats.recv_connection_id << "|" << stats.send_connection_id << " stats:" << endl;
0595     Out(SYS_UTP | LOG_DEBUG) << "bytes_received   = " << stats.bytes_received << endl;
0596     Out(SYS_UTP | LOG_DEBUG) << "bytes_sent       = " << stats.bytes_sent << endl;
0597     Out(SYS_UTP | LOG_DEBUG) << "packets_received = " << stats.packets_received << endl;
0598     Out(SYS_UTP | LOG_DEBUG) << "packets_sent     = " << stats.packets_sent << endl;
0599     Out(SYS_UTP | LOG_DEBUG) << "bytes_lost       = " << stats.bytes_lost << endl;
0600     Out(SYS_UTP | LOG_DEBUG) << "packets_lost     = " << stats.packets_lost << endl;
0601     Out(SYS_UTP | LOG_DEBUG) << "local_window     = " << local_wnd->bytesAvailable() << endl;
0602 }
0603 
0604 bool Connection::allDataSent() const
0605 {
0606     QMutexLocker lock(&mutex);
0607     return remote_wnd->allPacketsAcked() && output_buffer.size() == 0;
0608 }
0609 
0610 void Connection::startTimer()
0611 {
0612     stats.absolute_timeout = TimeValue();
0613     stats.absolute_timeout.addMilliSeconds(stats.timeout);
0614 }
0615 
0616 bt::Uint32 Connection::extensionLength() const
0617 {
0618     bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
0619     if (sack_bits > 0)
0620         return 2 + qMin(sack_bits / 8, (bt::Uint32)4);
0621     else
0622         return 0;
0623 }
0624 
0625 ///////////////////////////////////////////////////////
0626 
0627 Transmitter::~Transmitter()
0628 {
0629 }
0630 }
0631 
0632 #include "moc_connection.cpp"