File indexing completed on 2024-04-21 07:49:06

0001 # -*- coding: utf-8 -*-
0002 
0003 """
0004 Copyright (C) 2008-2016 Wolfgang Rohdewald <wolfgang@rohdewald.de>
0005 
0006 SPDX-License-Identifier: GPL-2.0
0007 
0008 """
0009 
0010 # util must not import from log because util should not
0011 # depend on kde.py
0012 
0013 import traceback
0014 import os
0015 import datetime
0016 import subprocess
0017 import gc
0018 
0019 
0020 from locale import getpreferredencoding
0021 from sys import stdout
0022 
0023 from common import Debug
0024 
0025 try:
0026     STDOUTENCODING = stdout.encoding
0027 except AttributeError:
0028     STDOUTENCODING = None
0029 if not STDOUTENCODING:
0030     STDOUTENCODING = getpreferredencoding()
0031 
0032 
0033 def stack(msg, limit=6):
0034     """return a list of lines with msg as prefix"""
0035     result = []
0036     for idx, values in enumerate(
0037             traceback.extract_stack(limit=limit + 2)[:-2]):
0038         fileName, line, function, txt = values
0039         result.append(
0040             '%2d: %s %s/%d %s: %s' %
0041             (idx, msg, os.path.splitext(
0042                 os.path.basename(fileName))[0],
0043              line, function, txt))
0044     return result
0045 
0046 
0047 def callers(count=5, exclude=None):
0048     """return the name of the calling method"""
0049     stck = traceback.extract_stack(limit=30)
0050     excluding = list(exclude) if exclude else []
0051     excluding.extend(['<genexpr>', '__call__', 'run', '<module>', 'runTests'])
0052     excluding.extend(['_startRunCallbacks', '_runCallbacks', 'remote_move', 'exec_move'])
0053     excluding.extend(['proto_message', '_recvMessage', 'remoteMessageReceived'])
0054     excluding.extend(['clientAction', 'myAction', 'expressionReceived'])
0055     excluding.extend(['_read', 'callWithLogger'])
0056     excluding.extend(['callbackIfDone', 'callback', '__gotAnswer'])
0057     excluding.extend(['callExpressionReceived', 'proto_answer'])
0058     excluding.extend(['_dataReceived', 'dataReceived', 'gotItem'])
0059     excluding.extend(['callWithContext', '_doReadOrWrite', 'doRead'])
0060     excluding.extend(['callers', 'debug', 'logMessage', 'logDebug'])
0061     _ = list(x[2] for x in stck if x[2] not in excluding)
0062     names = reversed(_[-count:])
0063     result = '.'.join(names)
0064     return '[{}]'.format(result)
0065 
0066 
0067 def elapsedSince(since):
0068     """return seconds since since"""
0069     delta = datetime.datetime.now() - since
0070     return float(
0071         delta.microseconds
0072         + (delta.seconds + delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
0073 
0074 
0075 def which(program):
0076     """return the full path for the binary or None"""
0077     for path in os.environ['PATH'].split(os.pathsep):
0078         fullName = os.path.join(path, program)
0079         if os.path.exists(fullName):
0080             return fullName
0081     return None
0082 
0083 
0084 def removeIfExists(filename):
0085     """remove file if it exists. Returns True if it existed"""
0086     exists = os.path.exists(filename)
0087     if exists:
0088         os.remove(filename)
0089     return exists
0090 
0091 
0092 def uniqueList(seq):
0093     """makes list content unique, keeping only the first occurrence"""
0094     seen = set()
0095     seen_add = seen.add
0096     return [x for x in seq if x not in seen and not seen_add(x)]
0097 
0098 
0099 def _getr(slist, olist, seen):
0100     """Recursively expand slist's objects into olist, using seen to track
0101     already processed objects."""
0102     for element in slist:
0103         if id(element) in seen:
0104             continue
0105         seen[id(element)] = None
0106         olist.append(element)
0107         tlist = gc.get_referents(element)
0108         if tlist:
0109             _getr(tlist, olist, seen)
0110 
0111 # The public function.
0112 
0113 
0114 def get_all_objects():
0115     """Return a list of all live Python objects, not including the
0116     list itself. May use this in Duration for showing where
0117     objects are leaking"""
0118     gc.collect()
0119     gcl = gc.get_objects()
0120     olist = []
0121     seen = {}
0122     # Just in case:
0123     seen[id(gcl)] = None
0124     seen[id(olist)] = None
0125     seen[id(seen)] = None
0126     # _getr does the real work.
0127     _getr(gcl, olist, seen)
0128     return olist
0129 
0130 
0131 class Duration:
0132 
0133     """a helper class for checking code execution duration"""
0134 
0135     def __init__(self, name, threshold=None, bug=False):
0136         """name describes where in the source we are checking
0137         threshold in seconds: do not warn below
0138         if bug is True, throw an exception if threshold is exceeded"""
0139         self.name = name
0140         self.threshold = threshold or 1.0
0141         self.bug = bug
0142         self.__start = datetime.datetime.now()
0143 
0144     def __enter__(self):
0145         return self
0146 
0147     def __exit__(self, exc_type, exc_value, trback):
0148         """now check time passed"""
0149         if not Debug.neutral:
0150             diff = datetime.datetime.now() - self.__start
0151             if diff > datetime.timedelta(seconds=self.threshold):
0152                 msg = '%s took %d.%02d seconds' % (
0153                     self.name,
0154                     diff.seconds,
0155                     diff.microseconds)
0156                 if self.bug:
0157                     raise UserWarning(msg)
0158                 print(msg)
0159 
0160 
0161 def checkMemory():
0162     """as the name says"""
0163     # pylint: disable=too-many-branches
0164     if not Debug.gc:
0165         return
0166     gc.set_threshold(0)
0167     gc.set_debug(gc.DEBUG_LEAK)
0168     gc.enable()
0169     print('collecting {{{')
0170     gc.collect()        # we want to eliminate all output
0171     print('}}} done')
0172 
0173     # code like this may help to find specific things
0174     if True: # pylint: disable=using-constant-test
0175         interesting = ('Client', 'Player', 'Game')
0176         for obj in gc.garbage:
0177             if hasattr(obj, 'cell_contents'):
0178                 obj = obj.cell_contents
0179             if not any(x in repr(obj) for x in interesting):
0180                 continue
0181             for referrer in gc.get_referrers(obj):
0182                 if referrer is gc.garbage:
0183                     continue
0184                 if hasattr(referrer, 'cell_contents'):
0185                     referrer = referrer.cell_contents
0186                 if referrer.__class__.__name__ in interesting:
0187                     for referent in gc.get_referents(referrer):
0188                         print('%s refers to %s' % (referrer, referent))
0189                 else:
0190                     print('referrer of %s/%s is: id=%s type=%s %s' %
0191                           (type(obj), obj, id(referrer),
0192                            type(referrer), referrer))
0193     print('unreachable:%s' % gc.collect())
0194     gc.set_debug(0)
0195 
0196 
0197 def gitHead():
0198     """the current git commit. 'current' if there are uncommitted changes
0199     and None if no .git found"""
0200     if not os.path.exists(os.path.join('..', '.git')):
0201         return None
0202     subprocess.Popen(['git', 'update-index', '-q', '--refresh'])
0203     uncommitted = list(popenReadlines('git diff-index --name-only HEAD --'))
0204     return 'current' if uncommitted else next(popenReadlines('git log -1 --format=%h'))
0205 
0206 
0207 def popenReadlines(args):
0208     """runs a subprocess and returns stdout as a list of unicode encodes lines"""
0209     if isinstance(args, str):
0210         args = args.split()
0211     my_env = os.environ.copy()
0212     my_env["LANG"] = "C"
0213     result = subprocess.Popen(args, universal_newlines=True, stdout=subprocess.PIPE, env=my_env).communicate()[0]
0214     return (x.strip() for x in result.split('\n') if x.strip())