File indexing completed on 2024-06-23 05:30:42

0001 #!/usr/bin/env python3
0002 
0003 # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
0004 # SPDX-License-Identifier: GPL-2.0-or-later
0005 
0006 # For FreeBSD CI which only has python 3.9
0007 from __future__ import annotations
0008 
0009 import threading
0010 import time
0011 
0012 from gi.repository import GLib
0013 
0014 
0015 class GLibMainLoopThread(threading.Thread):
0016     """
0017     A GLib main loop in another thread.
0018     @note It's redundant to create a loop when QCoreApplication is used in the same application.
0019     """
0020 
0021     loop: GLib.MainLoop | None = None
0022 
0023     def __init__(self) -> None:
0024         # Set up D-Bus loop
0025         self.loop = GLib.MainLoop()
0026         self.failSafeTimer = threading.Timer(60, self.loop.quit)
0027 
0028         # Create the thread
0029         super(GLibMainLoopThread, self).__init__()
0030 
0031     def run(self) -> None:
0032         """
0033         Method to run the DBus main loop (on a thread)
0034         """
0035         self.failSafeTimer.start()
0036         self.loop.run()
0037 
0038     def quit(self) -> None:
0039         self.failSafeTimer.cancel()
0040         self.loop.quit()
0041 
0042     @classmethod
0043     def process_events(cls) -> None:
0044         """
0045         Processes some pending events in the main loop
0046         """
0047         if cls.loop is not None:
0048             context = cls.loop.get_context()
0049         else:
0050             context = GLib.MainContext.default()
0051 
0052         for _ in range(4):
0053             if not context.pending():
0054                 time.sleep(0.1)
0055                 continue
0056             context.iteration(may_block=False)
0057             break