Warning, /plasma/plasma-firewall/kcm/backends/ufw/helper/kcm_ufw_helper.py.cmake is written in an unsupported language. File is not indexed.

0001 #!@Python3_EXECUTABLE@
0002 
0003 # SPDX-License-Identifier: GPL-3.0-or-later
0004 # SPDX-FileCopyrightText: 2010  Darwin M. Bautista <djclue917@gmail.com>
0005 # SPDX-FileCopyrightText: 2011 Craig Drummond <craig.p.drummond@gmail.com>
0006 
0007 # UFW KControl Module
0008 
0009 import os
0010 import sys
0011 
0012 import locale
0013 import gettext
0014 
0015 import getopt
0016 import shutil
0017 import hashlib
0018 import io
0019 
0020 from xml.etree import ElementTree as etree
0021 from copy import deepcopy
0022 
0023 import ufw.common
0024 import ufw.frontend
0025 from ufw.util import valid_address
0026 from ufw.common import UFWRule
0027 
0028 ANY_ADDR       = '0.0.0.0/0'
0029 ANY_PORT       = 'any'
0030 ANY_PROTOCOL   = 'any'
0031 OLD_DESCR_FILE = "/etc/ufw/descriptions"
0032 DESCR_FILE     = "/etc/kcm_ufw/descriptions"
0033 DEFAULTS_FILE  = "@DATA_INSTALL_DIR@/kcm_ufw/defaults"
0034 
0035 ERROR_FAILED_TO_SET_STATUS      = -1
0036 ERROR_INVALID_INDEX             = -2
0037 ERROR_INVALID_XML_NO_RULE       = -3
0038 ERROR_INVALID_XML_NO_ACTION_XML = -4
0039 ERROR_INVALID_XML_NO_DEFAULTS   = -5
0040 ERROR_INVALID_XML_NO_MODULES    = -6
0041 
0042 
0043 class UFWFrontend(ufw.frontend.UFWFrontend):
0044 
0045     def __init__(self, dryrun):
0046         ufw.frontend.UFWFrontend.__init__(self, dryrun)
0047         # Compatibility for ufw 0.31
0048         # This is a better way of handling method renames instead of putting
0049         # try/except blocks all over the whole application code
0050         # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=20
0051         try:
0052             self.backend.get_default_policy
0053         except AttributeError:
0054             self.backend.get_default_policy = self.backend._get_default_policy
0055         try:
0056             self.backend._is_enabled
0057         except AttributeError:
0058             self.backend._is_enabled = self.backend.is_enabled
0059 
0060 
0061 def localizeUfw():
0062     # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=19
0063     locale.setlocale(locale.LC_ALL, 'C')
0064     gettext.install(ufw.common.programName)
0065 
0066 #define looad descriptions file
0067 # descrsChanged=False
0068 # def moveOldDescrFile():
0069 #     try:
0070 #         if os.path.exists(OLD_DESCR_FILE) and os.path.isfile(OLD_DESCR_FILE):
0071 #             shutil.move(OLD_DESCR_FILE, DESCR_FILE)
0072 #     except Exception as e:
0073 #         return # Old file does not exist, or is not a file!...
0074 #
0075 # def loadDescriptions():
0076 #     moveOldDescrFile();
0077 #     try:
0078 #         global descrs
0079 #         descrs={"a":"b"}
0080 #         descrData=open(DESCR_FILE).read()
0081 #         descrs=eval(descrData)
0082 #     except Exception as e:
0083 #         return
0084 #
0085 # def removeDescriptions():
0086 #     try:
0087 #         os.remove(DESCR_FILE)
0088 #         descrsChanged=False
0089 #     except Exception as e:
0090 #         return
0091 #
0092 # def saveDescriptions():
0093 #     global descrsChanged
0094 #     if descrsChanged:
0095 #         try:
0096 #             if ("a" in descrs):
0097 #                 del descrs["a"]
0098 #             if len(descrs)>0:
0099 #                 descrFile = open(DESCR_FILE, 'w')
0100 #                 descrFile.write(repr(descrs))
0101 #                 descrsChanged=False
0102 #             else:
0103 #                 removeDescriptions()
0104 #         except Exception as e:
0105 #             return
0106 #
0107 # def getDescription(hashStr):
0108 #     if (hashStr in descrs):
0109 #         return descrs[hashStr]
0110 #     return ''
0111 #
0112 # def removeDescription(hashStr):
0113 #     if (hashStr in descrs):
0114 #         del descrs[hashStr]
0115 #         global descrsChanged
0116 #         descrsChanged=True
0117 #
0118 # def updateDescription(hashStr, descrStr):
0119 #     if ((hashStr in descrs)==False) or (descrs[hashStr]!=descrStr):
0120 #         removeDescription(hashStr)
0121 #         descrs[hashStr]=descrStr
0122 #         global descrsChanged
0123 #         descrsChanged=True
0124 
0125 def loadDefaultSettings(ufw):
0126     try:
0127         defaultsFile=open(DEFAULTS_FILE)
0128         while 1:
0129             line = defaultsFile.readline()
0130             if line == '':
0131                 break
0132             parts=line.split('=')
0133             if len(parts) == 2:
0134                 value=parts[1].replace('\n', '')
0135                 if parts[0] == 'incoming':
0136                     ufw.set_default_policy(value, 'incoming')
0137                 elif parts[0] == 'outgoing':
0138                     ufw.set_default_policy(value, 'outgoing')
0139                 elif parts[0] == 'loglevel':
0140                     ufw.set_loglevel(value)
0141                 elif parts[0] == 'ipv6':
0142                     ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', value.lower())
0143                 elif parts[0] == 'modules':
0144                     ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', '"' + value + '"')
0145     except Exception as e:
0146         return
0147 
0148 # Localise UFW, and init the 'frontend'
0149 localizeUfw()
0150 ufw=UFWFrontend(False)
0151 
0152 def getProtocol(rule):
0153     """Determine protocol of rule.
0154     Taken from ufw.parser.UFWCommandRule.parse
0155     """
0156     # Determine src type
0157     if rule.src == ANY_ADDR:
0158         from_type = 'any'
0159     else:
0160         from_type = ('v6' if valid_address(rule.src, '6') else 'v4')
0161     # Determine dst type
0162     if rule.dst == ANY_ADDR:
0163         to_type = 'any'
0164     else:
0165         to_type = ('v6' if valid_address(rule.dst, '6') else 'v4')
0166     # Figure out the type of rule (IPv4, IPv6, or both)
0167     if from_type == ANY_PROTOCOL and to_type == ANY_PROTOCOL:
0168         protocol = 'both'
0169     elif from_type != ANY_PROTOCOL and to_type != ANY_PROTOCOL and from_type != to_type:
0170         err_msg = _("Mixed IP versions for 'from' and 'to'")
0171         raise ufw.common.UFWError(err_msg)
0172     elif from_type != ANY_PROTOCOL:
0173         protocol = from_type
0174     elif to_type != ANY_PROTOCOL:
0175         protocol = to_type
0176     return protocol
0177 
0178 def insertRule(ufw, rule, protocol=None):
0179     if protocol is None:
0180         protocol = getProtocol(rule)
0181     rule = rule.dup_rule()
0182     # Fix any inconsistency
0183     if rule.sapp or rule.dapp:
0184         rule.set_protocol(ANY_PROTOCOL)
0185         if rule.sapp:
0186             rule.sport = rule.sapp
0187         if rule.dapp:
0188             rule.dport = rule.dapp
0189     # If trying to insert beyond the end, just set position to 0
0190     if rule.position and not ufw.backend.get_rule_by_number(rule.position):
0191         rule.set_position(0)
0192     ufw.set_rule(rule, protocol)
0193     # Reset the positions of the recently inserted rule(s)
0194     if rule.position:
0195         s = rule.position - 1
0196         e = rule.position + 1
0197         for r in ufw.backend.get_rules()[s:e]:
0198             r.set_position(0)
0199     return rule
0200 
0201 def getRulesList(ufw):
0202     app_rules = []
0203     for i, r in enumerate(ufw.backend.get_rules()):
0204         if r.dapp or r.sapp:
0205             t = r.get_app_tuple()
0206             if t in app_rules:
0207                 continue
0208             else:
0209                 app_rules.append(t)
0210         yield (i, r)
0211 
0212 def encodeText(str):
0213     str=str.replace("&", "&amp;")
0214     str=str.replace("<", "&lt;")
0215     str=str.replace("\"", "&quot;")
0216     str=str.replace(">", "&gt;")
0217     return str
0218 
0219 def ruleDetails(rule):
0220     xmlStr = io.StringIO()
0221     xmlStr.write("action=\"")
0222     xmlStr.write(rule.action.lower())
0223     xmlStr.write("\" direction=\"")
0224     xmlStr.write(rule.direction.lower())
0225     xmlStr.write("\" dapp=\"")
0226     xmlStr.write(rule.dapp)
0227     xmlStr.write("\" sapp=\"")
0228     xmlStr.write(rule.sapp)
0229     xmlStr.write("\" dport=\"")
0230     xmlStr.write(rule.dport)
0231     xmlStr.write("\" sport=\"")
0232     xmlStr.write(rule.sport)
0233     xmlStr.write("\" protocol=\"")
0234     xmlStr.write(rule.protocol.lower())
0235     xmlStr.write("\" dst=\"")
0236     xmlStr.write(rule.dst)
0237     xmlStr.write("\" src=\"")
0238     xmlStr.write(rule.src)
0239     xmlStr.write("\" interface_in=\"")
0240     xmlStr.write(rule.interface_in)
0241     xmlStr.write("\" interface_out=\"")
0242     xmlStr.write(rule.interface_out)
0243     xmlStr.write("\" v6=\"")
0244     if rule.v6:
0245         xmlStr.write('True')
0246     else:
0247         xmlStr.write('False')
0248     return xmlStr.getvalue()
0249 
0250 # def detailsHash(details):
0251 #     ruleHash = hashlib.md5()
0252 #     ruleHash.update(details.encode('utf-8'))
0253 #     return ruleHash.hexdigest()
0254 
0255 # Convert a rule to an XML string...
0256 def toXml(rule, xmlStr):
0257     xmlStr.write("<rule position=\"")
0258     xmlStr.write(str(rule.position))
0259     xmlStr.write("\" ")
0260     details=ruleDetails(rule)
0261     xmlStr.write(details)
0262 #     hashStr=detailsHash(details)
0263 #     descr=getDescription(hashStr)
0264 #     if descr != '':
0265 #         xmlStr.write("\" descr=\"")
0266 #         xmlStr.write(encodeText(descr))
0267 #         xmlStr.write("\" hash=\"")
0268 #         xmlStr.write(hashStr)
0269     xmlStr.write("\" logtype=\"")
0270     xmlStr.write(rule.logtype)
0271     xmlStr.write("\" />")
0272 
0273 # Create rule from XML...
0274 def fromXml(str):
0275     elem = etree.XML(str)
0276     if elem.tag != 'rule':
0277         error("ERROR: Invalid XML, expected \'rule\' element", ERROR_INVALID_XML_NO_RULE)
0278     action=elem.get('action', '').lower()
0279     if action == '':
0280         error("ERROR: Invalid XML, no action specified", ERROR_INVALID_XML_NO_ACTION_XML)
0281     protocol=elem.get('protocol', ANY_PROTOCOL).lower()
0282     rule = UFWRule(action, protocol)
0283     rule.position=int(elem.get('position', 0))
0284     rule.direction=elem.get('direction', 'in').lower()
0285     rule.dapp=elem.get('dapp', '')
0286     rule.sapp=elem.get('sapp', '')
0287     rule.dport=elem.get('dport', ANY_PORT)
0288     rule.sport=elem.get('sport', ANY_PORT)
0289     rule.dst=elem.get('dst', ANY_ADDR)
0290     rule.src=elem.get('src', ANY_ADDR)
0291     rule.interface_in=elem.get('interface_in', '')
0292     rule.interface_out=elem.get('interface_out', '')
0293     rule.logtype=elem.get('logtype', '').lower()
0294     rule.v6=elem.get('v6', 'False').lower() == "true"
0295     return rule
0296 
0297 def getStatus(ufw, xmlStr):
0298     xmlStr.write("<status enabled=\"")
0299     if ufw.backend._is_enabled():
0300         xmlStr.write('true')
0301     else:
0302         xmlStr.write('false')
0303     xmlStr.write("\" />")
0304 
0305 def setEnabled(ufw, status):
0306     if status.lower() == "false":
0307         stat=False
0308     else:
0309         stat=True
0310     if stat != ufw.backend._is_enabled():
0311         ufw.set_enabled(stat)
0312         if ufw.backend._is_enabled() != stat:
0313             error("ERROR: Failed to set UFW status", ERROR_FAILED_TO_SET_STATUS)
0314 
0315 def getDefaults(ufw, xmlStr):
0316     conf = ufw.backend.defaults
0317     xmlStr.write("<defaults incoming=\"")
0318     xmlStr.write(ufw.backend.get_default_policy('input'))
0319     xmlStr.write("\" outgoing=\"")
0320     xmlStr.write(ufw.backend.get_default_policy('output'))
0321     xmlStr.write("\" loglevel=\"")
0322     xmlStr.write(conf['loglevel'])
0323     xmlStr.write("\" ipv6=\"")
0324     xmlStr.write(conf['ipv6'])
0325     xmlStr.write("\" />")
0326 
0327 def setDefaults(ufw, xml):
0328     elem = etree.XML(xml)
0329     if elem.tag != 'defaults':
0330         error("ERROR: Invalid XML, expected \'defaults\' element", ERROR_INVALID_XML_NO_DEFAULTS)
0331     enabled=ufw.backend._is_enabled()
0332     if enabled:
0333         ufw.set_enabled(False)
0334     ipv6=elem.get('ipv6', '').lower()
0335     if ipv6 != '':
0336         del ufw
0337         ufw=UFWFrontend(False)
0338         ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', ipv6)
0339         del ufw
0340         ufw=UFWFrontend(False)
0341     policy=elem.get('incoming', '').lower()
0342     if policy != '':
0343         ufw.set_default_policy(policy, 'incoming')
0344     policy=elem.get('outgoing', '').lower()
0345     if policy != '':
0346         ufw.set_default_policy(policy, 'outgoing')
0347     loglevel=elem.get('loglevel', '').lower()
0348     if loglevel != '':
0349         ufw.set_loglevel(loglevel)
0350     if enabled:
0351         ufw.set_enabled(True)
0352 
0353 def getRules(ufw, xmlStr):
0354     xmlStr.write("<rules>")
0355     for i, data in enumerate(getRulesList(ufw)):
0356         idx, rule = data
0357         toXml(rule.dup_rule(), xmlStr)
0358     xmlStr.write("</rules>")
0359 
0360 # def updateRuleDescription(rule, xml):
0361 #     elem=etree.XML(xml)
0362 #     descr=elem.get('descr', '')
0363 #     oldHashCode=elem.get('hash', '')
0364 #     if descr != '':
0365 #         details=ruleDetails(rule)
0366 #         hashStr=detailsHash(details)
0367 #         # For an update, we should be passed old hash code - if so, remove old entry...
0368 #         if oldHashCode!= '':
0369 #             removeDescription(oldHashCode)
0370 #         updateDescription(hashStr, descr)
0371 #     else:
0372 #         if oldHashCode!= '':
0373 #             removeDescription(oldHashCode)
0374 
0375 def addRule(ufw, xml):
0376     rule=fromXml(xml)
0377     inserted=insertRule(ufw, rule)
0378 #     updateRuleDescription(inserted, xml)
0379 
0380 def updateRule(ufw, xml):
0381     rule=fromXml(xml)
0382     deleted=False
0383     try:
0384         prev=deepcopy(ufw.backend.get_rule_by_number(rule.position))
0385         ufw.delete_rule(rule.position, True)
0386         deleted=True
0387         inserted=insertRule(ufw, rule)
0388         deleted=False
0389 #         updateRuleDescription(inserted, xml)
0390     except Exception as e:
0391         if deleted:
0392             insertRule(ufw, prev)
0393 
0394 # def updateRuleDescr(ufw, xml):
0395 #     rule=fromXml(xml)
0396 #     details=ruleDetails(rule)
0397 #     hashStr=detailsHash(details)
0398 #     updateRuleDescription(rule, xml)
0399 
0400 # Remove a rule. Index is either; just the index, or <index>:<hashcode>
0401 def removeRule(ufw, index):
0402     parts=index.split(':')
0403     try:
0404         if 2==len(parts):
0405             idx=int(parts[0])
0406         else:
0407             idx=int(index)
0408         if idx<1 or idx>(ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True)):
0409             error("ERROR: Invalid index", ERROR_INVALID_INDEX)
0410 #         if 2==len(parts):
0411 #             removeDescription(parts[1])
0412 #         else:
0413 #             rule=ufw.backend.get_rule_by_number(index)
0414 #             if rule:
0415 #                 details=ruleDetails(rule)
0416 #                 hashStr=detailsHash(details)
0417 #                 removeDescription(hashStr)
0418         ufw.delete_rule(idx, True)
0419     #except ufw.common.UFWError as e:
0420         #error("ERROR: UFW error", e.value)
0421     except ValueError:
0422         error("ERROR: Invalid input type", ERROR_INVALID_INDEX)
0423 
0424 def moveRule(ufw, indexes):
0425     idx=indexes.split(':')
0426     if 2!= len(idx):
0427         error("ERROR: Invalid number of indexes", ERROR_INVALID_INDEX)
0428     fromIndex=int(idx[0])
0429     toIndex=int(idx[1])
0430     if fromIndex == toIndex:
0431         error("ERROR: Source and destination cannot be the same", ERROR_INVALID_INDEX)
0432     rule=ufw.backend.get_rule_by_number(fromIndex).dup_rule()
0433     ufw.delete_rule(fromIndex, True)
0434     rule.position=toIndex
0435     insertRule(ufw, rule)
0436 
0437 def reset(ufw):
0438     loadDefaultSettings(ufw)
0439     clearRules(ufw)
0440     ufw.reset(True)
0441     if ufw.backend._is_enabled():
0442         ufw.set_enabled(False)
0443         ufw.set_enabled(True)
0444 
0445 def clearRules(ufw):
0446 #     removeDescriptions()
0447     count=ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True)
0448     for num in range(0, count):
0449         try:
0450             ufw.delete_rule(1, True)
0451         except ufw.common.UFWError as e:
0452             pass
0453 
0454 def getModules(ufw, xmlStr):
0455     xmlStr.write("<modules enabled=\"")
0456     if 'ipt_modules' in ufw.backend.defaults:
0457         modules=ufw.backend.defaults['ipt_modules']
0458         if modules != '':
0459             xmlStr.write(modules)
0460     xmlStr.write("\" />")
0461 
0462 def setModules(ufw, xml):
0463     elem = etree.XML(xml)
0464     if elem.tag != 'modules':
0465         error("ERROR: Invalid XML, expected \'modules' element", ERROR_INVALID_XML_NO_MODULES)
0466     modules=elem.get('enabled', '').lower()
0467     modules = '"' + modules + '"'
0468     ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', modules)
0469 
0470 # def getProfiles(ufw, xmlStr):
0471 #     xmlStr.write("<profiles names=\"")
0472 #     first=True
0473 #     for profile in ufw.backend.profiles.keys():
0474 #         if profile != '':
0475 #             if first:
0476 #                 first=False
0477 #             else:
0478 #                 xmlStr.write(" ")
0479 #             xmlStr.write(profile)
0480 #     xmlStr.write("\" />")
0481 
0482 def error(str, rv):
0483     print >> sys.stderr, str
0484     #sys.exit(rv)
0485 
0486 def main():
0487     try:
0488 #         opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x",
0489 #                                    ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=",
0490 #                                     "update=", "updateDescr=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"])
0491         opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x",
0492                                    ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=",
0493                                     "update=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"])
0494     except getopt.GetoptError as err:
0495         # print help information and exit:
0496         print >> sys.stderr, str(err) # will print something like "option -a not recognized"
0497         usage()
0498         sys.exit(1)
0499 #     loadDescriptions()
0500     returnXml = False
0501     xmlOut = io.StringIO()
0502     xmlOut.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?><ufw>")
0503     for o, a in opts:
0504         if o in ("-h", "--help"):
0505             usage()
0506             sys.exit()
0507         elif o in ("-s", "--status"):
0508             getStatus(ufw, xmlOut)
0509             returnXml=True
0510         elif o in ("-e", "--setEnabled"):
0511             setEnabled(ufw, a)
0512         elif o in ("-d", "--defaults"):
0513             getDefaults(ufw, xmlOut)
0514             returnXml=True
0515         elif o in ("-f", "--setDefaults"):
0516             setDefaults(ufw, a)
0517         elif o in ("-l", "--list"):
0518             getRules(ufw, xmlOut)
0519             returnXml=True
0520         elif o in ("-a", "--add"):
0521             addRule(ufw, a)
0522         elif o in ("-u", "--update"):
0523             updateRule(ufw, a)
0524 #         elif o in ("-U", "--updateDescr"):
0525 #             updateRuleDescr(ufw, a)
0526         elif o in ("-r", "--remove"):
0527             removeRule(ufw, a)
0528         elif o in ("-m", "--move"):
0529             moveRule(ufw, a)
0530         elif o in ("-t", "--reset"):
0531             reset(ufw)
0532         elif o in ("-i", "--modules"):
0533             getModules(ufw, xmlOut)
0534             returnXml=True
0535         elif o in ("-I", "--setModules"):
0536             setModules(ufw, a)
0537         elif o in ("-x", "--clearRules"):
0538             clearRules(ufw)
0539         else:
0540             usage()
0541 #     saveDescriptions()
0542     if returnXml:
0543         xmlOut.write("</ufw>")
0544         print (xmlOut.getvalue())
0545 
0546 def usage():
0547     print ("Python helper for UFW KCM")
0548     print ("")
0549     print ("(C) Craig Drummond, 2011")
0550     print ("")
0551     print ("Usage:")
0552     print ("    "+sys.argv[0]+" --status")
0553     print ("    "+sys.argv[0]+" --setEnabled <true/false>")
0554     print ("    "+sys.argv[0]+" --defaults")
0555     print ("    "+sys.argv[0]+" --setDefaults <xml>")
0556     print ("    "+sys.argv[0]+" --list")
0557     print ("    "+sys.argv[0]+" --add <xml>")
0558     print ("    "+sys.argv[0]+" --update <xml>")
0559 #     print ("    "+sys.argv[0]+" --updateDescr <xml>")
0560     print ("    "+sys.argv[0]+" --remove <index>")
0561     print ("    "+sys.argv[0]+" --remove <index:hash>")
0562     print ("    "+sys.argv[0]+" --move <from:to>")
0563     print ("    "+sys.argv[0]+" --reset")
0564     print ("    "+sys.argv[0]+" --modules")
0565     print ("    "+sys.argv[0]+" --setModules <xml>")
0566     print ("    "+sys.argv[0]+" --clearRules")
0567 
0568 if __name__ == "__main__":
0569     main()