File indexing completed on 2024-04-21 04:01:28

0001 /* -*- C++ -*-
0002     The Queue class in ThreadWeaver.
0003 
0004     SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
0005 
0006     SPDX-License-Identifier: LGPL-2.0-or-later
0007 */
0008 
0009 #include <QCoreApplication>
0010 #include <QList>
0011 #include <QMutex>
0012 
0013 #include "queue.h"
0014 #include "weaver.h"
0015 
0016 using namespace ThreadWeaver;
0017 
0018 namespace
0019 {
0020 static Queue::GlobalQueueFactory *globalQueueFactory;
0021 }
0022 
0023 class Q_DECL_HIDDEN Queue::Private
0024 {
0025 public:
0026     Private(Queue *q, QueueSignals *queue)
0027         : implementation(queue)
0028     {
0029         Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!");
0030         Q_ASSERT(queue);
0031         queue->setParent(q);
0032         q->connect(implementation, SIGNAL(finished()), SIGNAL(finished()));
0033         q->connect(implementation, SIGNAL(suspended()), SIGNAL(suspended()));
0034     }
0035 
0036     QueueSignals *implementation;
0037     void init(QueueSignals *implementation);
0038 };
0039 
0040 /** @brief Construct a Queue. */
0041 Queue::Queue(QObject *parent)
0042     : QueueSignals(parent)
0043     , d(new Private(this, new Weaver))
0044 {
0045 }
0046 
0047 /** @brief Construct a Queue, specifying the QueueSignals implementation to use.
0048  *
0049  * The QueueSignals instance is usually a Weaver object, which may be customized for specific
0050  * application needs. The Weaver instance will take ownership of the implementation object and
0051  * deletes it when destructed.
0052  * @see Weaver
0053  * @see GlobalQueueFactory
0054  */
0055 Queue::Queue(QueueSignals *implementation, QObject *parent)
0056     : QueueSignals(parent)
0057     , d(new Private(this, implementation))
0058 {
0059 }
0060 
0061 /** @brief Destruct the Queue object.
0062  *
0063  * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure
0064  * enqueued jobs are completed and the queue is idle.
0065  * The queue implementation will be destroyed.
0066  * @see shutDown()
0067  * @see ThreadWeaver::Destructed
0068  */
0069 Queue::~Queue()
0070 {
0071     if (d->implementation->state()->stateId() != Destructed) {
0072         d->implementation->shutDown();
0073     }
0074     delete d->implementation;
0075     delete d;
0076 }
0077 
0078 /** @brief Create a QueueStream to enqueue jobs into this queue. */
0079 QueueStream Queue::stream()
0080 {
0081     return QueueStream(this);
0082 }
0083 
0084 void Queue::shutDown()
0085 {
0086     d->implementation->shutDown();
0087 }
0088 
0089 /** @brief Set the factory object that will create the global queue.
0090  *
0091  * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted.
0092  * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this
0093  * method before Q(Core)Application is constructed. */
0094 void Queue::setGlobalQueueFactory(Queue::GlobalQueueFactory *factory)
0095 {
0096     if (globalQueueFactory) {
0097         delete globalQueueFactory;
0098     }
0099     globalQueueFactory = factory;
0100 }
0101 
0102 const State *Queue::state() const
0103 {
0104     return d->implementation->state();
0105 }
0106 
0107 namespace
0108 {
0109 class StaticThreadWeaverInstanceGuard : public QObject
0110 {
0111     Q_OBJECT
0112 public:
0113     explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app)
0114         : QObject(app)
0115         , instance_(instance)
0116     {
0117         Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!");
0118         QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>();
0119         Q_ASSERT(impl);
0120         impl->setObjectName(QStringLiteral("GlobalQueue"));
0121         qAddPostRoutine(shutDownGlobalQueue);
0122     }
0123 
0124     ~StaticThreadWeaverInstanceGuard() override
0125     {
0126         instance_.fetchAndStoreOrdered(nullptr);
0127         delete globalQueueFactory;
0128         globalQueueFactory = nullptr;
0129     }
0130 
0131 private:
0132     static void shutDownGlobalQueue()
0133     {
0134         Queue::instance()->shutDown();
0135         Q_ASSERT(Queue::instance()->state()->stateId() == Destructed);
0136     }
0137 
0138     QAtomicPointer<Queue> &instance_;
0139 };
0140 
0141 }
0142 
0143 /** @brief Access the application-global Queue.
0144  *
0145  * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be
0146  * created if this method is actually called in the lifetime of the application.
0147  *
0148  * The Q(Core)Application object must exist when instance() is called for the first time.
0149  * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method
0150  * returns zero.
0151  */
0152 Queue *Queue::instance()
0153 {
0154     static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp));
0155     // Order is of importance here:
0156     // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete
0157     // the object s_instance pointed to.
0158     static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp);
0159     Q_UNUSED(s_instanceGuard);
0160     Q_ASSERT_X(s_instance.loadRelaxed() == nullptr //
0161                    || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(),
0162                Q_FUNC_INFO,
0163                "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!");
0164     return s_instance.loadAcquire();
0165 }
0166 
0167 void Queue::enqueue(const QList<JobPointer> &jobs)
0168 {
0169     d->implementation->enqueue(jobs);
0170 }
0171 
0172 void Queue::enqueue(const JobPointer &job)
0173 {
0174     enqueue(QList<JobPointer>() << job);
0175 }
0176 
0177 bool Queue::dequeue(const JobPointer &job)
0178 {
0179     return d->implementation->dequeue(job);
0180 }
0181 
0182 void Queue::dequeue()
0183 {
0184     return d->implementation->dequeue();
0185 }
0186 
0187 void Queue::finish()
0188 {
0189     return d->implementation->finish();
0190 }
0191 
0192 void Queue::suspend()
0193 {
0194     return d->implementation->suspend();
0195 }
0196 
0197 void Queue::resume()
0198 {
0199     return d->implementation->resume();
0200 }
0201 
0202 bool Queue::isEmpty() const
0203 {
0204     return d->implementation->isEmpty();
0205 }
0206 
0207 bool Queue::isIdle() const
0208 {
0209     return d->implementation->isIdle();
0210 }
0211 
0212 int Queue::queueLength() const
0213 {
0214     return d->implementation->queueLength();
0215 }
0216 
0217 void Queue::setMaximumNumberOfThreads(int cap)
0218 {
0219     d->implementation->setMaximumNumberOfThreads(cap);
0220 }
0221 
0222 int Queue::currentNumberOfThreads() const
0223 {
0224     return d->implementation->currentNumberOfThreads();
0225 }
0226 
0227 int Queue::maximumNumberOfThreads() const
0228 {
0229     return d->implementation->maximumNumberOfThreads();
0230 }
0231 
0232 void Queue::requestAbort()
0233 {
0234     d->implementation->requestAbort();
0235 }
0236 
0237 void Queue::reschedule()
0238 {
0239     d->implementation->reschedule();
0240 }
0241 
0242 #include "moc_queue.cpp"
0243 #include "queue.moc"