File indexing completed on 2024-04-28 16:59:49
0001 /* When two objects on different threads talk to each other (uni- or bidirectionally), they will 0002 * have pointers to each other. Before sending something on the other side, the sender needs to 0003 * know: 0004 * - is there still an object alive at the memory address it knows? 0005 * - is it still the same object that it wants to talk to? 0006 * The latter is similar to the somewhat well-known the ABA problem. 0007 * So what we do is that the initiator of the connection creates a Commutex object held alive 0008 * by shared_ptrs. A shared_ptr copied from the original is sent to the receiver. 0009 * The Commutex synchronizes the two objects insofar that destruction of one end will prevent 0010 * calls forever (Commutex in "Broken" state), and an ongoing call will block other calls 0011 * through the same Commutex as well as destruction of the receiver. 0012 */ 0013 0014 #ifndef COMMUTEX_H 0015 #define COMMUTEX_H 0016 0017 #include <atomic> 0018 #include <cassert> 0019 #include <memory> 0020 0021 #ifdef HAVE_VALGRIND 0022 #include <valgrind/helgrind.h> 0023 #else 0024 #include "valgrind-noop.h" 0025 #endif 0026 0027 // Commutex: Mutex-like thing for communicating objects. Better names welcome. 0028 class Commutex 0029 { 0030 public: 0031 enum State { 0032 Free = 0, 0033 Locked, // serves to delay destruction of one of the linked objects while the other is 0034 // calling methods / touching data on it 0035 Broken 0036 }; 0037 0038 enum TryLockResult { 0039 TransientFailure = 0, // state was Locked 0040 PermanentFailure, // state was Broken 0041 Success // state was Free and transitioned to Locked 0042 }; 0043 0044 Commutex() 0045 : m_state(Free) 0046 { 0047 // We're hopefully close enough to a mutex for Helgrind to work... 0048 VALGRIND_HG_MUTEX_INIT_POST(this, 0); 0049 } 0050 0051 ~Commutex() 0052 { 0053 VALGRIND_HG_MUTEX_DESTROY_PRE(this); 0054 } 0055 0056 private: 0057 friend class CommutexPeer; 0058 0059 TryLockResult tryLock() 0060 { 0061 State prevState = Free; 0062 VALGRIND_HG_MUTEX_LOCK_PRE(this, 1); 0063 if (m_state.compare_exchange_strong(prevState, Locked)) { 0064 VALGRIND_HG_MUTEX_LOCK_POST(this); 0065 return Success; 0066 } 0067 return prevState == Broken ? PermanentFailure : TransientFailure; 0068 } 0069 0070 bool lock() 0071 { 0072 while (true) { 0073 TryLockResult result = tryLock(); 0074 if (result == TransientFailure) { 0075 continue; 0076 } 0077 return result == Success; 0078 } 0079 } 0080 0081 // return value is only informational - what are you going to do when unlocking fails because 0082 // the state is already Broken? 0083 bool unlock() 0084 { 0085 VALGRIND_HG_MUTEX_UNLOCK_PRE(this); 0086 State prevState = Locked; 0087 if (m_state.compare_exchange_strong(prevState, Free)) { 0088 VALGRIND_HG_MUTEX_UNLOCK_POST(this); 0089 return true; 0090 } 0091 assert(prevState == Broken); // unlocking when already Free indicates wrong accounting 0092 VALGRIND_HG_MUTEX_UNLOCK_POST(this); 0093 return false; 0094 } 0095 0096 bool tryUnlink() 0097 { 0098 State prevState = Free; 0099 bool wasFree = m_state.compare_exchange_strong(prevState, Broken); 0100 return wasFree || prevState == Broken; 0101 } 0102 0103 void unlink() 0104 { 0105 while (!tryUnlink()) { 0106 } 0107 } 0108 0109 void unlinkFromLocked() 0110 { 0111 // we don't have the data to check if the Locked state is "owned" by the calling thread :/ 0112 State prevState = Locked; 0113 VALGRIND_HG_MUTEX_UNLOCK_PRE(this); 0114 bool success = m_state.compare_exchange_strong(prevState, Broken); 0115 if (success) { 0116 VALGRIND_HG_MUTEX_UNLOCK_POST(this); 0117 } 0118 assert(success); 0119 } 0120 0121 std::atomic<State> m_state; 0122 }; 0123 0124 class CommutexPeer 0125 { 0126 public: 0127 static std::pair<CommutexPeer, CommutexPeer> createLink() 0128 { 0129 std::shared_ptr<Commutex> commutex = std::make_shared<Commutex>(); 0130 return std::make_pair(CommutexPeer(commutex), CommutexPeer(commutex)); 0131 } 0132 0133 CommutexPeer() = default; // state will be Broken, that's fine 0134 0135 CommutexPeer(CommutexPeer &&other) 0136 : m_comm(std::move(other.m_comm)) 0137 {} 0138 0139 ~CommutexPeer() 0140 { 0141 unlink(); 0142 } 0143 0144 CommutexPeer &operator=(CommutexPeer &&other) 0145 { 0146 m_comm = std::move(other.m_comm); 0147 return *this; 0148 } 0149 0150 CommutexPeer(const CommutexPeer &other) = delete; 0151 CommutexPeer &operator=(const CommutexPeer &other) = delete; 0152 0153 Commutex::TryLockResult tryLock() 0154 { 0155 if (!m_comm) { 0156 return Commutex::PermanentFailure; 0157 } 0158 Commutex::TryLockResult ret = m_comm->tryLock(); 0159 if (ret == Commutex::PermanentFailure) { 0160 m_comm.reset(); 0161 } 0162 return ret; 0163 } 0164 0165 bool lock() 0166 { 0167 if (!m_comm) { 0168 return false; 0169 } 0170 bool ret = m_comm->lock(); 0171 if (!ret) { 0172 m_comm.reset(); 0173 } 0174 return ret; 0175 } 0176 0177 void unlock() 0178 { 0179 if (m_comm) { 0180 m_comm->unlock(); 0181 } 0182 } 0183 0184 // This might be useful when unlinking a set of somehow (accidentally?) inter-dependent commutexes. 0185 // In that case, keep calling tryUnlink() on all still unbroken ones until all are broken. 0186 bool tryUnlink() 0187 { 0188 if (!m_comm) { 0189 return true; 0190 } 0191 bool ret = m_comm->tryUnlink(); 0192 if (ret) { 0193 m_comm.reset(); 0194 } 0195 return ret; 0196 } 0197 0198 void unlink() 0199 { 0200 if (m_comm) { 0201 m_comm->unlink(); 0202 m_comm.reset(); 0203 } 0204 } 0205 0206 // This either succeeds immediately and unconditionally or the state wasn't Locked by user error 0207 // (it doesn't check if this CommutexPeer "owns" the Locked state) 0208 // So, this has the (unverifiable at this point) pre-condition that the calling thread "owns the lock". 0209 void unlinkFromLocked() 0210 { 0211 if (m_comm) { 0212 m_comm->unlinkFromLocked(); 0213 } 0214 } 0215 0216 // diagnostic use ONLY because it has no transactional semantics - also note that, since there 0217 // is no non-atomic read of an atomic variable, this might hide heisenbugs by causing spurious 0218 // memory barriers 0219 Commutex::State state() const 0220 { 0221 if (!m_comm) { 0222 return Commutex::Broken; 0223 } 0224 return m_comm->m_state; 0225 } 0226 0227 // Only for identification purposes, to see which two CommutexPeers belong together if 0228 // there is an unsorted bunch of them somewhere. 0229 Commutex *id() const { return m_comm.get(); } 0230 0231 private: 0232 friend class Commutex; 0233 0234 CommutexPeer(std::shared_ptr<Commutex> comm) 0235 : m_comm(comm) 0236 {} 0237 0238 std::shared_ptr<Commutex> m_comm; 0239 }; 0240 0241 class CommutexLocker 0242 { 0243 public: 0244 CommutexLocker(CommutexPeer *cp) 0245 : m_peer(cp) 0246 { 0247 m_hasLock = m_peer->lock(); 0248 } 0249 0250 bool hasLock() const 0251 { 0252 return m_hasLock; 0253 } 0254 0255 ~CommutexLocker() 0256 { 0257 // The check is not only an optimization - users of the class are likely to delete our 0258 // CommutexPeer when the Commutex is broken. 0259 if (m_hasLock) { 0260 m_peer->unlock(); 0261 } 0262 } 0263 0264 CommutexLocker(const CommutexLocker &other) = delete; 0265 CommutexLocker &operator=(const CommutexLocker &other) = delete; 0266 0267 private: 0268 CommutexPeer *m_peer; 0269 bool m_hasLock; 0270 }; 0271 0272 class CommutexUnlinker 0273 { 0274 public: 0275 CommutexUnlinker(CommutexPeer *cp, bool mustSucceed = true) 0276 : m_peer(cp) 0277 { 0278 m_tryLockResult = m_peer->tryLock(); 0279 while (mustSucceed && m_tryLockResult == Commutex::TransientFailure) { 0280 m_tryLockResult = m_peer->tryLock(); 0281 } 0282 } 0283 0284 bool hasLock() 0285 { 0286 return m_tryLockResult == Commutex::Success; 0287 } 0288 0289 // if the Commutex was already Broken or if we have a lock so our unlinkFromLocked() WILL succed 0290 bool willSucceed() 0291 { 0292 return m_tryLockResult != Commutex::TransientFailure; 0293 } 0294 0295 // mainly used to prevent the destructor from accessing *m_peer, to fix lifetime issues with *m_peer. 0296 void unlinkNow() 0297 { 0298 assert(willSucceed()); 0299 if (m_tryLockResult == Commutex::Success) { 0300 m_peer->unlinkFromLocked(); 0301 } 0302 m_tryLockResult = Commutex::PermanentFailure; // aka it is already unlinked, which is the case now 0303 } 0304 0305 ~CommutexUnlinker() 0306 { 0307 if (m_tryLockResult == Commutex::Success) { 0308 m_peer->unlinkFromLocked(); 0309 } 0310 } 0311 0312 CommutexUnlinker(const CommutexUnlinker &other) = delete; 0313 CommutexUnlinker &operator=(const CommutexUnlinker &other) = delete; 0314 0315 private: 0316 CommutexPeer *m_peer; 0317 Commutex::TryLockResult m_tryLockResult; 0318 }; 0319 0320 #endif // COMMUTEX_H