File indexing completed on 2024-04-21 04:51:39

0001 /*
0002 SPDX-FileCopyrightText: 2021 Jean-Baptiste Mardelle <jb@kdenlive.org>
0003 This file is part of Kdenlive. See www.kdenlive.org.
0004 
0005 SPDX-License-Identifier: GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
0006 */
0007 
0008 #include "taskmanager.h"
0009 #include "bin/abstractprojectitem.h"
0010 #include "bin/projectclip.h"
0011 #include "bin/projectitemmodel.h"
0012 #include "core.h"
0013 #include "kdenlivesettings.h"
0014 #include "macros.hpp"
0015 #include "undohelper.hpp"
0016 
0017 #include <KMessageWidget>
0018 #include <QFuture>
0019 #include <QThread>
0020 
0021 TaskManager::TaskManager(QObject *parent)
0022     : QObject(parent)
0023     , displayedClip(-1)
0024     , m_tasksListLock(QReadWriteLock::Recursive)
0025     , m_blockUpdates(false)
0026 {
0027     int maxThreads = qMin(4, QThread::idealThreadCount() - 1);
0028     m_taskPool.setMaxThreadCount(qMax(maxThreads, 1));
0029     m_transcodePool.setMaxThreadCount(KdenliveSettings::proxythreads());
0030 }
0031 
0032 TaskManager::~TaskManager()
0033 {
0034     slotCancelJobs();
0035 }
0036 
0037 bool TaskManager::isBlocked() const
0038 {
0039     return m_blockUpdates;
0040 }
0041 
0042 void TaskManager::updateConcurrency()
0043 {
0044     m_transcodePool.setMaxThreadCount(KdenliveSettings::proxythreads());
0045 }
0046 
0047 void TaskManager::discardJobs(const ObjectId &owner, AbstractTask::JOBTYPE type, bool softDelete, const QVector<AbstractTask::JOBTYPE> exceptions)
0048 {
0049     qDebug() << "========== READY FOR TASK DISCARD ON: " << owner.itemId;
0050     if (m_blockUpdates) {
0051         // We are already deleting all tasks
0052         return;
0053     }
0054     m_tasksListLock.lockForRead();
0055     // See if there is already a task for this MLT service and resource.
0056     if (m_taskList.find(owner.itemId) == m_taskList.end()) {
0057         m_tasksListLock.unlock();
0058         return;
0059     }
0060     std::vector<AbstractTask *> taskList = m_taskList.at(owner.itemId);
0061     m_tasksListLock.unlock();
0062     for (AbstractTask *t : taskList) {
0063         if ((type == AbstractTask::NOJOBTYPE || type == t->m_type) && t->m_progress < 100) {
0064             // If so, then just add ourselves to be notified upon completion.
0065             if (exceptions.contains(t->m_type) || t->isCanceled()) {
0066                 // Don't abort
0067                 continue;
0068             }
0069             t->cancelJob(softDelete);
0070             // Block until the task is finished
0071             t->m_runMutex.lock();
0072             t->m_runMutex.unlock();
0073             t->deleteLater();
0074         }
0075     }
0076 }
0077 
0078 void TaskManager::discardJob(const ObjectId &owner, const QUuid &uuid)
0079 {
0080     if (m_blockUpdates) {
0081         // We are already deleting all tasks
0082         return;
0083     }
0084     m_tasksListLock.lockForRead();
0085     // See if there is already a task for this MLT service and resource.
0086     if (m_taskList.find(owner.itemId) == m_taskList.end()) {
0087         m_tasksListLock.unlock();
0088         return;
0089     }
0090     std::vector<AbstractTask *> taskList = m_taskList.at(owner.itemId);
0091     m_tasksListLock.unlock();
0092     for (AbstractTask *t : taskList) {
0093         if ((t->m_uuid == uuid) && t->m_progress < 100 && !t->isCanceled()) {
0094             t->cancelJob();
0095             // Block until the task is finished
0096             t->m_runMutex.lock();
0097             t->m_runMutex.unlock();
0098             t->deleteLater();
0099         }
0100     }
0101 }
0102 
0103 bool TaskManager::hasPendingJob(const ObjectId &owner, AbstractTask::JOBTYPE type) const
0104 {
0105     QReadLocker lk(&m_tasksListLock);
0106     if (type == AbstractTask::NOJOBTYPE) {
0107         // Check for any kind of job for this clip
0108         return m_taskList.find(owner.itemId) != m_taskList.end();
0109     }
0110     if (m_taskList.find(owner.itemId) == m_taskList.end()) {
0111         return false;
0112     }
0113     std::vector<AbstractTask *> taskList = m_taskList.at(owner.itemId);
0114     for (AbstractTask *t : taskList) {
0115         if (type == t->m_type && t->m_progress < 100 && !t->m_isCanceled) {
0116             return true;
0117         }
0118     }
0119     return false;
0120 }
0121 
0122 TaskManagerStatus TaskManager::jobStatus(const ObjectId &owner) const
0123 {
0124     QReadLocker lk(&m_tasksListLock);
0125     if (m_taskList.find(owner.itemId) == m_taskList.end()) {
0126         // No job for this clip
0127         return TaskManagerStatus::NoJob;
0128     }
0129     std::vector<AbstractTask *> taskList = m_taskList.at(owner.itemId);
0130     for (AbstractTask *t : taskList) {
0131         if (t->m_running) {
0132             return TaskManagerStatus::Running;
0133         }
0134     }
0135     return TaskManagerStatus::Pending;
0136 }
0137 
0138 void TaskManager::updateJobCount()
0139 {
0140     QReadLocker lk(&m_tasksListLock);
0141     int count = 0;
0142     for (const auto &task : m_taskList) {
0143         count += task.second.size();
0144     }
0145     // Set jobs count
0146     Q_EMIT jobCount(count);
0147 }
0148 
0149 void TaskManager::taskDone(int cid, AbstractTask *task)
0150 {
0151     // This will be executed in the QRunnable job thread
0152     if (m_blockUpdates) {
0153         // We are closing, tasks will be handled on close
0154         return;
0155     }
0156     m_tasksListLock.lockForWrite();
0157     Q_ASSERT(m_taskList.find(cid) != m_taskList.end());
0158     m_taskList[cid].erase(std::remove(m_taskList[cid].begin(), m_taskList[cid].end(), task), m_taskList[cid].end());
0159     if (m_taskList[cid].size() == 0) {
0160         m_taskList.erase(cid);
0161     }
0162     task->deleteLater();
0163     m_tasksListLock.unlock();
0164     QMetaObject::invokeMethod(this, "updateJobCount");
0165 }
0166 
0167 void TaskManager::slotCancelJobs(bool leaveBlocked, const QVector<AbstractTask::JOBTYPE> exceptions)
0168 {
0169     if (m_blockUpdates) {
0170         // Already canceling
0171         return;
0172     }
0173     m_tasksListLock.lockForWrite();
0174     m_blockUpdates = true;
0175     for (const auto &task : m_taskList) {
0176         for (AbstractTask *t : task.second) {
0177             if (m_taskList.find(task.first) != m_taskList.end()) {
0178                 if (!exceptions.contains(t->m_type) && !t->isCanceled()) {
0179                     // If so, then just add ourselves to be notified upon completion.
0180                     t->cancelJob();
0181                     t->m_runMutex.lock();
0182                     t->m_runMutex.unlock();
0183                     t->deleteLater();
0184                 }
0185             }
0186         }
0187     }
0188     if (exceptions.isEmpty()) {
0189         m_taskPool.waitForDone();
0190         m_transcodePool.waitForDone();
0191         m_taskList.clear();
0192         m_taskPool.clear();
0193     }
0194     if (!leaveBlocked) {
0195         m_blockUpdates = false;
0196     }
0197     m_tasksListLock.unlock();
0198     updateJobCount();
0199 }
0200 
0201 void TaskManager::unBlock()
0202 {
0203     m_blockUpdates = false;
0204 }
0205 
0206 void TaskManager::startTask(int ownerId, AbstractTask *task)
0207 {
0208     if (m_blockUpdates) {
0209         // We are closing, tasks will be handled on close
0210         delete task;
0211         return;
0212     }
0213     m_tasksListLock.lockForWrite();
0214     if (m_taskList.find(ownerId) == m_taskList.end()) {
0215         // First task for this clip
0216         m_taskList[ownerId] = {task};
0217     } else {
0218         m_taskList[ownerId].emplace_back(task);
0219     }
0220     if (task->m_type == AbstractTask::TRANSCODEJOB || task->m_type == AbstractTask::PROXYJOB) {
0221         // We only want a limited concurrent jobs for those as for example GPU usually only accept 2 concurrent encoding jobs
0222         m_transcodePool.start(task, task->m_priority);
0223     } else {
0224         m_taskPool.start(task, task->m_priority);
0225     }
0226     m_tasksListLock.unlock();
0227     updateJobCount();
0228 }
0229 
0230 int TaskManager::getJobProgressForClip(const ObjectId &owner)
0231 {
0232     QReadLocker lk(&m_tasksListLock);
0233     QStringList jobNames;
0234     QList<int> jobsProgress;
0235     QStringList jobsUuids;
0236     if (m_taskList.find(owner.itemId) == m_taskList.end()) {
0237         if (owner.itemId == displayedClip) {
0238             Q_EMIT detailedProgress(owner, jobNames, jobsProgress, jobsUuids);
0239         }
0240         return 100;
0241     }
0242     std::vector<AbstractTask *> taskList = m_taskList.at(owner.itemId);
0243     int cnt = taskList.size();
0244     if (cnt == 0) {
0245         return 100;
0246     }
0247     int total = 0;
0248     for (AbstractTask *t : taskList) {
0249         if (t->m_type == AbstractTask::LOADJOB) {
0250             // Don't show progress for load task
0251             cnt--;
0252         } else if (owner.itemId == displayedClip) {
0253             jobNames << t->m_description;
0254             jobsProgress << t->m_progress;
0255             jobsUuids << t->m_uuid.toString();
0256         }
0257         total += t->m_progress;
0258     }
0259     if (cnt == 0) {
0260         return 100;
0261     }
0262     total /= cnt;
0263     if (owner.itemId == displayedClip) {
0264         Q_EMIT detailedProgress(owner, jobNames, jobsProgress, jobsUuids);
0265     }
0266     return total;
0267 }