File indexing completed on 2024-05-12 04:58:51

0001 /**
0002  * SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
0003  *
0004  * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
0005  */
0006 
0007 #include "compositeuploadjob.h"
0008 #include "lanlinkprovider.h"
0009 #include "plugins/share/shareplugin.h"
0010 #include <KJobTrackerInterface>
0011 #include <KLocalizedString>
0012 #include <core_debug.h>
0013 #include <daemon.h>
0014 
0015 CompositeUploadJob::CompositeUploadJob(const QString &deviceId, bool displayNotification)
0016     : KCompositeJob()
0017     , m_server(new Server(this))
0018     , m_socket(nullptr)
0019     , m_port(0)
0020     , m_deviceId(deviceId)
0021     , m_running(false)
0022     , m_currentJobNum(1)
0023     , m_totalJobs(0)
0024     , m_currentJobSendPayloadSize(0)
0025     , m_totalSendPayloadSize(0)
0026     , m_totalPayloadSize(0)
0027     , m_currentJob(nullptr)
0028     , m_prevElapsedTime(0)
0029     , m_updatePacketPending(false)
0030 {
0031     setCapabilities(Killable);
0032 
0033     if (displayNotification) {
0034         Daemon::instance()->jobTracker()->registerJob(this);
0035     }
0036 }
0037 
0038 bool CompositeUploadJob::isRunning()
0039 {
0040     return m_running;
0041 }
0042 
0043 void CompositeUploadJob::start()
0044 {
0045     if (m_running) {
0046         qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - already running";
0047         return;
0048     }
0049 
0050     if (!hasSubjobs()) {
0051         qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - there are no subjobs to start";
0052         emitResult();
0053         return;
0054     }
0055 
0056     if (!startListening()) {
0057         return;
0058     }
0059 
0060     connect(m_server, &QTcpServer::newConnection, this, &CompositeUploadJob::newConnection);
0061 
0062     m_running = true;
0063 
0064     // Give SharePlugin some time to add subjobs
0065     QMetaObject::invokeMethod(this, "startNextSubJob", Qt::QueuedConnection);
0066 }
0067 
0068 bool CompositeUploadJob::startListening()
0069 {
0070     m_port = MIN_PORT;
0071     while (!m_server->listen(QHostAddress::Any, m_port)) {
0072         m_port++;
0073         if (m_port > MAX_PORT) { // No ports available?
0074             qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - Error opening a port in range" << MIN_PORT << "-" << MAX_PORT;
0075             m_port = 0;
0076             setError(NoPortAvailable);
0077             setErrorText(i18n("Couldn't find an available port"));
0078             emitResult();
0079             return false;
0080         }
0081     }
0082 
0083     qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - listening on port: " << m_port;
0084     return true;
0085 }
0086 
0087 void CompositeUploadJob::startNextSubJob()
0088 {
0089     m_currentJob = qobject_cast<UploadJob *>(subjobs().at(0));
0090     m_currentJobSendPayloadSize = 0;
0091     emitDescription(m_currentJob->getNetworkPacket().get<QString>(QStringLiteral("filename")));
0092 
0093     connect(m_currentJob, &UploadJob::processedAmountChanged, this, &CompositeUploadJob::slotProcessedAmount);
0094 
0095     // Already done by KCompositeJob
0096     // connect(m_currentJob, &KJob::result, this, &CompositeUploadJob::slotResult);
0097 
0098     // TODO: Create a copy of the networkpacket that can be re-injected if sending via lan fails?
0099     NetworkPacket np = m_currentJob->getNetworkPacket();
0100     np.setPayload(nullptr, np.payloadSize());
0101     np.setPayloadTransferInfo({{QStringLiteral("port"), m_port}});
0102     np.set<int>(QStringLiteral("numberOfFiles"), m_totalJobs);
0103     np.set<quint64>(QStringLiteral("totalPayloadSize"), m_totalPayloadSize);
0104 
0105     Device *device = Daemon::instance()->getDevice(m_deviceId);
0106     if (device == nullptr) {
0107         qCWarning(KDECONNECT_CORE) << "Device disconnected" << this->m_deviceId;
0108         return;
0109     }
0110 
0111     if (device->sendPacket(np)) {
0112         m_server->resumeAccepting();
0113     } else {
0114         setError(SendingNetworkPacketFailed);
0115         setErrorText(i18n("Failed to send packet to %1", device->name()));
0116 
0117         emitResult();
0118     }
0119 }
0120 
0121 void CompositeUploadJob::newConnection()
0122 {
0123     m_server->pauseAccepting();
0124 
0125     m_socket = m_server->nextPendingConnection();
0126 
0127     if (!m_socket) {
0128         qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::newConnection() - m_server->nextPendingConnection() returned a nullptr";
0129         return;
0130     }
0131 
0132     m_currentJob->setSocket(m_socket);
0133 
0134     connect(m_socket, &QSslSocket::disconnected, this, [this]() {
0135         m_socket->close();
0136     });
0137     connect(m_socket, &QAbstractSocket::errorOccurred, this, [this](QAbstractSocket::SocketError error) {
0138         qCDebug(KDECONNECT_CORE) << "Error in socket occurred" << error;
0139         // Do not close the socket because when android closes the socket (share is cancelled) closing the socket leads to a cyclic socketError and eventually a
0140         // segv
0141         setError(SocketError);
0142         emitResult();
0143 
0144         m_running = false;
0145     });
0146     connect(m_socket, &QSslSocket::sslErrors, this, [this](const QList<QSslError> &errors) {
0147         qCDebug(KDECONNECT_CORE) << "Received ssl errors" << errors;
0148         m_socket->close();
0149         setError(SslError);
0150         emitResult();
0151 
0152         m_running = false;
0153     });
0154     connect(m_socket, &QSslSocket::encrypted, this, [this]() {
0155         if (!m_timer.isValid()) {
0156             m_timer.start();
0157         }
0158 
0159         m_currentJob->start();
0160     });
0161 
0162     LanLinkProvider::configureSslSocket(m_socket, m_deviceId, true);
0163 
0164     m_socket->startServerEncryption();
0165 }
0166 
0167 bool CompositeUploadJob::addSubjob(KJob *job)
0168 {
0169     if (UploadJob *uploadJob = qobject_cast<UploadJob *>(job)) {
0170         NetworkPacket np = uploadJob->getNetworkPacket();
0171 
0172         m_totalJobs++;
0173 
0174         if (np.payloadSize() >= 0) {
0175             m_totalPayloadSize += np.payloadSize();
0176             setTotalAmount(Bytes, m_totalPayloadSize);
0177         }
0178 
0179         QString filename;
0180         QString filenameArg = QStringLiteral("filename");
0181 
0182         if (m_currentJob) {
0183             filename = m_currentJob->getNetworkPacket().get<QString>(filenameArg);
0184         } else {
0185             filename = np.get<QString>(filenameArg);
0186         }
0187 
0188         emitDescription(filename);
0189 
0190         if (m_running && m_currentJob && !m_updatePacketPending) {
0191             m_updatePacketPending = true;
0192             QMetaObject::invokeMethod(this, "sendUpdatePacket", Qt::QueuedConnection);
0193         }
0194 
0195         return KCompositeJob::addSubjob(job);
0196     } else {
0197         qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::addSubjob() - you can only add UploadJob's, ignoring";
0198         return false;
0199     }
0200 }
0201 
0202 void CompositeUploadJob::sendUpdatePacket()
0203 {
0204     m_updatePacketPending = false;
0205 
0206     NetworkPacket np(PACKET_TYPE_SHARE_REQUEST_UPDATE);
0207     np.set<int>(QStringLiteral("numberOfFiles"), m_totalJobs);
0208     np.set<quint64>(QStringLiteral("totalPayloadSize"), m_totalPayloadSize);
0209 
0210     Device *device = Daemon::instance()->getDevice(m_deviceId);
0211     if (device == nullptr) {
0212         qCWarning(KDECONNECT_CORE) << "Device disconnected" << this->m_deviceId;
0213         return;
0214     }
0215     device->sendPacket(np);
0216 }
0217 
0218 bool CompositeUploadJob::doKill()
0219 {
0220     if (m_running) {
0221         m_running = false;
0222 
0223         return m_currentJob->stop();
0224     }
0225 
0226     return true;
0227 }
0228 
0229 void CompositeUploadJob::slotProcessedAmount(KJob * /*job*/, KJob::Unit unit, qulonglong amount)
0230 {
0231     m_currentJobSendPayloadSize = amount;
0232     quint64 uploaded = m_totalSendPayloadSize + m_currentJobSendPayloadSize;
0233 
0234     if (uploaded == m_totalPayloadSize || m_prevElapsedTime == 0 || m_timer.elapsed() - m_prevElapsedTime >= 100) {
0235         m_prevElapsedTime = m_timer.elapsed();
0236         setProcessedAmount(unit, uploaded);
0237 
0238         const auto elapsed = m_timer.elapsed();
0239         if (elapsed > 0) {
0240             emitSpeed((1000 * uploaded) / elapsed);
0241         }
0242     }
0243 }
0244 
0245 void CompositeUploadJob::slotResult(KJob *job)
0246 {
0247     // Copies job error and errorText and emits result if job is in error otherwise removes job from subjob list
0248     KCompositeJob::slotResult(job);
0249 
0250     if (error() || !m_running) {
0251         return;
0252     }
0253 
0254     m_totalSendPayloadSize += m_currentJobSendPayloadSize;
0255 
0256     if (hasSubjobs()) {
0257         m_currentJobNum++;
0258         startNextSubJob();
0259     } else {
0260         emitResult();
0261     }
0262 }
0263 
0264 void CompositeUploadJob::emitDescription(const QString &currentFileName)
0265 {
0266     Device *device = Daemon::instance()->getDevice(this->m_deviceId);
0267     if (device == nullptr) {
0268         qWarning() << "Device disconnected" << this->m_deviceId;
0269         return;
0270     }
0271     Q_EMIT description(this, i18n("Sending to %1", device->name()), {i18n("File"), currentFileName}, {});
0272 
0273     setProcessedAmount(Files, m_currentJobNum);
0274     setTotalAmount(Files, m_totalJobs);
0275 }
0276 
0277 #include "moc_compositeuploadjob.cpp"