File indexing completed on 2024-04-21 16:29:28
0001 # SPDX-License-Identifier: AGPL-3.0-or-later 0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org> 0003 0004 import base64 0005 from datetime import datetime, timedelta 0006 import numpy as np 0007 import tempfile 0008 import time 0009 import traceback 0010 from flask import Flask, request, jsonify 0011 import uuid 0012 import json 0013 import sys 0014 import os 0015 import signal 0016 import subprocess 0017 from werkzeug.exceptions import HTTPException 0018 0019 import pyatspi 0020 from lxml import etree 0021 0022 import gi 0023 from gi.repository import GLib 0024 from gi.repository import Gio 0025 gi.require_version('Gdk', '3.0') 0026 from gi.repository import Gdk 0027 gi.require_version('Gtk', '3.0') 0028 from gi.repository import Gtk 0029 0030 from app_roles import ROLE_NAMES 0031 0032 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so 0033 # we lose nothing but gain the reduced sucking of python. 0034 0035 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md 0036 # https://www.w3.org/TR/webdriver 0037 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md 0038 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/ 0039 0040 EVENTLOOP_TIME = 0.1 0041 EVENTLOOP_TIME_LONG = 0.5 0042 sys.stdout = sys.stderr 0043 sessions = {} # global dict of open sessions 0044 0045 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk 0046 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream. 0047 pyatspi.setTimeout(4000, 15000) 0048 0049 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework. 0050 app = Flask(__name__) 0051 0052 @app.errorhandler(Exception) 0053 def unknown_error(e): 0054 if isinstance(e, HTTPException): 0055 return e 0056 0057 return errorFromException(error='unknown error', exception=e), 500 0058 0059 @app.route('/status', methods=['GET']) 0060 def status(): 0061 body = { 0062 'value': { 0063 'ready': 'true', 0064 'message': 'There is only one state. Hooray!' 0065 } 0066 } 0067 return json.dumps(body), 200, {'content-type': 'application/json'} 0068 0069 0070 def _createNode2(accessible, parentElement, indexInParents=[]): 0071 if not accessible: 0072 return 0073 # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root. 0074 if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry": 0075 return 0076 0077 roleName = accessible.getRoleName() 0078 e = None 0079 if roleName: 0080 e = etree.Element(roleName.replace(" ", "_")) 0081 else: 0082 e = etree.Element("accessible") 0083 0084 e.set("name", accessible.name) 0085 e.set("role", str(int(accessible.getRole()))) 0086 e.set("description", accessible.description) 0087 if accessible.accessibleId != None: 0088 e.set("accessibility-id", accessible.accessibleId) 0089 # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes. 0090 # while generating the xml. 0091 # path = pyatspi.getPath(accessible) 0092 # path is a list of ints for the indexes within the parents 0093 path_strs = [str(x) for x in indexInParents] 0094 e.set("path", ' '.join(path_strs)) 0095 0096 states = [] 0097 for state in accessible.getState().getStates(): 0098 states.append(pyatspi.stateToString(state)) 0099 e.set("states", ', '.join(states)) 0100 0101 for i in range(0, accessible.childCount): 0102 newIndex = indexInParents.copy() 0103 newIndex.append(i) 0104 _createNode2(accessible.getChildAtIndex(i), e, newIndex) 0105 0106 if parentElement != None: 0107 parentElement.append(e) 0108 else: 0109 return e 0110 0111 0112 def errorFromMessage(error, message): 0113 return jsonify({'value': {'error': error, 'message': message}}) 0114 0115 0116 def errorFromException(error, exception): 0117 return jsonify({'value': {'error': error, 'message': str(exception), 'stacktrace': traceback.format_exc()}}) 0118 0119 0120 @app.route('/') 0121 def index(): 0122 return 'Servas' 0123 0124 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking 0125 # an app. 0126 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the 0127 # REST app functions to the Session object. 0128 0129 0130 class Session: 0131 0132 def __init__(self) -> None: 0133 self.id = str(uuid.uuid1()) 0134 self.elements = {} # a cache to hold elements between finding and interacting with 0135 self.browsing_context = None 0136 self.pid = -1 0137 # implicit deviates from spec, 0 is unreasonable 0138 self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000} 0139 self.launched = False 0140 0141 blob = json.loads(request.data) 0142 print(request.data) 0143 # TODO the blob from ruby is much more complicated god only knows why 0144 desired_app = None 0145 desired_timeouts = None 0146 desired_environ = None 0147 if 'desiredCapabilities' in blob: 0148 if 'app' in blob['desiredCapabilities']: 0149 desired_app = blob['desiredCapabilities']['app'] 0150 else: 0151 desired_app = blob['desiredCapabilities']['appium:app'] 0152 if 'timeouts' in blob['desiredCapabilities']: 0153 desired_timeouts = blob['desiredCapabilities']['timeouts'] 0154 if 'appium:environ' in blob['desiredCapabilities']: 0155 desired_environ = blob['desiredCapabilities']['appium:environ'] 0156 else: 0157 if 'app' in blob['capabilities']['alwaysMatch']: 0158 desired_app = blob['capabilities']['alwaysMatch']['app'] 0159 else: 0160 desired_app = blob['capabilities']['alwaysMatch']['appium:app'] 0161 if 'timeouts' in blob['capabilities']['alwaysMatch']: 0162 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts'] 0163 if 'appium:environ' in blob['capabilities']['alwaysMatch']: 0164 desired_environ = blob['capabilities']['alwaysMatch']['appium:environ'] 0165 0166 if desired_timeouts: 0167 if 'script' in desired_timeouts: 0168 self.timeouts['script'] = desired_timeouts['script'] 0169 if 'pageLoad' in desired_timeouts: 0170 self.timeouts['pageLoad'] = desired_timeouts['pageLoad'] 0171 if 'implicit' in desired_timeouts: 0172 self.timeouts['implicit'] = desired_timeouts['implicit'] 0173 0174 if desired_app == 'Root': 0175 # NB: at the time of writing there can only be one desktop ever 0176 self.browsing_context = pyatspi.Registry.getDesktop(0) 0177 return 0178 0179 self.launched = True 0180 end_time = datetime.now() + \ 0181 timedelta(milliseconds=(self.timeouts['implicit'] * 2)) 0182 0183 context = Gio.AppLaunchContext() 0184 context.setenv('QT_ACCESSIBILITY', '1') 0185 context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1') 0186 context.setenv('KIO_DISABLE_CACHE_CLEANER', '1') # don't dangle 0187 if isinstance(desired_environ, dict): 0188 for key, value in desired_environ.items(): 0189 context.setenv(key, value) 0190 0191 def on_launched(context, info, platform_data): 0192 self.pid = platform_data['pid'] 0193 print("launched " + str(self.pid)) 0194 0195 while datetime.now() < end_time: 0196 for desktop_index in range(pyatspi.Registry.getDesktopCount()): 0197 desktop = pyatspi.Registry.getDesktop(desktop_index) 0198 for app in desktop: 0199 try: 0200 if app.get_process_id() == self.pid: 0201 self.browsing_context = app 0202 break 0203 except gi.repository.GLib.GError: 0204 print('stumbled over a broken process. ignoring...') 0205 continue 0206 if self.browsing_context: 0207 break 0208 if self.browsing_context: 0209 break 0210 if not self.browsing_context: 0211 raise RuntimeError('Failed to find application on a11y bus within time limit!') 0212 0213 context.connect("launched", on_launched) 0214 0215 if desired_app.endswith(".desktop"): 0216 appinfo = Gio.DesktopAppInfo.new(desired_app) 0217 appinfo.launch([], context) 0218 elif desired_app.isnumeric(): 0219 on_launched(None, None, {'pid': int(desired_app)}) 0220 else: 0221 appinfo = Gio.AppInfo.create_from_commandline( 0222 desired_app, None, Gio.AppInfoCreateFlags.NONE) 0223 appinfo.launch([], context) 0224 print("browsing context set to:", self.browsing_context) 0225 0226 def close(self) -> None: 0227 if self.launched: 0228 try: 0229 os.kill(self.pid, signal.SIGKILL) 0230 except ProcessLookupError: 0231 pass 0232 0233 0234 @app.route('/session', methods=['GET', 'POST', 'DELETE']) 0235 def session(): 0236 if request.method == 'POST': 0237 # TODO: 0238 # https://www.w3.org/TR/webdriver1/#new-session 0239 # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14 0240 print(request) 0241 try: 0242 session = Session() 0243 except Exception as e: 0244 return errorFromException(error='session not created', exception=e), 500 0245 sessions[session.id] = session 0246 print(sessions) 0247 0248 if session.browsing_context is None: 0249 return errorFromMessage(error='session not created', 0250 message='Application was not found on the a11y bus for unknown reasons. It probably failed to register on the bus.'), 500 0251 0252 return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'} 0253 elif request.method == 'GET': 0254 # TODO impl 0255 print(request) 0256 elif request.method == 'DELETE': 0257 # TODO spec review 0258 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0259 0260 0261 @app.route('/session/<session_id>', methods=['DELETE']) 0262 def session_delete(session_id): 0263 if request.method == 'DELETE': 0264 # TODO spec review 0265 session = sessions[session_id] 0266 if not session: 0267 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0268 0269 session.close() 0270 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0271 0272 0273 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST']) 0274 def session_implicit_wait(session_id): 0275 session = sessions[session_id] 0276 if not session: 0277 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0278 0279 blob = json.loads(request.data) 0280 ms = blob['ms'] 0281 0282 session.timeouts['implicit'] = ms 0283 return json.dumps({'value': None}) 0284 0285 0286 @app.route('/session/<session_id>/source', methods=['GET']) 0287 def session_source(session_id): 0288 session = sessions[session_id] 0289 if not session: 0290 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0291 0292 doc = _createNode2(session.browsing_context, None) 0293 return json.dumps({'value': etree.tostring(doc, pretty_print=False).decode("utf-8")}), 200, {'content-type': 'application/xml'} 0294 0295 0296 # NB: custom method to get the source without json wrapper 0297 @app.route('/session/<session_id>/sourceRaw', methods=['GET']) 0298 def session_source_raw(session_id): 0299 session = sessions[session_id] 0300 if not session: 0301 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0302 0303 doc = _createNode2(session.browsing_context, None) 0304 return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'} 0305 0306 0307 def locator(session, strategy, selector, start, findAll = False): 0308 # pyatspi.findDescendant(start, lambda x: print(x)) 0309 0310 end_time = datetime.now() + \ 0311 timedelta(milliseconds=session.timeouts['implicit']) 0312 results = [] 0313 0314 while datetime.now() < end_time: 0315 if strategy == 'xpath': 0316 print("-- xml") 0317 doc = _createNode2(start, None) 0318 for c in doc.xpath(selector): 0319 path = [int(x) for x in c.get('path').split()] 0320 # path is relative to the app root, not our start item! 0321 item = session.browsing_context 0322 for i in path: 0323 item = item[i] 0324 if c.get('name') != item.name or c.get('description') != item.description: 0325 return [] 0326 results.append(item) 0327 print("-- xml") 0328 else: 0329 # TODO can I switch this in python +++ raise on unmapped strategy 0330 pred = None 0331 if strategy == 'accessibility id': 0332 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE)) 0333 # pyatspi strings "[ roleName | name ]" 0334 elif strategy == 'class name': 0335 def pred(x): return str(x) == selector and (x.getState().contains( 0336 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0337 elif strategy == 'name': 0338 def pred(x): return x.name == selector and (x.getState().contains( 0339 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0340 elif strategy == 'description': 0341 def pred(x): return x.description == selector and (x.getState().contains( 0342 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0343 # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug: 0344 if findAll: 0345 accessible = pyatspi.findAllDescendants(start, pred) 0346 if accessible: 0347 results += accessible 0348 else: 0349 accessible = pyatspi.findDescendant(start, pred) 0350 if accessible: 0351 results.append(accessible) 0352 print(accessible) 0353 if len(results) > 0: 0354 break 0355 0356 return results 0357 0358 0359 @app.route('/session/<session_id>/element', methods=['GET', 'POST']) 0360 def session_element(session_id=None): 0361 # https://www.w3.org/TR/webdriver1/#dfn-find-element 0362 0363 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0364 print(request.url) 0365 print(session_id) 0366 print(request.args) 0367 print(request.data) 0368 session = sessions[session_id] 0369 blob = json.loads(request.data) 0370 0371 strategy = blob['using'] 0372 selector = blob['value'] 0373 if not strategy or not selector: 0374 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0375 0376 start = session.browsing_context 0377 if not start: # browsing context (no longer) valid 0378 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0379 0380 results = locator(session, strategy, selector, start) 0381 0382 if not results: 0383 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0384 0385 result = results[0] 0386 unique_id = result.path.replace('/', '-') 0387 session.elements[unique_id] = result 0388 return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'} 0389 0390 0391 @app.route('/session/<session_id>/elements', methods=['GET', 'POST']) 0392 def session_element2(session_id=None): 0393 # https://www.w3.org/TR/webdriver1/#dfn-find-elements 0394 0395 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0396 print(request.url) 0397 print(session_id) 0398 print(request.args) 0399 print(request.data) 0400 session = sessions[session_id] 0401 blob = json.loads(request.data) 0402 0403 strategy = blob['using'] 0404 selector = blob['value'] 0405 if not strategy or not selector: 0406 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0407 0408 start = session.browsing_context 0409 if not start: # browsing context (no longer) valid 0410 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0411 0412 results = locator(session, strategy, selector, start, findAll = True) 0413 0414 if not results: 0415 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0416 0417 serializations = [] 0418 for result in results: 0419 unique_id = result.path.replace('/', '-') 0420 print(unique_id) 0421 session.elements[unique_id] = result 0422 serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id}) 0423 0424 return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'} 0425 0426 0427 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST']) 0428 def session_element_click(session_id, element_id): 0429 session = sessions[session_id] 0430 if not session: 0431 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0432 0433 element = session.elements[element_id] 0434 if not element: 0435 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0436 0437 action = element.queryAction() 0438 availableActions = {} 0439 for i in range(0, action.nActions): 0440 availableActions[action.getName(i)] = i 0441 0442 keys = availableActions.keys() 0443 if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click 0444 print("actioning focus") 0445 action.doAction(availableActions['SetFocus']) 0446 time.sleep(EVENTLOOP_TIME) 0447 0448 try: 0449 if 'Press' in keys: 0450 print("actioning press") 0451 action.doAction(availableActions['Press']) 0452 elif 'Toggle' in keys: 0453 print("actioning toggle") 0454 action.doAction(availableActions['Toggle']) 0455 elif 'ShowMenu' in keys: 0456 print("actioning showmenu") 0457 action.doAction(availableActions['ShowMenu']) 0458 except gi.repository.GLib.GError as e: 0459 print(e) 0460 print("Ignoring! There is a chance your application is misbehaving. Could also just be a blocked eventloop though.") 0461 time.sleep(EVENTLOOP_TIME) 0462 0463 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0464 0465 0466 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET']) 0467 def session_element_text(session_id, element_id): 0468 session = sessions[session_id] 0469 element = session.elements[element_id] 0470 try: 0471 textElement = element.queryText() 0472 return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'} 0473 except NotImplementedError: 0474 return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'} 0475 0476 0477 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET']) 0478 def session_element_enabled(session_id, element_id): 0479 session = sessions[session_id] 0480 element = session.elements[element_id] 0481 return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'} 0482 0483 0484 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET']) 0485 def session_element_displayed(session_id, element_id): 0486 session = sessions[session_id] 0487 element = session.elements[element_id] 0488 return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'} 0489 0490 0491 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET']) 0492 def session_element_selected(session_id, element_id): 0493 session = sessions[session_id] 0494 element = session.elements[element_id] 0495 return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'} 0496 0497 0498 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET']) 0499 def session_element_attribute(session_id, element_id, name): 0500 session = sessions[session_id] 0501 if not session: 0502 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0503 0504 element = session.elements[element_id] 0505 if not element: 0506 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0507 0508 print(request.data) 0509 0510 if name == "accessibility-id": 0511 return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'} 0512 0513 if name == "name": 0514 return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'} 0515 0516 if name == "value": 0517 elementValue = element.queryValue() 0518 return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'} 0519 0520 print(pyatspi.STATE_VALUE_TO_NAME) 0521 result = None 0522 for value, string in pyatspi.STATE_VALUE_TO_NAME.items(): 0523 if string == name: 0524 result = element.getState().contains(value) 0525 break 0526 0527 return json.dumps({'value': result}), 200, {'content-type': 'application/json'} 0528 0529 0530 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST']) 0531 def session_element_element(session_id, element_id): 0532 session = sessions[session_id] 0533 if not session: 0534 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0535 0536 blob = json.loads(request.data) 0537 print(blob) 0538 strategy = blob['using'] 0539 selector = blob['value'] 0540 if not strategy or not selector: 0541 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0542 0543 start = session.elements[element_id] 0544 if not start: # browsing context (no longer) valid 0545 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0546 0547 results = locator(session, strategy, selector, start) 0548 0549 if not results: 0550 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0551 0552 result = results[0] 0553 unique_id = result.path.replace('/', '-') 0554 session.elements[unique_id] = result 0555 return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'} 0556 0557 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST']) 0558 def session_element_value(session_id, element_id): 0559 session = sessions[session_id] 0560 if not session: 0561 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0562 0563 element = session.elements[element_id] 0564 if not element: 0565 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0566 0567 blob = json.loads(request.data) 0568 text = blob['text'] 0569 0570 print(blob) 0571 print(element) 0572 0573 try: 0574 offset = element.queryText().caretOffset 0575 textElement = element.queryEditableText() 0576 textElement.insertText(offset, text, len(text)) 0577 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0578 except NotImplementedError: 0579 print("element is not text type, falling back to synthesizing keyboard events") 0580 action = element.queryAction() 0581 processed = False 0582 for i in range(0, action.nActions): 0583 if action.getName(i) == 'SetFocus': 0584 processed = True 0585 action.doAction(i) 0586 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0587 generate_keyboard_event_text(text) 0588 break 0589 if not processed: 0590 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0591 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0592 0593 0594 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST']) 0595 def session_element_clear(session_id, element_id): 0596 session = sessions[session_id] 0597 if not session: 0598 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0599 0600 element = session.elements[element_id] 0601 if not element: 0602 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0603 0604 characterCount = element.queryText().characterCount 0605 try: 0606 textElement = element.queryEditableText() 0607 textElement.deleteText(0, characterCount) 0608 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0609 except NotImplementedError: 0610 print("element is not text type, falling back to synthesizing keyboard events") 0611 action = element.queryAction() 0612 processed = False 0613 for i in range(0, action.nActions): 0614 if action.getName(i) == 'SetFocus': 0615 processed = True 0616 action.doAction(i) 0617 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0618 0619 pseudo_text = '' 0620 pseudo_text += '\ue010' # end 0621 for _ in range(characterCount): 0622 pseudo_text += '\ue003' # backspace 0623 generate_keyboard_event_text(pseudo_text) 0624 break 0625 if not processed: 0626 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0627 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0628 0629 0630 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST']) 0631 def session_appium_device_app_state(session_id): 0632 session = sessions[session_id] 0633 if not session: 0634 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0635 0636 blob = json.loads(request.data) 0637 appId = blob['appId'] 0638 0639 proc = subprocess.Popen( 0640 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0641 out, err = proc.communicate() 0642 0643 apps = json.loads(out) 0644 print(apps) 0645 if appId in apps.values(): 0646 return json.dumps({'value': 4}), 200, {'content-type': 'application/json'} 0647 # TODO: implement rest of codes 0648 return json.dumps({'value': 1}), 200, {'content-type': 'application/json'} 0649 0650 0651 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST']) 0652 def session_appium_device_terminate_app(session_id): 0653 session = sessions[session_id] 0654 if not session: 0655 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0656 0657 blob = json.loads(request.data) 0658 appId = blob['appId'] 0659 0660 proc = subprocess.Popen( 0661 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0662 out, err = proc.communicate() 0663 0664 apps = json.loads(out) 0665 if appId in apps.values(): 0666 pid = list(apps.keys())[list(apps.values()).index(appId)] 0667 os.kill(int(pid), signal.SIGKILL) 0668 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0669 0670 0671 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST']) 0672 def session_appium_device_press_keycode(session_id): 0673 session = sessions[session_id] 0674 if not session: 0675 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0676 0677 blob = json.loads(request.data) 0678 keycode = blob['keycode'] 0679 # Not doing anything with these for now 0680 # metastate = blob['metastate'] 0681 # flags = blob['flags'] 0682 # FIXME needs testing 0683 generate_keyboard_event_text(keycode) 0684 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0685 0686 0687 @app.route('/session/<session_id>/actions', methods=['POST']) 0688 def session_actions(session_id): 0689 session = sessions[session_id] 0690 if not session: 0691 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0692 0693 blob = json.loads(request.data) 0694 0695 if 'KWIN_PID' in os.environ: 0696 with tempfile.NamedTemporaryFile() as file: 0697 file.write(request.data) 0698 file.flush() 0699 subprocess.run(["selenium-webdriver-at-spi-inputsynth", file.name]) 0700 else: 0701 raise RuntimeError("actions only work with nested kwin!") 0702 0703 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0704 0705 0706 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST']) 0707 def session_appium_device_get_clipboard(session_id): 0708 session = sessions[session_id] 0709 if not session: 0710 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0711 0712 blob = json.loads(request.data) 0713 contentType = blob['contentType'] 0714 0715 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0716 window = Gtk.Window() 0717 window.set_default_size(20, 20) 0718 window.show() 0719 display = window.get_display() 0720 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0721 0722 spin_glib_main_context() 0723 0724 data = None 0725 if contentType == 'plaintext': 0726 data = clipboard.wait_for_text() 0727 else: 0728 raise 'content type not currently supported' 0729 0730 window.close() 0731 0732 spin_glib_main_context() 0733 0734 return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'} 0735 0736 0737 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST']) 0738 def session_appium_device_set_clipboard(session_id): 0739 session = sessions[session_id] 0740 if not session: 0741 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0742 0743 blob = json.loads(request.data) 0744 contentType = blob['contentType'] 0745 content = blob['content'] 0746 0747 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0748 window = Gtk.Window() 0749 window.set_default_size(20, 20) 0750 display = window.get_display() 0751 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0752 0753 if contentType == 'plaintext': 0754 clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1) 0755 else: 0756 raise 'content type not currently supported' 0757 0758 spin_glib_main_context() 0759 0760 window.close() 0761 0762 spin_glib_main_context() 0763 0764 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0765 0766 0767 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST']) 0768 def session_appium_element_value(session_id, element_id): 0769 session = sessions[session_id] 0770 if not session: 0771 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0772 0773 element = session.elements[element_id] 0774 if not element: 0775 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0776 0777 blob = json.loads(request.data) 0778 print("blob:", blob) 0779 print(element) 0780 value = blob['text'] 0781 0782 try: 0783 valueElement = element.queryValue() 0784 valueElement.currentValue = float(value) 0785 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0786 except NotImplementedError: 0787 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0788 0789 0790 @app.route('/session/<session_id>/screenshot', methods=['GET']) 0791 def session_appium_screenshot(session_id): 0792 session = sessions[session_id] 0793 if not session: 0794 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0795 0796 # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display 0797 # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN) 0798 # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize() 0799 0800 proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter', 0801 str(0), str(0), str(0), str(0)], 0802 stdout=subprocess.PIPE) 0803 out, err = proc.communicate() 0804 0805 if not out: 0806 return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'} 0807 0808 return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'} 0809 0810 0811 @app.route('/session/<session_id>/appium/compare_images', methods=['POST']) 0812 def session_appium_compare_images(session_id): 0813 """ 0814 Reference: 0815 - https://github.com/appium/python-client/blob/master/appium/webdriver/extensions/images_comparison.py 0816 - https://github.com/appium/appium/blob/master/packages/opencv/lib/index.js 0817 """ 0818 session = sessions[session_id] 0819 if not session: 0820 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0821 0822 blob = json.loads(request.data) 0823 mode: str = blob['mode'] 0824 options: dict = blob['options'] 0825 return_value: dict = {} 0826 0827 import cv2 as cv # The extension is slow, so load it on demand 0828 0829 cv_image1 = cv.imdecode(np.fromstring(base64.b64decode(blob['firstImage']), np.uint8), cv.IMREAD_COLOR) 0830 cv_image2 = cv.imdecode(np.fromstring(base64.b64decode(blob['secondImage']), np.uint8), cv.IMREAD_COLOR) 0831 0832 if mode == 'matchFeatures': 0833 # https://docs.opencv.org/3.0-beta/doc/py_tutorials/py_feature2d/py_matcher/py_matcher.html 0834 detectorName: str = options.get('detectorName', 'ORB') 0835 matchFunc: str = options.get('matchFunc', 'BruteForce') 0836 goodMatchesFactor: int = options.get('goodMatchesFactor', -1) 0837 0838 if detectorName == 'AKAZE': 0839 detector = cv.AKAZE.create() 0840 elif detectorName == 'AGAST': 0841 detector = cv.AgastFeatureDetector.create() 0842 elif detectorName == 'BRISK': 0843 detector = cv.BRISK.create() 0844 elif detectorName == 'FAST': 0845 detector = cv.FastFeatureDetector.create() 0846 elif detectorName == 'GFTT': 0847 detector = cv.GFTTDetector.create() 0848 elif detectorName == 'KAZE': 0849 detector = cv.KAZE.create() 0850 elif detectorName == 'MSER': 0851 detector = cv.SIFT.create() 0852 else: 0853 detector = cv.ORB.create() 0854 0855 if matchFunc == 'FlannBased': 0856 matcher = cv.FlannBasedMatcher.create() 0857 elif matchFunc == 'BruteForceL1': 0858 matcher = cv.BFMatcher.create(cv.NORM_L1, crossCheck=True) 0859 elif matchFunc == 'BruteForceHamming': 0860 matcher = cv.BFMatcher.create(cv.NORM_HAMMING, crossCheck=True) 0861 elif matchFunc == 'BruteForceHammingLut': 0862 matcher = cv.BFMatcher.create(cv.NORM_HAMMING2, crossCheck=True) 0863 elif matchFunc == 'BruteForceSL2': 0864 matcher = cv.BFMatcher.create(cv.NORM_L2, crossCheck=True) 0865 else: 0866 matcher = cv.BFMatcher.create(cv.NORM_L2, crossCheck=True) 0867 0868 # Find the keypoints and descriptors 0869 kp1, des1 = detector.detectAndCompute(cv_image1, None) 0870 kp2, des2 = detector.detectAndCompute(cv_image2, None) 0871 matches = sorted(matcher.match(des1, des2), key=lambda m: m.distance) 0872 0873 if len(matches) < 1: 0874 return json.dumps({'value': {'error': 'Could not find any matches between images. Double-check orientation, resolution, or use another detector or matching function.'}}), 404, {'content-type': 'application/json'} 0875 0876 return_value['count'] = min(len(matches), goodMatchesFactor) if goodMatchesFactor > 0 else len(matches) 0877 return_value['points1'] = [kp1[m.queryIdx].pt for m in matches] 0878 return_value['rect1'] = calculate_matched_rect(return_value['points1']) 0879 return_value['points2'] = [kp2[m.trainIdx].pt for m in matches] 0880 return_value['rect2'] = calculate_matched_rect(return_value['points2']) 0881 0882 elif mode == 'matchTemplate': 0883 threshold: float = options.get('threshold', 0.0) # Exact match 0884 0885 matched = cv.matchTemplate(cv_image1, cv_image2, cv.TM_SQDIFF_NORMED) 0886 min_val, max_val, min_loc, max_loc = cv.minMaxLoc(matched) 0887 print(min_val, max_val, min_loc, max_loc) 0888 0889 if min_val <= threshold: 0890 x, y = min_loc 0891 return_value['rect'] = { 0892 'x': x, 0893 "y": y, 0894 'width': cv_image2.shape[1], 0895 'height': cv_image2.shape[0], 0896 } 0897 else: 0898 return json.dumps({'value': {'error': 'Cannot find any occurrences of the partial image in the full image.'}}), 404, {'content-type': 'application/json'} 0899 0900 elif mode == 'getSimilarity': 0901 if cv_image1.shape != cv_image2.shape: 0902 return json.dumps({'value': {'error': 'Both images are expected to have the same size in order to calculate the similarity score.'}}), 404, {'content-type': 'application/json'} 0903 matched = cv.matchTemplate(cv_image1, cv_image2, cv.TM_SQDIFF_NORMED) 0904 min_val, max_val, min_loc, max_loc = cv.minMaxLoc(matched) 0905 print(min_val, max_val, min_loc, max_loc) 0906 return_value['score'] = 1.0 - min_val 0907 0908 else: 0909 return json.dumps({'value': {'error': 'Mode is not supported'}}), 404, {'content-type': 'application/json'} 0910 0911 return json.dumps({'value': return_value}), 200, {'content-type': 'application/json'} 0912 0913 0914 def calculate_matched_rect(matched_points: list[tuple[int, int]]) -> dict[str, int]: 0915 if len(matched_points) < 2: 0916 return { 0917 'x': 0, 0918 "y": 0, 0919 'width': 0, 0920 'height': 0, 0921 } 0922 0923 points_sorted_by_distance = sorted(matched_points, key=lambda pt: pt[0] * pt[0] + pt[1] * pt[1]) 0924 first_point = points_sorted_by_distance[0] 0925 last_point = points_sorted_by_distance[1] 0926 0927 return { 0928 'x': min(first_point[0], last_point[0]), 0929 "y": min(first_point[1], last_point[1]), 0930 'width': abs(first_point[0] - last_point[0]), 0931 'height': abs(first_point[1] - last_point[1]), 0932 } 0933 0934 0935 def generate_keyboard_event_text(text): 0936 # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now) 0937 if 'KWIN_PID' in os.environ: 0938 with tempfile.NamedTemporaryFile() as fp: 0939 actions = [] 0940 for ch in text: 0941 actions.append({'type': 'keyDown', 'value': ch}) 0942 actions.append({'type': 'keyUp', 'value': ch}) 0943 print({'actions': {'type': 'key', 'id': 'key', 'actions': actions}}) 0944 fp.write(json.dumps({'actions': [{'type': 'key', 'id': 'key', 'actions': actions}]}).encode()) 0945 fp.flush() 0946 subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name]) 0947 else: 0948 for ch in text: 0949 pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM) 0950 time.sleep(EVENTLOOP_TIME) 0951 0952 def keyval_to_keycode(keyval): 0953 keymap = Gdk.Keymap.get_default() 0954 ret, keys = keymap.get_entries_for_keyval(keyval) 0955 if not ret: 0956 raise RuntimeError("Failed to map key!") 0957 # FIXME layout 0 is not necessarily the current one (e.g. in the kcm we can configure multiple layouts) 0958 return keys[0] 0959 0960 0961 def char_to_keyval(ch): 0962 print("----------::::::") 0963 keyval = Gdk.unicode_to_keyval(ord(ch)) 0964 # I Don't know why this doesn't work, also doesn't work with \033 as input. :(( 0965 # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c 0966 # Other useful resources: 0967 # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h 0968 if ch == "\uE00C": 0969 keyval = 0xff1b # escape 0970 elif ch == "\ue03d": 0971 keyval = 0xffeb # left meta 0972 elif ch == "\ue006": 0973 keyval = 0xff0d # return 0974 elif ch == "\ue007": 0975 keyval = 0xff8d # enter 0976 elif ch == "\ue003": 0977 keyval = 0xff08 # backspace 0978 elif ch == "\ue010": 0979 keyval = 0xff57 # end 0980 elif ch == "\ue012": 0981 keyval = 0xff51 # left 0982 elif ch == "\ue014": 0983 keyval = 0xff53 # right 0984 elif ch == "\ue013": 0985 keyval = 0xff52 # up 0986 elif ch == "\ue015": 0987 keyval = 0xff54 # down 0988 elif ch == "\ue004": 0989 keyval = 0xff09 # tab 0990 print(ord(ch)) 0991 print(hex(keyval)) 0992 return keyval 0993 0994 0995 def spin_glib_main_context(repeat: int = 4): 0996 context = GLib.MainContext.default() 0997 for _ in range(repeat): 0998 time.sleep(EVENTLOOP_TIME_LONG) 0999 while context.pending(): 1000 context.iteration(may_block=False)