Warning, /plasma/plasma-firewall/kcm/backends/ufw/helper/kcm_ufw_helper.py.cmake is written in an unsupported language. File is not indexed.
0001 #!@Python3_EXECUTABLE@ 0002 0003 # SPDX-License-Identifier: GPL-3.0-or-later 0004 # SPDX-FileCopyrightText: 2010 Darwin M. Bautista <djclue917@gmail.com> 0005 # SPDX-FileCopyrightText: 2011 Craig Drummond <craig.p.drummond@gmail.com> 0006 0007 # UFW KControl Module 0008 0009 import os 0010 import sys 0011 0012 import locale 0013 import gettext 0014 0015 import getopt 0016 import shutil 0017 import hashlib 0018 import io 0019 0020 from xml.etree import ElementTree as etree 0021 from copy import deepcopy 0022 0023 import ufw.common 0024 import ufw.frontend 0025 from ufw.util import valid_address 0026 from ufw.common import UFWRule 0027 0028 ANY_ADDR = '0.0.0.0/0' 0029 ANY_PORT = 'any' 0030 ANY_PROTOCOL = 'any' 0031 OLD_DESCR_FILE = "/etc/ufw/descriptions" 0032 DESCR_FILE = "/etc/kcm_ufw/descriptions" 0033 DEFAULTS_FILE = "@DATA_INSTALL_DIR@/kcm_ufw/defaults" 0034 0035 ERROR_FAILED_TO_SET_STATUS = -1 0036 ERROR_INVALID_INDEX = -2 0037 ERROR_INVALID_XML_NO_RULE = -3 0038 ERROR_INVALID_XML_NO_ACTION_XML = -4 0039 ERROR_INVALID_XML_NO_DEFAULTS = -5 0040 ERROR_INVALID_XML_NO_MODULES = -6 0041 0042 0043 class UFWFrontend(ufw.frontend.UFWFrontend): 0044 0045 def __init__(self, dryrun): 0046 ufw.frontend.UFWFrontend.__init__(self, dryrun) 0047 # Compatibility for ufw 0.31 0048 # This is a better way of handling method renames instead of putting 0049 # try/except blocks all over the whole application code 0050 # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=20 0051 try: 0052 self.backend.get_default_policy 0053 except AttributeError: 0054 self.backend.get_default_policy = self.backend._get_default_policy 0055 try: 0056 self.backend._is_enabled 0057 except AttributeError: 0058 self.backend._is_enabled = self.backend.is_enabled 0059 0060 0061 def localizeUfw(): 0062 # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=19 0063 locale.setlocale(locale.LC_ALL, 'C') 0064 gettext.install(ufw.common.programName) 0065 0066 #define looad descriptions file 0067 # descrsChanged=False 0068 # def moveOldDescrFile(): 0069 # try: 0070 # if os.path.exists(OLD_DESCR_FILE) and os.path.isfile(OLD_DESCR_FILE): 0071 # shutil.move(OLD_DESCR_FILE, DESCR_FILE) 0072 # except Exception as e: 0073 # return # Old file does not exist, or is not a file!... 0074 # 0075 # def loadDescriptions(): 0076 # moveOldDescrFile(); 0077 # try: 0078 # global descrs 0079 # descrs={"a":"b"} 0080 # descrData=open(DESCR_FILE).read() 0081 # descrs=eval(descrData) 0082 # except Exception as e: 0083 # return 0084 # 0085 # def removeDescriptions(): 0086 # try: 0087 # os.remove(DESCR_FILE) 0088 # descrsChanged=False 0089 # except Exception as e: 0090 # return 0091 # 0092 # def saveDescriptions(): 0093 # global descrsChanged 0094 # if descrsChanged: 0095 # try: 0096 # if ("a" in descrs): 0097 # del descrs["a"] 0098 # if len(descrs)>0: 0099 # descrFile = open(DESCR_FILE, 'w') 0100 # descrFile.write(repr(descrs)) 0101 # descrsChanged=False 0102 # else: 0103 # removeDescriptions() 0104 # except Exception as e: 0105 # return 0106 # 0107 # def getDescription(hashStr): 0108 # if (hashStr in descrs): 0109 # return descrs[hashStr] 0110 # return '' 0111 # 0112 # def removeDescription(hashStr): 0113 # if (hashStr in descrs): 0114 # del descrs[hashStr] 0115 # global descrsChanged 0116 # descrsChanged=True 0117 # 0118 # def updateDescription(hashStr, descrStr): 0119 # if ((hashStr in descrs)==False) or (descrs[hashStr]!=descrStr): 0120 # removeDescription(hashStr) 0121 # descrs[hashStr]=descrStr 0122 # global descrsChanged 0123 # descrsChanged=True 0124 0125 def loadDefaultSettings(ufw): 0126 try: 0127 defaultsFile=open(DEFAULTS_FILE) 0128 while 1: 0129 line = defaultsFile.readline() 0130 if line == '': 0131 break 0132 parts=line.split('=') 0133 if len(parts) == 2: 0134 value=parts[1].replace('\n', '') 0135 if parts[0] == 'incoming': 0136 ufw.set_default_policy(value, 'incoming') 0137 elif parts[0] == 'outgoing': 0138 ufw.set_default_policy(value, 'outgoing') 0139 elif parts[0] == 'loglevel': 0140 ufw.set_loglevel(value) 0141 elif parts[0] == 'ipv6': 0142 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', value.lower()) 0143 elif parts[0] == 'modules': 0144 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', '"' + value + '"') 0145 except Exception as e: 0146 return 0147 0148 # Localise UFW, and init the 'frontend' 0149 localizeUfw() 0150 ufw=UFWFrontend(False) 0151 0152 def getProtocol(rule): 0153 """Determine protocol of rule. 0154 Taken from ufw.parser.UFWCommandRule.parse 0155 """ 0156 # Determine src type 0157 if rule.src == ANY_ADDR: 0158 from_type = 'any' 0159 else: 0160 from_type = ('v6' if valid_address(rule.src, '6') else 'v4') 0161 # Determine dst type 0162 if rule.dst == ANY_ADDR: 0163 to_type = 'any' 0164 else: 0165 to_type = ('v6' if valid_address(rule.dst, '6') else 'v4') 0166 # Figure out the type of rule (IPv4, IPv6, or both) 0167 if from_type == ANY_PROTOCOL and to_type == ANY_PROTOCOL: 0168 protocol = 'both' 0169 elif from_type != ANY_PROTOCOL and to_type != ANY_PROTOCOL and from_type != to_type: 0170 err_msg = _("Mixed IP versions for 'from' and 'to'") 0171 raise ufw.common.UFWError(err_msg) 0172 elif from_type != ANY_PROTOCOL: 0173 protocol = from_type 0174 elif to_type != ANY_PROTOCOL: 0175 protocol = to_type 0176 return protocol 0177 0178 def insertRule(ufw, rule, protocol=None): 0179 if protocol is None: 0180 protocol = getProtocol(rule) 0181 rule = rule.dup_rule() 0182 # Fix any inconsistency 0183 if rule.sapp or rule.dapp: 0184 rule.set_protocol(ANY_PROTOCOL) 0185 if rule.sapp: 0186 rule.sport = rule.sapp 0187 if rule.dapp: 0188 rule.dport = rule.dapp 0189 # If trying to insert beyond the end, just set position to 0 0190 if rule.position and not ufw.backend.get_rule_by_number(rule.position): 0191 rule.set_position(0) 0192 ufw.set_rule(rule, protocol) 0193 # Reset the positions of the recently inserted rule(s) 0194 if rule.position: 0195 s = rule.position - 1 0196 e = rule.position + 1 0197 for r in ufw.backend.get_rules()[s:e]: 0198 r.set_position(0) 0199 return rule 0200 0201 def getRulesList(ufw): 0202 app_rules = [] 0203 for i, r in enumerate(ufw.backend.get_rules()): 0204 if r.dapp or r.sapp: 0205 t = r.get_app_tuple() 0206 if t in app_rules: 0207 continue 0208 else: 0209 app_rules.append(t) 0210 yield (i, r) 0211 0212 def encodeText(str): 0213 str=str.replace("&", "&") 0214 str=str.replace("<", "<") 0215 str=str.replace("\"", """) 0216 str=str.replace(">", ">") 0217 return str 0218 0219 def ruleDetails(rule): 0220 xmlStr = io.StringIO() 0221 xmlStr.write("action=\"") 0222 xmlStr.write(rule.action.lower()) 0223 xmlStr.write("\" direction=\"") 0224 xmlStr.write(rule.direction.lower()) 0225 xmlStr.write("\" dapp=\"") 0226 xmlStr.write(rule.dapp) 0227 xmlStr.write("\" sapp=\"") 0228 xmlStr.write(rule.sapp) 0229 xmlStr.write("\" dport=\"") 0230 xmlStr.write(rule.dport) 0231 xmlStr.write("\" sport=\"") 0232 xmlStr.write(rule.sport) 0233 xmlStr.write("\" protocol=\"") 0234 xmlStr.write(rule.protocol.lower()) 0235 xmlStr.write("\" dst=\"") 0236 xmlStr.write(rule.dst) 0237 xmlStr.write("\" src=\"") 0238 xmlStr.write(rule.src) 0239 xmlStr.write("\" interface_in=\"") 0240 xmlStr.write(rule.interface_in) 0241 xmlStr.write("\" interface_out=\"") 0242 xmlStr.write(rule.interface_out) 0243 xmlStr.write("\" v6=\"") 0244 if rule.v6: 0245 xmlStr.write('True') 0246 else: 0247 xmlStr.write('False') 0248 return xmlStr.getvalue() 0249 0250 # def detailsHash(details): 0251 # ruleHash = hashlib.md5() 0252 # ruleHash.update(details.encode('utf-8')) 0253 # return ruleHash.hexdigest() 0254 0255 # Convert a rule to an XML string... 0256 def toXml(rule, xmlStr): 0257 xmlStr.write("<rule position=\"") 0258 xmlStr.write(str(rule.position)) 0259 xmlStr.write("\" ") 0260 details=ruleDetails(rule) 0261 xmlStr.write(details) 0262 # hashStr=detailsHash(details) 0263 # descr=getDescription(hashStr) 0264 # if descr != '': 0265 # xmlStr.write("\" descr=\"") 0266 # xmlStr.write(encodeText(descr)) 0267 # xmlStr.write("\" hash=\"") 0268 # xmlStr.write(hashStr) 0269 xmlStr.write("\" logtype=\"") 0270 xmlStr.write(rule.logtype) 0271 xmlStr.write("\" />") 0272 0273 # Create rule from XML... 0274 def fromXml(str): 0275 elem = etree.XML(str) 0276 if elem.tag != 'rule': 0277 error("ERROR: Invalid XML, expected \'rule\' element", ERROR_INVALID_XML_NO_RULE) 0278 action=elem.get('action', '').lower() 0279 if action == '': 0280 error("ERROR: Invalid XML, no action specified", ERROR_INVALID_XML_NO_ACTION_XML) 0281 protocol=elem.get('protocol', ANY_PROTOCOL).lower() 0282 rule = UFWRule(action, protocol) 0283 rule.position=int(elem.get('position', 0)) 0284 rule.direction=elem.get('direction', 'in').lower() 0285 rule.dapp=elem.get('dapp', '') 0286 rule.sapp=elem.get('sapp', '') 0287 rule.dport=elem.get('dport', ANY_PORT) 0288 rule.sport=elem.get('sport', ANY_PORT) 0289 rule.dst=elem.get('dst', ANY_ADDR) 0290 rule.src=elem.get('src', ANY_ADDR) 0291 rule.interface_in=elem.get('interface_in', '') 0292 rule.interface_out=elem.get('interface_out', '') 0293 rule.logtype=elem.get('logtype', '').lower() 0294 rule.v6=elem.get('v6', 'False').lower() == "true" 0295 return rule 0296 0297 def getStatus(ufw, xmlStr): 0298 xmlStr.write("<status enabled=\"") 0299 if ufw.backend._is_enabled(): 0300 xmlStr.write('true') 0301 else: 0302 xmlStr.write('false') 0303 xmlStr.write("\" />") 0304 0305 def setEnabled(ufw, status): 0306 if status.lower() == "false": 0307 stat=False 0308 else: 0309 stat=True 0310 if stat != ufw.backend._is_enabled(): 0311 ufw.set_enabled(stat) 0312 if ufw.backend._is_enabled() != stat: 0313 error("ERROR: Failed to set UFW status", ERROR_FAILED_TO_SET_STATUS) 0314 0315 def getDefaults(ufw, xmlStr): 0316 conf = ufw.backend.defaults 0317 xmlStr.write("<defaults incoming=\"") 0318 xmlStr.write(ufw.backend.get_default_policy('input')) 0319 xmlStr.write("\" outgoing=\"") 0320 xmlStr.write(ufw.backend.get_default_policy('output')) 0321 xmlStr.write("\" loglevel=\"") 0322 xmlStr.write(conf['loglevel']) 0323 xmlStr.write("\" ipv6=\"") 0324 xmlStr.write(conf['ipv6']) 0325 xmlStr.write("\" />") 0326 0327 def setDefaults(ufw, xml): 0328 elem = etree.XML(xml) 0329 if elem.tag != 'defaults': 0330 error("ERROR: Invalid XML, expected \'defaults\' element", ERROR_INVALID_XML_NO_DEFAULTS) 0331 enabled=ufw.backend._is_enabled() 0332 if enabled: 0333 ufw.set_enabled(False) 0334 ipv6=elem.get('ipv6', '').lower() 0335 if ipv6 != '': 0336 del ufw 0337 ufw=UFWFrontend(False) 0338 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', ipv6) 0339 del ufw 0340 ufw=UFWFrontend(False) 0341 policy=elem.get('incoming', '').lower() 0342 if policy != '': 0343 ufw.set_default_policy(policy, 'incoming') 0344 policy=elem.get('outgoing', '').lower() 0345 if policy != '': 0346 ufw.set_default_policy(policy, 'outgoing') 0347 loglevel=elem.get('loglevel', '').lower() 0348 if loglevel != '': 0349 ufw.set_loglevel(loglevel) 0350 if enabled: 0351 ufw.set_enabled(True) 0352 0353 def getRules(ufw, xmlStr): 0354 xmlStr.write("<rules>") 0355 for i, data in enumerate(getRulesList(ufw)): 0356 idx, rule = data 0357 toXml(rule.dup_rule(), xmlStr) 0358 xmlStr.write("</rules>") 0359 0360 # def updateRuleDescription(rule, xml): 0361 # elem=etree.XML(xml) 0362 # descr=elem.get('descr', '') 0363 # oldHashCode=elem.get('hash', '') 0364 # if descr != '': 0365 # details=ruleDetails(rule) 0366 # hashStr=detailsHash(details) 0367 # # For an update, we should be passed old hash code - if so, remove old entry... 0368 # if oldHashCode!= '': 0369 # removeDescription(oldHashCode) 0370 # updateDescription(hashStr, descr) 0371 # else: 0372 # if oldHashCode!= '': 0373 # removeDescription(oldHashCode) 0374 0375 def addRule(ufw, xml): 0376 rule=fromXml(xml) 0377 inserted=insertRule(ufw, rule) 0378 # updateRuleDescription(inserted, xml) 0379 0380 def updateRule(ufw, xml): 0381 rule=fromXml(xml) 0382 deleted=False 0383 try: 0384 prev=deepcopy(ufw.backend.get_rule_by_number(rule.position)) 0385 ufw.delete_rule(rule.position, True) 0386 deleted=True 0387 inserted=insertRule(ufw, rule) 0388 deleted=False 0389 # updateRuleDescription(inserted, xml) 0390 except Exception as e: 0391 if deleted: 0392 insertRule(ufw, prev) 0393 0394 # def updateRuleDescr(ufw, xml): 0395 # rule=fromXml(xml) 0396 # details=ruleDetails(rule) 0397 # hashStr=detailsHash(details) 0398 # updateRuleDescription(rule, xml) 0399 0400 # Remove a rule. Index is either; just the index, or <index>:<hashcode> 0401 def removeRule(ufw, index): 0402 parts=index.split(':') 0403 try: 0404 if 2==len(parts): 0405 idx=int(parts[0]) 0406 else: 0407 idx=int(index) 0408 if idx<1 or idx>(ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True)): 0409 error("ERROR: Invalid index", ERROR_INVALID_INDEX) 0410 # if 2==len(parts): 0411 # removeDescription(parts[1]) 0412 # else: 0413 # rule=ufw.backend.get_rule_by_number(index) 0414 # if rule: 0415 # details=ruleDetails(rule) 0416 # hashStr=detailsHash(details) 0417 # removeDescription(hashStr) 0418 ufw.delete_rule(idx, True) 0419 #except ufw.common.UFWError as e: 0420 #error("ERROR: UFW error", e.value) 0421 except ValueError: 0422 error("ERROR: Invalid input type", ERROR_INVALID_INDEX) 0423 0424 def moveRule(ufw, indexes): 0425 idx=indexes.split(':') 0426 if 2!= len(idx): 0427 error("ERROR: Invalid number of indexes", ERROR_INVALID_INDEX) 0428 fromIndex=int(idx[0]) 0429 toIndex=int(idx[1]) 0430 if fromIndex == toIndex: 0431 error("ERROR: Source and destination cannot be the same", ERROR_INVALID_INDEX) 0432 rule=ufw.backend.get_rule_by_number(fromIndex).dup_rule() 0433 ufw.delete_rule(fromIndex, True) 0434 rule.position=toIndex 0435 insertRule(ufw, rule) 0436 0437 def reset(ufw): 0438 loadDefaultSettings(ufw) 0439 clearRules(ufw) 0440 ufw.reset(True) 0441 if ufw.backend._is_enabled(): 0442 ufw.set_enabled(False) 0443 ufw.set_enabled(True) 0444 0445 def clearRules(ufw): 0446 # removeDescriptions() 0447 count=ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True) 0448 for num in range(0, count): 0449 try: 0450 ufw.delete_rule(1, True) 0451 except ufw.common.UFWError as e: 0452 pass 0453 0454 def getModules(ufw, xmlStr): 0455 xmlStr.write("<modules enabled=\"") 0456 if 'ipt_modules' in ufw.backend.defaults: 0457 modules=ufw.backend.defaults['ipt_modules'] 0458 if modules != '': 0459 xmlStr.write(modules) 0460 xmlStr.write("\" />") 0461 0462 def setModules(ufw, xml): 0463 elem = etree.XML(xml) 0464 if elem.tag != 'modules': 0465 error("ERROR: Invalid XML, expected \'modules' element", ERROR_INVALID_XML_NO_MODULES) 0466 modules=elem.get('enabled', '').lower() 0467 modules = '"' + modules + '"' 0468 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', modules) 0469 0470 # def getProfiles(ufw, xmlStr): 0471 # xmlStr.write("<profiles names=\"") 0472 # first=True 0473 # for profile in ufw.backend.profiles.keys(): 0474 # if profile != '': 0475 # if first: 0476 # first=False 0477 # else: 0478 # xmlStr.write(" ") 0479 # xmlStr.write(profile) 0480 # xmlStr.write("\" />") 0481 0482 def error(str, rv): 0483 print >> sys.stderr, str 0484 #sys.exit(rv) 0485 0486 def main(): 0487 try: 0488 # opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x", 0489 # ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=", 0490 # "update=", "updateDescr=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"]) 0491 opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x", 0492 ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=", 0493 "update=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"]) 0494 except getopt.GetoptError as err: 0495 # print help information and exit: 0496 print >> sys.stderr, str(err) # will print something like "option -a not recognized" 0497 usage() 0498 sys.exit(1) 0499 # loadDescriptions() 0500 returnXml = False 0501 xmlOut = io.StringIO() 0502 xmlOut.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?><ufw>") 0503 for o, a in opts: 0504 if o in ("-h", "--help"): 0505 usage() 0506 sys.exit() 0507 elif o in ("-s", "--status"): 0508 getStatus(ufw, xmlOut) 0509 returnXml=True 0510 elif o in ("-e", "--setEnabled"): 0511 setEnabled(ufw, a) 0512 elif o in ("-d", "--defaults"): 0513 getDefaults(ufw, xmlOut) 0514 returnXml=True 0515 elif o in ("-f", "--setDefaults"): 0516 setDefaults(ufw, a) 0517 elif o in ("-l", "--list"): 0518 getRules(ufw, xmlOut) 0519 returnXml=True 0520 elif o in ("-a", "--add"): 0521 addRule(ufw, a) 0522 elif o in ("-u", "--update"): 0523 updateRule(ufw, a) 0524 # elif o in ("-U", "--updateDescr"): 0525 # updateRuleDescr(ufw, a) 0526 elif o in ("-r", "--remove"): 0527 removeRule(ufw, a) 0528 elif o in ("-m", "--move"): 0529 moveRule(ufw, a) 0530 elif o in ("-t", "--reset"): 0531 reset(ufw) 0532 elif o in ("-i", "--modules"): 0533 getModules(ufw, xmlOut) 0534 returnXml=True 0535 elif o in ("-I", "--setModules"): 0536 setModules(ufw, a) 0537 elif o in ("-x", "--clearRules"): 0538 clearRules(ufw) 0539 else: 0540 usage() 0541 # saveDescriptions() 0542 if returnXml: 0543 xmlOut.write("</ufw>") 0544 print (xmlOut.getvalue()) 0545 0546 def usage(): 0547 print ("Python helper for UFW KCM") 0548 print ("") 0549 print ("(C) Craig Drummond, 2011") 0550 print ("") 0551 print ("Usage:") 0552 print (" "+sys.argv[0]+" --status") 0553 print (" "+sys.argv[0]+" --setEnabled <true/false>") 0554 print (" "+sys.argv[0]+" --defaults") 0555 print (" "+sys.argv[0]+" --setDefaults <xml>") 0556 print (" "+sys.argv[0]+" --list") 0557 print (" "+sys.argv[0]+" --add <xml>") 0558 print (" "+sys.argv[0]+" --update <xml>") 0559 # print (" "+sys.argv[0]+" --updateDescr <xml>") 0560 print (" "+sys.argv[0]+" --remove <index>") 0561 print (" "+sys.argv[0]+" --remove <index:hash>") 0562 print (" "+sys.argv[0]+" --move <from:to>") 0563 print (" "+sys.argv[0]+" --reset") 0564 print (" "+sys.argv[0]+" --modules") 0565 print (" "+sys.argv[0]+" --setModules <xml>") 0566 print (" "+sys.argv[0]+" --clearRules") 0567 0568 if __name__ == "__main__": 0569 main()