File indexing completed on 2024-05-05 17:50:03

0001 /*
0002    Copyright (C) 2013 Andreas Hartmetz <ahartmetz@gmail.com>
0003 
0004    This library is free software; you can redistribute it and/or
0005    modify it under the terms of the GNU Library General Public
0006    License as published by the Free Software Foundation; either
0007    version 2 of the License, or (at your option) any later version.
0008 
0009    This library is distributed in the hope that it will be useful,
0010    but WITHOUT ANY WARRANTY; without even the implied warranty of
0011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012    Library General Public License for more details.
0013 
0014    You should have received a copy of the GNU Library General Public License
0015    along with this library; see the file COPYING.LGPL.  If not, write to
0016    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
0017    Boston, MA 02110-1301, USA.
0018 
0019    Alternatively, this file is available under the Mozilla Public License
0020    Version 1.1.  You may obtain a copy of the License at
0021    http://www.mozilla.org/MPL/
0022 */
0023 
0024 #include "eventdispatcher.h"
0025 #include "eventdispatcher_p.h"
0026 
0027 #ifndef DFERRY_NO_NATIVE_POLL
0028 #ifdef __linux__
0029 #include "epolleventpoller.h"
0030 #elif defined _WIN32
0031 #include "selecteventpoller_win32.h"
0032 #else
0033 #include "selecteventpoller_unix.h"
0034 #endif
0035 #endif
0036 
0037 #include "event.h"
0038 #include "foreigneventloopintegrator.h"
0039 #include "ieventpoller.h"
0040 #include "iioeventlistener.h"
0041 #include "platformtime.h"
0042 #include "connection_p.h"
0043 #include "timer.h"
0044 
0045 #include <algorithm>
0046 #include <cassert>
0047 
0048 #include <iostream>
0049 
0050 //#define EVENTDISPATCHER_DEBUG
0051 
0052 #ifndef DFERRY_NO_NATIVE_POLL
0053 EventDispatcher::EventDispatcher()
0054    : d(new EventDispatcherPrivate)
0055 {
0056 #ifdef __linux__
0057     d->m_poller = new EpollEventPoller(this);
0058 #else
0059     // TODO high performance IO multiplexers for non-Linux platforms
0060     d->m_poller = new SelectEventPoller(this);
0061 #endif
0062 }
0063 #endif
0064 
0065 EventDispatcher::EventDispatcher(ForeignEventLoopIntegrator *integrator)
0066    : d(new EventDispatcherPrivate)
0067 {
0068     d->m_integrator = integrator;
0069     d->m_poller = integrator->connectToDispatcher(this);
0070 }
0071 
0072 EventDispatcherPrivate::~EventDispatcherPrivate()
0073 {
0074     // removeIoListener is going to remove the current entry from m_ioListeners, so use this funny
0075     // way to "iterate"...
0076     for (auto it = m_ioListeners.cbegin(); it != m_ioListeners.cend(); it = m_ioListeners.cbegin())
0077     {
0078         const size_t sizeBefore = m_ioListeners.size();
0079         removeIoListener(it->second);
0080         if (m_ioListeners.size() == sizeBefore)
0081         {
0082             // this should never happen, however, avoid an infinite loop if it somehow does...
0083             assert(false);
0084             m_ioListeners.erase(it);
0085         }
0086     }
0087 
0088     for (auto it = m_timers.cbegin(); it != m_timers.cend(); ++it) {
0089         it->second->m_eventDispatcher = nullptr;
0090         it->second->m_isRunning = false;
0091     }
0092 
0093     if (!m_integrator) {
0094         delete m_poller;
0095     }
0096 }
0097 
0098 EventDispatcher::~EventDispatcher()
0099 {
0100     delete d;
0101     d = nullptr;
0102 }
0103 
0104 bool EventDispatcher::poll(int timeout)
0105 {
0106     int nextDue = d->timeToFirstDueTimer();
0107     if (timeout < 0) {
0108         timeout = nextDue;
0109     } else if (nextDue >= 0) {
0110         timeout = std::min(timeout, nextDue);
0111     }
0112 
0113 #ifdef EVENTDISPATCHER_DEBUG
0114     printf("EventDispatcher::poll(): timeout=%d, nextDue=%d.\n", timeout, nextDue);
0115 #endif
0116     IEventPoller::InterruptAction interrupAction = d->m_poller->poll(timeout);
0117 
0118     if (interrupAction == IEventPoller::Stop) {
0119         return false;
0120     } else if (interrupAction == IEventPoller::ProcessAuxEvents && d->m_connectionToNotify) {
0121         d->processAuxEvents();
0122     }
0123     d->triggerDueTimers();
0124     return true;
0125 }
0126 
0127 void EventDispatcher::interrupt()
0128 {
0129     d->m_poller->interrupt(IEventPoller::Stop);
0130 }
0131 
0132 void EventDispatcherPrivate::wakeForEvents()
0133 {
0134     m_poller->interrupt(IEventPoller::ProcessAuxEvents);
0135 }
0136 
0137 void EventDispatcherPrivate::addIoListenerInternal(IIoEventListener *iol, uint32 ioRw)
0138 {
0139     std::pair<std::unordered_map<FileDescriptor, IIoEventListener*>::iterator, bool> insertResult;
0140     insertResult = m_ioListeners.insert(std::make_pair(iol->fileDescriptor(), iol));
0141     if (insertResult.second) {
0142         m_poller->addFileDescriptor(iol->fileDescriptor(), ioRw);
0143     }
0144 }
0145 
0146 void EventDispatcherPrivate::removeIoListenerInternal(IIoEventListener *iol)
0147 {
0148     if (m_ioListeners.erase(iol->fileDescriptor())) {
0149         m_poller->removeFileDescriptor(iol->fileDescriptor());
0150     }
0151 }
0152 
0153 void EventDispatcherPrivate::updateIoInterestInternal(IIoEventListener *iol, uint32 ioRw)
0154 {
0155     m_poller->setReadWriteInterest(iol->fileDescriptor(), ioRw);
0156 }
0157 
0158 void EventDispatcherPrivate::notifyListenerForIo(FileDescriptor fd, IO::RW ioRw)
0159 {
0160     std::unordered_map<FileDescriptor, IIoEventListener *>::iterator it = m_ioListeners.find(fd);
0161     if (it != m_ioListeners.end()) {
0162         it->second->handleIoReady(ioRw);
0163     } else {
0164 #ifdef IEVENTDISPATCHER_DEBUG
0165         // while interesting for debugging, this is not an error if a connection was in the epoll
0166         // set and disconnected in its handleCanRead() or handleCanWrite() implementation
0167         std::cerr << "EventDispatcherPrivate::notifyListenerForIo(): unhandled file descriptor "
0168                   <<  fd << ".\n";
0169 #endif
0170     }
0171 }
0172 
0173 int EventDispatcherPrivate::timeToFirstDueTimer() const
0174 {
0175     std::multimap<uint64, Timer*>::const_iterator it = m_timers.cbegin();
0176     if (it != m_timers.cend() && it->second == nullptr) {
0177         // this is the dead entry of the currently triggered, currently being deleted Timer
0178         ++it;
0179     }
0180 
0181     if (it == m_timers.cend()) {
0182         return -1;
0183     }
0184 
0185     uint64 nextTimeout = it->first >> 10;
0186     uint64 currentTime = PlatformTime::monotonicMsecs();
0187 
0188     if (currentTime >= nextTimeout) {
0189         return 0;
0190     }
0191     return nextTimeout - currentTime;
0192 }
0193 
0194 uint EventDispatcherPrivate::nextTimerSerial()
0195 {
0196     if (m_currentTimerSerial > s_maxTimerSerial) {
0197         tryCompactTimerSerials();
0198     }
0199     return m_currentTimerSerial++;
0200 }
0201 
0202 void EventDispatcherPrivate::tryCompactTimerSerials()
0203 {
0204     // don't bother trying to compact when there won't be much room anyway (we are probably heading for an
0205     // unavoidable overflow / duplicates)
0206     const size_t timersCount = m_timers.size();
0207     if (timersCount >= s_maxTimerSerial * 0.9) {
0208         std::cerr << timersCount << "are too many active timers! Timers timing out at the same time are "
0209                                     "not guaranteed to trigger in a predictable order anymore.\n";
0210         m_currentTimerSerial = 0;
0211         return;
0212     }
0213 
0214     bool pickNextTimer = false;
0215     int numDeadEntries = 0;
0216     auto it = m_timers.begin();
0217     for (uint newSerial = 0; newSerial < timersCount; newSerial++) {
0218         Timer *const timer = it->second;
0219         it = m_timers.erase(it);
0220         if (!pickNextTimer) {
0221             if (timer && timer->m_isRunning) {
0222                 timer->m_serial = newSerial;
0223                 m_timers.emplace(timer->tag(), timer);
0224             } else {
0225                 // Drop this timer (it was removed while triggered) and use the serial for the next timer.
0226                 // In that situation, we are being called from inside that timer's callback. Prepare for
0227                 // iteration in triggerDueTimers() to continue after the current timer.
0228                 newSerial--;
0229                 m_adjustedIteratorOfNextTimer = m_timers.end();
0230                 numDeadEntries++;
0231                 pickNextTimer = true;
0232             }
0233         } else {
0234             // Only one timer can be currently triggered, which produces a "dead" map entry. Any other
0235             // timers are either running, or have already been removed from the map. Check this here.
0236             assert(timer);
0237             assert(timer->m_isRunning);
0238             timer->m_serial = newSerial;
0239             m_adjustedIteratorOfNextTimer = m_timers.emplace(timer->tag(), timer);
0240             pickNextTimer = false;
0241         }
0242     }
0243 
0244     assert(numDeadEntries <= 1);
0245 
0246     m_serialsCompacted = true;
0247     m_currentTimerSerial = timersCount;
0248 }
0249 
0250 void EventDispatcherPrivate::printTimerMap() const
0251 {
0252     for (auto it = m_timers.cbegin(); it != m_timers.cend(); ++it) {
0253         std::cerr << "tag: " << it->first
0254                   << " dueTime: " << (it->first >> 10) << " serial: " << (it->first & 0x3ff)
0255                   << " pointer: " << it->second << '\n';
0256     }
0257 }
0258 
0259 void EventDispatcherPrivate::addTimer(Timer *timer)
0260 {
0261     timer->m_serial = nextTimerSerial();
0262 
0263 
0264     uint64 dueTime;
0265     if (timer->m_interval != 0 || !m_triggerTime) {
0266         //std::cerr << "addTimer regular path " << timer << '\n';
0267         dueTime = PlatformTime::monotonicMsecs() + uint64(timer->m_interval);
0268     } else {
0269         // A timer is added from a timer callback - make sure it only runs in the *next* iteration
0270         // of the event loop. Timer users expect a timer to run at the earliest when the event loop
0271         // runs *again*, not in this iteration.
0272         //std::cerr << "addTimer staging path " << timer << '\n';
0273         dueTime = 0;
0274     }
0275 
0276     timer->m_nextDueTime = dueTime;
0277 
0278     //std::cerr << "  addTimer before:\n";
0279     //printTimerMap();
0280     m_timers.emplace(timer->tag(), timer);
0281     maybeSetTimeoutForIntegrator();
0282     //std::cerr << "  addTimer after:\n";
0283     //printTimerMap();
0284 }
0285 
0286 void EventDispatcherPrivate::removeTimer(Timer *timer)
0287 {
0288     assert(timer->tag() != 0);
0289 
0290     // If inside a timer instance T's callback, this is only called from T's destructor, never from
0291     // T.setRunning(false). In the setRunning(false) case, removing is handled in triggerDueTimers()
0292     // right after invoking the callback by looking at T.m_isRunning. In the destructor case, this
0293     // sets the Timer pointer to nullptr (see below).
0294     // It is possible that the technique for handling the destructor case could also handle the
0295     // setRunning(false) case, something to consider... (Note: tryCompactTimerSerials kinda does that
0296     // already.)
0297     auto iterRange = m_timers.equal_range(timer->tag());
0298     for (; iterRange.first != iterRange.second; ++iterRange.first) {
0299         if (iterRange.first->second == timer) {
0300             if (!timer->m_reentrancyGuard) {
0301                 m_timers.erase(iterRange.first);
0302             } else {
0303                 // This means that this is an "emergency remove" of a timer being deleted while in its
0304                 // callback. Mark it as dead so we won't dereference it. The map entry will be erased
0305                 // in triggerDueTimers() shortly after the callback returns.
0306                 iterRange.first->second = nullptr;
0307             }
0308             maybeSetTimeoutForIntegrator();
0309             return;
0310         }
0311     }
0312     assert(false); // the timer should never request a remove when it has not been added
0313 }
0314 
0315 void EventDispatcherPrivate::maybeSetTimeoutForIntegrator()
0316 {
0317     if (m_integrator) {
0318         m_integrator->watchTimeout(timeToFirstDueTimer());
0319     }
0320 }
0321 
0322 void EventDispatcherPrivate::triggerDueTimers()
0323 {
0324     m_triggerTime = PlatformTime::monotonicMsecs();
0325 
0326     for (auto it = m_timers.begin(); it != m_timers.end();) {
0327         const uint64 timerTimeout = it->first >> 10;
0328         if (timerTimeout > m_triggerTime) {
0329             break;
0330         }
0331         // careful here - protect against adding and removing any timer while inside its trigger()!
0332         // we do this by keeping the iterator at the current position (so changing any other timer
0333         // doesn't invalidate it) and blocking changes to the timer behind that iterator
0334         // (so we don't mess with its data should it have been deleted outright in the callback)
0335 
0336         m_serialsCompacted = false;
0337 
0338         Timer *timer = it->second;
0339         //std::cerr << "triggerDueTimers() - tag: " << it->first <<" pointer: " << timer << '\n';
0340         assert(timer->m_isRunning);
0341 
0342         // invariant:
0343         // timer.dueTime <= m_triggerTime <= currentTime(here) <= <timerAddedInTrigger>.dueTime
0344         // (the latter except for zero interval timers added in a timer callback, which go into
0345         // the staging area)
0346         if (timer->m_nextDueTime != 0) { // == 0: timer is in staging area
0347             timer->trigger();
0348         }
0349 
0350         if (m_serialsCompacted)
0351         {
0352             it = m_adjustedIteratorOfNextTimer;
0353             continue;
0354         }
0355 
0356         timer = it->second; // reload, removeTimer() may set it to nullptr
0357         if (timer && timer->m_isRunning) {
0358             // ### we are rescheduling timers based on triggerTime even though real time can be
0359             // much later - is this the desired behavior? I think so...
0360             if (timer->m_interval == 0 && timer->m_nextDueTime != 0) {
0361                 // If we reinserted a timer with m_interval == 0, we might iterate over it again in this run
0362                 // of triggerDueTimers(). If we just leave it where it is and keep iterating, we prevent
0363                 // that problem, and it's good for performance, too!
0364                 // If m_nextDueTime is zero, the timer was inserted during the last run of triggerDueTimers,
0365                 // and it *should* be reinserted, so that the timer is triggered in this and future runs.
0366                 ++it;
0367             } else {
0368                 timer->m_nextDueTime = m_triggerTime + timer->m_interval;
0369                 it = m_timers.erase(it);
0370                 m_timers.emplace(timer->tag(), timer);
0371             }
0372         } else {
0373             it = m_timers.erase(it);
0374         }
0375     }
0376     m_triggerTime = 0;
0377     maybeSetTimeoutForIntegrator();
0378 }
0379 
0380 void EventDispatcherPrivate::queueEvent(std::unique_ptr<Event> evt)
0381 {
0382     // std::cerr << "EventDispatcherPrivate::queueEvent() " << evt->type << " " << this << std::endl;
0383     {
0384         SpinLocker locker(&m_queuedEventsLock);
0385         m_queuedEvents.emplace_back(std::move(evt));
0386     }
0387     wakeForEvents();
0388 }
0389 
0390 void EventDispatcherPrivate::processAuxEvents()
0391 {
0392     // std::cerr << "EventDispatcherPrivate::processAuxEvents() " << this << std::endl;
0393     // don't hog the lock while processing the events
0394     std::vector<std::unique_ptr<Event>> events;
0395     {
0396         SpinLocker locker(&m_queuedEventsLock);
0397         std::swap(events, m_queuedEvents);
0398     }
0399     if (m_connectionToNotify) {
0400         for (const std::unique_ptr<Event> &evt : events) {
0401             m_connectionToNotify->processEvent(evt.get());
0402         }
0403     }
0404 }