Warning, file /sdk/selenium-webdriver-at-spi/selenium-webdriver-at-spi.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 # SPDX-License-Identifier: AGPL-3.0-or-later 0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org> 0003 0004 import base64 0005 from datetime import datetime, timedelta 0006 import tempfile 0007 import time 0008 from flask import Flask, request 0009 import uuid 0010 import json 0011 import sys 0012 import os 0013 import signal 0014 import subprocess 0015 0016 import pyatspi 0017 from lxml import etree 0018 0019 import gi 0020 from gi.repository import GLib 0021 from gi.repository import Gio 0022 gi.require_version('Gdk', '3.0') 0023 from gi.repository import Gdk 0024 gi.require_version('Gtk', '3.0') 0025 from gi.repository import Gtk 0026 0027 from app_roles import ROLE_NAMES 0028 0029 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so 0030 # we lose nothing but gain the reduced sucking of python. 0031 0032 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md 0033 # https://www.w3.org/TR/webdriver 0034 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md 0035 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/ 0036 0037 EVENTLOOP_TIME = 0.5 0038 sys.stdout = sys.stderr 0039 sessions = {} # global dict of open sessions 0040 0041 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk 0042 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream. 0043 pyatspi.setTimeout(4000, 15000) 0044 0045 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework. 0046 app = Flask(__name__) 0047 0048 0049 @app.route('/status', methods=['GET']) 0050 def status(): 0051 body = { 0052 'value': { 0053 'ready': 'true', 0054 'message': 'There is only one state. Hooray!' 0055 } 0056 } 0057 return json.dumps(body), 200, {'content-type': 'application/json'} 0058 0059 0060 def _createNode2(accessible, parentElement, indexInParents = []): 0061 if not accessible: 0062 return 0063 # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root. 0064 if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry": 0065 return 0066 0067 roleName = accessible.getRoleName() 0068 e = None 0069 if roleName: 0070 e = etree.Element(roleName.replace(" ", "_")) 0071 else: 0072 e = etree.Element("accessible") 0073 0074 e.set("name", accessible.name) 0075 e.set("role", str(int(accessible.getRole()))) 0076 e.set("description", accessible.description) 0077 if accessible.accessibleId != None: 0078 e.set("accessibility-id", accessible.accessibleId) 0079 # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes. 0080 # while generating the xml. 0081 # path = pyatspi.getPath(accessible) 0082 path_strs = [str(x) for x in indexInParents] # path is a list of ints for the indexes within the parents 0083 e.set("path", ' '.join(path_strs)) 0084 0085 states = [] 0086 for state in accessible.getState().getStates(): 0087 states.append(pyatspi.stateToString(state)) 0088 e.set("states", ', '.join(states)) 0089 0090 for i in range(0, accessible.childCount): 0091 newIndex = indexInParents.copy() 0092 newIndex.append(i) 0093 _createNode2(accessible.getChildAtIndex(i), e, newIndex) 0094 0095 if parentElement != None: 0096 parentElement.append(e) 0097 else: 0098 return e 0099 0100 0101 @app.route('/') 0102 def index(): 0103 return 'Servas' 0104 0105 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking 0106 # an app. 0107 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the 0108 # REST app functions to the Session object. 0109 0110 0111 class Session: 0112 0113 def __init__(self) -> None: 0114 self.id = str(uuid.uuid1()) 0115 self.elements = {} # a cache to hold elements between finding and interacting with 0116 self.browsing_context = None 0117 self.pid = -1 0118 self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000} # implicit deviates from spec, 0 is unreasonable 0119 self.launched = False 0120 0121 blob = json.loads(request.data) 0122 print(request.data) 0123 # TODO the blob from ruby is much more complicated god only knows why 0124 desired_app = None 0125 desired_timeouts = None 0126 if 'desiredCapabilities' in blob: 0127 if 'app' in blob['desiredCapabilities']: 0128 desired_app = blob['desiredCapabilities']['app'] 0129 else: 0130 desired_app = blob['desiredCapabilities']['appium:app'] 0131 if 'timeouts' in blob['desiredCapabilities']: 0132 desired_timeouts = blob['desiredCapabilities']['timeouts'] 0133 else: 0134 if 'app' in blob['capabilities']['alwaysMatch']: 0135 desired_app = blob['capabilities']['alwaysMatch']['app'] 0136 else: 0137 desired_app = blob['capabilities']['alwaysMatch']['appium:app'] 0138 if 'timeouts' in blob['capabilities']['alwaysMatch']: 0139 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts'] 0140 0141 if desired_timeouts: 0142 if 'script' in desired_timeouts: 0143 self.timeouts['script'] = desired_timeouts['script'] 0144 if 'pageLoad' in desired_timeouts: 0145 self.timeouts['pageLoad'] = desired_timeouts['pageLoad'] 0146 if 'implicit' in desired_timeouts: 0147 self.timeouts['implicit'] = desired_timeouts['implicit'] 0148 0149 if desired_app == 'Root': 0150 # NB: at the time of writing there can only be one desktop ever 0151 self.browsing_context = pyatspi.Registry.getDesktop(0) 0152 return 0153 0154 self.launched = True 0155 end_time = datetime.now() + \ 0156 timedelta(milliseconds=self.timeouts['implicit']) 0157 0158 context = Gio.AppLaunchContext() 0159 context.setenv('QT_ACCESSIBILITY', '1') 0160 context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1') 0161 context.setenv('KIO_DISABLE_CACHE_CLEANER', '1') # don't dangle 0162 0163 def on_launched(context, info, platform_data): 0164 self.pid = platform_data['pid'] 0165 print("launched " + str(self.pid)) 0166 0167 while datetime.now() < end_time: 0168 for desktop_index in range(pyatspi.Registry.getDesktopCount()): 0169 desktop = pyatspi.Registry.getDesktop(desktop_index) 0170 for app in desktop: 0171 try: 0172 if app.get_process_id() == self.pid: 0173 self.browsing_context = app 0174 break 0175 except gi.repository.GLib.GError: 0176 print('stumbled over a broken process. ignoring...') 0177 continue 0178 if self.browsing_context: 0179 break 0180 # TODO raise if no context? 0181 if self.browsing_context: 0182 break 0183 0184 context.connect("launched", on_launched) 0185 0186 if desired_app.endswith(".desktop"): 0187 appinfo = Gio.DesktopAppInfo.new(desired_app) 0188 appinfo.launch([], context) 0189 elif desired_app.isnumeric(): 0190 on_launched(None, None, {'pid': int(desired_app)}) 0191 else: 0192 appinfo = Gio.AppInfo.create_from_commandline( 0193 desired_app, None, Gio.AppInfoCreateFlags.NONE) 0194 appinfo.launch([], context) 0195 print(self.browsing_context) 0196 0197 def close(self) -> None: 0198 if self.launched: 0199 try: 0200 os.kill(self.pid, signal.SIGKILL) 0201 except ProcessLookupError: 0202 pass 0203 0204 0205 @app.route('/session', methods=['GET', 'POST', 'DELETE']) 0206 def session(): 0207 if request.method == 'POST': 0208 # TODO: 0209 # https://www.w3.org/TR/webdriver1/#new-session 0210 # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14 0211 session = Session() 0212 print(request) 0213 sessions[session.id] = session 0214 print(sessions) 0215 0216 if session.browsing_context is None: 0217 return json.dumps({'value': {'error': 'session not created '}}), 500, {'content-type': 'application/json'} 0218 0219 return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'} 0220 elif request.method == 'GET': 0221 # TODO impl 0222 print(request) 0223 elif request.method == 'DELETE': 0224 # TODO spec review 0225 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0226 0227 0228 @app.route('/session/<session_id>', methods=['DELETE']) 0229 def session_delete(session_id): 0230 if request.method == 'DELETE': 0231 # TODO spec review 0232 session = sessions[session_id] 0233 if not session: 0234 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0235 0236 session.close() 0237 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0238 0239 0240 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST']) 0241 def session_implicit_wait(session_id): 0242 session = sessions[session_id] 0243 if not session: 0244 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0245 0246 blob = json.loads(request.data) 0247 ms = blob['ms'] 0248 0249 session.timeouts['implicit'] = ms 0250 return json.dumps({'value': None}) 0251 0252 0253 @app.route('/session/<session_id>/source', methods=['GET']) 0254 def session_source(session_id): 0255 session = sessions[session_id] 0256 if not session: 0257 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0258 0259 doc = _createNode2(session.browsing_context, None) 0260 return json.dumps({ 'value': etree.tostring(doc, pretty_print=False).decode("utf-8") }), 200, {'content-type': 'application/xml'} 0261 0262 0263 # NB: custom method to get the source without json wrapper 0264 @app.route('/session/<session_id>/sourceRaw', methods=['GET']) 0265 def session_source_raw(session_id): 0266 session = sessions[session_id] 0267 if not session: 0268 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0269 0270 doc = _createNode2(session.browsing_context, None) 0271 return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'} 0272 0273 0274 def locator(session, strategy, selector, start): 0275 # pyatspi.findDescendant(start, lambda x: print(x)) 0276 0277 end_time = datetime.now() + \ 0278 timedelta(milliseconds=session.timeouts['implicit']) 0279 results = [] 0280 0281 while datetime.now() < end_time: 0282 if strategy == 'xpath': 0283 print("-- xml") 0284 doc = _createNode2(start, None) 0285 for c in doc.xpath(selector): 0286 path = [int(x) for x in c.get('path').split()] 0287 # path is relative to the app root, not our start item! 0288 item = session.browsing_context 0289 for i in path: 0290 item = item[i] 0291 if c.get('name') != item.name or c.get('description') != item.description: 0292 return [] 0293 results.append(item) 0294 print("-- xml") 0295 else: 0296 # TODO can I switch this in python +++ raise on unmapped strategy 0297 pred = None 0298 if strategy == 'accessibility id': 0299 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE)) 0300 # pyatspi strings "[ roleName | name ]" 0301 elif strategy == 'class name': 0302 def pred(x): return str(x) == selector and (x.getState().contains( 0303 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0304 elif strategy == 'name': 0305 def pred(x): return x.name == selector and (x.getState().contains( 0306 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0307 elif strategy == 'description': 0308 def pred(x): return x.description == selector and (x.getState().contains( 0309 pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE)) 0310 # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug: 0311 accessible = pyatspi.findDescendant(start, pred) 0312 print(accessible) 0313 if accessible: 0314 results.append(accessible) 0315 if len(results) > 0: 0316 break 0317 0318 return results 0319 0320 0321 @app.route('/session/<session_id>/element', methods=['GET', 'POST']) 0322 def session_element(session_id=None): 0323 # https://www.w3.org/TR/webdriver1/#dfn-find-element 0324 0325 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0326 print(request.url) 0327 print(session_id) 0328 print(request.args) 0329 print(request.data) 0330 session = sessions[session_id] 0331 blob = json.loads(request.data) 0332 0333 strategy = blob['using'] 0334 selector = blob['value'] 0335 if not strategy or not selector: 0336 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0337 0338 start = session.browsing_context 0339 if not start: # browsing context (no longer) valid 0340 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0341 0342 results = locator(session, strategy, selector, start) 0343 0344 if not results: 0345 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0346 0347 result = results[0] 0348 unique_id = result.path.replace('/', '-') 0349 session.elements[unique_id] = result 0350 return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'} 0351 0352 0353 @app.route('/session/<session_id>/elements', methods=['GET', 'POST']) 0354 def session_element2(session_id=None): 0355 # https://www.w3.org/TR/webdriver1/#dfn-find-elements 0356 0357 # TODO scope elements to session somehow when the session gets closed we can throw away the references 0358 print(request.url) 0359 print(session_id) 0360 print(request.args) 0361 print(request.data) 0362 session = sessions[session_id] 0363 blob = json.loads(request.data) 0364 0365 strategy = blob['using'] 0366 selector = blob['value'] 0367 if not strategy or not selector: 0368 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0369 0370 start = session.browsing_context 0371 if not start: # browsing context (no longer) valid 0372 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0373 0374 results = locator(session, strategy, selector, start) 0375 0376 if not results: 0377 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0378 0379 serializations = [] 0380 for result in results: 0381 unique_id = result.path.replace('/', '-') 0382 print(unique_id) 0383 session.elements[unique_id] = result 0384 serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id}) 0385 0386 return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'} 0387 0388 0389 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST']) 0390 def session_element_click(session_id, element_id): 0391 session = sessions[session_id] 0392 if not session: 0393 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0394 0395 element = session.elements[element_id] 0396 if not element: 0397 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0398 0399 action = element.queryAction() 0400 availableActions = {} 0401 for i in range(0, action.nActions): 0402 availableActions[action.getName(i)] = i 0403 0404 keys = availableActions.keys() 0405 if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click 0406 print("actioning focus") 0407 action.doAction(availableActions['SetFocus']) 0408 time.sleep(EVENTLOOP_TIME) 0409 0410 if 'Press' in keys: 0411 print("actioning press") 0412 action.doAction(availableActions['Press']) 0413 elif 'Toggle' in keys: 0414 print("actioning toggle") 0415 action.doAction(availableActions['Toggle']) 0416 time.sleep(EVENTLOOP_TIME) 0417 0418 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0419 0420 0421 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET']) 0422 def session_element_text(session_id, element_id): 0423 session = sessions[session_id] 0424 element = session.elements[element_id] 0425 try: 0426 textElement = element.queryText() 0427 return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'} 0428 except NotImplementedError: 0429 return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'} 0430 0431 0432 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET']) 0433 def session_element_enabled(session_id, element_id): 0434 session = sessions[session_id] 0435 element = session.elements[element_id] 0436 return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'} 0437 0438 0439 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET']) 0440 def session_element_displayed(session_id, element_id): 0441 session = sessions[session_id] 0442 element = session.elements[element_id] 0443 return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'} 0444 0445 0446 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET']) 0447 def session_element_selected(session_id, element_id): 0448 session = sessions[session_id] 0449 element = session.elements[element_id] 0450 return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'} 0451 0452 0453 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET']) 0454 def session_element_attribute(session_id, element_id, name): 0455 session = sessions[session_id] 0456 if not session: 0457 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0458 0459 element = session.elements[element_id] 0460 if not element: 0461 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0462 0463 print(request.data) 0464 0465 if name == "accessibility-id": 0466 return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'} 0467 0468 if name == "value": 0469 elementValue = element.queryValue() 0470 return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'} 0471 0472 print(pyatspi.STATE_VALUE_TO_NAME) 0473 result = None 0474 for value, string in pyatspi.STATE_VALUE_TO_NAME.items(): 0475 if string == name: 0476 result = element.getState().contains(value) 0477 break 0478 0479 return json.dumps({'value': result}), 200, {'content-type': 'application/json'} 0480 0481 0482 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST']) 0483 def session_element_element(session_id, element_id): 0484 session = sessions[session_id] 0485 if not session: 0486 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0487 0488 blob = json.loads(request.data) 0489 print(blob) 0490 strategy = blob['using'] 0491 selector = blob['value'] 0492 if not strategy or not selector: 0493 return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'} 0494 0495 start = session.elements[element_id] 0496 if not start: # browsing context (no longer) valid 0497 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0498 0499 return locator(session, strategy, selector, start) 0500 0501 0502 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST']) 0503 def session_element_value(session_id, element_id): 0504 session = sessions[session_id] 0505 if not session: 0506 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0507 0508 element = session.elements[element_id] 0509 if not element: 0510 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0511 0512 blob = json.loads(request.data) 0513 text = blob['text'] 0514 0515 print(blob) 0516 print(element) 0517 0518 try: 0519 offset = element.queryText().caretOffset 0520 textElement = element.queryEditableText() 0521 textElement.insertText(offset, text, len(text)) 0522 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0523 except NotImplementedError: 0524 print("element is not text type, falling back to synthesizing keyboard events") 0525 action = element.queryAction() 0526 processed = False 0527 for i in range(0, action.nActions): 0528 if action.getName(i) == 'SetFocus': 0529 processed = True 0530 action.doAction(i) 0531 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0532 for ch in text: 0533 generate_keyboard_event(ch) 0534 break 0535 if not processed: 0536 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0537 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0538 0539 0540 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST']) 0541 def session_element_clear(session_id, element_id): 0542 session = sessions[session_id] 0543 if not session: 0544 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0545 0546 element = session.elements[element_id] 0547 if not element: 0548 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0549 0550 characterCount = element.queryText().characterCount 0551 try: 0552 textElement = element.queryEditableText() 0553 textElement.deleteText(0, characterCount) 0554 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0555 except NotImplementedError: 0556 print("element is not text type, falling back to synthesizing keyboard events") 0557 action = element.queryAction() 0558 processed = False 0559 for i in range(0, action.nActions): 0560 if action.getName(i) == 'SetFocus': 0561 processed = True 0562 action.doAction(i) 0563 time.sleep(EVENTLOOP_TIME) # give the focus time to apply 0564 generate_keyboard_event('\ue010') # end 0565 for _ in range(characterCount): 0566 generate_keyboard_event('\ue003') # backspace 0567 break 0568 if not processed: 0569 raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed") 0570 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0571 0572 0573 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST']) 0574 def session_appium_device_app_state(session_id): 0575 session = sessions[session_id] 0576 if not session: 0577 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0578 0579 blob = json.loads(request.data) 0580 appId = blob['appId'] 0581 0582 proc = subprocess.Popen( 0583 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0584 out, err = proc.communicate() 0585 0586 apps = json.loads(out) 0587 print(apps) 0588 if appId in apps.values(): 0589 return json.dumps({'value': 4}), 200, {'content-type': 'application/json'} 0590 # TODO: implement rest of codes 0591 return json.dumps({'value': 1}), 200, {'content-type': 'application/json'} 0592 0593 0594 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST']) 0595 def session_appium_device_terminate_app(session_id): 0596 session = sessions[session_id] 0597 if not session: 0598 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0599 0600 blob = json.loads(request.data) 0601 appId = blob['appId'] 0602 0603 proc = subprocess.Popen( 0604 'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE) 0605 out, err = proc.communicate() 0606 0607 apps = json.loads(out) 0608 if appId in apps.values(): 0609 pid = list(apps.keys())[list(apps.values()).index(appId)] 0610 os.kill(int(pid), signal.SIGKILL) 0611 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0612 0613 0614 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST']) 0615 def session_appium_device_press_keycode(session_id): 0616 session = sessions[session_id] 0617 if not session: 0618 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0619 0620 blob = json.loads(request.data) 0621 keycode = blob['keycode'] 0622 # Not doing anything with these for now 0623 # metastate = blob['metastate'] 0624 # flags = blob['flags'] 0625 for ch in keycode: 0626 generate_keyboard_event(ch) 0627 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0628 0629 0630 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST']) 0631 def session_appium_device_get_clipboard(session_id): 0632 session = sessions[session_id] 0633 if not session: 0634 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0635 0636 blob = json.loads(request.data) 0637 contentType = blob['contentType'] 0638 0639 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0640 window = Gtk.Window() 0641 window.set_default_size(20, 20); 0642 window.show() 0643 display = window.get_display() 0644 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0645 0646 context = GLib.MainContext.default() 0647 for _ in range(4): 0648 time.sleep(0.5) 0649 while context.pending(): 0650 context.iteration(may_block=False) 0651 0652 data = None 0653 if contentType == 'plaintext': 0654 data = clipboard.wait_for_text() 0655 else: 0656 raise 'content type not currently supported' 0657 0658 window.close() 0659 return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'} 0660 0661 0662 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST']) 0663 def session_appium_device_set_clipboard(session_id): 0664 session = sessions[session_id] 0665 if not session: 0666 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0667 0668 blob = json.loads(request.data) 0669 contentType = blob['contentType'] 0670 content = blob['content'] 0671 0672 # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway) 0673 window = Gtk.Window() 0674 window.set_default_size(20, 20); 0675 display = window.get_display() 0676 clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD) 0677 0678 if contentType == 'plaintext': 0679 clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1) 0680 else: 0681 raise 'content type not currently supported' 0682 0683 context = GLib.MainContext.default() 0684 for _ in range(4): 0685 time.sleep(0.5) 0686 while context.pending(): 0687 context.iteration(may_block=False) 0688 0689 window.close() 0690 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0691 0692 0693 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST']) 0694 def session_appium_element_value(session_id, element_id): 0695 session = sessions[session_id] 0696 if not session: 0697 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0698 0699 element = session.elements[element_id] 0700 if not element: 0701 return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'} 0702 0703 blob = json.loads(request.data) 0704 print(blob) 0705 print(element) 0706 value = blob['value'][0] 0707 0708 try: 0709 valueElement = element.queryValue() 0710 valueElement.currentValue = float(value) 0711 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0712 except NotImplementedError: 0713 return json.dumps({'value': None}), 200, {'content-type': 'application/json'} 0714 0715 0716 @app.route('/session/<session_id>/screenshot', methods=['GET']) 0717 def session_appium_screenshot(session_id): 0718 session = sessions[session_id] 0719 if not session: 0720 return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'} 0721 0722 # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display 0723 # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN) 0724 # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize() 0725 0726 proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter', 0727 str(0), str(0), str(0), str(0)], 0728 stdout=subprocess.PIPE) 0729 out, err = proc.communicate() 0730 0731 if not out: 0732 return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'} 0733 0734 return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'} 0735 0736 0737 def generate_keyboard_event(ch): 0738 if 'KWIN_PID' in os.environ: # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now) 0739 with tempfile.NamedTemporaryFile() as fp: 0740 keymap = keyval_to_keycode(char_to_keyval(ch)) 0741 fp.write(json.dumps([ 0742 {'type': 'keyboard', 'level': keymap.level, 'keycode': keymap.keycode} 0743 ]).encode()) 0744 fp.flush() 0745 subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name]) 0746 time.sleep(EVENTLOOP_TIME) 0747 else: 0748 pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM) 0749 time.sleep(EVENTLOOP_TIME) 0750 0751 0752 def keyval_to_keycode(keyval): 0753 keymap = Gdk.Keymap.get_default() 0754 ret, keys = keymap.get_entries_for_keyval(keyval) 0755 if not ret: 0756 raise RuntimeError("Failed to map key!") 0757 return keys[0] 0758 0759 0760 def char_to_keyval(ch): 0761 print("----------::::::") 0762 keyval = Gdk.unicode_to_keyval(ord(ch)) 0763 # I Don't know why this doesn't work, also doesn't work with \033 as input. :(( 0764 # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c 0765 # Other useful resources: 0766 # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h 0767 if ch == "\uE00C": 0768 keyval = 0xff1b # escape 0769 elif ch == "\ue03d": 0770 keyval = 0xffeb # left meta 0771 elif ch == "\ue006": 0772 keyval = 0xff0d # return 0773 elif ch == "\ue007": 0774 keyval = 0xff8d # enter 0775 elif ch == "\ue003": 0776 keyval = 0xff08 # backspace 0777 elif ch == "\ue010": 0778 keyval = 0xff57 # end 0779 elif ch == "\ue012": 0780 keyval = 0xff51 # left 0781 elif ch == "\ue014": 0782 keyval = 0xff53 # right 0783 elif ch == "\ue013": 0784 keyval = 0xff52 # up 0785 elif ch == "\ue015": 0786 keyval = 0xff54 # down 0787 elif ch == "\ue004": 0788 keyval = 0xff09 # tab 0789 print(ord(ch)) 0790 print(hex(keyval)) 0791 return keyval