Warning, file /sdk/selenium-webdriver-at-spi/selenium-webdriver-at-spi.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 # SPDX-License-Identifier: AGPL-3.0-or-later
0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org>
0003 
0004 import base64
0005 from datetime import datetime, timedelta
0006 import tempfile
0007 import time
0008 from flask import Flask, request
0009 import uuid
0010 import json
0011 import sys
0012 import os
0013 import signal
0014 import subprocess
0015 
0016 import pyatspi
0017 from lxml import etree
0018 
0019 import gi
0020 from gi.repository import GLib
0021 from gi.repository import Gio
0022 gi.require_version('Gdk', '3.0')
0023 from gi.repository import Gdk
0024 gi.require_version('Gtk', '3.0')
0025 from gi.repository import Gtk
0026 
0027 from app_roles import ROLE_NAMES
0028 
0029 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so
0030 # we lose nothing but gain the reduced sucking of python.
0031 
0032 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md
0033 # https://www.w3.org/TR/webdriver
0034 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md
0035 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/
0036 
0037 EVENTLOOP_TIME = 0.5
0038 sys.stdout = sys.stderr
0039 sessions = {} # global dict of open sessions
0040 
0041 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk
0042 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream.
0043 pyatspi.setTimeout(4000, 15000)
0044 
0045 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework.
0046 app = Flask(__name__)
0047 
0048 
0049 @app.route('/status', methods=['GET'])
0050 def status():
0051     body = {
0052         'value': {
0053             'ready': 'true',
0054             'message': 'There is only one state. Hooray!'
0055         }
0056     }
0057     return json.dumps(body), 200, {'content-type': 'application/json'}
0058 
0059 
0060 def _createNode2(accessible, parentElement, indexInParents = []):
0061     if not accessible:
0062         return
0063     # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root.
0064     if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry":
0065         return
0066 
0067     roleName = accessible.getRoleName()
0068     e = None
0069     if roleName:
0070         e = etree.Element(roleName.replace(" ", "_"))
0071     else:
0072         e = etree.Element("accessible")
0073 
0074     e.set("name", accessible.name)
0075     e.set("role", str(int(accessible.getRole())))
0076     e.set("description", accessible.description)
0077     if accessible.accessibleId != None:
0078         e.set("accessibility-id", accessible.accessibleId)
0079     # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes.
0080     # while generating the xml.
0081     # path = pyatspi.getPath(accessible)
0082     path_strs = [str(x) for x in indexInParents] # path is a list of ints for the indexes within the parents
0083     e.set("path", ' '.join(path_strs))
0084 
0085     states = []
0086     for state in accessible.getState().getStates():
0087         states.append(pyatspi.stateToString(state))
0088     e.set("states", ', '.join(states))
0089 
0090     for i in range(0, accessible.childCount):
0091         newIndex = indexInParents.copy()
0092         newIndex.append(i)
0093         _createNode2(accessible.getChildAtIndex(i), e, newIndex)
0094 
0095     if parentElement != None:
0096         parentElement.append(e)
0097     else:
0098         return e
0099 
0100 
0101 @app.route('/')
0102 def index():
0103     return 'Servas'
0104 
0105 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking
0106 # an app.
0107 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the
0108 #   REST app functions to the Session object.
0109 
0110 
0111 class Session:
0112 
0113     def __init__(self) -> None:
0114         self.id = str(uuid.uuid1())
0115         self.elements = {}  # a cache to hold elements between finding and interacting with
0116         self.browsing_context = None
0117         self.pid = -1
0118         self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000} # implicit deviates from spec, 0 is unreasonable
0119         self.launched = False
0120 
0121         blob = json.loads(request.data)
0122         print(request.data)
0123         # TODO the blob from ruby is much more complicated god only knows why
0124         desired_app = None
0125         desired_timeouts = None
0126         if 'desiredCapabilities' in blob:
0127             if 'app' in blob['desiredCapabilities']:
0128                 desired_app = blob['desiredCapabilities']['app']
0129             else:
0130                 desired_app = blob['desiredCapabilities']['appium:app']
0131             if 'timeouts' in blob['desiredCapabilities']:
0132                 desired_timeouts = blob['desiredCapabilities']['timeouts']
0133         else:
0134             if 'app' in blob['capabilities']['alwaysMatch']:
0135                 desired_app = blob['capabilities']['alwaysMatch']['app']
0136             else:
0137                 desired_app = blob['capabilities']['alwaysMatch']['appium:app']
0138             if 'timeouts' in blob['capabilities']['alwaysMatch']:
0139                 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts']
0140 
0141         if desired_timeouts:
0142             if 'script' in desired_timeouts:
0143                 self.timeouts['script'] = desired_timeouts['script']
0144             if 'pageLoad' in desired_timeouts:
0145                 self.timeouts['pageLoad'] = desired_timeouts['pageLoad']
0146             if 'implicit' in desired_timeouts:
0147                 self.timeouts['implicit'] = desired_timeouts['implicit']
0148 
0149         if desired_app == 'Root':
0150             # NB: at the time of writing there can only be one desktop ever
0151             self.browsing_context = pyatspi.Registry.getDesktop(0)
0152             return
0153 
0154         self.launched = True
0155         end_time = datetime.now() + \
0156             timedelta(milliseconds=self.timeouts['implicit'])
0157 
0158         context = Gio.AppLaunchContext()
0159         context.setenv('QT_ACCESSIBILITY', '1')
0160         context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1')
0161         context.setenv('KIO_DISABLE_CACHE_CLEANER', '1')  # don't dangle
0162 
0163         def on_launched(context, info, platform_data):
0164             self.pid = platform_data['pid']
0165             print("launched " + str(self.pid))
0166 
0167             while datetime.now() < end_time:
0168                 for desktop_index in range(pyatspi.Registry.getDesktopCount()):
0169                     desktop = pyatspi.Registry.getDesktop(desktop_index)
0170                     for app in desktop:
0171                         try:
0172                             if app.get_process_id() == self.pid:
0173                                 self.browsing_context = app
0174                                 break
0175                         except gi.repository.GLib.GError:
0176                             print('stumbled over a broken process. ignoring...')
0177                             continue
0178                     if self.browsing_context:
0179                         break
0180                 # TODO raise if no context?
0181                 if self.browsing_context:
0182                     break
0183 
0184         context.connect("launched", on_launched)
0185 
0186         if desired_app.endswith(".desktop"):
0187             appinfo = Gio.DesktopAppInfo.new(desired_app)
0188             appinfo.launch([], context)
0189         elif desired_app.isnumeric():
0190             on_launched(None, None, {'pid': int(desired_app)})
0191         else:
0192             appinfo = Gio.AppInfo.create_from_commandline(
0193                 desired_app, None, Gio.AppInfoCreateFlags.NONE)
0194             appinfo.launch([], context)
0195         print(self.browsing_context)
0196 
0197     def close(self) -> None:
0198         if self.launched:
0199             try:
0200                 os.kill(self.pid, signal.SIGKILL)
0201             except ProcessLookupError:
0202                 pass
0203 
0204 
0205 @app.route('/session', methods=['GET', 'POST', 'DELETE'])
0206 def session():
0207     if request.method == 'POST':
0208         # TODO:
0209         # https://www.w3.org/TR/webdriver1/#new-session
0210         # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14
0211         session = Session()
0212         print(request)
0213         sessions[session.id] = session
0214         print(sessions)
0215 
0216         if session.browsing_context is None:
0217             return json.dumps({'value': {'error': 'session not created '}}), 500, {'content-type': 'application/json'}
0218 
0219         return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'}
0220     elif request.method == 'GET':
0221         # TODO impl
0222         print(request)
0223     elif request.method == 'DELETE':
0224         # TODO spec review
0225         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0226 
0227 
0228 @app.route('/session/<session_id>', methods=['DELETE'])
0229 def session_delete(session_id):
0230     if request.method == 'DELETE':
0231         # TODO spec review
0232         session = sessions[session_id]
0233         if not session:
0234             return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0235 
0236         session.close()
0237         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0238 
0239 
0240 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST'])
0241 def session_implicit_wait(session_id):
0242     session = sessions[session_id]
0243     if not session:
0244         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0245 
0246     blob = json.loads(request.data)
0247     ms = blob['ms']
0248 
0249     session.timeouts['implicit'] = ms
0250     return json.dumps({'value': None})
0251 
0252 
0253 @app.route('/session/<session_id>/source', methods=['GET'])
0254 def session_source(session_id):
0255     session = sessions[session_id]
0256     if not session:
0257         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0258 
0259     doc = _createNode2(session.browsing_context, None)
0260     return json.dumps({ 'value': etree.tostring(doc, pretty_print=False).decode("utf-8") }), 200, {'content-type': 'application/xml'}
0261 
0262 
0263 # NB: custom method to get the source without json wrapper
0264 @app.route('/session/<session_id>/sourceRaw', methods=['GET'])
0265 def session_source_raw(session_id):
0266     session = sessions[session_id]
0267     if not session:
0268         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0269 
0270     doc = _createNode2(session.browsing_context, None)
0271     return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'}
0272 
0273 
0274 def locator(session, strategy, selector, start):
0275     # pyatspi.findDescendant(start, lambda x: print(x))
0276 
0277     end_time = datetime.now() + \
0278         timedelta(milliseconds=session.timeouts['implicit'])
0279     results = []
0280 
0281     while datetime.now() < end_time:
0282         if strategy == 'xpath':
0283             print("-- xml")
0284             doc = _createNode2(start, None)
0285             for c in doc.xpath(selector):
0286                 path = [int(x) for x in c.get('path').split()]
0287                 # path is relative to the app root, not our start item!
0288                 item = session.browsing_context
0289                 for i in path:
0290                     item = item[i]
0291                 if c.get('name') != item.name or c.get('description') != item.description:
0292                     return []
0293                 results.append(item)
0294             print("-- xml")
0295         else:
0296             # TODO can I switch this in python +++ raise on unmapped strategy
0297             pred = None
0298             if strategy == 'accessibility id':
0299                 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE))
0300             # pyatspi strings "[ roleName | name ]"
0301             elif strategy == 'class name':
0302                 def pred(x): return str(x) == selector and (x.getState().contains(
0303                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0304             elif strategy == 'name':
0305                 def pred(x): return x.name == selector and (x.getState().contains(
0306                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0307             elif strategy == 'description':
0308                 def pred(x): return x.description == selector and (x.getState().contains(
0309                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0310             # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug:
0311             accessible = pyatspi.findDescendant(start, pred)
0312             print(accessible)
0313             if accessible:
0314                 results.append(accessible)
0315         if len(results) > 0:
0316             break
0317 
0318     return results
0319 
0320 
0321 @app.route('/session/<session_id>/element', methods=['GET', 'POST'])
0322 def session_element(session_id=None):
0323     # https://www.w3.org/TR/webdriver1/#dfn-find-element
0324 
0325     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0326     print(request.url)
0327     print(session_id)
0328     print(request.args)
0329     print(request.data)
0330     session = sessions[session_id]
0331     blob = json.loads(request.data)
0332 
0333     strategy = blob['using']
0334     selector = blob['value']
0335     if not strategy or not selector:
0336         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0337 
0338     start = session.browsing_context
0339     if not start: # browsing context (no longer) valid
0340         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0341 
0342     results = locator(session, strategy, selector, start)
0343 
0344     if not results:
0345         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0346 
0347     result = results[0]
0348     unique_id = result.path.replace('/', '-')
0349     session.elements[unique_id] = result
0350     return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'}
0351 
0352 
0353 @app.route('/session/<session_id>/elements', methods=['GET', 'POST'])
0354 def session_element2(session_id=None):
0355     # https://www.w3.org/TR/webdriver1/#dfn-find-elements
0356 
0357     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0358     print(request.url)
0359     print(session_id)
0360     print(request.args)
0361     print(request.data)
0362     session = sessions[session_id]
0363     blob = json.loads(request.data)
0364 
0365     strategy = blob['using']
0366     selector = blob['value']
0367     if not strategy or not selector:
0368         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0369 
0370     start = session.browsing_context
0371     if not start:  # browsing context (no longer) valid
0372         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0373 
0374     results = locator(session, strategy, selector, start)
0375 
0376     if not results:
0377         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0378 
0379     serializations = []
0380     for result in results:
0381         unique_id = result.path.replace('/', '-')
0382         print(unique_id)
0383         session.elements[unique_id] = result
0384         serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id})
0385 
0386     return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'}
0387 
0388 
0389 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST'])
0390 def session_element_click(session_id, element_id):
0391     session = sessions[session_id]
0392     if not session:
0393         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0394 
0395     element = session.elements[element_id]
0396     if not element:
0397         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0398 
0399     action = element.queryAction()
0400     availableActions = {}
0401     for i in range(0, action.nActions):
0402         availableActions[action.getName(i)] = i
0403 
0404     keys = availableActions.keys()
0405     if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click
0406         print("actioning focus")
0407         action.doAction(availableActions['SetFocus'])
0408         time.sleep(EVENTLOOP_TIME)
0409 
0410     if 'Press' in keys:
0411         print("actioning press")
0412         action.doAction(availableActions['Press'])
0413     elif 'Toggle' in keys:
0414         print("actioning toggle")
0415         action.doAction(availableActions['Toggle'])
0416     time.sleep(EVENTLOOP_TIME)
0417 
0418     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0419 
0420 
0421 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET'])
0422 def session_element_text(session_id, element_id):
0423     session = sessions[session_id]
0424     element = session.elements[element_id]
0425     try:
0426         textElement = element.queryText()
0427         return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'}
0428     except NotImplementedError:
0429         return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'}
0430 
0431 
0432 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET'])
0433 def session_element_enabled(session_id, element_id):
0434     session = sessions[session_id]
0435     element = session.elements[element_id]
0436     return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'}
0437 
0438 
0439 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET'])
0440 def session_element_displayed(session_id, element_id):
0441     session = sessions[session_id]
0442     element = session.elements[element_id]
0443     return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'}
0444 
0445 
0446 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET'])
0447 def session_element_selected(session_id, element_id):
0448     session = sessions[session_id]
0449     element = session.elements[element_id]
0450     return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'}
0451 
0452 
0453 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET'])
0454 def session_element_attribute(session_id, element_id, name):
0455     session = sessions[session_id]
0456     if not session:
0457         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0458 
0459     element = session.elements[element_id]
0460     if not element:
0461         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0462 
0463     print(request.data)
0464 
0465     if name == "accessibility-id":
0466         return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'}
0467 
0468     if name == "value":
0469         elementValue = element.queryValue()
0470         return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'}
0471 
0472     print(pyatspi.STATE_VALUE_TO_NAME)
0473     result = None
0474     for value, string in pyatspi.STATE_VALUE_TO_NAME.items():
0475         if string == name:
0476             result = element.getState().contains(value)
0477             break
0478 
0479     return json.dumps({'value': result}), 200, {'content-type': 'application/json'}
0480 
0481 
0482 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST'])
0483 def session_element_element(session_id, element_id):
0484     session = sessions[session_id]
0485     if not session:
0486         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0487 
0488     blob = json.loads(request.data)
0489     print(blob)
0490     strategy = blob['using']
0491     selector = blob['value']
0492     if not strategy or not selector:
0493         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0494 
0495     start = session.elements[element_id]
0496     if not start:  # browsing context (no longer) valid
0497         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0498 
0499     return locator(session, strategy, selector, start)
0500 
0501 
0502 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST'])
0503 def session_element_value(session_id, element_id):
0504     session = sessions[session_id]
0505     if not session:
0506         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0507 
0508     element = session.elements[element_id]
0509     if not element:
0510         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0511 
0512     blob = json.loads(request.data)
0513     text = blob['text']
0514 
0515     print(blob)
0516     print(element)
0517 
0518     try:
0519         offset = element.queryText().caretOffset
0520         textElement = element.queryEditableText()
0521         textElement.insertText(offset, text, len(text))
0522         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0523     except NotImplementedError:
0524         print("element is not text type, falling back to synthesizing keyboard events")
0525         action = element.queryAction()
0526         processed = False
0527         for i in range(0, action.nActions):
0528             if action.getName(i) == 'SetFocus':
0529                 processed = True
0530                 action.doAction(i)
0531                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0532                 for ch in text:
0533                     generate_keyboard_event(ch)
0534                 break
0535         if not processed:
0536             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0537         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0538 
0539 
0540 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST'])
0541 def session_element_clear(session_id, element_id):
0542     session = sessions[session_id]
0543     if not session:
0544         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0545 
0546     element = session.elements[element_id]
0547     if not element:
0548         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0549 
0550     characterCount = element.queryText().characterCount
0551     try:
0552         textElement = element.queryEditableText()
0553         textElement.deleteText(0, characterCount)
0554         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0555     except NotImplementedError:
0556         print("element is not text type, falling back to synthesizing keyboard events")
0557         action = element.queryAction()
0558         processed = False
0559         for i in range(0, action.nActions):
0560             if action.getName(i) == 'SetFocus':
0561                 processed = True
0562                 action.doAction(i)
0563                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0564                 generate_keyboard_event('\ue010') # end
0565                 for _ in range(characterCount):
0566                     generate_keyboard_event('\ue003') # backspace
0567                 break
0568         if not processed:
0569             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0570         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0571 
0572 
0573 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST'])
0574 def session_appium_device_app_state(session_id):
0575     session = sessions[session_id]
0576     if not session:
0577         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0578 
0579     blob = json.loads(request.data)
0580     appId = blob['appId']
0581 
0582     proc = subprocess.Popen(
0583         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0584     out, err = proc.communicate()
0585 
0586     apps = json.loads(out)
0587     print(apps)
0588     if appId in apps.values():
0589         return json.dumps({'value': 4}), 200, {'content-type': 'application/json'}
0590     # TODO: implement rest of codes
0591     return json.dumps({'value': 1}), 200, {'content-type': 'application/json'}
0592 
0593 
0594 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST'])
0595 def session_appium_device_terminate_app(session_id):
0596     session = sessions[session_id]
0597     if not session:
0598         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0599 
0600     blob = json.loads(request.data)
0601     appId = blob['appId']
0602 
0603     proc = subprocess.Popen(
0604         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0605     out, err = proc.communicate()
0606 
0607     apps = json.loads(out)
0608     if appId in apps.values():
0609         pid = list(apps.keys())[list(apps.values()).index(appId)]
0610         os.kill(int(pid), signal.SIGKILL)
0611     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0612 
0613 
0614 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST'])
0615 def session_appium_device_press_keycode(session_id):
0616     session = sessions[session_id]
0617     if not session:
0618         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0619 
0620     blob = json.loads(request.data)
0621     keycode = blob['keycode']
0622     # Not doing anything with these for now
0623     # metastate = blob['metastate']
0624     # flags = blob['flags']
0625     for ch in keycode:
0626         generate_keyboard_event(ch)
0627     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0628 
0629 
0630 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST'])
0631 def session_appium_device_get_clipboard(session_id):
0632     session = sessions[session_id]
0633     if not session:
0634         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0635 
0636     blob = json.loads(request.data)
0637     contentType = blob['contentType']
0638 
0639     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0640     window = Gtk.Window()
0641     window.set_default_size(20, 20);
0642     window.show()
0643     display = window.get_display()
0644     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0645 
0646     context = GLib.MainContext.default()
0647     for _ in range(4):
0648         time.sleep(0.5)
0649         while context.pending():
0650             context.iteration(may_block=False)
0651 
0652     data = None
0653     if contentType == 'plaintext':
0654         data = clipboard.wait_for_text()
0655     else:
0656         raise 'content type not currently supported'
0657 
0658     window.close()
0659     return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'}
0660 
0661 
0662 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST'])
0663 def session_appium_device_set_clipboard(session_id):
0664     session = sessions[session_id]
0665     if not session:
0666         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0667 
0668     blob = json.loads(request.data)
0669     contentType = blob['contentType']
0670     content = blob['content']
0671 
0672     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0673     window = Gtk.Window()
0674     window.set_default_size(20, 20);
0675     display = window.get_display()
0676     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0677 
0678     if contentType == 'plaintext':
0679         clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1)
0680     else:
0681         raise 'content type not currently supported'
0682 
0683     context = GLib.MainContext.default()
0684     for _ in range(4):
0685         time.sleep(0.5)
0686         while context.pending():
0687             context.iteration(may_block=False)
0688 
0689     window.close()
0690     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0691 
0692 
0693 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST'])
0694 def session_appium_element_value(session_id, element_id):
0695     session = sessions[session_id]
0696     if not session:
0697         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0698 
0699     element = session.elements[element_id]
0700     if not element:
0701         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0702 
0703     blob = json.loads(request.data)
0704     print(blob)
0705     print(element)
0706     value = blob['value'][0]
0707 
0708     try:
0709         valueElement = element.queryValue()
0710         valueElement.currentValue = float(value)
0711         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0712     except NotImplementedError:
0713         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0714 
0715 
0716 @app.route('/session/<session_id>/screenshot', methods=['GET'])
0717 def session_appium_screenshot(session_id):
0718     session = sessions[session_id]
0719     if not session:
0720         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0721 
0722     # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display
0723     # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN)
0724     # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize()
0725 
0726     proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter',
0727                              str(0), str(0), str(0), str(0)],
0728                             stdout=subprocess.PIPE)
0729     out, err = proc.communicate()
0730 
0731     if not out:
0732         return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'}
0733 
0734     return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'}
0735 
0736 
0737 def generate_keyboard_event(ch):
0738     if 'KWIN_PID' in os.environ: # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now)
0739         with tempfile.NamedTemporaryFile() as fp:
0740             keymap = keyval_to_keycode(char_to_keyval(ch))
0741             fp.write(json.dumps([
0742                 {'type': 'keyboard', 'level': keymap.level, 'keycode': keymap.keycode}
0743             ]).encode())
0744             fp.flush()
0745             subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name])
0746         time.sleep(EVENTLOOP_TIME)
0747     else:
0748         pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM)
0749         time.sleep(EVENTLOOP_TIME)
0750 
0751 
0752 def keyval_to_keycode(keyval):
0753     keymap = Gdk.Keymap.get_default()
0754     ret, keys = keymap.get_entries_for_keyval(keyval)
0755     if not ret:
0756         raise RuntimeError("Failed to map key!")
0757     return keys[0]
0758 
0759 
0760 def char_to_keyval(ch):
0761     print("----------::::::")
0762     keyval = Gdk.unicode_to_keyval(ord(ch))
0763     # I Don't know why this doesn't work, also doesn't work with \033 as input. :((
0764     # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c
0765     # Other useful resources:
0766     # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h
0767     if ch == "\uE00C":
0768         keyval = 0xff1b # escape
0769     elif ch == "\ue03d":
0770         keyval = 0xffeb # left meta
0771     elif ch == "\ue006":
0772         keyval = 0xff0d # return
0773     elif ch == "\ue007":
0774         keyval = 0xff8d # enter
0775     elif ch == "\ue003":
0776         keyval = 0xff08 # backspace
0777     elif ch == "\ue010":
0778         keyval = 0xff57 # end
0779     elif ch == "\ue012":
0780         keyval = 0xff51 # left
0781     elif ch == "\ue014":
0782         keyval = 0xff53 # right
0783     elif ch == "\ue013":
0784         keyval = 0xff52 # up
0785     elif ch == "\ue015":
0786         keyval = 0xff54 # down
0787     elif ch == "\ue004":
0788         keyval = 0xff09 # tab
0789     print(ord(ch))
0790     print(hex(keyval))
0791     return keyval