File indexing completed on 2024-06-23 05:30:42
0001 #!/usr/bin/env python3 0002 0003 # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com> 0004 # SPDX-License-Identifier: GPL-2.0-or-later 0005 0006 # For FreeBSD CI which only has python 3.9 0007 from __future__ import annotations 0008 0009 import threading 0010 import time 0011 0012 from gi.repository import GLib 0013 0014 0015 class GLibMainLoopThread(threading.Thread): 0016 """ 0017 A GLib main loop in another thread. 0018 @note It's redundant to create a loop when QCoreApplication is used in the same application. 0019 """ 0020 0021 loop: GLib.MainLoop | None = None 0022 0023 def __init__(self) -> None: 0024 # Set up D-Bus loop 0025 self.loop = GLib.MainLoop() 0026 self.failSafeTimer = threading.Timer(60, self.loop.quit) 0027 0028 # Create the thread 0029 super(GLibMainLoopThread, self).__init__() 0030 0031 def run(self) -> None: 0032 """ 0033 Method to run the DBus main loop (on a thread) 0034 """ 0035 self.failSafeTimer.start() 0036 self.loop.run() 0037 0038 def quit(self) -> None: 0039 self.failSafeTimer.cancel() 0040 self.loop.quit() 0041 0042 @classmethod 0043 def process_events(cls) -> None: 0044 """ 0045 Processes some pending events in the main loop 0046 """ 0047 if cls.loop is not None: 0048 context = cls.loop.get_context() 0049 else: 0050 context = GLib.MainContext.default() 0051 0052 for _ in range(4): 0053 if not context.pending(): 0054 time.sleep(0.1) 0055 continue 0056 context.iteration(may_block=False) 0057 break