File indexing completed on 2023-09-24 09:47:50

0001 # SPDX-License-Identifier: AGPL-3.0-or-later
0002 # SPDX-FileCopyrightText: 2021-2023 Harald Sitter <sitter@kde.org>
0003 
0004 import base64
0005 from datetime import datetime, timedelta
0006 import tempfile
0007 import time
0008 import traceback
0009 from flask import Flask, request, jsonify
0010 import uuid
0011 import json
0012 import sys
0013 import os
0014 import signal
0015 import subprocess
0016 from werkzeug.exceptions import HTTPException
0017 
0018 import pyatspi
0019 from lxml import etree
0020 
0021 import gi
0022 from gi.repository import GLib
0023 from gi.repository import Gio
0024 gi.require_version('Gdk', '3.0')
0025 from gi.repository import Gdk
0026 gi.require_version('Gtk', '3.0')
0027 from gi.repository import Gtk
0028 
0029 from app_roles import ROLE_NAMES
0030 
0031 # Exposes AT-SPI as a webdriver. This is written in python because C sucks and pyatspi is a first class binding so
0032 # we lose nothing but gain the reduced sucking of python.
0033 
0034 # https://github.com/SeleniumHQ/selenium/wiki/JsonWireProtocol#WebElement_JSON_Object.md
0035 # https://www.w3.org/TR/webdriver
0036 # https://github.com/microsoft/WinAppDriver/blob/master/Docs/SupportedAPIs.md
0037 # https://www.freedesktop.org/wiki/Accessibility/PyAtSpi2Example/
0038 
0039 EVENTLOOP_TIME = 0.1
0040 sys.stdout = sys.stderr
0041 sessions = {} # global dict of open sessions
0042 
0043 # Give the GUI enough time to react. tests run on the CI won't always be responsive in the tight schedule established by at-spi2 (800ms) and run risk
0044 # of timing out on (e.g.) click events. The second value is the timeout for app startup, we keep that the same as upstream.
0045 pyatspi.setTimeout(4000, 15000)
0046 
0047 # Using flask because I know nothing about writing REST in python and it seemed the most straight-forward framework.
0048 app = Flask(__name__)
0049 
0050 @app.errorhandler(Exception)
0051 def unknown_error(e):
0052     if isinstance(e, HTTPException):
0053         return e
0054 
0055     return errorFromException(error='unknown error', exception=e), 500
0056 
0057 @app.route('/status', methods=['GET'])
0058 def status():
0059     body = {
0060         'value': {
0061             'ready': 'true',
0062             'message': 'There is only one state. Hooray!'
0063         }
0064     }
0065     return json.dumps(body), 200, {'content-type': 'application/json'}
0066 
0067 
0068 def _createNode2(accessible, parentElement, indexInParents=[]):
0069     if not accessible:
0070         return
0071     # A bit of aggressive filtering to not introspect chromium and firefox and the likes when using the desktop root.
0072     if accessible.toolkitName != "Qt" and accessible.toolkitName != "at-spi-registry":
0073         return
0074 
0075     roleName = accessible.getRoleName()
0076     e = None
0077     if roleName:
0078         e = etree.Element(roleName.replace(" ", "_"))
0079     else:
0080         e = etree.Element("accessible")
0081 
0082     e.set("name", accessible.name)
0083     e.set("role", str(int(accessible.getRole())))
0084     e.set("description", accessible.description)
0085     if accessible.accessibleId != None:
0086         e.set("accessibility-id", accessible.accessibleId)
0087     # NB: pyatspi.getPath is bugged when the QObject has no QObject parent. Instead manually keep track of indexes.
0088     # while generating the xml.
0089     # path = pyatspi.getPath(accessible)
0090     # path is a list of ints for the indexes within the parents
0091     path_strs = [str(x) for x in indexInParents]
0092     e.set("path", ' '.join(path_strs))
0093 
0094     states = []
0095     for state in accessible.getState().getStates():
0096         states.append(pyatspi.stateToString(state))
0097     e.set("states", ', '.join(states))
0098 
0099     for i in range(0, accessible.childCount):
0100         newIndex = indexInParents.copy()
0101         newIndex.append(i)
0102         _createNode2(accessible.getChildAtIndex(i), e, newIndex)
0103 
0104     if parentElement != None:
0105         parentElement.append(e)
0106     else:
0107         return e
0108 
0109 
0110 def errorFromMessage(error, message):
0111     return jsonify({'value': {'error': error, 'message': message}})
0112 
0113 
0114 def errorFromException(error, exception):
0115     return jsonify({'value': {'error': error, 'message': str(exception), 'stacktrace': traceback.format_exc()}})
0116 
0117 
0118 @app.route('/')
0119 def index():
0120     return 'Servas'
0121 
0122 # Encapsulates a Session object. Sessions are opened by the client and contain elements. A session is generally speaking
0123 # an app.
0124 # TODO: should we expose the root scope somehow? requires a special variant of session and moving logic from the
0125 #   REST app functions to the Session object.
0126 
0127 
0128 class Session:
0129 
0130     def __init__(self) -> None:
0131         self.id = str(uuid.uuid1())
0132         self.elements = {}  # a cache to hold elements between finding and interacting with
0133         self.browsing_context = None
0134         self.pid = -1
0135         # implicit deviates from spec, 0 is unreasonable
0136         self.timeouts = {'script': 30000, 'pageLoad': 300000, 'implicit': 5000}
0137         self.launched = False
0138 
0139         blob = json.loads(request.data)
0140         print(request.data)
0141         # TODO the blob from ruby is much more complicated god only knows why
0142         desired_app = None
0143         desired_timeouts = None
0144         if 'desiredCapabilities' in blob:
0145             if 'app' in blob['desiredCapabilities']:
0146                 desired_app = blob['desiredCapabilities']['app']
0147             else:
0148                 desired_app = blob['desiredCapabilities']['appium:app']
0149             if 'timeouts' in blob['desiredCapabilities']:
0150                 desired_timeouts = blob['desiredCapabilities']['timeouts']
0151         else:
0152             if 'app' in blob['capabilities']['alwaysMatch']:
0153                 desired_app = blob['capabilities']['alwaysMatch']['app']
0154             else:
0155                 desired_app = blob['capabilities']['alwaysMatch']['appium:app']
0156             if 'timeouts' in blob['capabilities']['alwaysMatch']:
0157                 desired_timeouts = blob['capabilities']['alwaysMatch']['timeouts']
0158 
0159         if desired_timeouts:
0160             if 'script' in desired_timeouts:
0161                 self.timeouts['script'] = desired_timeouts['script']
0162             if 'pageLoad' in desired_timeouts:
0163                 self.timeouts['pageLoad'] = desired_timeouts['pageLoad']
0164             if 'implicit' in desired_timeouts:
0165                 self.timeouts['implicit'] = desired_timeouts['implicit']
0166 
0167         if desired_app == 'Root':
0168             # NB: at the time of writing there can only be one desktop ever
0169             self.browsing_context = pyatspi.Registry.getDesktop(0)
0170             return
0171 
0172         self.launched = True
0173         end_time = datetime.now() + \
0174             timedelta(milliseconds=(self.timeouts['implicit'] * 2))
0175 
0176         context = Gio.AppLaunchContext()
0177         context.setenv('QT_ACCESSIBILITY', '1')
0178         context.setenv('QT_LINUX_ACCESSIBILITY_ALWAYS_ON', '1')
0179         context.setenv('KIO_DISABLE_CACHE_CLEANER', '1')  # don't dangle
0180 
0181         def on_launched(context, info, platform_data):
0182             self.pid = platform_data['pid']
0183             print("launched " + str(self.pid))
0184 
0185             while datetime.now() < end_time:
0186                 for desktop_index in range(pyatspi.Registry.getDesktopCount()):
0187                     desktop = pyatspi.Registry.getDesktop(desktop_index)
0188                     for app in desktop:
0189                         try:
0190                             if app.get_process_id() == self.pid:
0191                                 self.browsing_context = app
0192                                 break
0193                         except gi.repository.GLib.GError:
0194                             print('stumbled over a broken process. ignoring...')
0195                             continue
0196                     if self.browsing_context:
0197                         break
0198                 if self.browsing_context:
0199                     break
0200             if not self.browsing_context:
0201                 raise RuntimeError('Failed to find application on a11y bus within time limit!')
0202 
0203         context.connect("launched", on_launched)
0204 
0205         if desired_app.endswith(".desktop"):
0206             appinfo = Gio.DesktopAppInfo.new(desired_app)
0207             appinfo.launch([], context)
0208         elif desired_app.isnumeric():
0209             on_launched(None, None, {'pid': int(desired_app)})
0210         else:
0211             appinfo = Gio.AppInfo.create_from_commandline(
0212                 desired_app, None, Gio.AppInfoCreateFlags.NONE)
0213             appinfo.launch([], context)
0214         print(self.browsing_context)
0215 
0216     def close(self) -> None:
0217         if self.launched:
0218             try:
0219                 os.kill(self.pid, signal.SIGKILL)
0220             except ProcessLookupError:
0221                 pass
0222 
0223 
0224 @app.route('/session', methods=['GET', 'POST', 'DELETE'])
0225 def session():
0226     if request.method == 'POST':
0227         # TODO:
0228         # https://www.w3.org/TR/webdriver1/#new-session
0229         # 1, 3, 4, 5, 8, 9, 11, 12, 13, 14
0230         print(request)
0231         try:
0232             session = Session()
0233         except Exception as e:
0234             return errorFromException(error='session not created', exception=e), 500
0235         sessions[session.id] = session
0236         print(sessions)
0237 
0238         if session.browsing_context is None:
0239             return errorFromMessage(error='session not created',
0240                                     message='Application was not found on the a11y bus for unknown reasons. It probably failed to register on the bus.'), 500
0241 
0242         return json.dumps({'value': {'sessionId': session.id, 'capabilities': {"app": session.browsing_context.name}}}), 200, {'content-type': 'application/json'}
0243     elif request.method == 'GET':
0244         # TODO impl
0245         print(request)
0246     elif request.method == 'DELETE':
0247         # TODO spec review
0248         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0249 
0250 
0251 @app.route('/session/<session_id>', methods=['DELETE'])
0252 def session_delete(session_id):
0253     if request.method == 'DELETE':
0254         # TODO spec review
0255         session = sessions[session_id]
0256         if not session:
0257             return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0258 
0259         session.close()
0260         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0261 
0262 
0263 @app.route('/session/<session_id>/timeouts/implicit_wait', methods=['POST'])
0264 def session_implicit_wait(session_id):
0265     session = sessions[session_id]
0266     if not session:
0267         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0268 
0269     blob = json.loads(request.data)
0270     ms = blob['ms']
0271 
0272     session.timeouts['implicit'] = ms
0273     return json.dumps({'value': None})
0274 
0275 
0276 @app.route('/session/<session_id>/source', methods=['GET'])
0277 def session_source(session_id):
0278     session = sessions[session_id]
0279     if not session:
0280         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0281 
0282     doc = _createNode2(session.browsing_context, None)
0283     return json.dumps({'value': etree.tostring(doc, pretty_print=False).decode("utf-8")}), 200, {'content-type': 'application/xml'}
0284 
0285 
0286 # NB: custom method to get the source without json wrapper
0287 @app.route('/session/<session_id>/sourceRaw', methods=['GET'])
0288 def session_source_raw(session_id):
0289     session = sessions[session_id]
0290     if not session:
0291         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0292 
0293     doc = _createNode2(session.browsing_context, None)
0294     return etree.tostring(doc, pretty_print=True).decode("utf-8"), 200, {'content-type': 'application/xml'}
0295 
0296 
0297 def locator(session, strategy, selector, start):
0298     # pyatspi.findDescendant(start, lambda x: print(x))
0299 
0300     end_time = datetime.now() + \
0301         timedelta(milliseconds=session.timeouts['implicit'])
0302     results = []
0303 
0304     while datetime.now() < end_time:
0305         if strategy == 'xpath':
0306             print("-- xml")
0307             doc = _createNode2(start, None)
0308             for c in doc.xpath(selector):
0309                 path = [int(x) for x in c.get('path').split()]
0310                 # path is relative to the app root, not our start item!
0311                 item = session.browsing_context
0312                 for i in path:
0313                     item = item[i]
0314                 if c.get('name') != item.name or c.get('description') != item.description:
0315                     return []
0316                 results.append(item)
0317             print("-- xml")
0318         else:
0319             # TODO can I switch this in python +++ raise on unmapped strategy
0320             pred = None
0321             if strategy == 'accessibility id':
0322                 def pred(x): return x.accessibleId.endswith(selector) and (x.getState().contains(pyatspi.STATE_VISIBLE) and x.getState().contains(pyatspi.STATE_SENSITIVE))
0323             # pyatspi strings "[ roleName | name ]"
0324             elif strategy == 'class name':
0325                 def pred(x): return str(x) == selector and (x.getState().contains(
0326                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0327             elif strategy == 'name':
0328                 def pred(x): return x.name == selector and (x.getState().contains(
0329                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0330             elif strategy == 'description':
0331                 def pred(x): return x.description == selector and (x.getState().contains(
0332                     pyatspi.STATE_VISIBLE) or x.getState().contains(pyatspi.STATE_SENSITIVE))
0333             # there are also id and accessibleId but they seem not ever set. Not sure what to make of that :shrug:
0334             accessible = pyatspi.findDescendant(start, pred)
0335             print(accessible)
0336             if accessible:
0337                 results.append(accessible)
0338         if len(results) > 0:
0339             break
0340 
0341     return results
0342 
0343 
0344 @app.route('/session/<session_id>/element', methods=['GET', 'POST'])
0345 def session_element(session_id=None):
0346     # https://www.w3.org/TR/webdriver1/#dfn-find-element
0347 
0348     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0349     print(request.url)
0350     print(session_id)
0351     print(request.args)
0352     print(request.data)
0353     session = sessions[session_id]
0354     blob = json.loads(request.data)
0355 
0356     strategy = blob['using']
0357     selector = blob['value']
0358     if not strategy or not selector:
0359         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0360 
0361     start = session.browsing_context
0362     if not start: # browsing context (no longer) valid
0363         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0364 
0365     results = locator(session, strategy, selector, start)
0366 
0367     if not results:
0368         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0369 
0370     result = results[0]
0371     unique_id = result.path.replace('/', '-')
0372     session.elements[unique_id] = result
0373     return json.dumps({'value': {'element-6066-11e4-a52e-4f735466cecf': unique_id}}), 200, {'content-type': 'application/json'}
0374 
0375 
0376 @app.route('/session/<session_id>/elements', methods=['GET', 'POST'])
0377 def session_element2(session_id=None):
0378     # https://www.w3.org/TR/webdriver1/#dfn-find-elements
0379 
0380     # TODO scope elements to session somehow when the session gets closed we can throw away the references
0381     print(request.url)
0382     print(session_id)
0383     print(request.args)
0384     print(request.data)
0385     session = sessions[session_id]
0386     blob = json.loads(request.data)
0387 
0388     strategy = blob['using']
0389     selector = blob['value']
0390     if not strategy or not selector:
0391         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0392 
0393     start = session.browsing_context
0394     if not start:  # browsing context (no longer) valid
0395         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0396 
0397     results = locator(session, strategy, selector, start)
0398 
0399     if not results:
0400         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0401 
0402     serializations = []
0403     for result in results:
0404         unique_id = result.path.replace('/', '-')
0405         print(unique_id)
0406         session.elements[unique_id] = result
0407         serializations.append({'element-6066-11e4-a52e-4f735466cecf': unique_id})
0408 
0409     return json.dumps({'value': serializations}), 200, {'content-type': 'application/json'}
0410 
0411 
0412 @app.route('/session/<session_id>/element/<element_id>/click', methods=['GET', 'POST'])
0413 def session_element_click(session_id, element_id):
0414     session = sessions[session_id]
0415     if not session:
0416         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0417 
0418     element = session.elements[element_id]
0419     if not element:
0420         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0421 
0422     action = element.queryAction()
0423     availableActions = {}
0424     for i in range(0, action.nActions):
0425         availableActions[action.getName(i)] = i
0426 
0427     keys = availableActions.keys()
0428     if 'SetFocus' in keys: # this is used in addition to actual actions so focus is where it would be expected after a click
0429         print("actioning focus")
0430         action.doAction(availableActions['SetFocus'])
0431         time.sleep(EVENTLOOP_TIME)
0432 
0433     try:
0434         if 'Press' in keys:
0435             print("actioning press")
0436             action.doAction(availableActions['Press'])
0437         elif 'Toggle' in keys:
0438             print("actioning toggle")
0439             action.doAction(availableActions['Toggle'])
0440         elif 'ShowMenu' in keys:
0441             print("actioning showmenu")
0442             action.doAction(availableActions['ShowMenu'])
0443     except gi.repository.GLib.GError as e:
0444         print(e)
0445         print("Ignoring! There is a chance your application is misbehaving. Could also just be a blocked eventloop though.")
0446     time.sleep(EVENTLOOP_TIME)
0447 
0448     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0449 
0450 
0451 @app.route('/session/<session_id>/element/<element_id>/text', methods=['GET'])
0452 def session_element_text(session_id, element_id):
0453     session = sessions[session_id]
0454     element = session.elements[element_id]
0455     try:
0456         textElement = element.queryText()
0457         return json.dumps({'value': textElement.getText(0, -1)}), 200, {'content-type': 'application/json'}
0458     except NotImplementedError:
0459         return json.dumps({'value': element.name}), 200, {'content-type': 'application/json'}
0460 
0461 
0462 @app.route('/session/<session_id>/element/<element_id>/enabled', methods=['GET'])
0463 def session_element_enabled(session_id, element_id):
0464     session = sessions[session_id]
0465     element = session.elements[element_id]
0466     return json.dumps({'value': element.getState().contains(pyatspi.STATE_ENABLED)}), 200, {'content-type': 'application/json'}
0467 
0468 
0469 @app.route('/session/<session_id>/element/<element_id>/displayed', methods=['GET'])
0470 def session_element_displayed(session_id, element_id):
0471     session = sessions[session_id]
0472     element = session.elements[element_id]
0473     return json.dumps({'value': element.getState().contains(pyatspi.STATE_VISIBLE) and element.getState().contains(pyatspi.STATE_SHOWING)}), 200, {'content-type': 'application/json'}
0474 
0475 
0476 @app.route('/session/<session_id>/element/<element_id>/selected', methods=['GET'])
0477 def session_element_selected(session_id, element_id):
0478     session = sessions[session_id]
0479     element = session.elements[element_id]
0480     return json.dumps({'value': (element.getState().contains(pyatspi.STATE_SELECTED) or element.getState().contains(pyatspi.STATE_FOCUSED))}), 200, {'content-type': 'application/json'}
0481 
0482 
0483 @app.route('/session/<session_id>/element/<element_id>/attribute/<name>', methods=['GET'])
0484 def session_element_attribute(session_id, element_id, name):
0485     session = sessions[session_id]
0486     if not session:
0487         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0488 
0489     element = session.elements[element_id]
0490     if not element:
0491         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0492 
0493     print(request.data)
0494 
0495     if name == "accessibility-id":
0496         return json.dumps({'value': element.accessibleId}), 200, {'content-type': 'application/json'}
0497 
0498     if name == "value":
0499         elementValue = element.queryValue()
0500         return json.dumps({'value': elementValue.currentValue}), 200, {'content-type': 'application/json'}
0501 
0502     print(pyatspi.STATE_VALUE_TO_NAME)
0503     result = None
0504     for value, string in pyatspi.STATE_VALUE_TO_NAME.items():
0505         if string == name:
0506             result = element.getState().contains(value)
0507             break
0508 
0509     return json.dumps({'value': result}), 200, {'content-type': 'application/json'}
0510 
0511 
0512 @app.route('/session/<session_id>/element/<element_id>/element', methods=['POST'])
0513 def session_element_element(session_id, element_id):
0514     session = sessions[session_id]
0515     if not session:
0516         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0517 
0518     blob = json.loads(request.data)
0519     print(blob)
0520     strategy = blob['using']
0521     selector = blob['value']
0522     if not strategy or not selector:
0523         return json.dumps({'value': {'error': 'invalid argument'}}), 404, {'content-type': 'application/json'}
0524 
0525     start = session.elements[element_id]
0526     if not start:  # browsing context (no longer) valid
0527         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0528 
0529     return locator(session, strategy, selector, start)
0530 
0531 
0532 @app.route('/session/<session_id>/element/<element_id>/value', methods=['POST'])
0533 def session_element_value(session_id, element_id):
0534     session = sessions[session_id]
0535     if not session:
0536         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0537 
0538     element = session.elements[element_id]
0539     if not element:
0540         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0541 
0542     blob = json.loads(request.data)
0543     text = blob['text']
0544 
0545     print(blob)
0546     print(element)
0547 
0548     try:
0549         offset = element.queryText().caretOffset
0550         textElement = element.queryEditableText()
0551         textElement.insertText(offset, text, len(text))
0552         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0553     except NotImplementedError:
0554         print("element is not text type, falling back to synthesizing keyboard events")
0555         action = element.queryAction()
0556         processed = False
0557         for i in range(0, action.nActions):
0558             if action.getName(i) == 'SetFocus':
0559                 processed = True
0560                 action.doAction(i)
0561                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0562                 for ch in text:
0563                     generate_keyboard_event(ch)
0564                 break
0565         if not processed:
0566             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0567         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0568 
0569 
0570 @app.route('/session/<session_id>/element/<element_id>/clear', methods=['POST'])
0571 def session_element_clear(session_id, element_id):
0572     session = sessions[session_id]
0573     if not session:
0574         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0575 
0576     element = session.elements[element_id]
0577     if not element:
0578         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0579 
0580     characterCount = element.queryText().characterCount
0581     try:
0582         textElement = element.queryEditableText()
0583         textElement.deleteText(0, characterCount)
0584         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0585     except NotImplementedError:
0586         print("element is not text type, falling back to synthesizing keyboard events")
0587         action = element.queryAction()
0588         processed = False
0589         for i in range(0, action.nActions):
0590             if action.getName(i) == 'SetFocus':
0591                 processed = True
0592                 action.doAction(i)
0593                 time.sleep(EVENTLOOP_TIME) # give the focus time to apply
0594                 generate_keyboard_event('\ue010') # end
0595                 for _ in range(characterCount):
0596                     generate_keyboard_event('\ue003') # backspace
0597                 break
0598         if not processed:
0599             raise RuntimeError("element's actions list didn't contain SetFocus. The element may be malformed")
0600         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0601 
0602 
0603 @app.route('/session/<session_id>/appium/device/app_state', methods=['POST'])
0604 def session_appium_device_app_state(session_id):
0605     session = sessions[session_id]
0606     if not session:
0607         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0608 
0609     blob = json.loads(request.data)
0610     appId = blob['appId']
0611 
0612     proc = subprocess.Popen(
0613         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0614     out, err = proc.communicate()
0615 
0616     apps = json.loads(out)
0617     print(apps)
0618     if appId in apps.values():
0619         return json.dumps({'value': 4}), 200, {'content-type': 'application/json'}
0620     # TODO: implement rest of codes
0621     return json.dumps({'value': 1}), 200, {'content-type': 'application/json'}
0622 
0623 
0624 @app.route('/session/<session_id>/appium/device/terminate_app', methods=['POST'])
0625 def session_appium_device_terminate_app(session_id):
0626     session = sessions[session_id]
0627     if not session:
0628         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0629 
0630     blob = json.loads(request.data)
0631     appId = blob['appId']
0632 
0633     proc = subprocess.Popen(
0634         'selenium-webdriver-at-spi-appidlister', stdout=subprocess.PIPE)
0635     out, err = proc.communicate()
0636 
0637     apps = json.loads(out)
0638     if appId in apps.values():
0639         pid = list(apps.keys())[list(apps.values()).index(appId)]
0640         os.kill(int(pid), signal.SIGKILL)
0641     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0642 
0643 
0644 @app.route('/session/<session_id>/appium/device/press_keycode', methods=['POST'])
0645 def session_appium_device_press_keycode(session_id):
0646     session = sessions[session_id]
0647     if not session:
0648         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0649 
0650     blob = json.loads(request.data)
0651     keycode = blob['keycode']
0652     # Not doing anything with these for now
0653     # metastate = blob['metastate']
0654     # flags = blob['flags']
0655     for ch in keycode:
0656         generate_keyboard_event(ch)
0657     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0658 
0659 
0660 @app.route('/session/<session_id>/appium/device/get_clipboard', methods=['POST'])
0661 def session_appium_device_get_clipboard(session_id):
0662     session = sessions[session_id]
0663     if not session:
0664         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0665 
0666     blob = json.loads(request.data)
0667     contentType = blob['contentType']
0668 
0669     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0670     window = Gtk.Window()
0671     window.set_default_size(20, 20)
0672     window.show()
0673     display = window.get_display()
0674     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0675 
0676     context = GLib.MainContext.default()
0677     for _ in range(4):
0678         time.sleep(0.5)
0679         while context.pending():
0680             context.iteration(may_block=False)
0681 
0682     data = None
0683     if contentType == 'plaintext':
0684         data = clipboard.wait_for_text()
0685     else:
0686         raise 'content type not currently supported'
0687 
0688     window.close()
0689     return json.dumps({'value': base64.b64encode(data.encode('utf-8')).decode('utf-8')}), 200, {'content-type': 'application/json'}
0690 
0691 
0692 @app.route('/session/<session_id>/appium/device/set_clipboard', methods=['POST'])
0693 def session_appium_device_set_clipboard(session_id):
0694     session = sessions[session_id]
0695     if not session:
0696         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0697 
0698     blob = json.loads(request.data)
0699     contentType = blob['contentType']
0700     content = blob['content']
0701 
0702     # NOTE: need a window because on wayland we must be the active window to manipulate the clipboard (currently anyway)
0703     window = Gtk.Window()
0704     window.set_default_size(20, 20)
0705     display = window.get_display()
0706     clipboard = Gtk.Clipboard.get_for_display(display, Gdk.SELECTION_CLIPBOARD)
0707 
0708     if contentType == 'plaintext':
0709         clipboard.set_text(base64.b64decode(content).decode('utf-8'), -1)
0710     else:
0711         raise 'content type not currently supported'
0712 
0713     context = GLib.MainContext.default()
0714     for _ in range(4):
0715         time.sleep(0.5)
0716         while context.pending():
0717             context.iteration(may_block=False)
0718 
0719     window.close()
0720     return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0721 
0722 
0723 @app.route('/session/<session_id>/appium/element/<element_id>/value', methods=['POST'])
0724 def session_appium_element_value(session_id, element_id):
0725     session = sessions[session_id]
0726     if not session:
0727         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0728 
0729     element = session.elements[element_id]
0730     if not element:
0731         return json.dumps({'value': {'error': 'no such element'}}), 404, {'content-type': 'application/json'}
0732 
0733     blob = json.loads(request.data)
0734     print(blob)
0735     print(element)
0736     value = blob['value'][0]
0737 
0738     try:
0739         valueElement = element.queryValue()
0740         valueElement.currentValue = float(value)
0741         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0742     except NotImplementedError:
0743         return json.dumps({'value': None}), 200, {'content-type': 'application/json'}
0744 
0745 
0746 @app.route('/session/<session_id>/screenshot', methods=['GET'])
0747 def session_appium_screenshot(session_id):
0748     session = sessions[session_id]
0749     if not session:
0750         return json.dumps({'value': {'error': 'no such window'}}), 404, {'content-type': 'application/json'}
0751 
0752     # NB: these values incorrectly do not include the device pixel ratio, so they are off when used on a scaling display
0753     # position_x, position_y = session.browsing_context.getChildAtIndex(0).queryComponent().getPosition(pyatspi.XY_SCREEN)
0754     # size_width, size_height = session.browsing_context.getChildAtIndex(0).queryComponent().getSize()
0755 
0756     proc = subprocess.Popen(['selenium-webdriver-at-spi-screenshotter',
0757                              str(0), str(0), str(0), str(0)],
0758                             stdout=subprocess.PIPE)
0759     out, err = proc.communicate()
0760 
0761     if not out:
0762         return json.dumps({'value': {'error': err}}), 404, {'content-type': 'application/json'}
0763 
0764     return json.dumps({'value': out.decode('utf-8')}), 200, {'content-type': 'application/json'}
0765 
0766 
0767 def generate_keyboard_event(ch):
0768     # using a nested kwin. need to synthesize keys into wayland (not supported in atspi right now)
0769     if 'KWIN_PID' in os.environ:
0770         with tempfile.NamedTemporaryFile() as fp:
0771             keymap = keyval_to_keycode(char_to_keyval(ch))
0772             fp.write(json.dumps([
0773                 {'type': 'keyboard', 'level': keymap.level, 'keycode': keymap.keycode}
0774             ]).encode())
0775             fp.flush()
0776             subprocess.run(["selenium-webdriver-at-spi-inputsynth", fp.name])
0777         time.sleep(EVENTLOOP_TIME)
0778     else:
0779         pyatspi.Registry.generateKeyboardEvent(char_to_keyval(ch), None, pyatspi.KEY_SYM)
0780         time.sleep(EVENTLOOP_TIME)
0781 
0782 
0783 def keyval_to_keycode(keyval):
0784     keymap = Gdk.Keymap.get_default()
0785     ret, keys = keymap.get_entries_for_keyval(keyval)
0786     if not ret:
0787         raise RuntimeError("Failed to map key!")
0788     return keys[0]
0789 
0790 
0791 def char_to_keyval(ch):
0792     print("----------::::::")
0793     keyval = Gdk.unicode_to_keyval(ord(ch))
0794     # I Don't know why this doesn't work, also doesn't work with \033 as input. :((
0795     # https://gitlab.gnome.org/GNOME/gtk/-/blob/gtk-3-24/gdk/gdkkeyuni.c
0796     # Other useful resources:
0797     # https://www.cl.cam.ac.uk/~mgk25/ucs/keysymdef.h
0798     if ch == "\uE00C":
0799         keyval = 0xff1b # escape
0800     elif ch == "\ue03d":
0801         keyval = 0xffeb # left meta
0802     elif ch == "\ue006":
0803         keyval = 0xff0d # return
0804     elif ch == "\ue007":
0805         keyval = 0xff8d # enter
0806     elif ch == "\ue003":
0807         keyval = 0xff08 # backspace
0808     elif ch == "\ue010":
0809         keyval = 0xff57 # end
0810     elif ch == "\ue012":
0811         keyval = 0xff51 # left
0812     elif ch == "\ue014":
0813         keyval = 0xff53 # right
0814     elif ch == "\ue013":
0815         keyval = 0xff52 # up
0816     elif ch == "\ue015":
0817         keyval = 0xff54 # down
0818     elif ch == "\ue004":
0819         keyval = 0xff09 # tab
0820     print(ord(ch))
0821     print(hex(keyval))
0822     return keyval