Warning, /plasma/plasma-firewall/kcm/backends/ufw/helper/kcm_ufw_helper.py.cmake is written in an unsupported language. File is not indexed.
0001 #!@Python3_EXECUTABLE@
0002
0003 # SPDX-License-Identifier: GPL-3.0-or-later
0004 # SPDX-FileCopyrightText: 2010 Darwin M. Bautista <djclue917@gmail.com>
0005 # SPDX-FileCopyrightText: 2011 Craig Drummond <craig.p.drummond@gmail.com>
0006
0007 # UFW KControl Module
0008
0009 import os
0010 import sys
0011
0012 import locale
0013 import gettext
0014
0015 import getopt
0016 import shutil
0017 import hashlib
0018 import io
0019
0020 from xml.etree import ElementTree as etree
0021 from copy import deepcopy
0022
0023 import ufw.common
0024 import ufw.frontend
0025 from ufw.util import valid_address
0026 from ufw.common import UFWRule
0027
0028 ANY_ADDR = '0.0.0.0/0'
0029 ANY_PORT = 'any'
0030 ANY_PROTOCOL = 'any'
0031 OLD_DESCR_FILE = "/etc/ufw/descriptions"
0032 DESCR_FILE = "/etc/kcm_ufw/descriptions"
0033 DEFAULTS_FILE = "@DATA_INSTALL_DIR@/kcm_ufw/defaults"
0034
0035 ERROR_FAILED_TO_SET_STATUS = -1
0036 ERROR_INVALID_INDEX = -2
0037 ERROR_INVALID_XML_NO_RULE = -3
0038 ERROR_INVALID_XML_NO_ACTION_XML = -4
0039 ERROR_INVALID_XML_NO_DEFAULTS = -5
0040 ERROR_INVALID_XML_NO_MODULES = -6
0041
0042
0043 class UFWFrontend(ufw.frontend.UFWFrontend):
0044
0045 def __init__(self, dryrun):
0046 ufw.frontend.UFWFrontend.__init__(self, dryrun)
0047 # Compatibility for ufw 0.31
0048 # This is a better way of handling method renames instead of putting
0049 # try/except blocks all over the whole application code
0050 # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=20
0051 try:
0052 self.backend.get_default_policy
0053 except AttributeError:
0054 self.backend.get_default_policy = self.backend._get_default_policy
0055 try:
0056 self.backend._is_enabled
0057 except AttributeError:
0058 self.backend._is_enabled = self.backend.is_enabled
0059
0060
0061 def localizeUfw():
0062 # Ref: http://code.google.com/p/ufw-frontends/issues/detail?id=19
0063 locale.setlocale(locale.LC_ALL, 'C')
0064 gettext.install(ufw.common.programName)
0065
0066 #define looad descriptions file
0067 # descrsChanged=False
0068 # def moveOldDescrFile():
0069 # try:
0070 # if os.path.exists(OLD_DESCR_FILE) and os.path.isfile(OLD_DESCR_FILE):
0071 # shutil.move(OLD_DESCR_FILE, DESCR_FILE)
0072 # except Exception as e:
0073 # return # Old file does not exist, or is not a file!...
0074 #
0075 # def loadDescriptions():
0076 # moveOldDescrFile();
0077 # try:
0078 # global descrs
0079 # descrs={"a":"b"}
0080 # descrData=open(DESCR_FILE).read()
0081 # descrs=eval(descrData)
0082 # except Exception as e:
0083 # return
0084 #
0085 # def removeDescriptions():
0086 # try:
0087 # os.remove(DESCR_FILE)
0088 # descrsChanged=False
0089 # except Exception as e:
0090 # return
0091 #
0092 # def saveDescriptions():
0093 # global descrsChanged
0094 # if descrsChanged:
0095 # try:
0096 # if ("a" in descrs):
0097 # del descrs["a"]
0098 # if len(descrs)>0:
0099 # descrFile = open(DESCR_FILE, 'w')
0100 # descrFile.write(repr(descrs))
0101 # descrsChanged=False
0102 # else:
0103 # removeDescriptions()
0104 # except Exception as e:
0105 # return
0106 #
0107 # def getDescription(hashStr):
0108 # if (hashStr in descrs):
0109 # return descrs[hashStr]
0110 # return ''
0111 #
0112 # def removeDescription(hashStr):
0113 # if (hashStr in descrs):
0114 # del descrs[hashStr]
0115 # global descrsChanged
0116 # descrsChanged=True
0117 #
0118 # def updateDescription(hashStr, descrStr):
0119 # if ((hashStr in descrs)==False) or (descrs[hashStr]!=descrStr):
0120 # removeDescription(hashStr)
0121 # descrs[hashStr]=descrStr
0122 # global descrsChanged
0123 # descrsChanged=True
0124
0125 def loadDefaultSettings(ufw):
0126 try:
0127 defaultsFile=open(DEFAULTS_FILE)
0128 while 1:
0129 line = defaultsFile.readline()
0130 if line == '':
0131 break
0132 parts=line.split('=')
0133 if len(parts) == 2:
0134 value=parts[1].replace('\n', '')
0135 if parts[0] == 'incoming':
0136 ufw.set_default_policy(value, 'incoming')
0137 elif parts[0] == 'outgoing':
0138 ufw.set_default_policy(value, 'outgoing')
0139 elif parts[0] == 'loglevel':
0140 ufw.set_loglevel(value)
0141 elif parts[0] == 'ipv6':
0142 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', value.lower())
0143 elif parts[0] == 'modules':
0144 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', '"' + value + '"')
0145 except Exception as e:
0146 return
0147
0148 # Localise UFW, and init the 'frontend'
0149 localizeUfw()
0150 ufw=UFWFrontend(False)
0151
0152 def getProtocol(rule):
0153 """Determine protocol of rule.
0154 Taken from ufw.parser.UFWCommandRule.parse
0155 """
0156 # Determine src type
0157 if rule.src == ANY_ADDR:
0158 from_type = 'any'
0159 else:
0160 from_type = ('v6' if valid_address(rule.src, '6') else 'v4')
0161 # Determine dst type
0162 if rule.dst == ANY_ADDR:
0163 to_type = 'any'
0164 else:
0165 to_type = ('v6' if valid_address(rule.dst, '6') else 'v4')
0166 # Figure out the type of rule (IPv4, IPv6, or both)
0167 if from_type == ANY_PROTOCOL and to_type == ANY_PROTOCOL:
0168 protocol = 'both'
0169 elif from_type != ANY_PROTOCOL and to_type != ANY_PROTOCOL and from_type != to_type:
0170 err_msg = _("Mixed IP versions for 'from' and 'to'")
0171 raise ufw.common.UFWError(err_msg)
0172 elif from_type != ANY_PROTOCOL:
0173 protocol = from_type
0174 elif to_type != ANY_PROTOCOL:
0175 protocol = to_type
0176 return protocol
0177
0178 def insertRule(ufw, rule, protocol=None):
0179 if protocol is None:
0180 protocol = getProtocol(rule)
0181 rule = rule.dup_rule()
0182 # Fix any inconsistency
0183 if rule.sapp or rule.dapp:
0184 rule.set_protocol(ANY_PROTOCOL)
0185 if rule.sapp:
0186 rule.sport = rule.sapp
0187 if rule.dapp:
0188 rule.dport = rule.dapp
0189 # If trying to insert beyond the end, just set position to 0
0190 if rule.position and not ufw.backend.get_rule_by_number(rule.position):
0191 rule.set_position(0)
0192 ufw.set_rule(rule, protocol)
0193 # Reset the positions of the recently inserted rule(s)
0194 if rule.position:
0195 s = rule.position - 1
0196 e = rule.position + 1
0197 for r in ufw.backend.get_rules()[s:e]:
0198 r.set_position(0)
0199 return rule
0200
0201 def getRulesList(ufw):
0202 app_rules = []
0203 for i, r in enumerate(ufw.backend.get_rules()):
0204 if r.dapp or r.sapp:
0205 t = r.get_app_tuple()
0206 if t in app_rules:
0207 continue
0208 else:
0209 app_rules.append(t)
0210 yield (i, r)
0211
0212 def encodeText(str):
0213 str=str.replace("&", "&")
0214 str=str.replace("<", "<")
0215 str=str.replace("\"", """)
0216 str=str.replace(">", ">")
0217 return str
0218
0219 def ruleDetails(rule):
0220 xmlStr = io.StringIO()
0221 xmlStr.write("action=\"")
0222 xmlStr.write(rule.action.lower())
0223 xmlStr.write("\" direction=\"")
0224 xmlStr.write(rule.direction.lower())
0225 xmlStr.write("\" dapp=\"")
0226 xmlStr.write(rule.dapp)
0227 xmlStr.write("\" sapp=\"")
0228 xmlStr.write(rule.sapp)
0229 xmlStr.write("\" dport=\"")
0230 xmlStr.write(rule.dport)
0231 xmlStr.write("\" sport=\"")
0232 xmlStr.write(rule.sport)
0233 xmlStr.write("\" protocol=\"")
0234 xmlStr.write(rule.protocol.lower())
0235 xmlStr.write("\" dst=\"")
0236 xmlStr.write(rule.dst)
0237 xmlStr.write("\" src=\"")
0238 xmlStr.write(rule.src)
0239 xmlStr.write("\" interface_in=\"")
0240 xmlStr.write(rule.interface_in)
0241 xmlStr.write("\" interface_out=\"")
0242 xmlStr.write(rule.interface_out)
0243 xmlStr.write("\" v6=\"")
0244 if rule.v6:
0245 xmlStr.write('True')
0246 else:
0247 xmlStr.write('False')
0248 return xmlStr.getvalue()
0249
0250 # def detailsHash(details):
0251 # ruleHash = hashlib.md5()
0252 # ruleHash.update(details.encode('utf-8'))
0253 # return ruleHash.hexdigest()
0254
0255 # Convert a rule to an XML string...
0256 def toXml(rule, xmlStr):
0257 xmlStr.write("<rule position=\"")
0258 xmlStr.write(str(rule.position))
0259 xmlStr.write("\" ")
0260 details=ruleDetails(rule)
0261 xmlStr.write(details)
0262 # hashStr=detailsHash(details)
0263 # descr=getDescription(hashStr)
0264 # if descr != '':
0265 # xmlStr.write("\" descr=\"")
0266 # xmlStr.write(encodeText(descr))
0267 # xmlStr.write("\" hash=\"")
0268 # xmlStr.write(hashStr)
0269 xmlStr.write("\" logtype=\"")
0270 xmlStr.write(rule.logtype)
0271 xmlStr.write("\" />")
0272
0273 # Create rule from XML...
0274 def fromXml(str):
0275 elem = etree.XML(str)
0276 if elem.tag != 'rule':
0277 error("ERROR: Invalid XML, expected \'rule\' element", ERROR_INVALID_XML_NO_RULE)
0278 action=elem.get('action', '').lower()
0279 if action == '':
0280 error("ERROR: Invalid XML, no action specified", ERROR_INVALID_XML_NO_ACTION_XML)
0281 protocol=elem.get('protocol', ANY_PROTOCOL).lower()
0282 rule = UFWRule(action, protocol)
0283 rule.position=int(elem.get('position', 0))
0284 rule.direction=elem.get('direction', 'in').lower()
0285 rule.dapp=elem.get('dapp', '')
0286 rule.sapp=elem.get('sapp', '')
0287 rule.dport=elem.get('dport', ANY_PORT)
0288 rule.sport=elem.get('sport', ANY_PORT)
0289 rule.dst=elem.get('dst', ANY_ADDR)
0290 rule.src=elem.get('src', ANY_ADDR)
0291 rule.interface_in=elem.get('interface_in', '')
0292 rule.interface_out=elem.get('interface_out', '')
0293 rule.logtype=elem.get('logtype', '').lower()
0294 rule.v6=elem.get('v6', 'False').lower() == "true"
0295 return rule
0296
0297 def getStatus(ufw, xmlStr):
0298 xmlStr.write("<status enabled=\"")
0299 if ufw.backend._is_enabled():
0300 xmlStr.write('true')
0301 else:
0302 xmlStr.write('false')
0303 xmlStr.write("\" />")
0304
0305 def setEnabled(ufw, status):
0306 if status.lower() == "false":
0307 stat=False
0308 else:
0309 stat=True
0310 if stat != ufw.backend._is_enabled():
0311 ufw.set_enabled(stat)
0312 if ufw.backend._is_enabled() != stat:
0313 error("ERROR: Failed to set UFW status", ERROR_FAILED_TO_SET_STATUS)
0314
0315 def getDefaults(ufw, xmlStr):
0316 conf = ufw.backend.defaults
0317 xmlStr.write("<defaults incoming=\"")
0318 xmlStr.write(ufw.backend.get_default_policy('input'))
0319 xmlStr.write("\" outgoing=\"")
0320 xmlStr.write(ufw.backend.get_default_policy('output'))
0321 xmlStr.write("\" loglevel=\"")
0322 xmlStr.write(conf['loglevel'])
0323 xmlStr.write("\" ipv6=\"")
0324 xmlStr.write(conf['ipv6'])
0325 xmlStr.write("\" />")
0326
0327 def setDefaults(ufw, xml):
0328 elem = etree.XML(xml)
0329 if elem.tag != 'defaults':
0330 error("ERROR: Invalid XML, expected \'defaults\' element", ERROR_INVALID_XML_NO_DEFAULTS)
0331 enabled=ufw.backend._is_enabled()
0332 if enabled:
0333 ufw.set_enabled(False)
0334 ipv6=elem.get('ipv6', '').lower()
0335 if ipv6 != '':
0336 del ufw
0337 ufw=UFWFrontend(False)
0338 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPV6', ipv6)
0339 del ufw
0340 ufw=UFWFrontend(False)
0341 policy=elem.get('incoming', '').lower()
0342 if policy != '':
0343 ufw.set_default_policy(policy, 'incoming')
0344 policy=elem.get('outgoing', '').lower()
0345 if policy != '':
0346 ufw.set_default_policy(policy, 'outgoing')
0347 loglevel=elem.get('loglevel', '').lower()
0348 if loglevel != '':
0349 ufw.set_loglevel(loglevel)
0350 if enabled:
0351 ufw.set_enabled(True)
0352
0353 def getRules(ufw, xmlStr):
0354 xmlStr.write("<rules>")
0355 for i, data in enumerate(getRulesList(ufw)):
0356 idx, rule = data
0357 toXml(rule.dup_rule(), xmlStr)
0358 xmlStr.write("</rules>")
0359
0360 # def updateRuleDescription(rule, xml):
0361 # elem=etree.XML(xml)
0362 # descr=elem.get('descr', '')
0363 # oldHashCode=elem.get('hash', '')
0364 # if descr != '':
0365 # details=ruleDetails(rule)
0366 # hashStr=detailsHash(details)
0367 # # For an update, we should be passed old hash code - if so, remove old entry...
0368 # if oldHashCode!= '':
0369 # removeDescription(oldHashCode)
0370 # updateDescription(hashStr, descr)
0371 # else:
0372 # if oldHashCode!= '':
0373 # removeDescription(oldHashCode)
0374
0375 def addRule(ufw, xml):
0376 rule=fromXml(xml)
0377 inserted=insertRule(ufw, rule)
0378 # updateRuleDescription(inserted, xml)
0379
0380 def updateRule(ufw, xml):
0381 rule=fromXml(xml)
0382 deleted=False
0383 try:
0384 prev=deepcopy(ufw.backend.get_rule_by_number(rule.position))
0385 ufw.delete_rule(rule.position, True)
0386 deleted=True
0387 inserted=insertRule(ufw, rule)
0388 deleted=False
0389 # updateRuleDescription(inserted, xml)
0390 except Exception as e:
0391 if deleted:
0392 insertRule(ufw, prev)
0393
0394 # def updateRuleDescr(ufw, xml):
0395 # rule=fromXml(xml)
0396 # details=ruleDetails(rule)
0397 # hashStr=detailsHash(details)
0398 # updateRuleDescription(rule, xml)
0399
0400 # Remove a rule. Index is either; just the index, or <index>:<hashcode>
0401 def removeRule(ufw, index):
0402 parts=index.split(':')
0403 try:
0404 if 2==len(parts):
0405 idx=int(parts[0])
0406 else:
0407 idx=int(index)
0408 if idx<1 or idx>(ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True)):
0409 error("ERROR: Invalid index", ERROR_INVALID_INDEX)
0410 # if 2==len(parts):
0411 # removeDescription(parts[1])
0412 # else:
0413 # rule=ufw.backend.get_rule_by_number(index)
0414 # if rule:
0415 # details=ruleDetails(rule)
0416 # hashStr=detailsHash(details)
0417 # removeDescription(hashStr)
0418 ufw.delete_rule(idx, True)
0419 #except ufw.common.UFWError as e:
0420 #error("ERROR: UFW error", e.value)
0421 except ValueError:
0422 error("ERROR: Invalid input type", ERROR_INVALID_INDEX)
0423
0424 def moveRule(ufw, indexes):
0425 idx=indexes.split(':')
0426 if 2!= len(idx):
0427 error("ERROR: Invalid number of indexes", ERROR_INVALID_INDEX)
0428 fromIndex=int(idx[0])
0429 toIndex=int(idx[1])
0430 if fromIndex == toIndex:
0431 error("ERROR: Source and destination cannot be the same", ERROR_INVALID_INDEX)
0432 rule=ufw.backend.get_rule_by_number(fromIndex).dup_rule()
0433 ufw.delete_rule(fromIndex, True)
0434 rule.position=toIndex
0435 insertRule(ufw, rule)
0436
0437 def reset(ufw):
0438 loadDefaultSettings(ufw)
0439 clearRules(ufw)
0440 ufw.reset(True)
0441 if ufw.backend._is_enabled():
0442 ufw.set_enabled(False)
0443 ufw.set_enabled(True)
0444
0445 def clearRules(ufw):
0446 # removeDescriptions()
0447 count=ufw.backend.get_rules_count(False)+ufw.backend.get_rules_count(True)
0448 for num in range(0, count):
0449 try:
0450 ufw.delete_rule(1, True)
0451 except ufw.common.UFWError as e:
0452 pass
0453
0454 def getModules(ufw, xmlStr):
0455 xmlStr.write("<modules enabled=\"")
0456 if 'ipt_modules' in ufw.backend.defaults:
0457 modules=ufw.backend.defaults['ipt_modules']
0458 if modules != '':
0459 xmlStr.write(modules)
0460 xmlStr.write("\" />")
0461
0462 def setModules(ufw, xml):
0463 elem = etree.XML(xml)
0464 if elem.tag != 'modules':
0465 error("ERROR: Invalid XML, expected \'modules' element", ERROR_INVALID_XML_NO_MODULES)
0466 modules=elem.get('enabled', '').lower()
0467 modules = '"' + modules + '"'
0468 ufw.backend.set_default(ufw.backend.files['defaults'], 'IPT_MODULES', modules)
0469
0470 # def getProfiles(ufw, xmlStr):
0471 # xmlStr.write("<profiles names=\"")
0472 # first=True
0473 # for profile in ufw.backend.profiles.keys():
0474 # if profile != '':
0475 # if first:
0476 # first=False
0477 # else:
0478 # xmlStr.write(" ")
0479 # xmlStr.write(profile)
0480 # xmlStr.write("\" />")
0481
0482 def error(str, rv):
0483 print >> sys.stderr, str
0484 #sys.exit(rv)
0485
0486 def main():
0487 try:
0488 # opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x",
0489 # ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=",
0490 # "update=", "updateDescr=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"])
0491 opts, args = getopt.getopt(sys.argv[1:], "hse:df:la:u:U:r:m:tiI:x",
0492 ["help", "status", "setEnabled=", "defaults", "setDefaults=", "list", "add=",
0493 "update=", "remove=", "move=", "reset", "modules", "setModules=", "clearRules"])
0494 except getopt.GetoptError as err:
0495 # print help information and exit:
0496 print >> sys.stderr, str(err) # will print something like "option -a not recognized"
0497 usage()
0498 sys.exit(1)
0499 # loadDescriptions()
0500 returnXml = False
0501 xmlOut = io.StringIO()
0502 xmlOut.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?><ufw>")
0503 for o, a in opts:
0504 if o in ("-h", "--help"):
0505 usage()
0506 sys.exit()
0507 elif o in ("-s", "--status"):
0508 getStatus(ufw, xmlOut)
0509 returnXml=True
0510 elif o in ("-e", "--setEnabled"):
0511 setEnabled(ufw, a)
0512 elif o in ("-d", "--defaults"):
0513 getDefaults(ufw, xmlOut)
0514 returnXml=True
0515 elif o in ("-f", "--setDefaults"):
0516 setDefaults(ufw, a)
0517 elif o in ("-l", "--list"):
0518 getRules(ufw, xmlOut)
0519 returnXml=True
0520 elif o in ("-a", "--add"):
0521 addRule(ufw, a)
0522 elif o in ("-u", "--update"):
0523 updateRule(ufw, a)
0524 # elif o in ("-U", "--updateDescr"):
0525 # updateRuleDescr(ufw, a)
0526 elif o in ("-r", "--remove"):
0527 removeRule(ufw, a)
0528 elif o in ("-m", "--move"):
0529 moveRule(ufw, a)
0530 elif o in ("-t", "--reset"):
0531 reset(ufw)
0532 elif o in ("-i", "--modules"):
0533 getModules(ufw, xmlOut)
0534 returnXml=True
0535 elif o in ("-I", "--setModules"):
0536 setModules(ufw, a)
0537 elif o in ("-x", "--clearRules"):
0538 clearRules(ufw)
0539 else:
0540 usage()
0541 # saveDescriptions()
0542 if returnXml:
0543 xmlOut.write("</ufw>")
0544 print (xmlOut.getvalue())
0545
0546 def usage():
0547 print ("Python helper for UFW KCM")
0548 print ("")
0549 print ("(C) Craig Drummond, 2011")
0550 print ("")
0551 print ("Usage:")
0552 print (" "+sys.argv[0]+" --status")
0553 print (" "+sys.argv[0]+" --setEnabled <true/false>")
0554 print (" "+sys.argv[0]+" --defaults")
0555 print (" "+sys.argv[0]+" --setDefaults <xml>")
0556 print (" "+sys.argv[0]+" --list")
0557 print (" "+sys.argv[0]+" --add <xml>")
0558 print (" "+sys.argv[0]+" --update <xml>")
0559 # print (" "+sys.argv[0]+" --updateDescr <xml>")
0560 print (" "+sys.argv[0]+" --remove <index>")
0561 print (" "+sys.argv[0]+" --remove <index:hash>")
0562 print (" "+sys.argv[0]+" --move <from:to>")
0563 print (" "+sys.argv[0]+" --reset")
0564 print (" "+sys.argv[0]+" --modules")
0565 print (" "+sys.argv[0]+" --setModules <xml>")
0566 print (" "+sys.argv[0]+" --clearRules")
0567
0568 if __name__ == "__main__":
0569 main()