File indexing completed on 2024-04-21 04:01:28
0001 /* -*- C++ -*- 0002 The Queue class in ThreadWeaver. 0003 0004 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> 0005 0006 SPDX-License-Identifier: LGPL-2.0-or-later 0007 */ 0008 0009 #include <QCoreApplication> 0010 #include <QList> 0011 #include <QMutex> 0012 0013 #include "queue.h" 0014 #include "weaver.h" 0015 0016 using namespace ThreadWeaver; 0017 0018 namespace 0019 { 0020 static Queue::GlobalQueueFactory *globalQueueFactory; 0021 } 0022 0023 class Q_DECL_HIDDEN Queue::Private 0024 { 0025 public: 0026 Private(Queue *q, QueueSignals *queue) 0027 : implementation(queue) 0028 { 0029 Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!"); 0030 Q_ASSERT(queue); 0031 queue->setParent(q); 0032 q->connect(implementation, SIGNAL(finished()), SIGNAL(finished())); 0033 q->connect(implementation, SIGNAL(suspended()), SIGNAL(suspended())); 0034 } 0035 0036 QueueSignals *implementation; 0037 void init(QueueSignals *implementation); 0038 }; 0039 0040 /** @brief Construct a Queue. */ 0041 Queue::Queue(QObject *parent) 0042 : QueueSignals(parent) 0043 , d(new Private(this, new Weaver)) 0044 { 0045 } 0046 0047 /** @brief Construct a Queue, specifying the QueueSignals implementation to use. 0048 * 0049 * The QueueSignals instance is usually a Weaver object, which may be customized for specific 0050 * application needs. The Weaver instance will take ownership of the implementation object and 0051 * deletes it when destructed. 0052 * @see Weaver 0053 * @see GlobalQueueFactory 0054 */ 0055 Queue::Queue(QueueSignals *implementation, QObject *parent) 0056 : QueueSignals(parent) 0057 , d(new Private(this, implementation)) 0058 { 0059 } 0060 0061 /** @brief Destruct the Queue object. 0062 * 0063 * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure 0064 * enqueued jobs are completed and the queue is idle. 0065 * The queue implementation will be destroyed. 0066 * @see shutDown() 0067 * @see ThreadWeaver::Destructed 0068 */ 0069 Queue::~Queue() 0070 { 0071 if (d->implementation->state()->stateId() != Destructed) { 0072 d->implementation->shutDown(); 0073 } 0074 delete d->implementation; 0075 delete d; 0076 } 0077 0078 /** @brief Create a QueueStream to enqueue jobs into this queue. */ 0079 QueueStream Queue::stream() 0080 { 0081 return QueueStream(this); 0082 } 0083 0084 void Queue::shutDown() 0085 { 0086 d->implementation->shutDown(); 0087 } 0088 0089 /** @brief Set the factory object that will create the global queue. 0090 * 0091 * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted. 0092 * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this 0093 * method before Q(Core)Application is constructed. */ 0094 void Queue::setGlobalQueueFactory(Queue::GlobalQueueFactory *factory) 0095 { 0096 if (globalQueueFactory) { 0097 delete globalQueueFactory; 0098 } 0099 globalQueueFactory = factory; 0100 } 0101 0102 const State *Queue::state() const 0103 { 0104 return d->implementation->state(); 0105 } 0106 0107 namespace 0108 { 0109 class StaticThreadWeaverInstanceGuard : public QObject 0110 { 0111 Q_OBJECT 0112 public: 0113 explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app) 0114 : QObject(app) 0115 , instance_(instance) 0116 { 0117 Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!"); 0118 QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>(); 0119 Q_ASSERT(impl); 0120 impl->setObjectName(QStringLiteral("GlobalQueue")); 0121 qAddPostRoutine(shutDownGlobalQueue); 0122 } 0123 0124 ~StaticThreadWeaverInstanceGuard() override 0125 { 0126 instance_.fetchAndStoreOrdered(nullptr); 0127 delete globalQueueFactory; 0128 globalQueueFactory = nullptr; 0129 } 0130 0131 private: 0132 static void shutDownGlobalQueue() 0133 { 0134 Queue::instance()->shutDown(); 0135 Q_ASSERT(Queue::instance()->state()->stateId() == Destructed); 0136 } 0137 0138 QAtomicPointer<Queue> &instance_; 0139 }; 0140 0141 } 0142 0143 /** @brief Access the application-global Queue. 0144 * 0145 * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be 0146 * created if this method is actually called in the lifetime of the application. 0147 * 0148 * The Q(Core)Application object must exist when instance() is called for the first time. 0149 * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method 0150 * returns zero. 0151 */ 0152 Queue *Queue::instance() 0153 { 0154 static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp)); 0155 // Order is of importance here: 0156 // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete 0157 // the object s_instance pointed to. 0158 static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp); 0159 Q_UNUSED(s_instanceGuard); 0160 Q_ASSERT_X(s_instance.loadRelaxed() == nullptr // 0161 || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(), 0162 Q_FUNC_INFO, 0163 "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!"); 0164 return s_instance.loadAcquire(); 0165 } 0166 0167 void Queue::enqueue(const QList<JobPointer> &jobs) 0168 { 0169 d->implementation->enqueue(jobs); 0170 } 0171 0172 void Queue::enqueue(const JobPointer &job) 0173 { 0174 enqueue(QList<JobPointer>() << job); 0175 } 0176 0177 bool Queue::dequeue(const JobPointer &job) 0178 { 0179 return d->implementation->dequeue(job); 0180 } 0181 0182 void Queue::dequeue() 0183 { 0184 return d->implementation->dequeue(); 0185 } 0186 0187 void Queue::finish() 0188 { 0189 return d->implementation->finish(); 0190 } 0191 0192 void Queue::suspend() 0193 { 0194 return d->implementation->suspend(); 0195 } 0196 0197 void Queue::resume() 0198 { 0199 return d->implementation->resume(); 0200 } 0201 0202 bool Queue::isEmpty() const 0203 { 0204 return d->implementation->isEmpty(); 0205 } 0206 0207 bool Queue::isIdle() const 0208 { 0209 return d->implementation->isIdle(); 0210 } 0211 0212 int Queue::queueLength() const 0213 { 0214 return d->implementation->queueLength(); 0215 } 0216 0217 void Queue::setMaximumNumberOfThreads(int cap) 0218 { 0219 d->implementation->setMaximumNumberOfThreads(cap); 0220 } 0221 0222 int Queue::currentNumberOfThreads() const 0223 { 0224 return d->implementation->currentNumberOfThreads(); 0225 } 0226 0227 int Queue::maximumNumberOfThreads() const 0228 { 0229 return d->implementation->maximumNumberOfThreads(); 0230 } 0231 0232 void Queue::requestAbort() 0233 { 0234 d->implementation->requestAbort(); 0235 } 0236 0237 void Queue::reschedule() 0238 { 0239 d->implementation->reschedule(); 0240 } 0241 0242 #include "moc_queue.cpp" 0243 #include "queue.moc"