File indexing completed on 2024-05-19 09:36:34
0001 /* 0002 Copyright (C) 2013 Andreas Hartmetz <ahartmetz@gmail.com> 0003 0004 This library is free software; you can redistribute it and/or 0005 modify it under the terms of the GNU Library General Public 0006 License as published by the Free Software Foundation; either 0007 version 2 of the License, or (at your option) any later version. 0008 0009 This library is distributed in the hope that it will be useful, 0010 but WITHOUT ANY WARRANTY; without even the implied warranty of 0011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0012 Library General Public License for more details. 0013 0014 You should have received a copy of the GNU Library General Public License 0015 along with this library; see the file COPYING.LGPL. If not, write to 0016 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 0017 Boston, MA 02110-1301, USA. 0018 0019 Alternatively, this file is available under the Mozilla Public License 0020 Version 1.1. You may obtain a copy of the License at 0021 http://www.mozilla.org/MPL/ 0022 */ 0023 0024 #include "eventdispatcher.h" 0025 #include "eventdispatcher_p.h" 0026 0027 #ifndef DFERRY_NO_NATIVE_POLL 0028 #ifdef __linux__ 0029 #include "epolleventpoller.h" 0030 #elif defined _WIN32 0031 #include "selecteventpoller_win32.h" 0032 #else 0033 #include "selecteventpoller_unix.h" 0034 #endif 0035 #endif 0036 0037 #include "event.h" 0038 #include "foreigneventloopintegrator.h" 0039 #include "ieventpoller.h" 0040 #include "iioeventlistener.h" 0041 #include "platformtime.h" 0042 #include "connection_p.h" 0043 #include "timer.h" 0044 0045 #include <algorithm> 0046 #include <cassert> 0047 0048 #include <iostream> 0049 0050 //#define EVENTDISPATCHER_DEBUG 0051 0052 #ifndef DFERRY_NO_NATIVE_POLL 0053 EventDispatcher::EventDispatcher() 0054 : d(new EventDispatcherPrivate) 0055 { 0056 #ifdef __linux__ 0057 d->m_poller = new EpollEventPoller(this); 0058 #else 0059 // TODO high performance IO multiplexers for non-Linux platforms 0060 d->m_poller = new SelectEventPoller(this); 0061 #endif 0062 } 0063 #endif 0064 0065 EventDispatcher::EventDispatcher(ForeignEventLoopIntegrator *integrator) 0066 : d(new EventDispatcherPrivate) 0067 { 0068 d->m_integrator = integrator; 0069 d->m_poller = integrator->connectToDispatcher(this); 0070 } 0071 0072 EventDispatcherPrivate::~EventDispatcherPrivate() 0073 { 0074 // removeIoListener is going to remove the current entry from m_ioListeners, so use this funny 0075 // way to "iterate"... 0076 for (auto it = m_ioListeners.cbegin(); it != m_ioListeners.cend(); it = m_ioListeners.cbegin()) 0077 { 0078 const size_t sizeBefore = m_ioListeners.size(); 0079 removeIoListener(it->second); 0080 if (m_ioListeners.size() == sizeBefore) 0081 { 0082 // this should never happen, however, avoid an infinite loop if it somehow does... 0083 assert(false); 0084 m_ioListeners.erase(it); 0085 } 0086 } 0087 0088 for (auto it = m_timers.cbegin(); it != m_timers.cend(); ++it) { 0089 it->second->m_eventDispatcher = nullptr; 0090 it->second->m_isRunning = false; 0091 } 0092 0093 if (!m_integrator) { 0094 delete m_poller; 0095 } 0096 } 0097 0098 EventDispatcher::~EventDispatcher() 0099 { 0100 delete d; 0101 d = nullptr; 0102 } 0103 0104 bool EventDispatcher::poll(int timeout) 0105 { 0106 int nextDue = d->timeToFirstDueTimer(); 0107 if (timeout < 0) { 0108 timeout = nextDue; 0109 } else if (nextDue >= 0) { 0110 timeout = std::min(timeout, nextDue); 0111 } 0112 0113 #ifdef EVENTDISPATCHER_DEBUG 0114 printf("EventDispatcher::poll(): timeout=%d, nextDue=%d.\n", timeout, nextDue); 0115 #endif 0116 IEventPoller::InterruptAction interrupAction = d->m_poller->poll(timeout); 0117 0118 if (interrupAction == IEventPoller::Stop) { 0119 return false; 0120 } else if (interrupAction == IEventPoller::ProcessAuxEvents && d->m_connectionToNotify) { 0121 d->processAuxEvents(); 0122 } 0123 d->triggerDueTimers(); 0124 return true; 0125 } 0126 0127 void EventDispatcher::interrupt() 0128 { 0129 d->m_poller->interrupt(IEventPoller::Stop); 0130 } 0131 0132 void EventDispatcherPrivate::wakeForEvents() 0133 { 0134 m_poller->interrupt(IEventPoller::ProcessAuxEvents); 0135 } 0136 0137 void EventDispatcherPrivate::addIoListenerInternal(IIoEventListener *iol, uint32 ioRw) 0138 { 0139 std::pair<std::unordered_map<FileDescriptor, IIoEventListener*>::iterator, bool> insertResult; 0140 insertResult = m_ioListeners.insert(std::make_pair(iol->fileDescriptor(), iol)); 0141 if (insertResult.second) { 0142 m_poller->addFileDescriptor(iol->fileDescriptor(), ioRw); 0143 } 0144 } 0145 0146 void EventDispatcherPrivate::removeIoListenerInternal(IIoEventListener *iol) 0147 { 0148 if (m_ioListeners.erase(iol->fileDescriptor())) { 0149 m_poller->removeFileDescriptor(iol->fileDescriptor()); 0150 } 0151 } 0152 0153 void EventDispatcherPrivate::updateIoInterestInternal(IIoEventListener *iol, uint32 ioRw) 0154 { 0155 m_poller->setReadWriteInterest(iol->fileDescriptor(), ioRw); 0156 } 0157 0158 void EventDispatcherPrivate::notifyListenerForIo(FileDescriptor fd, IO::RW ioRw) 0159 { 0160 std::unordered_map<FileDescriptor, IIoEventListener *>::iterator it = m_ioListeners.find(fd); 0161 if (it != m_ioListeners.end()) { 0162 it->second->handleIoReady(ioRw); 0163 } else { 0164 #ifdef IEVENTDISPATCHER_DEBUG 0165 // while interesting for debugging, this is not an error if a connection was in the epoll 0166 // set and disconnected in its handleCanRead() or handleCanWrite() implementation 0167 std::cerr << "EventDispatcherPrivate::notifyListenerForIo(): unhandled file descriptor " 0168 << fd << ".\n"; 0169 #endif 0170 } 0171 } 0172 0173 int EventDispatcherPrivate::timeToFirstDueTimer() const 0174 { 0175 std::multimap<uint64, Timer*>::const_iterator it = m_timers.cbegin(); 0176 if (it != m_timers.cend() && it->second == nullptr) { 0177 // this is the dead entry of the currently triggered, currently being deleted Timer 0178 ++it; 0179 } 0180 0181 if (it == m_timers.cend()) { 0182 return -1; 0183 } 0184 0185 uint64 nextTimeout = it->first >> 10; 0186 uint64 currentTime = PlatformTime::monotonicMsecs(); 0187 0188 if (currentTime >= nextTimeout) { 0189 return 0; 0190 } 0191 return nextTimeout - currentTime; 0192 } 0193 0194 uint EventDispatcherPrivate::nextTimerSerial() 0195 { 0196 if (m_currentTimerSerial > s_maxTimerSerial) { 0197 tryCompactTimerSerials(); 0198 } 0199 return m_currentTimerSerial++; 0200 } 0201 0202 void EventDispatcherPrivate::tryCompactTimerSerials() 0203 { 0204 // don't bother trying to compact when there won't be much room anyway (we are probably heading for an 0205 // unavoidable overflow / duplicates) 0206 const size_t timersCount = m_timers.size(); 0207 if (timersCount >= s_maxTimerSerial * 0.9) { 0208 std::cerr << timersCount << "are too many active timers! Timers timing out at the same time are " 0209 "not guaranteed to trigger in a predictable order anymore.\n"; 0210 m_currentTimerSerial = 0; 0211 return; 0212 } 0213 0214 bool pickNextTimer = false; 0215 int numDeadEntries = 0; 0216 auto it = m_timers.begin(); 0217 for (uint newSerial = 0; newSerial < timersCount; newSerial++) { 0218 Timer *const timer = it->second; 0219 it = m_timers.erase(it); 0220 if (!pickNextTimer) { 0221 if (timer && timer->m_isRunning) { 0222 timer->m_serial = newSerial; 0223 m_timers.emplace(timer->tag(), timer); 0224 } else { 0225 // Drop this timer (it was removed while triggered) and use the serial for the next timer. 0226 // In that situation, we are being called from inside that timer's callback. Prepare for 0227 // iteration in triggerDueTimers() to continue after the current timer. 0228 newSerial--; 0229 m_adjustedIteratorOfNextTimer = m_timers.end(); 0230 numDeadEntries++; 0231 pickNextTimer = true; 0232 } 0233 } else { 0234 // Only one timer can be currently triggered, which produces a "dead" map entry. Any other 0235 // timers are either running, or have already been removed from the map. Check this here. 0236 assert(timer); 0237 assert(timer->m_isRunning); 0238 timer->m_serial = newSerial; 0239 m_adjustedIteratorOfNextTimer = m_timers.emplace(timer->tag(), timer); 0240 pickNextTimer = false; 0241 } 0242 } 0243 0244 assert(numDeadEntries <= 1); 0245 0246 m_serialsCompacted = true; 0247 m_currentTimerSerial = timersCount; 0248 } 0249 0250 void EventDispatcherPrivate::printTimerMap() const 0251 { 0252 for (auto it = m_timers.cbegin(); it != m_timers.cend(); ++it) { 0253 std::cerr << "tag: " << it->first 0254 << " dueTime: " << (it->first >> 10) << " serial: " << (it->first & 0x3ff) 0255 << " pointer: " << it->second << '\n'; 0256 } 0257 } 0258 0259 void EventDispatcherPrivate::addTimer(Timer *timer) 0260 { 0261 timer->m_serial = nextTimerSerial(); 0262 0263 0264 uint64 dueTime; 0265 if (timer->m_interval != 0 || !m_triggerTime) { 0266 //std::cerr << "addTimer regular path " << timer << '\n'; 0267 dueTime = PlatformTime::monotonicMsecs() + uint64(timer->m_interval); 0268 } else { 0269 // A timer is added from a timer callback - make sure it only runs in the *next* iteration 0270 // of the event loop. Timer users expect a timer to run at the earliest when the event loop 0271 // runs *again*, not in this iteration. 0272 //std::cerr << "addTimer staging path " << timer << '\n'; 0273 dueTime = 0; 0274 } 0275 0276 timer->m_nextDueTime = dueTime; 0277 0278 //std::cerr << " addTimer before:\n"; 0279 //printTimerMap(); 0280 m_timers.emplace(timer->tag(), timer); 0281 maybeSetTimeoutForIntegrator(); 0282 //std::cerr << " addTimer after:\n"; 0283 //printTimerMap(); 0284 } 0285 0286 void EventDispatcherPrivate::removeTimer(Timer *timer) 0287 { 0288 assert(timer->tag() != 0); 0289 0290 // If inside a timer instance T's callback, this is only called from T's destructor, never from 0291 // T.setRunning(false). In the setRunning(false) case, removing is handled in triggerDueTimers() 0292 // right after invoking the callback by looking at T.m_isRunning. In the destructor case, this 0293 // sets the Timer pointer to nullptr (see below). 0294 // It is possible that the technique for handling the destructor case could also handle the 0295 // setRunning(false) case, something to consider... (Note: tryCompactTimerSerials kinda does that 0296 // already.) 0297 auto iterRange = m_timers.equal_range(timer->tag()); 0298 for (; iterRange.first != iterRange.second; ++iterRange.first) { 0299 if (iterRange.first->second == timer) { 0300 if (!timer->m_reentrancyGuard) { 0301 m_timers.erase(iterRange.first); 0302 } else { 0303 // This means that this is an "emergency remove" of a timer being deleted while in its 0304 // callback. Mark it as dead so we won't dereference it. The map entry will be erased 0305 // in triggerDueTimers() shortly after the callback returns. 0306 iterRange.first->second = nullptr; 0307 } 0308 maybeSetTimeoutForIntegrator(); 0309 return; 0310 } 0311 } 0312 assert(false); // the timer should never request a remove when it has not been added 0313 } 0314 0315 void EventDispatcherPrivate::maybeSetTimeoutForIntegrator() 0316 { 0317 if (m_integrator) { 0318 m_integrator->watchTimeout(timeToFirstDueTimer()); 0319 } 0320 } 0321 0322 void EventDispatcherPrivate::triggerDueTimers() 0323 { 0324 m_triggerTime = PlatformTime::monotonicMsecs(); 0325 0326 for (auto it = m_timers.begin(); it != m_timers.end();) { 0327 const uint64 timerTimeout = it->first >> 10; 0328 if (timerTimeout > m_triggerTime) { 0329 break; 0330 } 0331 // careful here - protect against adding and removing any timer while inside its trigger()! 0332 // we do this by keeping the iterator at the current position (so changing any other timer 0333 // doesn't invalidate it) and blocking changes to the timer behind that iterator 0334 // (so we don't mess with its data should it have been deleted outright in the callback) 0335 0336 m_serialsCompacted = false; 0337 0338 Timer *timer = it->second; 0339 //std::cerr << "triggerDueTimers() - tag: " << it->first <<" pointer: " << timer << '\n'; 0340 assert(timer->m_isRunning); 0341 0342 // invariant: 0343 // timer.dueTime <= m_triggerTime <= currentTime(here) <= <timerAddedInTrigger>.dueTime 0344 // (the latter except for zero interval timers added in a timer callback, which go into 0345 // the staging area) 0346 if (timer->m_nextDueTime != 0) { // == 0: timer is in staging area 0347 timer->trigger(); 0348 } 0349 0350 if (m_serialsCompacted) 0351 { 0352 it = m_adjustedIteratorOfNextTimer; 0353 continue; 0354 } 0355 0356 timer = it->second; // reload, removeTimer() may set it to nullptr 0357 if (timer && timer->m_isRunning) { 0358 // ### we are rescheduling timers based on triggerTime even though real time can be 0359 // much later - is this the desired behavior? I think so... 0360 if (timer->m_interval == 0 && timer->m_nextDueTime != 0) { 0361 // If we reinserted a timer with m_interval == 0, we might iterate over it again in this run 0362 // of triggerDueTimers(). If we just leave it where it is and keep iterating, we prevent 0363 // that problem, and it's good for performance, too! 0364 // If m_nextDueTime is zero, the timer was inserted during the last run of triggerDueTimers, 0365 // and it *should* be reinserted, so that the timer is triggered in this and future runs. 0366 ++it; 0367 } else { 0368 timer->m_nextDueTime = m_triggerTime + timer->m_interval; 0369 it = m_timers.erase(it); 0370 m_timers.emplace(timer->tag(), timer); 0371 } 0372 } else { 0373 it = m_timers.erase(it); 0374 } 0375 } 0376 m_triggerTime = 0; 0377 maybeSetTimeoutForIntegrator(); 0378 } 0379 0380 void EventDispatcherPrivate::queueEvent(std::unique_ptr<Event> evt) 0381 { 0382 // std::cerr << "EventDispatcherPrivate::queueEvent() " << evt->type << " " << this << std::endl; 0383 { 0384 SpinLocker locker(&m_queuedEventsLock); 0385 m_queuedEvents.emplace_back(std::move(evt)); 0386 } 0387 wakeForEvents(); 0388 } 0389 0390 void EventDispatcherPrivate::processAuxEvents() 0391 { 0392 // std::cerr << "EventDispatcherPrivate::processAuxEvents() " << this << std::endl; 0393 // don't hog the lock while processing the events 0394 std::vector<std::unique_ptr<Event>> events; 0395 { 0396 SpinLocker locker(&m_queuedEventsLock); 0397 std::swap(events, m_queuedEvents); 0398 } 0399 if (m_connectionToNotify) { 0400 for (const std::unique_ptr<Event> &evt : events) { 0401 m_connectionToNotify->processEvent(evt.get()); 0402 } 0403 } 0404 }