File indexing completed on 2023-09-24 09:47:50
0001 # SPDX-License-Identifier: AGPL-3.0-or-later 0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org> 0003 0004 import base64 0005 from datetime import datetime, timedelta 0006 import tempfile 0007 import time 0008 import traceback 0009 from flask import Flask, request, jsonify 0010 import uuid 0011 import json 0012 import sys 0013 import os 0014 import signal 0015 import subprocess 0016 from werkzeug.exceptions import HTTPException 0017 0018 import pyatspi 0019 from lxml import etree 0020 0021 import gi 0022 from gi.repository import GLib 0023 from gi.repository import Gio 0024 gi.require_version('Gdk', '3.0') 0025 from gi.repository import Gdk 0026 gi.require_version('Gtk', '3.0') 0027 from gi.repository import Gtk 0028 0029 from app_roles import ROLE_NAMES 0030 0031 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so 0032 # we lose nothing but gain the reduced sucking of python. 0033 0034 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md 0035 # https://www.w3.org/TR/webdriver 0036 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md 0037 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/ 0038 0039 EVENTLOOP_TIME = 0.1 0040 sys.stdout = sys.stderr 0041 sessions = {} # global dict of open sessions 0042 0043 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk 0044 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream. 0045 pyatspi.setTimeout(4000, 15000) 0046 0047 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework. 0048 app = Flask(__name__) 0049 0050 @app.errorhandler(Exception) 0051 def unknown_error(e): 0052 if isinstance(e, HTTPException): 0053 return e 0054 0055 return errorFromException(error='unknown error', exception=e), 500 0056 0057 @app.route('/status', methods=['GET']) 0058 def status(): 0059 body = { 0060 'value': { 0061 'ready': 'true', 0062 'message': 'There is only one state. Hooray!' 0063 } 0064 } 0065 return json.dumps(body), 200, {'content-type': 'application/json'} 0066 0067 0068 def _createNode2(accessible, parentElement, indexInParents=[]): 0069 if not accessible: 0070 return 0071 # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root. 0072 if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry": 0073 return 0074 0075 roleName = accessible.getRoleName() 0076 e = None 0077 if roleName: 0078 e = etree.Element(roleName.replace(" ", "_")) 0079 else: 0080 e = etree.Element("accessible") 0081 0082 e.set("name", accessible.name) 0083 e.set("role", str(int(accessible.getRole()))) 0084 e.set("description", accessible.description) 0085 if accessible.accessibleId != None: 0086 e.set("accessibility-id", accessible.accessibleId) 0087 # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes. 0088 # while generating the xml. 0089 # path = pyatspi.getPath(accessible) 0090 # path is a list of ints for the indexes within the parents 0091 path_strs = [str(x) for x in indexInParents] 0092 e.set("path", ' '.join(path_strs)) 0093 0094 states = [] 0095 for state in accessible.getState().getStates(): 0096 states.append(pyatspi.stateToString(state)) 0097 e.set("states", ', '.join(states)) 0098 0099 for i in range(0, accessible.childCount): 0100 newIndex = indexInParents.copy() 0101 newIndex.append(i) 0102 _createNode2(accessible.getChildAtIndex(i), e, newIndex) 0103 0104 if parentElement != None: 0105 parentElement.append(e) 0106 else: 0107 return e 0108 0109 0110 def errorFromMessage(error, message): 0111 return jsonify({'value': {'error': error, 'message': message}}) 0112 0113 0114 def errorFromException(error, exception): 0115 return jsonify({'value': {'error': error, 'message': str(exception), 'stacktrace': traceback.format_exc()}}) 0116 0117 0118 @app.route('/') 0119 def index(): 0120 return 'Servas' 0121 0122 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking 0123 # an app. 0124 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the 0125 # REST app functions to the Session object. 0126 0127 0128 class Session: 0129 0130 def __init__(self) -> None: 0131 self.id = str(uuid.uuid1()) 0132 self.elements = {} # a cache to hold elements between finding and interacting with 0133 self.browsing_context = None 0134 self.pid = -1 0135 # implicit deviates from spec, 0 is unreasonable 0136 self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000} 0137 self.launched = False 0138 0139 blob = json.loads(request.data) 0140 print(request.data) 0141 # TODO the blob from ruby is much more complicated god only knows why 0142 desired_app = None 0143 desired_timeouts = None 0144 if 'desiredCapabilities' in blob: 0145 if 'app' in blob['desiredCapabilities']: 0146 desired_app = blob['desiredCapabilities']['app'] 0147 else: 0148 desired_app = blob['desiredCapabilities']['appium:app'] 0149 if 'timeouts' in blob['desiredCapabilities']: 0150 desired_timeouts = blob['desiredCapabilities']['timeouts'] 0151 else: 0152 if 'app' in blob['capabilities']['alwaysMatch']: 0153 desired_app = blob['capabilities']['alwaysMatch']['app'] 0154 else: 0155 desired_app = blob['capabilities']['alwaysMatch']['appium:app'] 0156 if 'timeouts' in blob['capabilities']['alwaysMatch']: 0157 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts'] 0158 0159 if desired_timeouts: 0160 if 'script' in desired_timeouts: 0161 self.timeouts['script'] = desired_timeouts['script'] 0162 if 'pageLoad' in desired_timeouts: 0163 self.timeouts['pageLoad'] = desired_timeouts['pageLoad'] 0164 if 'implicit' in desired_timeouts: 0165 self.timeouts['implicit'] = desired_timeouts['implicit'] 0166 0167 if desired_app == 'Root': 0168 # NB: at the time of writing there can only be one desktop ever 0169 self.browsing_context = pyatspi.Registry.getDesktop(0) 0170 return 0171 0172 self.launched = True 0173 end_time = datetime.now() + \ 0174 timedelta(milliseconds=(self.timeouts['implicit'] * 2)) 0175 0176 context = Gio.AppLaunchContext() 0177 context.setenv('QT_ACCESSIBILITY', '1') 0178 context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1') 0179 context.setenv('KIO_DISABLE_CACHE_CLEANER', '1') # don't dangle 0180 0181 def on_launched(context, info, platform_data): 0182 self.pid = platform_data['pid'] 0183 print("launched " + str(self.pid)) 0184 0185 while datetime.now() < end_time: 0186 for desktop_index in range(pyatspi.Registry.getDesktopCount()): 0187 desktop = pyatspi.Registry.getDesktop(desktop_index) 0188 for app in desktop: 0189 try: 0190 if app.get_process_id() == self.pid: 0191 self.browsing_context = app 0192 break 0193 except gi.repository.GLib.GError: 0194 print('stumbled over a broken process. ignoring...') 0195 continue 0196 if self.browsing_context: 0197 break 0198 if self.browsing_context: 0199 break 0200 if not self.browsing_context: 0201 raise RuntimeError('Failed to find application on a11y bus within time limit!') 0202 0203 context.connect("launched", on_launched) 0204 0205 if desired_app.endswith(".desktop"): 0206 appinfo = Gio.DesktopAppInfo.new(desired_app) 0207 appinfo.launch([], context) 0208 elif desired_app.isnumeric(): 0209 on_launched(None, None, {'pid': int(desired_app)}) 0210 else: 0211 appinfo = Gio.AppInfo.create_from_commandline( 0212 desired_app, None, Gio.AppInfoCreateFlags.NONE) 0213 appinfo.launch([], context) 0214 print(self.browsing_context) 0215 0216 def close(self) -> None: 0217 if self.launched: 0218 try: 0219 os.kill(self.pid, signal.SIGKILL) 0220 except ProcessLookupError: 0221 pass 0222 0223 0224 @app.route('/session', methods=['GET', 'POST', 'DELETE']) 0225 def session(): 0226 if request.method == 'POST': 0227 # TODO: 0228 # https://www.w3.org/TR/webdriver1/#new-session 0229 # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14 0230 print(request) 0231 try: 0232 session = Session() 0233 except Exception as e: 0234 return errorFromException(error='session not created', exception=e), 500 0235 sessions[session.id] = session 0236 print(sessions) 0237 0238 if session.browsing_context is None: 0239 return errorFromMessage(error='session not created', 0240 message='Application was not found on the a11y bus for unknown reasons. It probably failed to register on the bus.'), 500 0241 0242 return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'} 0243 elif request.method == 'GET': 0244 # TODO impl 0245 print(request) 0246 elif request.method == 'DELETE': 0247 # TODO spec review 0248 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0249 0250 0251 @app.route('/session/<session_id>', methods=['DELETE']) 0252 def session_delete(session_id): 0253 if request.method == 'DELETE': 0254 # TODO spec review 0255 session = sessions[session_id] 0256 if not session: 0257 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0258 0259 session.close() 0260 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0261 0262 0263 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST']) 0264 def session_implicit_wait(session_id): 0265 session = sessions[session_id] 0266 if not session: 0267 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0268 0269 blob = json.loads(request.data) 0270 ms = blob['ms'] 0271 0272 session.timeouts['implicit'] = ms 0273 return json.dumps({'value': None}) 0274 0275 0276 @app.route('/session/<session_id>/source', methods=['GET']) 0277 def session_source(session_id): 0278 session = sessions[session_id] 0279 if not session: 0280 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0281 0282 doc = _createNode2(session.browsing_context, None) 0283 return json.dumps({'value': etree.tostring(doc, pretty_print=False).decode("utf-8")}), 200, {'content-type': 'application/xml'} 0284 0285 0286 # NB: custom method to get the source without json wrapper 0287 @app.route('/session/<session_id>/sourceRaw', methods=['GET']) 0288 def session_source_raw(session_id): 0289 session = sessions[session_id] 0290 if not session: 0291 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0292 0293 doc = _createNode2(session.browsing_context, None) 0294 return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'} 0295 0296 0297 def locator(session, strategy, selector, start): 0298 # pyatspi.findDescendant(start, lambda x: print(x)) 0299 0300 end_time = datetime.now() + \ 0301 timedelta(milliseconds=session.timeouts['implicit']) 0302 results = [] 0303 0304 while datetime.now() < end_time: 0305 if strategy == 'xpath': 0306 print("-- xml") 0307 doc = _createNode2(start, None) 0308 for c in doc.xpath(selector): 0309 path = [int(x) for x in c.get('path').split()] 0310 # path is relative to the app root, not our start item! 0311 item = session.browsing_context 0312 for i in path: 0313 item = item[i] 0314 if c.get('name') != item.name or c.get('description') != item.description: 0315 return [] 0316 results.append(item) 0317 print("-- xml") 0318 else: 0319 # TODO can I switch this in python +++ raise on unmapped strategy 0320 pred = None 0321 if strategy == 'accessibility id': 0322 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE)) 0323 # pyatspi strings "[ roleName | name ]" 0324 elif strategy == 'class name': 0325 def pred(x): return str(x) == selector and (x.getState().contains( 0326 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0327 elif strategy == 'name': 0328 def pred(x): return x.name == selector and (x.getState().contains( 0329 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0330 elif strategy == 'description': 0331 def pred(x): return x.description == selector and (x.getState().contains( 0332 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0333 # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug: 0334 accessible = pyatspi.findDescendant(start, pred) 0335 print(accessible) 0336 if accessible: 0337 results.append(accessible) 0338 if len(results) > 0: 0339 break 0340 0341 return results 0342 0343 0344 @app.route('/session/<session_id>/element', methods=['GET', 'POST']) 0345 def session_element(session_id=None): 0346 # https://www.w3.org/TR/webdriver1/#dfn-find-element 0347 0348 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0349 print(request.url) 0350 print(session_id) 0351 print(request.args) 0352 print(request.data) 0353 session = sessions[session_id] 0354 blob = json.loads(request.data) 0355 0356 strategy = blob['using'] 0357 selector = blob['value'] 0358 if not strategy or not selector: 0359 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0360 0361 start = session.browsing_context 0362 if not start: # browsing context (no longer) valid 0363 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0364 0365 results = locator(session, strategy, selector, start) 0366 0367 if not results: 0368 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0369 0370 result = results[0] 0371 unique_id = result.path.replace('/', '-') 0372 session.elements[unique_id] = result 0373 return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'} 0374 0375 0376 @app.route('/session/<session_id>/elements', methods=['GET', 'POST']) 0377 def session_element2(session_id=None): 0378 # https://www.w3.org/TR/webdriver1/#dfn-find-elements 0379 0380 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0381 print(request.url) 0382 print(session_id) 0383 print(request.args) 0384 print(request.data) 0385 session = sessions[session_id] 0386 blob = json.loads(request.data) 0387 0388 strategy = blob['using'] 0389 selector = blob['value'] 0390 if not strategy or not selector: 0391 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0392 0393 start = session.browsing_context 0394 if not start: # browsing context (no longer) valid 0395 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0396 0397 results = locator(session, strategy, selector, start) 0398 0399 if not results: 0400 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0401 0402 serializations = [] 0403 for result in results: 0404 unique_id = result.path.replace('/', '-') 0405 print(unique_id) 0406 session.elements[unique_id] = result 0407 serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id}) 0408 0409 return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'} 0410 0411 0412 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST']) 0413 def session_element_click(session_id, element_id): 0414 session = sessions[session_id] 0415 if not session: 0416 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0417 0418 element = session.elements[element_id] 0419 if not element: 0420 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0421 0422 action = element.queryAction() 0423 availableActions = {} 0424 for i in range(0, action.nActions): 0425 availableActions[action.getName(i)] = i 0426 0427 keys = availableActions.keys() 0428 if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click 0429 print("actioning focus") 0430 action.doAction(availableActions['SetFocus']) 0431 time.sleep(EVENTLOOP_TIME) 0432 0433 try: 0434 if 'Press' in keys: 0435 print("actioning press") 0436 action.doAction(availableActions['Press']) 0437 elif 'Toggle' in keys: 0438 print("actioning toggle") 0439 action.doAction(availableActions['Toggle']) 0440 elif 'ShowMenu' in keys: 0441 print("actioning showmenu") 0442 action.doAction(availableActions['ShowMenu']) 0443 except gi.repository.GLib.GError as e: 0444 print(e) 0445 print("Ignoring! There is a chance your application is misbehaving. Could also just be a blocked eventloop though.") 0446 time.sleep(EVENTLOOP_TIME) 0447 0448 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0449 0450 0451 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET']) 0452 def session_element_text(session_id, element_id): 0453 session = sessions[session_id] 0454 element = session.elements[element_id] 0455 try: 0456 textElement = element.queryText() 0457 return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'} 0458 except NotImplementedError: 0459 return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'} 0460 0461 0462 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET']) 0463 def session_element_enabled(session_id, element_id): 0464 session = sessions[session_id] 0465 element = session.elements[element_id] 0466 return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'} 0467 0468 0469 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET']) 0470 def session_element_displayed(session_id, element_id): 0471 session = sessions[session_id] 0472 element = session.elements[element_id] 0473 return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'} 0474 0475 0476 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET']) 0477 def session_element_selected(session_id, element_id): 0478 session = sessions[session_id] 0479 element = session.elements[element_id] 0480 return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'} 0481 0482 0483 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET']) 0484 def session_element_attribute(session_id, element_id, name): 0485 session = sessions[session_id] 0486 if not session: 0487 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0488 0489 element = session.elements[element_id] 0490 if not element: 0491 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0492 0493 print(request.data) 0494 0495 if name == "accessibility-id": 0496 return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'} 0497 0498 if name == "value": 0499 elementValue = element.queryValue() 0500 return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'} 0501 0502 print(pyatspi.STATE_VALUE_TO_NAME) 0503 result = None 0504 for value, string in pyatspi.STATE_VALUE_TO_NAME.items(): 0505 if string == name: 0506 result = element.getState().contains(value) 0507 break 0508 0509 return json.dumps({'value': result}), 200, {'content-type': 'application/json'} 0510 0511 0512 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST']) 0513 def session_element_element(session_id, element_id): 0514 session = sessions[session_id] 0515 if not session: 0516 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0517 0518 blob = json.loads(request.data) 0519 print(blob) 0520 strategy = blob['using'] 0521 selector = blob['value'] 0522 if not strategy or not selector: 0523 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0524 0525 start = session.elements[element_id] 0526 if not start: # browsing context (no longer) valid 0527 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0528 0529 return locator(session, strategy, selector, start) 0530 0531 0532 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST']) 0533 def session_element_value(session_id, element_id): 0534 session = sessions[session_id] 0535 if not session: 0536 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0537 0538 element = session.elements[element_id] 0539 if not element: 0540 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0541 0542 blob = json.loads(request.data) 0543 text = blob['text'] 0544 0545 print(blob) 0546 print(element) 0547 0548 try: 0549 offset = element.queryText().caretOffset 0550 textElement = element.queryEditableText() 0551 textElement.insertText(offset, text, len(text)) 0552 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0553 except NotImplementedError: 0554 print("element is not text type, falling back to synthesizing keyboard events") 0555 action = element.queryAction() 0556 processed = False 0557 for i in range(0, action.nActions): 0558 if action.getName(i) == 'SetFocus': 0559 processed = True 0560 action.doAction(i) 0561 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0562 for ch in text: 0563 generate_keyboard_event(ch) 0564 break 0565 if not processed: 0566 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0567 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0568 0569 0570 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST']) 0571 def session_element_clear(session_id, element_id): 0572 session = sessions[session_id] 0573 if not session: 0574 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0575 0576 element = session.elements[element_id] 0577 if not element: 0578 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0579 0580 characterCount = element.queryText().characterCount 0581 try: 0582 textElement = element.queryEditableText() 0583 textElement.deleteText(0, characterCount) 0584 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0585 except NotImplementedError: 0586 print("element is not text type, falling back to synthesizing keyboard events") 0587 action = element.queryAction() 0588 processed = False 0589 for i in range(0, action.nActions): 0590 if action.getName(i) == 'SetFocus': 0591 processed = True 0592 action.doAction(i) 0593 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0594 generate_keyboard_event('\ue010') # end 0595 for _ in range(characterCount): 0596 generate_keyboard_event('\ue003') # backspace 0597 break 0598 if not processed: 0599 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0600 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0601 0602 0603 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST']) 0604 def session_appium_device_app_state(session_id): 0605 session = sessions[session_id] 0606 if not session: 0607 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0608 0609 blob = json.loads(request.data) 0610 appId = blob['appId'] 0611 0612 proc = subprocess.Popen( 0613 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0614 out, err = proc.communicate() 0615 0616 apps = json.loads(out) 0617 print(apps) 0618 if appId in apps.values(): 0619 return json.dumps({'value': 4}), 200, {'content-type': 'application/json'} 0620 # TODO: implement rest of codes 0621 return json.dumps({'value': 1}), 200, {'content-type': 'application/json'} 0622 0623 0624 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST']) 0625 def session_appium_device_terminate_app(session_id): 0626 session = sessions[session_id] 0627 if not session: 0628 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0629 0630 blob = json.loads(request.data) 0631 appId = blob['appId'] 0632 0633 proc = subprocess.Popen( 0634 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0635 out, err = proc.communicate() 0636 0637 apps = json.loads(out) 0638 if appId in apps.values(): 0639 pid = list(apps.keys())[list(apps.values()).index(appId)] 0640 os.kill(int(pid), signal.SIGKILL) 0641 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0642 0643 0644 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST']) 0645 def session_appium_device_press_keycode(session_id): 0646 session = sessions[session_id] 0647 if not session: 0648 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0649 0650 blob = json.loads(request.data) 0651 keycode = blob['keycode'] 0652 # Not doing anything with these for now 0653 # metastate = blob['metastate'] 0654 # flags = blob['flags'] 0655 for ch in keycode: 0656 generate_keyboard_event(ch) 0657 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0658 0659 0660 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST']) 0661 def session_appium_device_get_clipboard(session_id): 0662 session = sessions[session_id] 0663 if not session: 0664 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0665 0666 blob = json.loads(request.data) 0667 contentType = blob['contentType'] 0668 0669 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0670 window = Gtk.Window() 0671 window.set_default_size(20, 20) 0672 window.show() 0673 display = window.get_display() 0674 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0675 0676 context = GLib.MainContext.default() 0677 for _ in range(4): 0678 time.sleep(0.5) 0679 while context.pending(): 0680 context.iteration(may_block=False) 0681 0682 data = None 0683 if contentType == 'plaintext': 0684 data = clipboard.wait_for_text() 0685 else: 0686 raise 'content type not currently supported' 0687 0688 window.close() 0689 return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'} 0690 0691 0692 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST']) 0693 def session_appium_device_set_clipboard(session_id): 0694 session = sessions[session_id] 0695 if not session: 0696 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0697 0698 blob = json.loads(request.data) 0699 contentType = blob['contentType'] 0700 content = blob['content'] 0701 0702 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0703 window = Gtk.Window() 0704 window.set_default_size(20, 20) 0705 display = window.get_display() 0706 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0707 0708 if contentType == 'plaintext': 0709 clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1) 0710 else: 0711 raise 'content type not currently supported' 0712 0713 context = GLib.MainContext.default() 0714 for _ in range(4): 0715 time.sleep(0.5) 0716 while context.pending(): 0717 context.iteration(may_block=False) 0718 0719 window.close() 0720 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0721 0722 0723 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST']) 0724 def session_appium_element_value(session_id, element_id): 0725 session = sessions[session_id] 0726 if not session: 0727 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0728 0729 element = session.elements[element_id] 0730 if not element: 0731 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0732 0733 blob = json.loads(request.data) 0734 print(blob) 0735 print(element) 0736 value = blob['value'][0] 0737 0738 try: 0739 valueElement = element.queryValue() 0740 valueElement.currentValue = float(value) 0741 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0742 except NotImplementedError: 0743 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0744 0745 0746 @app.route('/session/<session_id>/screenshot', methods=['GET']) 0747 def session_appium_screenshot(session_id): 0748 session = sessions[session_id] 0749 if not session: 0750 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0751 0752 # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display 0753 # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN) 0754 # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize() 0755 0756 proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter', 0757 str(0), str(0), str(0), str(0)], 0758 stdout=subprocess.PIPE) 0759 out, err = proc.communicate() 0760 0761 if not out: 0762 return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'} 0763 0764 return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'} 0765 0766 0767 def generate_keyboard_event(ch): 0768 # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now) 0769 if 'KWIN_PID' in os.environ: 0770 with tempfile.NamedTemporaryFile() as fp: 0771 keymap = keyval_to_keycode(char_to_keyval(ch)) 0772 fp.write(json.dumps([ 0773 {'type': 'keyboard', 'level': keymap.level, 'keycode': keymap.keycode} 0774 ]).encode()) 0775 fp.flush() 0776 subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name]) 0777 time.sleep(EVENTLOOP_TIME) 0778 else: 0779 pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM) 0780 time.sleep(EVENTLOOP_TIME) 0781 0782 0783 def keyval_to_keycode(keyval): 0784 keymap = Gdk.Keymap.get_default() 0785 ret, keys = keymap.get_entries_for_keyval(keyval) 0786 if not ret: 0787 raise RuntimeError("Failed to map key!") 0788 return keys[0] 0789 0790 0791 def char_to_keyval(ch): 0792 print("----------::::::") 0793 keyval = Gdk.unicode_to_keyval(ord(ch)) 0794 # I Don't know why this doesn't work, also doesn't work with \033 as input. :(( 0795 # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c 0796 # Other useful resources: 0797 # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h 0798 if ch == "\uE00C": 0799 keyval = 0xff1b # escape 0800 elif ch == "\ue03d": 0801 keyval = 0xffeb # left meta 0802 elif ch == "\ue006": 0803 keyval = 0xff0d # return 0804 elif ch == "\ue007": 0805 keyval = 0xff8d # enter 0806 elif ch == "\ue003": 0807 keyval = 0xff08 # backspace 0808 elif ch == "\ue010": 0809 keyval = 0xff57 # end 0810 elif ch == "\ue012": 0811 keyval = 0xff51 # left 0812 elif ch == "\ue014": 0813 keyval = 0xff53 # right 0814 elif ch == "\ue013": 0815 keyval = 0xff52 # up 0816 elif ch == "\ue015": 0817 keyval = 0xff54 # down 0818 elif ch == "\ue004": 0819 keyval = 0xff09 # tab 0820 print(ord(ch)) 0821 print(hex(keyval)) 0822 return keyval