Warning, file /sdk/selenium-webdriver-at-spi/selenium-webdriver-at-spi.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 # SPDX-License-Identifier: AGPL-3.0-or-later
0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org>
0003 
0004 import base64
0005 from datetime import datetime, timedelta
0006 import numpy as np
0007 import tempfile
0008 import time
0009 import traceback
0010 from flask import Flask, request, jsonify
0011 import uuid
0012 import json
0013 import sys
0014 import os
0015 import signal
0016 import subprocess
0017 from werkzeug.exceptions import HTTPException
0018 
0019 import pyatspi
0020 from lxml import etree
0021 
0022 import gi
0023 from gi.repository import GLib
0024 from gi.repository import Gio
0025 gi.require_version('Gdk', '3.0')
0026 from gi.repository import Gdk
0027 gi.require_version('Gtk', '3.0')
0028 from gi.repository import Gtk
0029 
0030 from app_roles import ROLE_NAMES
0031 
0032 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so
0033 # we lose nothing but gain the reduced sucking of python.
0034 
0035 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md
0036 # https://www.w3.org/TR/webdriver
0037 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md
0038 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/
0039 
0040 EVENTLOOP_TIME = 0.1
0041 EVENTLOOP_TIME_LONG = 0.5
0042 sys.stdout = sys.stderr
0043 sessions = {} # global dict of open sessions
0044 
0045 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk
0046 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream.
0047 pyatspi.setTimeout(4000, 15000)
0048 
0049 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework.
0050 app = Flask(__name__)
0051 
0052 @app.errorhandler(Exception)
0053 def unknown_error(e):
0054     if isinstance(e, HTTPException):
0055         return e
0056 
0057     return errorFromException(error='unknown error', exception=e), 500
0058 
0059 @app.route('/status', methods=['GET'])
0060 def status():
0061     body = {
0062         'value': {
0063             'ready': 'true',
0064             'message': 'There is only one state. Hooray!'
0065         }
0066     }
0067     return json.dumps(body), 200, {'content-type': 'application/json'}
0068 
0069 
0070 def _createNode2(accessible, parentElement, indexInParents=[]):
0071     if not accessible:
0072         return
0073     # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root.
0074     if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry":
0075         return
0076 
0077     roleName = accessible.getRoleName()
0078     e = None
0079     if roleName:
0080         e = etree.Element(roleName.replace(" ", "_"))
0081     else:
0082         e = etree.Element("accessible")
0083 
0084     e.set("name", accessible.name)
0085     e.set("role", str(int(accessible.getRole())))
0086     e.set("description", accessible.description)
0087     if accessible.accessibleId != None:
0088         e.set("accessibility-id", accessible.accessibleId)
0089     # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes.
0090     # while generating the xml.
0091     # path = pyatspi.getPath(accessible)
0092     # path is a list of ints for the indexes within the parents
0093     path_strs = [str(x) for x in indexInParents]
0094     e.set("path", ' '.join(path_strs))
0095 
0096     states = []
0097     for state in accessible.getState().getStates():
0098         states.append(pyatspi.stateToString(state))
0099     e.set("states", ', '.join(states))
0100 
0101     for i in range(0, accessible.childCount):
0102         newIndex = indexInParents.copy()
0103         newIndex.append(i)
0104         _createNode2(accessible.getChildAtIndex(i), e, newIndex)
0105 
0106     if parentElement != None:
0107         parentElement.append(e)
0108     else:
0109         return e
0110 
0111 
0112 def errorFromMessage(error, message):
0113     return jsonify({'value': {'error': error, 'message': message}})
0114 
0115 
0116 def errorFromException(error, exception):
0117     return jsonify({'value': {'error': error, 'message': str(exception), 'stacktrace': traceback.format_exc()}})
0118 
0119 
0120 @app.route('/')
0121 def index():
0122     return 'Servas'
0123 
0124 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking
0125 # an app.
0126 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the
0127 #   REST app functions to the Session object.
0128 
0129 
0130 class Session:
0131 
0132     def __init__(self) -> None:
0133         self.id = str(uuid.uuid1())
0134         self.elements = {}  # a cache to hold elements between finding and interacting with
0135         self.browsing_context = None
0136         self.pid = -1
0137         # implicit deviates from spec, 0 is unreasonable
0138         self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000}
0139         self.launched = False
0140 
0141         blob = json.loads(request.data)
0142         print(request.data)
0143         # TODO the blob from ruby is much more complicated god only knows why
0144         desired_app = None
0145         desired_timeouts = None
0146         desired_environ = None
0147         if 'desiredCapabilities' in blob:
0148             if 'app' in blob['desiredCapabilities']:
0149                 desired_app = blob['desiredCapabilities']['app']
0150             else:
0151                 desired_app = blob['desiredCapabilities']['appium:app']
0152             if 'timeouts' in blob['desiredCapabilities']:
0153                 desired_timeouts = blob['desiredCapabilities']['timeouts']
0154             if 'appium:environ' in blob['desiredCapabilities']:
0155                 desired_environ = blob['desiredCapabilities']['appium:environ']
0156         else:
0157             if 'app' in blob['capabilities']['alwaysMatch']:
0158                 desired_app = blob['capabilities']['alwaysMatch']['app']
0159             else:
0160                 desired_app = blob['capabilities']['alwaysMatch']['appium:app']
0161             if 'timeouts' in blob['capabilities']['alwaysMatch']:
0162                 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts']
0163             if 'appium:environ' in blob['capabilities']['alwaysMatch']:
0164                 desired_environ = blob['capabilities']['alwaysMatch']['appium:environ']
0165 
0166         if desired_timeouts:
0167             if 'script' in desired_timeouts:
0168                 self.timeouts['script'] = desired_timeouts['script']
0169             if 'pageLoad' in desired_timeouts:
0170                 self.timeouts['pageLoad'] = desired_timeouts['pageLoad']
0171             if 'implicit' in desired_timeouts:
0172                 self.timeouts['implicit'] = desired_timeouts['implicit']
0173 
0174         if desired_app == 'Root':
0175             # NB: at the time of writing there can only be one desktop ever
0176             self.browsing_context = pyatspi.Registry.getDesktop(0)
0177             return
0178 
0179         self.launched = True
0180         end_time = datetime.now() + \
0181             timedelta(milliseconds=(self.timeouts['implicit'] * 2))
0182 
0183         context = Gio.AppLaunchContext()
0184         context.setenv('QT_ACCESSIBILITY', '1')
0185         context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1')
0186         context.setenv('KIO_DISABLE_CACHE_CLEANER', '1')  # don't dangle
0187         if isinstance(desired_environ, dict):
0188             for key, value in desired_environ.items():
0189                 context.setenv(key, value)
0190 
0191         def on_launched(context, info, platform_data):
0192             self.pid = platform_data['pid']
0193             print("launched " + str(self.pid))
0194 
0195             while datetime.now() < end_time:
0196                 for desktop_index in range(pyatspi.Registry.getDesktopCount()):
0197                     desktop = pyatspi.Registry.getDesktop(desktop_index)
0198                     for app in desktop:
0199                         try:
0200                             if app.get_process_id() == self.pid:
0201                                 self.browsing_context = app
0202                                 break
0203                         except gi.repository.GLib.GError:
0204                             print('stumbled over a broken process. ignoring...')
0205                             continue
0206                     if self.browsing_context:
0207                         break
0208                 if self.browsing_context:
0209                     break
0210             if not self.browsing_context:
0211                 raise RuntimeError('Failed to find application on a11y bus within time limit!')
0212 
0213         context.connect("launched", on_launched)
0214 
0215         if desired_app.endswith(".desktop"):
0216             appinfo = Gio.DesktopAppInfo.new(desired_app)
0217             appinfo.launch([], context)
0218         elif desired_app.isnumeric():
0219             on_launched(None, None, {'pid': int(desired_app)})
0220         else:
0221             appinfo = Gio.AppInfo.create_from_commandline(
0222                 desired_app, None, Gio.AppInfoCreateFlags.NONE)
0223             appinfo.launch([], context)
0224         print("browsing context set to:", self.browsing_context)
0225 
0226     def close(self) -> None:
0227         if self.launched:
0228             try:
0229                 os.kill(self.pid, signal.SIGKILL)
0230             except ProcessLookupError:
0231                 pass
0232 
0233 
0234 @app.route('/session', methods=['GET', 'POST', 'DELETE'])
0235 def session():
0236     if request.method == 'POST':
0237         # TODO:
0238         # https://www.w3.org/TR/webdriver1/#new-session
0239         # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14
0240         print(request)
0241         try:
0242             session = Session()
0243         except Exception as e:
0244             return errorFromException(error='session not created', exception=e), 500
0245         sessions[session.id] = session
0246         print(sessions)
0247 
0248         if session.browsing_context is None:
0249             return errorFromMessage(error='session not created',
0250                                     message='Application was not found on the a11y bus for unknown reasons. It probably failed to register on the bus.'), 500
0251 
0252         return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'}
0253     elif request.method == 'GET':
0254         # TODO impl
0255         print(request)
0256     elif request.method == 'DELETE':
0257         # TODO spec review
0258         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0259 
0260 
0261 @app.route('/session/<session_id>', methods=['DELETE'])
0262 def session_delete(session_id):
0263     if request.method == 'DELETE':
0264         # TODO spec review
0265         session = sessions[session_id]
0266         if not session:
0267             return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0268 
0269         session.close()
0270         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0271 
0272 
0273 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST'])
0274 def session_implicit_wait(session_id):
0275     session = sessions[session_id]
0276     if not session:
0277         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0278 
0279     blob = json.loads(request.data)
0280     ms = blob['ms']
0281 
0282     session.timeouts['implicit'] = ms
0283     return json.dumps({'value': None})
0284 
0285 
0286 @app.route('/session/<session_id>/source', methods=['GET'])
0287 def session_source(session_id):
0288     session = sessions[session_id]
0289     if not session:
0290         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0291 
0292     doc = _createNode2(session.browsing_context, None)
0293     return json.dumps({'value': etree.tostring(doc, pretty_print=False).decode("utf-8")}), 200, {'content-type': 'application/xml'}
0294 
0295 
0296 # NB: custom method to get the source without json wrapper
0297 @app.route('/session/<session_id>/sourceRaw', methods=['GET'])
0298 def session_source_raw(session_id):
0299     session = sessions[session_id]
0300     if not session:
0301         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0302 
0303     doc = _createNode2(session.browsing_context, None)
0304     return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'}
0305 
0306 
0307 def locator(session, strategy, selector, start, findAll = False):
0308     # pyatspi.findDescendant(start, lambda x: print(x))
0309 
0310     end_time = datetime.now() + \
0311         timedelta(milliseconds=session.timeouts['implicit'])
0312     results = []
0313 
0314     while datetime.now() < end_time:
0315         if strategy == 'xpath':
0316             print("-- xml")
0317             doc = _createNode2(start, None)
0318             for c in doc.xpath(selector):
0319                 path = [int(x) for x in c.get('path').split()]
0320                 # path is relative to the app root, not our start item!
0321                 item = session.browsing_context
0322                 for i in path:
0323                     item = item[i]
0324                 if c.get('name') != item.name or c.get('description') != item.description:
0325                     return []
0326                 results.append(item)
0327             print("-- xml")
0328         else:
0329             # TODO can I switch this in python +++ raise on unmapped strategy
0330             pred = None
0331             if strategy == 'accessibility id':
0332                 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE))
0333             # pyatspi strings "[ roleName | name ]"
0334             elif strategy == 'class name':
0335                 def pred(x): return str(x) == selector and (x.getState().contains(
0336                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0337             elif strategy == 'name':
0338                 def pred(x): return x.name == selector and (x.getState().contains(
0339                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0340             elif strategy == 'description':
0341                 def pred(x): return x.description == selector and (x.getState().contains(
0342                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0343             # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug:
0344             if findAll:
0345                 accessible = pyatspi.findAllDescendants(start, pred)
0346                 if accessible:
0347                     results += accessible
0348             else:
0349                 accessible = pyatspi.findDescendant(start, pred)
0350                 if accessible:
0351                     results.append(accessible)
0352             print(accessible)
0353         if len(results) > 0:
0354             break
0355 
0356     return results
0357 
0358 
0359 @app.route('/session/<session_id>/element', methods=['GET', 'POST'])
0360 def session_element(session_id=None):
0361     # https://www.w3.org/TR/webdriver1/#dfn-find-element
0362 
0363     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0364     print(request.url)
0365     print(session_id)
0366     print(request.args)
0367     print(request.data)
0368     session = sessions[session_id]
0369     blob = json.loads(request.data)
0370 
0371     strategy = blob['using']
0372     selector = blob['value']
0373     if not strategy or not selector:
0374         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0375 
0376     start = session.browsing_context
0377     if not start: # browsing context (no longer) valid
0378         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0379 
0380     results = locator(session, strategy, selector, start)
0381 
0382     if not results:
0383         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0384 
0385     result = results[0]
0386     unique_id = result.path.replace('/', '-')
0387     session.elements[unique_id] = result
0388     return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'}
0389 
0390 
0391 @app.route('/session/<session_id>/elements', methods=['GET', 'POST'])
0392 def session_element2(session_id=None):
0393     # https://www.w3.org/TR/webdriver1/#dfn-find-elements
0394 
0395     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0396     print(request.url)
0397     print(session_id)
0398     print(request.args)
0399     print(request.data)
0400     session = sessions[session_id]
0401     blob = json.loads(request.data)
0402 
0403     strategy = blob['using']
0404     selector = blob['value']
0405     if not strategy or not selector:
0406         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0407 
0408     start = session.browsing_context
0409     if not start:  # browsing context (no longer) valid
0410         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0411 
0412     results = locator(session, strategy, selector, start, findAll = True)
0413 
0414     if not results:
0415         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0416 
0417     serializations = []
0418     for result in results:
0419         unique_id = result.path.replace('/', '-')
0420         print(unique_id)
0421         session.elements[unique_id] = result
0422         serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id})
0423 
0424     return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'}
0425 
0426 
0427 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST'])
0428 def session_element_click(session_id, element_id):
0429     session = sessions[session_id]
0430     if not session:
0431         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0432 
0433     element = session.elements[element_id]
0434     if not element:
0435         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0436 
0437     action = element.queryAction()
0438     availableActions = {}
0439     for i in range(0, action.nActions):
0440         availableActions[action.getName(i)] = i
0441 
0442     keys = availableActions.keys()
0443     if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click
0444         print("actioning focus")
0445         action.doAction(availableActions['SetFocus'])
0446         time.sleep(EVENTLOOP_TIME)
0447 
0448     try:
0449         if 'Press' in keys:
0450             print("actioning press")
0451             action.doAction(availableActions['Press'])
0452         elif 'Toggle' in keys:
0453             print("actioning toggle")
0454             action.doAction(availableActions['Toggle'])
0455         elif 'ShowMenu' in keys:
0456             print("actioning showmenu")
0457             action.doAction(availableActions['ShowMenu'])
0458     except gi.repository.GLib.GError as e:
0459         print(e)
0460         print("Ignoring! There is a chance your application is misbehaving. Could also just be a blocked eventloop though.")
0461     time.sleep(EVENTLOOP_TIME)
0462 
0463     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0464 
0465 
0466 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET'])
0467 def session_element_text(session_id, element_id):
0468     session = sessions[session_id]
0469     element = session.elements[element_id]
0470     try:
0471         textElement = element.queryText()
0472         return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'}
0473     except NotImplementedError:
0474         return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'}
0475 
0476 
0477 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET'])
0478 def session_element_enabled(session_id, element_id):
0479     session = sessions[session_id]
0480     element = session.elements[element_id]
0481     return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'}
0482 
0483 
0484 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET'])
0485 def session_element_displayed(session_id, element_id):
0486     session = sessions[session_id]
0487     element = session.elements[element_id]
0488     return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'}
0489 
0490 
0491 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET'])
0492 def session_element_selected(session_id, element_id):
0493     session = sessions[session_id]
0494     element = session.elements[element_id]
0495     return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'}
0496 
0497 
0498 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET'])
0499 def session_element_attribute(session_id, element_id, name):
0500     session = sessions[session_id]
0501     if not session:
0502         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0503 
0504     element = session.elements[element_id]
0505     if not element:
0506         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0507 
0508     print(request.data)
0509 
0510     if name == "accessibility-id":
0511         return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'}
0512 
0513     if name == "name":
0514         return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'}
0515 
0516     if name == "value":
0517         elementValue = element.queryValue()
0518         return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'}
0519 
0520     print(pyatspi.STATE_VALUE_TO_NAME)
0521     result = None
0522     for value, string in pyatspi.STATE_VALUE_TO_NAME.items():
0523         if string == name:
0524             result = element.getState().contains(value)
0525             break
0526 
0527     return json.dumps({'value': result}), 200, {'content-type': 'application/json'}
0528 
0529 
0530 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST'])
0531 def session_element_element(session_id, element_id):
0532     session = sessions[session_id]
0533     if not session:
0534         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0535 
0536     blob = json.loads(request.data)
0537     print(blob)
0538     strategy = blob['using']
0539     selector = blob['value']
0540     if not strategy or not selector:
0541         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0542 
0543     start = session.elements[element_id]
0544     if not start:  # browsing context (no longer) valid
0545         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0546 
0547     results = locator(session, strategy, selector, start)
0548 
0549     if not results:
0550         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0551 
0552     result = results[0]
0553     unique_id = result.path.replace('/', '-')
0554     session.elements[unique_id] = result
0555     return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'}
0556 
0557 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST'])
0558 def session_element_value(session_id, element_id):
0559     session = sessions[session_id]
0560     if not session:
0561         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0562 
0563     element = session.elements[element_id]
0564     if not element:
0565         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0566 
0567     blob = json.loads(request.data)
0568     text = blob['text']
0569 
0570     print(blob)
0571     print(element)
0572 
0573     try:
0574         offset = element.queryText().caretOffset
0575         textElement = element.queryEditableText()
0576         textElement.insertText(offset, text, len(text))
0577         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0578     except NotImplementedError:
0579         print("element is not text type, falling back to synthesizing keyboard events")
0580         action = element.queryAction()
0581         processed = False
0582         for i in range(0, action.nActions):
0583             if action.getName(i) == 'SetFocus':
0584                 processed = True
0585                 action.doAction(i)
0586                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0587                 generate_keyboard_event_text(text)
0588                 break
0589         if not processed:
0590             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0591         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0592 
0593 
0594 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST'])
0595 def session_element_clear(session_id, element_id):
0596     session = sessions[session_id]
0597     if not session:
0598         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0599 
0600     element = session.elements[element_id]
0601     if not element:
0602         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0603 
0604     characterCount = element.queryText().characterCount
0605     try:
0606         textElement = element.queryEditableText()
0607         textElement.deleteText(0, characterCount)
0608         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0609     except NotImplementedError:
0610         print("element is not text type, falling back to synthesizing keyboard events")
0611         action = element.queryAction()
0612         processed = False
0613         for i in range(0, action.nActions):
0614             if action.getName(i) == 'SetFocus':
0615                 processed = True
0616                 action.doAction(i)
0617                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0618 
0619                 pseudo_text = ''
0620                 pseudo_text += '\ue010' # end
0621                 for _ in range(characterCount):
0622                     pseudo_text += '\ue003'  # backspace
0623                 generate_keyboard_event_text(pseudo_text)
0624                 break
0625         if not processed:
0626             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0627         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0628 
0629 
0630 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST'])
0631 def session_appium_device_app_state(session_id):
0632     session = sessions[session_id]
0633     if not session:
0634         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0635 
0636     blob = json.loads(request.data)
0637     appId = blob['appId']
0638 
0639     proc = subprocess.Popen(
0640         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0641     out, err = proc.communicate()
0642 
0643     apps = json.loads(out)
0644     print(apps)
0645     if appId in apps.values():
0646         return json.dumps({'value': 4}), 200, {'content-type': 'application/json'}
0647     # TODO: implement rest of codes
0648     return json.dumps({'value': 1}), 200, {'content-type': 'application/json'}
0649 
0650 
0651 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST'])
0652 def session_appium_device_terminate_app(session_id):
0653     session = sessions[session_id]
0654     if not session:
0655         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0656 
0657     blob = json.loads(request.data)
0658     appId = blob['appId']
0659 
0660     proc = subprocess.Popen(
0661         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0662     out, err = proc.communicate()
0663 
0664     apps = json.loads(out)
0665     if appId in apps.values():
0666         pid = list(apps.keys())[list(apps.values()).index(appId)]
0667         os.kill(int(pid), signal.SIGKILL)
0668     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0669 
0670 
0671 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST'])
0672 def session_appium_device_press_keycode(session_id):
0673     session = sessions[session_id]
0674     if not session:
0675         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0676 
0677     blob = json.loads(request.data)
0678     keycode = blob['keycode']
0679     # Not doing anything with these for now
0680     # metastate = blob['metastate']
0681     # flags = blob['flags']
0682     # FIXME needs testing
0683     generate_keyboard_event_text(keycode)
0684     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0685 
0686 
0687 @app.route('/session/<session_id>/actions', methods=['POST'])
0688 def session_actions(session_id):
0689     session = sessions[session_id]
0690     if not session:
0691         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0692 
0693     blob = json.loads(request.data)
0694 
0695     if 'KWIN_PID' in os.environ:
0696         with tempfile.NamedTemporaryFile() as file:
0697             file.write(request.data)
0698             file.flush()
0699             subprocess.run(["selenium-webdriver-at-spi-inputsynth", file.name])
0700     else:
0701         raise RuntimeError("actions only work with nested kwin!")
0702 
0703     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0704 
0705 
0706 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST'])
0707 def session_appium_device_get_clipboard(session_id):
0708     session = sessions[session_id]
0709     if not session:
0710         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0711 
0712     blob = json.loads(request.data)
0713     contentType = blob['contentType']
0714 
0715     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0716     window = Gtk.Window()
0717     window.set_default_size(20, 20)
0718     window.show()
0719     display = window.get_display()
0720     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0721 
0722     spin_glib_main_context()
0723 
0724     data = None
0725     if contentType == 'plaintext':
0726         data = clipboard.wait_for_text()
0727     else:
0728         raise 'content type not currently supported'
0729 
0730     window.close()
0731 
0732     spin_glib_main_context()
0733 
0734     return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'}
0735 
0736 
0737 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST'])
0738 def session_appium_device_set_clipboard(session_id):
0739     session = sessions[session_id]
0740     if not session:
0741         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0742 
0743     blob = json.loads(request.data)
0744     contentType = blob['contentType']
0745     content = blob['content']
0746 
0747     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0748     window = Gtk.Window()
0749     window.set_default_size(20, 20)
0750     display = window.get_display()
0751     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0752 
0753     if contentType == 'plaintext':
0754         clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1)
0755     else:
0756         raise 'content type not currently supported'
0757 
0758     spin_glib_main_context()
0759 
0760     window.close()
0761 
0762     spin_glib_main_context()
0763 
0764     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0765 
0766 
0767 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST'])
0768 def session_appium_element_value(session_id, element_id):
0769     session = sessions[session_id]
0770     if not session:
0771         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0772 
0773     element = session.elements[element_id]
0774     if not element:
0775         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0776 
0777     blob = json.loads(request.data)
0778     print("blob:", blob)
0779     print(element)
0780     value = blob['text']
0781 
0782     try:
0783         valueElement = element.queryValue()
0784         valueElement.currentValue = float(value)
0785         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0786     except NotImplementedError:
0787         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0788 
0789 
0790 @app.route('/session/<session_id>/screenshot', methods=['GET'])
0791 def session_appium_screenshot(session_id):
0792     session = sessions[session_id]
0793     if not session:
0794         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0795 
0796     # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display
0797     # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN)
0798     # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize()
0799 
0800     proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter',
0801                              str(0), str(0), str(0), str(0)],
0802                             stdout=subprocess.PIPE)
0803     out, err = proc.communicate()
0804 
0805     if not out:
0806         return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'}
0807 
0808     return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'}
0809 
0810 
0811 @app.route('/session/<session_id>/appium/compare_images', methods=['POST'])
0812 def session_appium_compare_images(session_id):
0813     """
0814     Reference:
0815     - https://github.com/appium/python-client/blob/master/appium/webdriver/extensions/images_comparison.py
0816     - https://github.com/appium/appium/blob/master/packages/opencv/lib/index.js
0817     """
0818     session = sessions[session_id]
0819     if not session:
0820         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0821 
0822     blob = json.loads(request.data)
0823     mode: str = blob['mode']
0824     options: dict = blob['options']
0825     return_value: dict = {}
0826 
0827     import cv2 as cv  # The extension is slow, so load it on demand
0828 
0829     cv_image1 = cv.imdecode(np.fromstring(base64.b64decode(blob['firstImage']), np.uint8), cv.IMREAD_COLOR)
0830     cv_image2 = cv.imdecode(np.fromstring(base64.b64decode(blob['secondImage']), np.uint8), cv.IMREAD_COLOR)
0831 
0832     if mode == 'matchFeatures':
0833         # https://docs.opencv.org/3.0-beta/doc/py_tutorials/py_feature2d/py_matcher/py_matcher.html
0834         detectorName: str = options.get('detectorName', 'ORB')
0835         matchFunc: str = options.get('matchFunc', 'BruteForce')
0836         goodMatchesFactor: int = options.get('goodMatchesFactor', -1)
0837 
0838         if detectorName == 'AKAZE':
0839             detector = cv.AKAZE.create()
0840         elif detectorName == 'AGAST':
0841             detector = cv.AgastFeatureDetector.create()
0842         elif detectorName == 'BRISK':
0843             detector = cv.BRISK.create()
0844         elif detectorName == 'FAST':
0845             detector = cv.FastFeatureDetector.create()
0846         elif detectorName == 'GFTT':
0847             detector = cv.GFTTDetector.create()
0848         elif detectorName == 'KAZE':
0849             detector = cv.KAZE.create()
0850         elif detectorName == 'MSER':
0851             detector = cv.SIFT.create()
0852         else:
0853             detector = cv.ORB.create()
0854 
0855         if matchFunc == 'FlannBased':
0856             matcher = cv.FlannBasedMatcher.create()
0857         elif matchFunc == 'BruteForceL1':
0858             matcher = cv.BFMatcher.create(cv.NORM_L1, crossCheck=True)
0859         elif matchFunc == 'BruteForceHamming':
0860             matcher = cv.BFMatcher.create(cv.NORM_HAMMING, crossCheck=True)
0861         elif matchFunc == 'BruteForceHammingLut':
0862             matcher = cv.BFMatcher.create(cv.NORM_HAMMING2, crossCheck=True)
0863         elif matchFunc == 'BruteForceSL2':
0864             matcher = cv.BFMatcher.create(cv.NORM_L2, crossCheck=True)
0865         else:
0866             matcher = cv.BFMatcher.create(cv.NORM_L2, crossCheck=True)
0867 
0868         # Find the keypoints and descriptors
0869         kp1, des1 = detector.detectAndCompute(cv_image1, None)
0870         kp2, des2 = detector.detectAndCompute(cv_image2, None)
0871         matches = sorted(matcher.match(des1, des2), key=lambda m: m.distance)
0872 
0873         if len(matches) < 1:
0874             return json.dumps({'value': {'error': 'Could not find any matches between images. Double-check orientation, resolution, or use another detector or matching function.'}}), 404, {'content-type': 'application/json'}
0875 
0876         return_value['count'] = min(len(matches), goodMatchesFactor) if goodMatchesFactor > 0 else len(matches)
0877         return_value['points1'] = [kp1[m.queryIdx].pt for m in matches]
0878         return_value['rect1'] = calculate_matched_rect(return_value['points1'])
0879         return_value['points2'] = [kp2[m.trainIdx].pt for m in matches]
0880         return_value['rect2'] = calculate_matched_rect(return_value['points2'])
0881 
0882     elif mode == 'matchTemplate':
0883         threshold: float = options.get('threshold', 0.0)  # Exact match
0884 
0885         matched = cv.matchTemplate(cv_image1, cv_image2, cv.TM_SQDIFF_NORMED)
0886         min_val, max_val, min_loc, max_loc = cv.minMaxLoc(matched)
0887         print(min_val, max_val, min_loc, max_loc)
0888 
0889         if min_val <= threshold:
0890             x, y = min_loc
0891             return_value['rect'] = {
0892                 'x': x,
0893                 "y": y,
0894                 'width': cv_image2.shape[1],
0895                 'height': cv_image2.shape[0],
0896             }
0897         else:
0898             return json.dumps({'value': {'error': 'Cannot find any occurrences of the partial image in the full image.'}}), 404, {'content-type': 'application/json'}
0899 
0900     elif mode == 'getSimilarity':
0901         if cv_image1.shape != cv_image2.shape:
0902             return json.dumps({'value': {'error': 'Both images are expected to have the same size in order to calculate the similarity score.'}}), 404, {'content-type': 'application/json'}
0903         matched = cv.matchTemplate(cv_image1, cv_image2, cv.TM_SQDIFF_NORMED)
0904         min_val, max_val, min_loc, max_loc = cv.minMaxLoc(matched)
0905         print(min_val, max_val, min_loc, max_loc)
0906         return_value['score'] = 1.0 - min_val
0907 
0908     else:
0909         return json.dumps({'value': {'error': 'Mode is not supported'}}), 404, {'content-type': 'application/json'}
0910 
0911     return json.dumps({'value': return_value}), 200, {'content-type': 'application/json'}
0912 
0913 
0914 def calculate_matched_rect(matched_points: list[tuple[int, int]]) -> dict[str, int]:
0915     if len(matched_points) < 2:
0916         return {
0917             'x': 0,
0918             "y": 0,
0919             'width': 0,
0920             'height': 0,
0921         }
0922 
0923     points_sorted_by_distance = sorted(matched_points, key=lambda pt: pt[0] * pt[0] + pt[1] * pt[1])
0924     first_point = points_sorted_by_distance[0]
0925     last_point = points_sorted_by_distance[1]
0926 
0927     return {
0928         'x': min(first_point[0], last_point[0]),
0929         "y": min(first_point[1], last_point[1]),
0930         'width': abs(first_point[0] - last_point[0]),
0931         'height': abs(first_point[1] - last_point[1]),
0932     }
0933 
0934 
0935 def generate_keyboard_event_text(text):
0936     # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now)
0937     if 'KWIN_PID' in os.environ:
0938         with tempfile.NamedTemporaryFile() as fp:
0939             actions = []
0940             for ch in text:
0941                 actions.append({'type': 'keyDown', 'value': ch})
0942                 actions.append({'type': 'keyUp', 'value': ch})
0943             print({'actions': {'type': 'key', 'id': 'key', 'actions': actions}})
0944             fp.write(json.dumps({'actions': [{'type': 'key', 'id': 'key', 'actions': actions}]}).encode())
0945             fp.flush()
0946             subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name])
0947     else:
0948         for ch in text:
0949             pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM)
0950             time.sleep(EVENTLOOP_TIME)
0951 
0952 def keyval_to_keycode(keyval):
0953     keymap = Gdk.Keymap.get_default()
0954     ret, keys = keymap.get_entries_for_keyval(keyval)
0955     if not ret:
0956         raise RuntimeError("Failed to map key!")
0957     # FIXME layout 0 is not necessarily the current one (e.g. in the kcm we can configure multiple layouts)
0958     return keys[0]
0959 
0960 
0961 def char_to_keyval(ch):
0962     print("----------::::::")
0963     keyval = Gdk.unicode_to_keyval(ord(ch))
0964     # I Don't know why this doesn't work, also doesn't work with \033 as input. :((
0965     # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c
0966     # Other useful resources:
0967     # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h
0968     if ch == "\uE00C":
0969         keyval = 0xff1b # escape
0970     elif ch == "\ue03d":
0971         keyval = 0xffeb # left meta
0972     elif ch == "\ue006":
0973         keyval = 0xff0d # return
0974     elif ch == "\ue007":
0975         keyval = 0xff8d # enter
0976     elif ch == "\ue003":
0977         keyval = 0xff08 # backspace
0978     elif ch == "\ue010":
0979         keyval = 0xff57 # end
0980     elif ch == "\ue012":
0981         keyval = 0xff51 # left
0982     elif ch == "\ue014":
0983         keyval = 0xff53 # right
0984     elif ch == "\ue013":
0985         keyval = 0xff52 # up
0986     elif ch == "\ue015":
0987         keyval = 0xff54 # down
0988     elif ch == "\ue004":
0989         keyval = 0xff09 # tab
0990     print(ord(ch))
0991     print(hex(keyval))
0992     return keyval
0993 
0994 
0995 def spin_glib_main_context(repeat: int = 4):
0996     context = GLib.MainContext.default()
0997     for _ in range(repeat):
0998         time.sleep(EVENTLOOP_TIME_LONG)
0999         while context.pending():
1000             context.iteration(may_block=False)