File indexing completed on 2024-06-16 04:47:18
0001 #!/usr/bin/env python3 0002 # -*- coding: utf-8 -*- 0003 #*************************************************************************** 0004 #* SPDX-FileCopyrightText: 2022 S. MANKOWSKI stephane@mankowski.fr 0005 #* SPDX-FileCopyrightText: 2022 G. DE BURE support@mankowski.fr 0006 #* SPDX-License-Identifier: GPL-3.0-or-later 0007 #*************************************************************************** 0008 0009 """ 0010 Skrooge AqBanking Bridge (SABB) 0011 ------------------------------- 0012 0013 Authors: 0014 * Bernhard Scheirle <bernhard@scheirle.de> 0015 0016 Changelog: 0017 3.0.2 - 2022.07.28 0018 * Switch from deprecated distutils.version to packaging.version 0019 0020 3.0.1 - 2022.07.28 0021 * Use ultimateDebitor as payee if present. 0022 0023 3.0.0 - 2022.07.28 0024 * Requires AqBanking 6.5.0 0025 Only version 6+ is PSD2 (Payment Services Directiv; Directive (EU) 2015/2366) compliant. 0026 * Limit transaction download to the last 6 weeks 0027 * Removed --prefer-valutadate command line option 0028 * Added some code for optical tans, but does not work yet 0029 0030 2.0.0 - 2019.05.09 0031 * Added auto repair for certain banks (Sprada, Netbank, Comdirect). 0032 * Added --disable-auto-repair command line option 0033 * Added --prefer-valutadate command line option 0034 * Removed --balance command line option 0035 0036 1.2.0 - 2019.04.28 0037 * Allow processing of accounts without an IBAN 0038 (e.g credit card accounts). 0039 In this case a fake IBAN is used: 0040 XX00<bank_number><account_number> 0041 0042 1.1.0 - 2018.05.21 0043 * Added command line parameter --terminal-emulator 0044 0045 1.0.0 - 2017.07.29 0046 * Initial release 0047 0048 """ 0049 0050 import argparse 0051 import contextlib 0052 import csv 0053 import datetime 0054 import io 0055 import os 0056 import re 0057 import shutil 0058 import subprocess 0059 import sys 0060 import tempfile 0061 from packaging.version import parse as parse_version 0062 from PIL import Image 0063 0064 __VERSION__ = "3.0.2" 0065 0066 0067 class AqDialect(csv.Dialect): 0068 delimiter = ';' 0069 quotechar = '"' 0070 quoting = csv.QUOTE_ALL 0071 lineterminator = '\n' 0072 0073 0074 class SkroogeDialect(csv.Dialect): 0075 delimiter = ';' 0076 quotechar = '"' 0077 escapechar = '\\' 0078 quoting = csv.QUOTE_ALL 0079 lineterminator = '\n' 0080 0081 0082 class AqBankingCli(object): 0083 Tool = 'aqbanking-cli' 0084 MinVersion = "6.0.2" 0085 0086 OpticalTan = "{} opticaltan".format(os.path.abspath(__file__)) 0087 0088 @staticmethod 0089 def build_command(args): 0090 com = [AqBankingCli.Tool] 0091 com.append('--charset=utf-8') 0092 com.append('--opticaltan="{}"'.format(AqBankingCli.OpticalTan)) 0093 com.extend(args) 0094 0095 if True: 0096 print("AqBankingCli.build_command: {}".format(com)) 0097 0098 return com 0099 0100 @staticmethod 0101 def check_version(): 0102 process_result = subprocess.run( 0103 AqBankingCli.build_command(['versions']), 0104 stdout=subprocess.PIPE 0105 ) 0106 process_result.check_returncode() 0107 lines = process_result.stdout.decode("utf-8").splitlines() 0108 valid_version = False 0109 for line in lines: 0110 line = line.strip() 0111 if line.startswith("AqBanking-CLI:"): 0112 line = line[14:].strip() 0113 if parse_version(line) >= parse_version(AqBankingCli.MinVersion): 0114 valid_version = True 0115 break 0116 if not valid_version: 0117 print("Please install a newer version of aqbanking." 0118 "At least version {} is requiered.".format(AqBankingCli.MinVersion)) 0119 return valid_version 0120 0121 0122 class Account(object): 0123 def __init__(self): 0124 self.bank_number = "" 0125 self.account_number = "" 0126 self.iban = "" 0127 self.fake_iban = False 0128 0129 @staticmethod 0130 def create(iban, bank, account): 0131 a = Account() 0132 a.iban = iban 0133 a.bank_number = bank 0134 a.account_number = account 0135 if a.iban == "": 0136 a.fake_iban = True 0137 a.iban = "XX00{}{}".format(a.bank_number, a.account_number) 0138 return a 0139 0140 def update(self, other): 0141 if self.bank_number != other.bank_number \ 0142 or self.account_number != other.account_number: 0143 return False 0144 0145 if not self.fake_iban or other.fake_iban: 0146 return True 0147 0148 self.fake_iban = other.fake_iban 0149 self.iban = other.iban 0150 return True 0151 0152 def toString(self): 0153 return 'Account({}, {}, {})'.format(self.bank_number, 0154 self.account_number, 0155 self.iban) 0156 0157 def isValid(self): 0158 return self.bank_number != "" \ 0159 and self.account_number != "" \ 0160 and self.iban != "" 0161 0162 def nice_iban(self): 0163 result = "" 0164 for i, c in enumerate(self.iban): 0165 if i % 4 == 0: 0166 result = result + " " 0167 result = result + c 0168 return result.strip().upper() 0169 0170 0171 class Accounts(object): 0172 def __init__(self): 0173 self._accounts = None 0174 0175 def _insert(self, account): 0176 if not account.isValid(): 0177 return 0178 0179 for a in self._accounts: 0180 if a.update(account): 0181 return 0182 self._accounts.append(account) 0183 0184 def _initialize(self): 0185 self._accounts = [] 0186 0187 process_result = subprocess.run( 0188 AqBankingCli.build_command(['listaccs', 0189 '--template="$(iban)";"$(bankcode)";"$(accountNumber)"']), 0190 stdout=subprocess.PIPE 0191 ) 0192 process_result.check_returncode() 0193 0194 fields = ["iban", "bankcode", "accountNumber"] 0195 input = process_result.stdout.decode("utf-8") 0196 data_lines = input.splitlines()[1:] 0197 reader = csv.DictReader(data_lines, fieldnames=fields, dialect=AqDialect) 0198 0199 for record in reader: 0200 account = Account.create(record['iban'], 0201 record['bankcode'], 0202 record['accountNumber']) 0203 self._insert(account) 0204 0205 def get_accounts(self): 0206 if not self._accounts: 0207 self._initialize() 0208 return self._accounts 0209 0210 0211 @contextlib.contextmanager 0212 def TemporaryContextFile(): 0213 with tempfile.TemporaryDirectory(prefix="SSAB.") as dir_path: 0214 context_file_path = os.path.join(dir_path, "context") 0215 open(context_file_path, 'a').close() 0216 yield context_file_path 0217 0218 0219 class RepairMan(object): 0220 def repair_row(self, row): 0221 pass 0222 0223 0224 class RepairManSpardaNetBank(RepairMan): 0225 """ 0226 Sparda / Netbank only: 0227 If the payees name exceeds 27 characters the overflowing characters 0228 of the name gets stored at the beginning of the purpose field. 0229 0230 This is the case, when one of the strings listed in Keywords is part 0231 of the purpose fields but does not start at the beginning. 0232 In this case, the part leading up to the keyword is to be treated as the 0233 tail of the payee. 0234 """ 0235 0236 Keywords = ['SEPA-BASISLASTSCHRIFT', 0237 'SEPA-ÜBERWEISUNG', 0238 'SEPA LOHN/GEHALT'] 0239 0240 def repair_row(self, row): 0241 comment = row['comment'] 0242 0243 for key in self.Keywords: 0244 offset = comment.find(key) 0245 if offset >= 0: 0246 if offset > 0: 0247 row['payee'] = row['payee'] + comment[:offset] 0248 keyEnd = offset + len(key) 0249 row['mode'] = comment[offset:keyEnd] 0250 row['comment'] = comment[keyEnd:] 0251 break 0252 return row 0253 0254 0255 class RepairManComdirect(RepairMan): 0256 Keywords = ['WERTPAPIERE', 0257 'LASTSCHRIFT / BELASTUNG', 0258 'ÜBERTRAG / ÜBERWEISUNG', 0259 'KONTOÜBERTRAG', 0260 'KUPON', 0261 'SUMME MONATSABRECHNUNG VISA', 0262 'KONTOABSCHLUSSABSCHLUSS ZINSEN'] 0263 0264 def repair_row(self, row): 0265 comment = row['comment'].strip() 0266 0267 for key in self.Keywords: 0268 if comment.startswith(key): 0269 row['comment'] = comment[len(key):] 0270 row['mode'] = key 0271 break 0272 return row 0273 0274 0275 class RepairManStrip(RepairMan): 0276 Keywords = ['date', 'mode', 'comment', 'payee', 'amount'] 0277 0278 def repair_row(self, row): 0279 for key in self.Keywords: 0280 row[key] = row[key].strip() 0281 return row 0282 0283 0284 class RepairManDC(RepairMan): 0285 Keywords = ['comment'] 0286 0287 def repair_row(self, row): 0288 for key in self.Keywords: 0289 row[key] = row[key].replace("\\xdc", "Ü") 0290 return row 0291 0292 0293 class SABB(object): 0294 ReturnValue_NormalExit = 0 0295 ReturnValue_InvalidVersion = 1 0296 ReturnValue_NoCommandGiven = 10 0297 ReturnValue_UnknownMimeType = 20 0298 ReturnValue_UnsupportedMimeType = 21 0299 0300 def __init__(self): 0301 self.accounts = Accounts() 0302 self.repair_mans = [RepairManDC(), 0303 RepairManSpardaNetBank(), 0304 RepairManComdirect(), 0305 RepairManStrip()] 0306 0307 self.output_folder = None 0308 self.terminal_emulator = None 0309 self.disable_auto_repair = None 0310 0311 def get_csv_writer(self, fieldnames, generateHeader=True): 0312 output = io.StringIO("") 0313 writer = csv.DictWriter(output, fieldnames=fieldnames, dialect=SkroogeDialect) 0314 if generateHeader: 0315 writer.writeheader() 0316 return output, writer 0317 0318 def get_accounts(self): 0319 if not AqBankingCli.check_version(): 0320 return self.ReturnValue_InvalidVersion 0321 0322 fieldnames_output = ['id'] 0323 output, writer = self.get_csv_writer(fieldnames_output) 0324 for account in self.accounts.get_accounts(): 0325 row = {} 0326 row['id'] = account.nice_iban() 0327 writer.writerow(row) 0328 print(output.getvalue().strip()) 0329 return self.ReturnValue_NormalExit 0330 0331 def convert_transactions(self, aqbanking_output, generateHeader): 0332 fieldnames_input = ['date', 'value', 'remoteName', 'ultimateDebtor', 'purpose'] 0333 reader = csv.DictReader(aqbanking_output.splitlines(), fieldnames=fieldnames_input, dialect=AqDialect) 0334 fieldnames_output = ['date', 'mode', 'comment', 'payee', 'amount'] 0335 output, writer = self.get_csv_writer(fieldnames_output, generateHeader) 0336 for record in sorted(reader, key=lambda row: row['date']): 0337 row = {} 0338 row['date'] = record['date'] 0339 row['mode'] = "" 0340 row['comment'] = record['purpose'] 0341 if record['ultimateDebtor'] != "": 0342 row['payee'] = record['ultimateDebtor'].split('/')[0] 0343 else: 0344 row['payee'] = record['remoteName'] 0345 row['amount'] = record['value'] 0346 # row['unit'] = "" # record['value_currency'] 0347 if not self.disable_auto_repair: 0348 for repair_man in self.repair_mans: 0349 row = repair_man.repair_row(row) 0350 writer.writerow(row) 0351 return output.getvalue() 0352 0353 def process_context_file(self, context_file_path): 0354 self.output_folder = os.path.abspath(self.output_folder) 0355 if not os.path.exists(self.output_folder): 0356 os.makedirs(self.output_folder) 0357 0358 files = {} 0359 for account in self.accounts.get_accounts(): 0360 process_result = subprocess.run( 0361 AqBankingCli.build_command([ 0362 'listtrans', 0363 '--bank=' + str(account.bank_number), 0364 '--account=' + str(account.account_number), 0365 '--template="$(dateOrValutaDateAsString)";"$(valueAsString)";"$(remoteName)";"$(ultimateDebtor)";"$(purposeInOneLine)"', 0366 '-c', 0367 context_file_path]), 0368 stdout=subprocess.PIPE, 0369 stderr=subprocess.DEVNULL 0370 ) 0371 transactions = process_result.stdout.decode("utf-8", errors="backslashreplace") 0372 0373 output_file_path = os.path.join(self.output_folder, account.nice_iban() + ".csv") 0374 if output_file_path in files: 0375 files[output_file_path] = files[output_file_path] + '\n' + self.convert_transactions(transactions, False) 0376 else: 0377 files[output_file_path] = self.convert_transactions(transactions, True) 0378 0379 for path, content in files.items(): 0380 with open(path, 'w') as f: 0381 f.write(content) 0382 0383 def download(self): 0384 if not AqBankingCli.check_version(): 0385 return self.ReturnValue_InvalidVersion 0386 0387 today = datetime.date.today() 0388 weeksAgo6 = today + datetime.timedelta(weeks=-6) 0389 weeksNext = today + datetime.timedelta(weeks=1) 0390 with TemporaryContextFile() as context_file_path: 0391 args = ['request', 0392 '--ignoreUnsupported', 0393 '--transactions', 0394 # '--number=1', # Was required as workaround for bug in 6.03 0395 '--fromdate={}'.format(weeksAgo6.strftime("%Y%m%d")), 0396 '--todate={}'.format(today.strftime("%Y%m%d")), 0397 '-c', 0398 context_file_path 0399 ] 0400 0401 command = str.split(self.terminal_emulator) 0402 command.extend(AqBankingCli.build_command(args)) 0403 subprocess.run(command, check=True) 0404 0405 self.process_context_file(context_file_path) 0406 0407 return self.ReturnValue_NormalExit 0408 0409 0410 def main(): 0411 parser = argparse.ArgumentParser(prog='sabb', description='Skrooge AqBanking Bridge (SABB)') 0412 0413 # Global arguments 0414 parser.add_argument('--version', '-v', action='version', version='%(prog)s ' + __VERSION__, help='Shows version information.') 0415 subparsers = parser.add_subparsers(title='Commands', dest='command') 0416 0417 # Command: listaccounts 0418 parser_listaccounts = subparsers.add_parser('listaccounts', help='Returns a list of accounts that can be queried with AqBanking.') 0419 0420 # Command: bulkdownload 0421 parser_download = subparsers.add_parser('bulkdownload', help='Downloads all transactions of the last 6 weeks into the given output folder') 0422 parser_download.add_argument('--output_folder', required=True, help='The folder to store the csv files.') 0423 parser_download.add_argument( 0424 '--terminal-emulator', 0425 required=False, 0426 default="x-terminal-emulator -e", 0427 help='The terminal emulator command string that gets used to run the aqbanking user-interactive session. ' 0428 'Use an empty value »""« to not start a new terminal, but reuse the terminal running this command. ' 0429 'Example: "xterm -e". ' 0430 '(Default: "x-terminal-emulator -e")' 0431 ) 0432 parser_download.add_argument( 0433 '--disable-auto-repair', 0434 required=False, 0435 action='store_true', 0436 help='Disables bank specific repair steps.' 0437 ) 0438 0439 # Command: opticaltan 0440 parser_opticaltan = subparsers.add_parser( 0441 'opticaltan', 0442 help='Internal command. Used from {} to display optical tans.'.format(AqBankingCli.Tool)) 0443 parser_opticaltan.add_argument('mime', help="Mime type of the payload.") 0444 parser_opticaltan.add_argument('payload', help="Mime type dependent payload.") 0445 0446 args = parser.parse_args() 0447 0448 sabb = SABB() 0449 if (args.command == "listaccounts"): 0450 return sabb.get_accounts() 0451 elif (args.command == "bulkdownload"): 0452 sabb.output_folder = args.output_folder 0453 sabb.terminal_emulator = args.terminal_emulator 0454 sabb.disable_auto_repair = args.disable_auto_repair 0455 return sabb.download() 0456 #return sabb.process_context_file("<context file>") 0457 elif (args.command == "opticaltan"): 0458 if args.mime == "image/png": 0459 image = Image.open(args.payload) 0460 image = image.resize((2 * s for s in image.size)) 0461 image.show() 0462 return SABB.ReturnValue_NormalExit 0463 elif args.mime == "text/x-flickercode": 0464 print("Flicker code is not supported by SABB.") 0465 print("Configure your AqBanking account to use the manual (non-optical) flicker code.") 0466 return SABB.ReturnValue_UnsupportedMimeType 0467 else: 0468 print("Unsupported MIME type: {}".format(args.mime)) 0469 return SABB.ReturnValue_UnknownMimeType 0470 0471 parser.print_help() 0472 return SABB.ReturnValue_NoCommandGiven 0473 0474 0475 if __name__ == "__main__": 0476 sys.exit(main())