Warning, file /plasma/plasma-workspace/appiumtests/systemtraytest.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python3
0002 
0003 # SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org>
0004 # SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
0005 # SPDX-License-Identifier: MIT
0006 
0007 import os
0008 import queue
0009 import sys
0010 import threading
0011 import unittest
0012 from datetime import date
0013 from subprocess import PIPE, Popen
0014 from time import sleep
0015 from typing import IO, Final
0016 
0017 import gi
0018 
0019 gi.require_version("Gtk", "3.0")  # StatusIcon is removed in 4
0020 from appium import webdriver
0021 from appium.options.common.base import AppiumOptions
0022 from appium.webdriver.common.appiumby import AppiumBy
0023 from gi.repository import Gio, GLib, Gtk
0024 from selenium.common.exceptions import TimeoutException
0025 from selenium.webdriver.support import expected_conditions as EC
0026 from selenium.webdriver.support.ui import WebDriverWait
0027 
0028 KDE_VERSION: Final = 5
0029 WIDGET_ID: Final = "org.kde.plasma.systemtray"
0030 
0031 
0032 class XEmbedTrayIcon(threading.Thread):
0033     """
0034     XEmbed tray icon implementation using Gtk.StatusIcon
0035     """
0036 
0037     def __init__(self, title: str) -> None:
0038         super().__init__()
0039 
0040         self.__timer: threading.Timer = threading.Timer(300, Gtk.main_quit)  # Failsafe
0041 
0042         self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title)
0043         self.__status_icon.set_from_icon_name("xorg")
0044         self.__status_icon.connect("button-press-event", self.__on_button_press_event)
0045         self.__status_icon.connect("button-release-event", self.__on_button_release_event)
0046         self.__status_icon.connect("popup-menu", self.__on_popup_menu)
0047         self.__status_icon.connect("scroll-event", self.__on_scroll_event)
0048 
0049     def run(self) -> None:
0050         self.__timer.start()
0051         Gtk.main()
0052 
0053     def quit(self) -> None:
0054         self.__timer.cancel()
0055         Gtk.main_quit()
0056 
0057     def __on_button_press_event(self, status_icon: Gtk.StatusIcon, button_event) -> None:
0058         print("button-press-event", button_event.button, file=sys.stderr, flush=True)
0059 
0060     def __on_button_release_event(self, status_icon: Gtk.StatusIcon, button_event) -> None:
0061         print("button-release-event", button_event.button, file=sys.stderr, flush=True)
0062         self.quit()
0063 
0064     def __on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, activate_time: int) -> None:
0065         print("popup-menu", button, activate_time, file=sys.stderr, flush=True)
0066 
0067     def __on_scroll_event(self, status_icon, scroll_event) -> None:
0068         print("scroll-event", scroll_event.delta_x, scroll_event.delta_y, int(scroll_event.direction), file=sys.stderr, flush=True)
0069 
0070 
0071 class StreamReaderThread(threading.Thread):
0072     """
0073     Non-blocking readline thread
0074     """
0075 
0076     def __init__(self, stream: IO[bytes]) -> None:
0077         """
0078         @param stream: the stream to read from
0079         """
0080         self.__stream: IO[bytes] = stream
0081         self.__queue = queue.Queue()
0082 
0083         self.__stop_event = threading.Event()
0084 
0085         # Create the thread
0086         super().__init__()
0087 
0088     def run(self) -> None:
0089         """
0090         Collects lines from the source stream and put them in the queue.
0091         """
0092         while self.__stream.readable() and not self.__stop_event.is_set():
0093             line_str: str = self.__stream.readline().decode(encoding="utf-8")
0094             if "Received click" in line_str:
0095                 self.__queue.put(line_str)
0096             elif len(line_str) == 0:
0097                 break
0098 
0099     def stop(self) -> None:
0100         """
0101         Stops the thread
0102         """
0103         self.__stop_event.set()
0104 
0105     def readline(self) -> str | None:
0106         """
0107         Non-blocking readline
0108         The default timeout is 5s.
0109         """
0110         try:
0111             return self.__queue.get(block=True, timeout=5)
0112         except queue.Empty:
0113             return None
0114 
0115 
0116 class SystemTrayTests(unittest.TestCase):
0117     """
0118     Tests for the system tray widget
0119     """
0120 
0121     driver: webdriver.Remote
0122     xembedsniproxy: Popen[bytes]
0123     xembed_tray_icon: XEmbedTrayIcon | None
0124     stream_reader_thread: StreamReaderThread | None
0125 
0126     @classmethod
0127     def setUpClass(cls) -> None:
0128         """
0129         Opens the widget and initialize the webdriver
0130         """
0131         options = AppiumOptions()
0132         options.set_capability("app", f"plasmawindowed -p org.kde.plasma.nano {WIDGET_ID}")
0133         cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', options=options)
0134 
0135     def setUp(self) -> None:
0136         self.kded = Popen([f"kded{KDE_VERSION}"])
0137         # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnection.html
0138         session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESSION)
0139         SERVICE_NAME: Final = "org.freedesktop.DBus"
0140         OBJECT_PATH: Final = "/"
0141         INTERFACE_NAME: Final = SERVICE_NAME
0142         message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner")
0143         message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"]))
0144         for _ in range(5):
0145             reply, _ = session_bus.send_message_with_reply_sync(message, Gio.DBusSendMessageFlags.NONE, 1000)
0146 
0147             if reply and reply.get_signature() == 'b' and reply.get_body().get_child_value(0).get_boolean():
0148                 break
0149             print(f"waiting for kded to appear on the dbus session", file=sys.stderr, flush=True)
0150             sleep(1)
0151 
0152         kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE_VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", GLib.Variant("(s)", [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), Gio.DBusSendMessageFlags.NONE, 1000)
0153         self.assertTrue(kded_reply.get_child_value(0).get_boolean(), "Module is not loaded")
0154 
0155     def tearDown(self) -> None:
0156         """
0157         Take screenshot when the current test fails
0158         """
0159         if not self._outcome.result.wasSuccessful():
0160             self.driver.get_screenshot_as_file(f"failed_test_shot_systemtraytest_#{self.id()}.png")
0161         self.kded.kill()
0162 
0163     @classmethod
0164     def tearDownClass(cls) -> None:
0165         """
0166         Make sure to terminate the driver again, lest it dangles.
0167         """
0168         cls.driver.quit()
0169 
0170     def cleanup_xembed_tray_icon(self) -> None:
0171         """
0172         Cleanup function for test_xembed_tray_icon
0173         """
0174         self.xembedsniproxy.terminate()
0175         self.xembedsniproxy = None
0176 
0177         if self.xembed_tray_icon is not None:
0178             self.xembed_tray_icon.quit()
0179             self.xembed_tray_icon = None
0180 
0181         if self.stream_reader_thread is not None and self.stream_reader_thread.is_alive():
0182             self.stream_reader_thread.stop()
0183             self.stream_reader_thread = None
0184 
0185     def test_xembed_tray_icon(self) -> None:
0186         """
0187         Tests XEmbed tray icons can be listed and clicked in the tray.
0188 
0189         @note GTK doesn't like send_events and double checks the mouse position
0190         matches where the window is and is top level, so match the debug
0191         output from xembedsniproxy instead.
0192         """
0193         self.addCleanup(self.cleanup_xembed_tray_icon)
0194 
0195         debug_env: dict[str, str] = os.environ.copy()
0196         debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true"
0197         self.xembedsniproxy = Popen(['xembedsniproxy', '--platform', 'xcb'], env=debug_env, stderr=PIPE)  # For debug output
0198         if not self.xembedsniproxy.stderr or self.xembedsniproxy.poll() != None:
0199             self.fail("xembedsniproxy is not available")
0200         print(f"xembedsniproxy PID: {self.xembedsniproxy.pid}", file=sys.stderr, flush=True)
0201 
0202         title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m%d')}"
0203         self.xembed_tray_icon = XEmbedTrayIcon(title)
0204         self.xembed_tray_icon.start()
0205 
0206         wait: WebDriverWait = WebDriverWait(self.driver, 10)
0207         try:
0208             # FocusScope in StatusNotifierItem.qml
0209             wait.until(EC.presence_of_element_located((AppiumBy.NAME, title)))
0210         except TimeoutException:
0211             self.fail(f"Cannot find the XEmbed icon in the system tray: {self.xembedsniproxy.stderr.readlines()}")
0212 
0213         # Create a reader thread to work around read block
0214         self.stream_reader_thread = StreamReaderThread(self.xembedsniproxy.stderr)
0215         self.stream_reader_thread.start()
0216         self.assertTrue(self.stream_reader_thread.is_alive(), "The reader thread is not running")
0217 
0218         # Now test clickable
0219         self.driver.find_element(AppiumBy.NAME, title).click()
0220 
0221         success: bool = False
0222         for _ in range(10):
0223             if self.stream_reader_thread.readline() is not None:
0224                 success = True
0225                 break
0226 
0227             print("Retrying...", file=sys.stderr, flush=True)
0228             self.driver.find_element(AppiumBy.NAME, title).click()
0229 
0230         self.assertTrue(success, "xembedsniproxy did not receive the click event")
0231 
0232 
0233 if __name__ == '__main__':
0234     unittest.main()