File indexing completed on 2024-05-05 04:44:41

0001 /*  This file is part of the KDE project
0002     Copyright (C) 2008 Matthias Kretz <kretz@kde.org>
0003 
0004     This library is free software; you can redistribute it and/or
0005     modify it under the terms of the GNU Lesser General Public
0006     License as published by the Free Software Foundation; either
0007     version 2.1 of the License, or (at your option) version 3, or any
0008     later version accepted by the membership of KDE e.V. (or its
0009     successor approved by the membership of KDE e.V.), Nokia Corporation 
0010     (or its successors, if any) and the KDE Free Qt Foundation, which shall
0011     act as a proxy defined in Section 6 of version 3 of the license.
0012 
0013     This library is distributed in the hope that it will be useful,
0014     but WITHOUT ANY WARRANTY; without even the implied warranty of
0015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0016     Lesser General Public License for more details.
0017 
0018     You should have received a copy of the GNU Lesser General Public 
0019     License along with this library.  If not, see <http://www.gnu.org/licenses/>.
0020 
0021 */
0022 
0023 #include "lockfreequeue_p.h"
0024 #include <QHash>
0025 #include <QWriteLocker>
0026 #include <QReadWriteLock>
0027 #include <stdlib.h>
0028 #include "globalstatic_p.h"
0029 
0030 struct MemoryPool
0031 {
0032     ~MemoryPool();
0033     // Stack structure:
0034     QAtomicPointer<LockFreeQueueBase::NodeBase> stack;
0035     QAtomicInt count;
0036     QAtomicInt size;
0037 
0038     void clear();
0039 };
0040 
0041 void MemoryPool::clear()
0042 {
0043     LockFreeQueueBase::NodeBase *node = stack;
0044     while (node) {
0045         if (stack.testAndSetAcquire(node, const_cast<LockFreeQueueBase::NodeBase *>(node->next))) {
0046             count.deref();
0047             free(node);
0048         }
0049         node = stack;
0050     }
0051 }
0052 
0053 MemoryPool::~MemoryPool()
0054 {
0055     LockFreeQueueBase::NodeBase *node = stack;
0056     while (node) {
0057         void *toDelete = node;
0058         node = const_cast<LockFreeQueueBase::NodeBase *>(node->next);
0059         ::free(toDelete);
0060     }
0061 }
0062 
0063 struct MemoryPoolVector
0064 {
0065     static const int POOL_COUNT = 16;
0066     ~MemoryPoolVector() { delete next; }
0067     inline MemoryPool &operator[](size_t s)
0068     {
0069         for (int i = 0; i < POOL_COUNT; ++i) {
0070             if (m_pools[i].size == static_cast<int>(s)) {
0071                 return m_pools[i];
0072             } else if (m_pools[i].size == 0) {
0073                 if (m_pools[i].size.testAndSetRelaxed(0, static_cast<int>(s))) {
0074                     return m_pools[i];
0075                 }
0076                 if (m_pools[i].size == static_cast<int>(s)) {
0077                     return m_pools[i];
0078                 }
0079             }
0080         }
0081         if (!next) {
0082             MemoryPoolVector *newPoolVector = new MemoryPoolVector;
0083             if (!next.testAndSetRelaxed(0, newPoolVector)) {
0084                 delete newPoolVector;
0085             }
0086         }
0087         return (*next)[s];
0088     }
0089 
0090     void clearAllPools()
0091     {
0092         for (int i = 0; i < POOL_COUNT; ++i) {
0093             if (0 == m_pools[i].size) {
0094                 return;
0095             }
0096             m_pools[i].clear();
0097         }
0098         if (next) {
0099             next->clearAllPools();
0100         }
0101     }
0102 
0103     MemoryPool m_pools[POOL_COUNT];
0104     QAtomicPointer<MemoryPoolVector> next;
0105 };
0106 
0107 static int s_poolSize = 128;
0108 PHONON_GLOBAL_STATIC(MemoryPoolVector, s_memoryPool)
0109 
0110 void *LockFreeQueueBase::NodeBaseKeepNodePool::operator new(size_t s)
0111 {
0112     MemoryPool &p = (*s_memoryPool)[s];
0113     NodeBase *node = p.stack;
0114     if (node) {
0115         if (!p.stack.testAndSetAcquire(node, const_cast<NodeBase *>(node->next))) {
0116             return ::malloc(s);
0117         }
0118         p.count.deref();
0119         return node;
0120     }
0121     return ::malloc(s);
0122 }
0123 
0124 void LockFreeQueueBase::NodeBaseKeepNodePool::operator delete(void *ptr, size_t s)
0125 {
0126     MemoryPool &p = (*s_memoryPool)[s];
0127     if (p.count > s_poolSize) {
0128         ::free(ptr);
0129         return;
0130     }
0131     NodeBase *node = static_cast<NodeBase *>(ptr);
0132     NodeBase *next = p.stack;
0133     node->next = next;
0134     if (!p.stack.testAndSetOrdered(next, node)) {
0135         ::free(ptr);
0136         return;
0137     }
0138     p.count.ref();
0139 }
0140 
0141 void LockFreeQueueBase::NodeBaseKeepNodePool::clear()
0142 {
0143     s_memoryPool->clearAllPools();
0144 }
0145 
0146 void LockFreeQueueBase::NodeBaseKeepNodePool::setPoolSize(int s)
0147 {
0148     s_poolSize = s;
0149 }
0150 
0151 int LockFreeQueueBase::NodeBaseKeepNodePool::poolSize()
0152 {
0153     return s_poolSize;
0154 }
0155 
0156 class LockFreeQueueBasePrivate
0157 {
0158     public:
0159         LockFreeQueueBasePrivate();
0160         ~LockFreeQueueBasePrivate();
0161         QReadWriteLock dataReadyHandlerMutex;
0162         LockFreeQueueBase::NodeBase *sentinel; // end marker
0163         LockFreeQueueBase::NodeBase *lastHeadNode;
0164         QAtomicPointer<LockFreeQueueBase::NodeBasePointer> queueHead;
0165         QAtomicPointer<LockFreeQueueBase::NodeBasePointer> queueTail;
0166         QAtomicInt size;
0167         LockFreeQueueBase::DataReadyHandler *dataReadyHandler;
0168 };
0169 
0170 LockFreeQueueBasePrivate::LockFreeQueueBasePrivate()
0171     : sentinel(new LockFreeQueueBase::NodeBase(0)),
0172     lastHeadNode(new LockFreeQueueBase::NodeBase(sentinel)),
0173     queueHead(&lastHeadNode->next),
0174     queueTail(&lastHeadNode->next),
0175     size(0),
0176     dataReadyHandler(0)
0177 {
0178     // let d->sentinel point to itself so that we can use d->sentinel->next as
0179     // QAtomicPointer<Node> for d->queueHead and d->queueTail
0180     sentinel->next = sentinel;
0181 }
0182 
0183 LockFreeQueueBasePrivate::~LockFreeQueueBasePrivate()
0184 {
0185     Q_ASSERT(queueHead);
0186     LockFreeQueueBase::NodeBase *node = lastHeadNode;
0187     while (node != sentinel) {
0188         LockFreeQueueBase::NodeBase *toDelete = node;
0189         node = const_cast<LockFreeQueueBase::NodeBase *>(node->next);
0190         toDelete->deref();
0191     }
0192 }
0193 
0194 LockFreeQueueBase::LockFreeQueueBase()
0195     : d(new LockFreeQueueBasePrivate)
0196 {
0197 }
0198 
0199 LockFreeQueueBase::~LockFreeQueueBase()
0200 {
0201     delete d;
0202 }
0203 
0204 void LockFreeQueueBase::setDataReadyHandler(DataReadyHandler *h)
0205 {
0206     QWriteLocker lock(&d->dataReadyHandlerMutex);
0207     d->dataReadyHandler = h;
0208 }
0209 
0210 void LockFreeQueueBase::_enqueue(NodeBase *newNode)
0211 {
0212     newNode->ref();
0213     newNode->next = d->sentinel;
0214     /*if (d->size > 0 && newNode->priority > std::numeric_limits<int>::min()) {
0215         NodeBasePointer *node = d->queueHead.fetchAndStoreRelaxed(&d->sentinel->next);
0216         if (node == &d->sentinel->next) {
0217             // Another thread got the real node, we just got the placeholder telling us to not touch
0218             // anything. As we replaced &d->sentinel->next with &d->sentinel->next in
0219             // d->queueHead we don't have to reset anything.
0220         }
0221         // node is a pointer to a Node::next member pointing to the first entry in
0222         // the list
0223         if (*node == d->sentinel) {
0224             // the list is empty, good
0225         }
0226     } else {*/
0227         // just append
0228         NodeBasePointer &lastNextPointer = *d->queueTail.fetchAndStoreAcquire(&newNode->next);
0229         lastNextPointer = newNode;
0230         d->size.ref();
0231 
0232         if (d->dataReadyHandler) {
0233             QReadLocker lock(&d->dataReadyHandlerMutex);
0234             if (d->dataReadyHandler) {
0235                 d->dataReadyHandler->dataReady();
0236             }
0237         }
0238     //}
0239 }
0240 
0241 LockFreeQueueBase::NodeBase *LockFreeQueueBase::_acquireHeadNodeBlocking()
0242 {
0243     NodeBasePointer *node = 0;
0244     while (d->size > 0) {
0245         if ((node = d->queueHead.fetchAndStoreRelaxed(&d->sentinel->next)) != &d->sentinel->next) {
0246             // node is a pointer to a Node::next member pointing to the first entry in the list
0247             if (*node != d->sentinel) {
0248                 d->size.deref();
0249                 NodeBase *_node = const_cast<NodeBase *>(*node); // cast volatile away
0250                 _node->ref();
0251 
0252                 NodeBase *toDeref = d->lastHeadNode;
0253                 d->lastHeadNode = _node;
0254                 const bool check = d->queueHead.testAndSetRelease(&d->sentinel->next, &_node->next);
0255                 Q_ASSERT(check); Q_UNUSED(check);
0256                 toDeref->deref();
0257 
0258                 return _node;
0259             }
0260             // empty (d->size == 0), put it back
0261             const bool check = d->queueHead.testAndSetRelaxed(&d->sentinel->next, node);
0262             Q_ASSERT(check); Q_UNUSED(check);
0263             // try again, with some luck d->size is > 0 again
0264         }
0265     }
0266     return 0;
0267 }
0268 
0269 LockFreeQueueBase::NodeBase *LockFreeQueueBase::_acquireHeadNode()
0270 {
0271     if (*d->queueHead == d->sentinel || d->queueHead == &d->sentinel->next) {
0272         return 0;
0273     }
0274     // setting d->queueHead to &d->sentinel->next makes the above check fail (i.e. all
0275     // other threads in a dequeue function will exit). Also enqueue will not modify
0276     // this as d->queueTail references d->lastHeadNode->next which != d->sentinel->next
0277     NodeBasePointer *node = d->queueHead.fetchAndStoreRelaxed(&d->sentinel->next);
0278     if (node == &d->sentinel->next) {
0279         // Another thread got the real node, we just got the placeholder telling us to not touch
0280         // anything. As we replaced &d->sentinel->next with &d->sentinel->next in
0281         // d->queueHead we don't have to reset anything.
0282         return 0;
0283     }
0284     // node is a pointer to a Node::next member pointing to the first entry in
0285     // the list
0286     if (*node == d->sentinel) {
0287         //qDebug() << "empty, put it back";
0288         const bool check = d->queueHead.testAndSetRelaxed(&d->sentinel->next, node);
0289         Q_ASSERT(check); Q_UNUSED(check);
0290         return 0;
0291     }
0292     d->size.deref();
0293 
0294     NodeBase *_node = const_cast<NodeBase *>(*node);
0295     _node->ref();
0296 
0297     NodeBase *toDeref = d->lastHeadNode;
0298     d->lastHeadNode = _node;
0299     const bool check = d->queueHead.testAndSetRelease(&d->sentinel->next, &_node->next);
0300     Q_ASSERT(check); Q_UNUSED(check);
0301     toDeref->deref();
0302 
0303     return _node;
0304 }
0305 
0306 int LockFreeQueueBase::size() const
0307 {
0308     return d->size;
0309 }