File indexing completed on 2024-05-12 04:58:51
0001 /** 0002 * SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL 0005 */ 0006 0007 #include "compositeuploadjob.h" 0008 #include "lanlinkprovider.h" 0009 #include "plugins/share/shareplugin.h" 0010 #include <KJobTrackerInterface> 0011 #include <KLocalizedString> 0012 #include <core_debug.h> 0013 #include <daemon.h> 0014 0015 CompositeUploadJob::CompositeUploadJob(const QString &deviceId, bool displayNotification) 0016 : KCompositeJob() 0017 , m_server(new Server(this)) 0018 , m_socket(nullptr) 0019 , m_port(0) 0020 , m_deviceId(deviceId) 0021 , m_running(false) 0022 , m_currentJobNum(1) 0023 , m_totalJobs(0) 0024 , m_currentJobSendPayloadSize(0) 0025 , m_totalSendPayloadSize(0) 0026 , m_totalPayloadSize(0) 0027 , m_currentJob(nullptr) 0028 , m_prevElapsedTime(0) 0029 , m_updatePacketPending(false) 0030 { 0031 setCapabilities(Killable); 0032 0033 if (displayNotification) { 0034 Daemon::instance()->jobTracker()->registerJob(this); 0035 } 0036 } 0037 0038 bool CompositeUploadJob::isRunning() 0039 { 0040 return m_running; 0041 } 0042 0043 void CompositeUploadJob::start() 0044 { 0045 if (m_running) { 0046 qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - already running"; 0047 return; 0048 } 0049 0050 if (!hasSubjobs()) { 0051 qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - there are no subjobs to start"; 0052 emitResult(); 0053 return; 0054 } 0055 0056 if (!startListening()) { 0057 return; 0058 } 0059 0060 connect(m_server, &QTcpServer::newConnection, this, &CompositeUploadJob::newConnection); 0061 0062 m_running = true; 0063 0064 // Give SharePlugin some time to add subjobs 0065 QMetaObject::invokeMethod(this, "startNextSubJob", Qt::QueuedConnection); 0066 } 0067 0068 bool CompositeUploadJob::startListening() 0069 { 0070 m_port = MIN_PORT; 0071 while (!m_server->listen(QHostAddress::Any, m_port)) { 0072 m_port++; 0073 if (m_port > MAX_PORT) { // No ports available? 0074 qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - Error opening a port in range" << MIN_PORT << "-" << MAX_PORT; 0075 m_port = 0; 0076 setError(NoPortAvailable); 0077 setErrorText(i18n("Couldn't find an available port")); 0078 emitResult(); 0079 return false; 0080 } 0081 } 0082 0083 qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - listening on port: " << m_port; 0084 return true; 0085 } 0086 0087 void CompositeUploadJob::startNextSubJob() 0088 { 0089 m_currentJob = qobject_cast<UploadJob *>(subjobs().at(0)); 0090 m_currentJobSendPayloadSize = 0; 0091 emitDescription(m_currentJob->getNetworkPacket().get<QString>(QStringLiteral("filename"))); 0092 0093 connect(m_currentJob, &UploadJob::processedAmountChanged, this, &CompositeUploadJob::slotProcessedAmount); 0094 0095 // Already done by KCompositeJob 0096 // connect(m_currentJob, &KJob::result, this, &CompositeUploadJob::slotResult); 0097 0098 // TODO: Create a copy of the networkpacket that can be re-injected if sending via lan fails? 0099 NetworkPacket np = m_currentJob->getNetworkPacket(); 0100 np.setPayload(nullptr, np.payloadSize()); 0101 np.setPayloadTransferInfo({{QStringLiteral("port"), m_port}}); 0102 np.set<int>(QStringLiteral("numberOfFiles"), m_totalJobs); 0103 np.set<quint64>(QStringLiteral("totalPayloadSize"), m_totalPayloadSize); 0104 0105 Device *device = Daemon::instance()->getDevice(m_deviceId); 0106 if (device == nullptr) { 0107 qCWarning(KDECONNECT_CORE) << "Device disconnected" << this->m_deviceId; 0108 return; 0109 } 0110 0111 if (device->sendPacket(np)) { 0112 m_server->resumeAccepting(); 0113 } else { 0114 setError(SendingNetworkPacketFailed); 0115 setErrorText(i18n("Failed to send packet to %1", device->name())); 0116 0117 emitResult(); 0118 } 0119 } 0120 0121 void CompositeUploadJob::newConnection() 0122 { 0123 m_server->pauseAccepting(); 0124 0125 m_socket = m_server->nextPendingConnection(); 0126 0127 if (!m_socket) { 0128 qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::newConnection() - m_server->nextPendingConnection() returned a nullptr"; 0129 return; 0130 } 0131 0132 m_currentJob->setSocket(m_socket); 0133 0134 connect(m_socket, &QSslSocket::disconnected, this, [this]() { 0135 m_socket->close(); 0136 }); 0137 connect(m_socket, &QAbstractSocket::errorOccurred, this, [this](QAbstractSocket::SocketError error) { 0138 qCDebug(KDECONNECT_CORE) << "Error in socket occurred" << error; 0139 // Do not close the socket because when android closes the socket (share is cancelled) closing the socket leads to a cyclic socketError and eventually a 0140 // segv 0141 setError(SocketError); 0142 emitResult(); 0143 0144 m_running = false; 0145 }); 0146 connect(m_socket, &QSslSocket::sslErrors, this, [this](const QList<QSslError> &errors) { 0147 qCDebug(KDECONNECT_CORE) << "Received ssl errors" << errors; 0148 m_socket->close(); 0149 setError(SslError); 0150 emitResult(); 0151 0152 m_running = false; 0153 }); 0154 connect(m_socket, &QSslSocket::encrypted, this, [this]() { 0155 if (!m_timer.isValid()) { 0156 m_timer.start(); 0157 } 0158 0159 m_currentJob->start(); 0160 }); 0161 0162 LanLinkProvider::configureSslSocket(m_socket, m_deviceId, true); 0163 0164 m_socket->startServerEncryption(); 0165 } 0166 0167 bool CompositeUploadJob::addSubjob(KJob *job) 0168 { 0169 if (UploadJob *uploadJob = qobject_cast<UploadJob *>(job)) { 0170 NetworkPacket np = uploadJob->getNetworkPacket(); 0171 0172 m_totalJobs++; 0173 0174 if (np.payloadSize() >= 0) { 0175 m_totalPayloadSize += np.payloadSize(); 0176 setTotalAmount(Bytes, m_totalPayloadSize); 0177 } 0178 0179 QString filename; 0180 QString filenameArg = QStringLiteral("filename"); 0181 0182 if (m_currentJob) { 0183 filename = m_currentJob->getNetworkPacket().get<QString>(filenameArg); 0184 } else { 0185 filename = np.get<QString>(filenameArg); 0186 } 0187 0188 emitDescription(filename); 0189 0190 if (m_running && m_currentJob && !m_updatePacketPending) { 0191 m_updatePacketPending = true; 0192 QMetaObject::invokeMethod(this, "sendUpdatePacket", Qt::QueuedConnection); 0193 } 0194 0195 return KCompositeJob::addSubjob(job); 0196 } else { 0197 qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::addSubjob() - you can only add UploadJob's, ignoring"; 0198 return false; 0199 } 0200 } 0201 0202 void CompositeUploadJob::sendUpdatePacket() 0203 { 0204 m_updatePacketPending = false; 0205 0206 NetworkPacket np(PACKET_TYPE_SHARE_REQUEST_UPDATE); 0207 np.set<int>(QStringLiteral("numberOfFiles"), m_totalJobs); 0208 np.set<quint64>(QStringLiteral("totalPayloadSize"), m_totalPayloadSize); 0209 0210 Device *device = Daemon::instance()->getDevice(m_deviceId); 0211 if (device == nullptr) { 0212 qCWarning(KDECONNECT_CORE) << "Device disconnected" << this->m_deviceId; 0213 return; 0214 } 0215 device->sendPacket(np); 0216 } 0217 0218 bool CompositeUploadJob::doKill() 0219 { 0220 if (m_running) { 0221 m_running = false; 0222 0223 return m_currentJob->stop(); 0224 } 0225 0226 return true; 0227 } 0228 0229 void CompositeUploadJob::slotProcessedAmount(KJob * /*job*/, KJob::Unit unit, qulonglong amount) 0230 { 0231 m_currentJobSendPayloadSize = amount; 0232 quint64 uploaded = m_totalSendPayloadSize + m_currentJobSendPayloadSize; 0233 0234 if (uploaded == m_totalPayloadSize || m_prevElapsedTime == 0 || m_timer.elapsed() - m_prevElapsedTime >= 100) { 0235 m_prevElapsedTime = m_timer.elapsed(); 0236 setProcessedAmount(unit, uploaded); 0237 0238 const auto elapsed = m_timer.elapsed(); 0239 if (elapsed > 0) { 0240 emitSpeed((1000 * uploaded) / elapsed); 0241 } 0242 } 0243 } 0244 0245 void CompositeUploadJob::slotResult(KJob *job) 0246 { 0247 // Copies job error and errorText and emits result if job is in error otherwise removes job from subjob list 0248 KCompositeJob::slotResult(job); 0249 0250 if (error() || !m_running) { 0251 return; 0252 } 0253 0254 m_totalSendPayloadSize += m_currentJobSendPayloadSize; 0255 0256 if (hasSubjobs()) { 0257 m_currentJobNum++; 0258 startNextSubJob(); 0259 } else { 0260 emitResult(); 0261 } 0262 } 0263 0264 void CompositeUploadJob::emitDescription(const QString ¤tFileName) 0265 { 0266 Device *device = Daemon::instance()->getDevice(this->m_deviceId); 0267 if (device == nullptr) { 0268 qWarning() << "Device disconnected" << this->m_deviceId; 0269 return; 0270 } 0271 Q_EMIT description(this, i18n("Sending to %1", device->name()), {i18n("File"), currentFileName}, {}); 0272 0273 setProcessedAmount(Files, m_currentJobNum); 0274 setTotalAmount(Files, m_totalJobs); 0275 } 0276 0277 #include "moc_compositeuploadjob.cpp"