File indexing completed on 2024-04-28 16:59:49

0001 /* When two objects on different threads talk to each other (uni- or bidirectionally), they will
0002  * have pointers to each other. Before sending something on the other side, the sender needs to
0003  * know:
0004  * - is there still an object alive at the memory address it knows?
0005  * - is it still the same object that it wants to talk to?
0006  * The latter is similar to the somewhat well-known the ABA problem.
0007  * So what we do is that the initiator of the connection creates a Commutex object held alive
0008  * by shared_ptrs. A shared_ptr copied from the original is sent to the receiver.
0009  * The Commutex synchronizes the two objects insofar that destruction of one end will prevent
0010  * calls forever (Commutex in "Broken" state), and an ongoing call will block other calls
0011  * through the same Commutex as well as destruction of the receiver.
0012  */
0013 
0014 #ifndef COMMUTEX_H
0015 #define COMMUTEX_H
0016 
0017 #include <atomic>
0018 #include <cassert>
0019 #include <memory>
0020 
0021 #ifdef HAVE_VALGRIND
0022 #include <valgrind/helgrind.h>
0023 #else
0024 #include "valgrind-noop.h"
0025 #endif
0026 
0027 // Commutex: Mutex-like thing for communicating objects. Better names welcome.
0028 class Commutex
0029 {
0030 public:
0031     enum State {
0032         Free = 0,
0033         Locked, // serves to delay destruction of one of the linked objects while the other is
0034               // calling methods / touching data on it
0035         Broken
0036     };
0037 
0038     enum TryLockResult {
0039         TransientFailure = 0, // state was Locked
0040         PermanentFailure, // state was Broken
0041         Success // state was Free and transitioned to Locked
0042     };
0043 
0044     Commutex()
0045        : m_state(Free)
0046     {
0047         // We're hopefully close enough to a mutex for Helgrind to work...
0048         VALGRIND_HG_MUTEX_INIT_POST(this, 0);
0049     }
0050 
0051     ~Commutex()
0052     {
0053         VALGRIND_HG_MUTEX_DESTROY_PRE(this);
0054     }
0055 
0056 private:
0057     friend class CommutexPeer;
0058 
0059     TryLockResult tryLock()
0060     {
0061         State prevState = Free;
0062         VALGRIND_HG_MUTEX_LOCK_PRE(this, 1);
0063         if (m_state.compare_exchange_strong(prevState, Locked)) {
0064             VALGRIND_HG_MUTEX_LOCK_POST(this);
0065             return Success;
0066         }
0067         return prevState == Broken ? PermanentFailure : TransientFailure;
0068     }
0069 
0070     bool lock()
0071     {
0072         while (true) {
0073             TryLockResult result = tryLock();
0074             if (result == TransientFailure) {
0075                 continue;
0076             }
0077             return result == Success;
0078         }
0079     }
0080 
0081     // return value is only informational - what are you going to do when unlocking fails because
0082     // the state is already Broken?
0083     bool unlock()
0084     {
0085         VALGRIND_HG_MUTEX_UNLOCK_PRE(this);
0086         State prevState = Locked;
0087         if (m_state.compare_exchange_strong(prevState, Free)) {
0088             VALGRIND_HG_MUTEX_UNLOCK_POST(this);
0089             return true;
0090         }
0091         assert(prevState == Broken); // unlocking when already Free indicates wrong accounting
0092         VALGRIND_HG_MUTEX_UNLOCK_POST(this);
0093         return false;
0094     }
0095 
0096     bool tryUnlink()
0097     {
0098         State prevState = Free;
0099         bool wasFree = m_state.compare_exchange_strong(prevState, Broken);
0100         return wasFree || prevState == Broken;
0101     }
0102 
0103     void unlink()
0104     {
0105         while (!tryUnlink()) {
0106         }
0107     }
0108 
0109     void unlinkFromLocked()
0110     {
0111         // we don't have the data to check if the Locked state is "owned" by the calling thread :/
0112         State prevState = Locked;
0113         VALGRIND_HG_MUTEX_UNLOCK_PRE(this);
0114         bool success = m_state.compare_exchange_strong(prevState, Broken);
0115         if (success) {
0116             VALGRIND_HG_MUTEX_UNLOCK_POST(this);
0117         }
0118         assert(success);
0119     }
0120 
0121     std::atomic<State> m_state;
0122 };
0123 
0124 class CommutexPeer
0125 {
0126 public:
0127     static std::pair<CommutexPeer, CommutexPeer> createLink()
0128     {
0129         std::shared_ptr<Commutex> commutex = std::make_shared<Commutex>();
0130         return std::make_pair(CommutexPeer(commutex), CommutexPeer(commutex));
0131     }
0132 
0133     CommutexPeer() = default; // state will be Broken, that's fine
0134 
0135     CommutexPeer(CommutexPeer &&other)
0136        : m_comm(std::move(other.m_comm))
0137     {}
0138 
0139     ~CommutexPeer()
0140     {
0141         unlink();
0142     }
0143 
0144     CommutexPeer &operator=(CommutexPeer &&other)
0145     {
0146         m_comm = std::move(other.m_comm);
0147         return *this;
0148     }
0149 
0150     CommutexPeer(const CommutexPeer &other) = delete;
0151     CommutexPeer &operator=(const CommutexPeer &other) = delete;
0152 
0153     Commutex::TryLockResult tryLock()
0154     {
0155         if (!m_comm) {
0156             return Commutex::PermanentFailure;
0157         }
0158         Commutex::TryLockResult ret = m_comm->tryLock();
0159         if (ret == Commutex::PermanentFailure) {
0160             m_comm.reset();
0161         }
0162         return ret;
0163     }
0164 
0165     bool lock()
0166     {
0167         if (!m_comm) {
0168             return false;
0169         }
0170         bool ret = m_comm->lock();
0171         if (!ret) {
0172             m_comm.reset();
0173         }
0174         return ret;
0175     }
0176 
0177     void unlock()
0178     {
0179         if (m_comm) {
0180             m_comm->unlock();
0181         }
0182     }
0183 
0184     // This might be useful when unlinking a set of somehow (accidentally?) inter-dependent commutexes.
0185     // In that case, keep calling tryUnlink() on all still unbroken ones until all are broken.
0186     bool tryUnlink()
0187     {
0188         if (!m_comm) {
0189             return true;
0190         }
0191         bool ret = m_comm->tryUnlink();
0192         if (ret) {
0193             m_comm.reset();
0194         }
0195         return ret;
0196     }
0197 
0198     void unlink()
0199     {
0200         if (m_comm) {
0201             m_comm->unlink();
0202             m_comm.reset();
0203         }
0204     }
0205 
0206     // This either succeeds immediately and unconditionally or the state wasn't Locked by user error
0207     // (it doesn't check if this CommutexPeer "owns" the Locked state)
0208     // So, this has the (unverifiable at this point) pre-condition that the calling thread "owns the lock".
0209     void unlinkFromLocked()
0210     {
0211         if (m_comm) {
0212             m_comm->unlinkFromLocked();
0213         }
0214     }
0215 
0216     // diagnostic use ONLY because it has no transactional semantics - also note that, since there
0217     // is no non-atomic read of an atomic variable, this might hide heisenbugs by causing spurious
0218     // memory barriers
0219     Commutex::State state() const
0220     {
0221         if (!m_comm) {
0222             return Commutex::Broken;
0223         }
0224         return m_comm->m_state;
0225     }
0226 
0227     // Only for identification purposes, to see which two CommutexPeers belong together if
0228     // there is an unsorted bunch of them somewhere.
0229     Commutex *id() const { return m_comm.get(); }
0230 
0231 private:
0232     friend class Commutex;
0233 
0234     CommutexPeer(std::shared_ptr<Commutex> comm)
0235        : m_comm(comm)
0236     {}
0237 
0238     std::shared_ptr<Commutex> m_comm;
0239 };
0240 
0241 class CommutexLocker
0242 {
0243 public:
0244     CommutexLocker(CommutexPeer *cp)
0245        : m_peer(cp)
0246     {
0247         m_hasLock = m_peer->lock();
0248     }
0249 
0250     bool hasLock() const
0251     {
0252         return m_hasLock;
0253     }
0254 
0255     ~CommutexLocker()
0256     {
0257         // The check is not only an optimization - users of the class are likely to delete our
0258         // CommutexPeer when the Commutex is broken.
0259         if (m_hasLock) {
0260             m_peer->unlock();
0261         }
0262     }
0263 
0264     CommutexLocker(const CommutexLocker &other) = delete;
0265     CommutexLocker &operator=(const CommutexLocker &other) = delete;
0266 
0267 private:
0268     CommutexPeer *m_peer;
0269     bool m_hasLock;
0270 };
0271 
0272 class CommutexUnlinker
0273 {
0274 public:
0275     CommutexUnlinker(CommutexPeer *cp, bool mustSucceed = true)
0276        : m_peer(cp)
0277     {
0278         m_tryLockResult = m_peer->tryLock();
0279         while (mustSucceed && m_tryLockResult == Commutex::TransientFailure) {
0280             m_tryLockResult = m_peer->tryLock();
0281         }
0282     }
0283 
0284     bool hasLock()
0285     {
0286         return m_tryLockResult == Commutex::Success;
0287     }
0288 
0289     // if the Commutex was already Broken or if we have a lock so our unlinkFromLocked() WILL succed
0290     bool willSucceed()
0291     {
0292         return m_tryLockResult != Commutex::TransientFailure;
0293     }
0294 
0295     // mainly used to prevent the destructor from accessing *m_peer, to fix lifetime issues with *m_peer.
0296     void unlinkNow()
0297     {
0298         assert(willSucceed());
0299         if (m_tryLockResult == Commutex::Success) {
0300             m_peer->unlinkFromLocked();
0301         }
0302         m_tryLockResult = Commutex::PermanentFailure; // aka it is already unlinked, which is the case now
0303     }
0304 
0305     ~CommutexUnlinker()
0306     {
0307         if (m_tryLockResult == Commutex::Success) {
0308             m_peer->unlinkFromLocked();
0309         }
0310     }
0311 
0312     CommutexUnlinker(const CommutexUnlinker &other) = delete;
0313     CommutexUnlinker &operator=(const CommutexUnlinker &other) = delete;
0314 
0315 private:
0316     CommutexPeer *m_peer;
0317     Commutex::TryLockResult m_tryLockResult;
0318 };
0319 
0320 #endif // COMMUTEX_H