File indexing completed on 2024-06-16 04:47:18

0001 #!/usr/bin/env python3
0002 # -*- coding: utf-8 -*-
0003 #***************************************************************************
0004 #* SPDX-FileCopyrightText: 2022 S. MANKOWSKI stephane@mankowski.fr
0005 #* SPDX-FileCopyrightText: 2022 G. DE BURE support@mankowski.fr
0006 #* SPDX-License-Identifier: GPL-3.0-or-later
0007 #***************************************************************************
0008 
0009 """
0010 Skrooge AqBanking Bridge (SABB)
0011 -------------------------------
0012 
0013 Authors:
0014  * Bernhard Scheirle <bernhard@scheirle.de>
0015 
0016 Changelog:
0017 3.0.2 - 2022.07.28
0018     * Switch from deprecated distutils.version to packaging.version
0019 
0020 3.0.1 - 2022.07.28
0021     * Use ultimateDebitor as payee if present.
0022 
0023 3.0.0 - 2022.07.28
0024     * Requires AqBanking 6.5.0
0025       Only version 6+ is PSD2 (Payment Services Directiv; Directive (EU) 2015/2366) compliant.
0026     * Limit transaction download to the last 6 weeks
0027     * Removed --prefer-valutadate command line option
0028     * Added some code for optical tans, but does not work yet
0029 
0030 2.0.0 - 2019.05.09
0031     * Added auto repair for certain banks (Sprada, Netbank, Comdirect).
0032     * Added --disable-auto-repair command line option
0033     * Added --prefer-valutadate command line option
0034     * Removed --balance command line option
0035 
0036 1.2.0 - 2019.04.28
0037     * Allow processing of accounts without an IBAN
0038       (e.g credit card accounts).
0039       In this case a fake IBAN is used:
0040       XX00<bank_number><account_number>
0041 
0042 1.1.0 - 2018.05.21
0043     * Added command line parameter --terminal-emulator
0044 
0045 1.0.0 - 2017.07.29
0046     * Initial release
0047 
0048 """
0049 
0050 import argparse
0051 import contextlib
0052 import csv
0053 import datetime
0054 import io
0055 import os
0056 import re
0057 import shutil
0058 import subprocess
0059 import sys
0060 import tempfile
0061 from packaging.version import parse as parse_version
0062 from PIL import Image
0063 
0064 __VERSION__ = "3.0.2"
0065 
0066 
0067 class AqDialect(csv.Dialect):
0068     delimiter = ';'
0069     quotechar = '"'
0070     quoting = csv.QUOTE_ALL
0071     lineterminator = '\n'
0072 
0073 
0074 class SkroogeDialect(csv.Dialect):
0075     delimiter = ';'
0076     quotechar = '"'
0077     escapechar = '\\'
0078     quoting = csv.QUOTE_ALL
0079     lineterminator = '\n'
0080 
0081 
0082 class AqBankingCli(object):
0083     Tool = 'aqbanking-cli'
0084     MinVersion = "6.0.2"
0085 
0086     OpticalTan = "{} opticaltan".format(os.path.abspath(__file__))
0087 
0088     @staticmethod
0089     def build_command(args):
0090         com = [AqBankingCli.Tool]
0091         com.append('--charset=utf-8')
0092         com.append('--opticaltan="{}"'.format(AqBankingCli.OpticalTan))
0093         com.extend(args)
0094 
0095         if True:
0096             print("AqBankingCli.build_command: {}".format(com))
0097 
0098         return com
0099 
0100     @staticmethod
0101     def check_version():
0102         process_result = subprocess.run(
0103             AqBankingCli.build_command(['versions']),
0104             stdout=subprocess.PIPE
0105         )
0106         process_result.check_returncode()
0107         lines = process_result.stdout.decode("utf-8").splitlines()
0108         valid_version = False
0109         for line in lines:
0110             line = line.strip()
0111             if line.startswith("AqBanking-CLI:"):
0112                 line = line[14:].strip()
0113                 if parse_version(line) >= parse_version(AqBankingCli.MinVersion):
0114                     valid_version = True
0115                     break
0116         if not valid_version:
0117             print("Please install a newer version of aqbanking."
0118                   "At least version {} is requiered.".format(AqBankingCli.MinVersion))
0119         return valid_version
0120 
0121 
0122 class Account(object):
0123     def __init__(self):
0124         self.bank_number    = ""
0125         self.account_number = ""
0126         self.iban           = ""
0127         self.fake_iban      = False
0128 
0129     @staticmethod
0130     def create(iban, bank, account):
0131         a = Account()
0132         a.iban = iban
0133         a.bank_number = bank
0134         a.account_number = account
0135         if a.iban == "":
0136             a.fake_iban = True
0137             a.iban = "XX00{}{}".format(a.bank_number, a.account_number)
0138         return a
0139 
0140     def update(self, other):
0141         if self.bank_number != other.bank_number \
0142            or self.account_number != other.account_number:
0143             return False
0144 
0145         if not self.fake_iban or other.fake_iban:
0146             return True
0147 
0148         self.fake_iban = other.fake_iban
0149         self.iban = other.iban
0150         return True
0151 
0152     def toString(self):
0153         return 'Account({}, {}, {})'.format(self.bank_number,
0154                                             self.account_number,
0155                                             self.iban)
0156 
0157     def isValid(self):
0158         return self.bank_number     != "" \
0159             and self.account_number != "" \
0160             and self.iban           != ""
0161 
0162     def nice_iban(self):
0163         result = ""
0164         for i, c in enumerate(self.iban):
0165             if i % 4 == 0:
0166                 result = result + " "
0167             result = result + c
0168         return result.strip().upper()
0169 
0170 
0171 class Accounts(object):
0172     def __init__(self):
0173         self._accounts = None
0174 
0175     def _insert(self, account):
0176         if not account.isValid():
0177             return
0178 
0179         for a in self._accounts:
0180             if a.update(account):
0181                 return
0182         self._accounts.append(account)
0183 
0184     def _initialize(self):
0185         self._accounts = []
0186 
0187         process_result = subprocess.run(
0188             AqBankingCli.build_command(['listaccs',
0189                                         '--template="$(iban)";"$(bankcode)";"$(accountNumber)"']),
0190             stdout=subprocess.PIPE
0191         )
0192         process_result.check_returncode()
0193 
0194         fields = ["iban", "bankcode", "accountNumber"]
0195         input = process_result.stdout.decode("utf-8")
0196         data_lines = input.splitlines()[1:]
0197         reader = csv.DictReader(data_lines, fieldnames=fields, dialect=AqDialect)
0198 
0199         for record in reader:
0200             account = Account.create(record['iban'],
0201                                      record['bankcode'],
0202                                      record['accountNumber'])
0203             self._insert(account)
0204 
0205     def get_accounts(self):
0206         if not self._accounts:
0207             self._initialize()
0208         return self._accounts
0209 
0210 
0211 @contextlib.contextmanager
0212 def TemporaryContextFile():
0213     with tempfile.TemporaryDirectory(prefix="SSAB.") as dir_path:
0214         context_file_path = os.path.join(dir_path, "context")
0215         open(context_file_path, 'a').close()
0216         yield context_file_path
0217 
0218 
0219 class RepairMan(object):
0220     def repair_row(self, row):
0221         pass
0222 
0223 
0224 class RepairManSpardaNetBank(RepairMan):
0225     """
0226     Sparda / Netbank only:
0227     If the payees name exceeds 27 characters the overflowing characters
0228     of the name gets stored at the beginning of the purpose field.
0229 
0230     This is the case, when one of the strings listed in Keywords is part
0231     of the purpose fields but does not start at the beginning.
0232     In this case, the part leading up to the keyword is to be treated as the
0233     tail of the payee.
0234     """
0235 
0236     Keywords = ['SEPA-BASISLASTSCHRIFT',
0237                 'SEPA-ÜBERWEISUNG',
0238                 'SEPA LOHN/GEHALT']
0239 
0240     def repair_row(self, row):
0241         comment = row['comment']
0242 
0243         for key in self.Keywords:
0244             offset = comment.find(key)
0245             if offset >= 0:
0246                 if offset > 0:
0247                     row['payee'] = row['payee'] + comment[:offset]
0248                 keyEnd = offset + len(key)
0249                 row['mode'] = comment[offset:keyEnd]
0250                 row['comment'] = comment[keyEnd:]
0251                 break
0252         return row
0253 
0254 
0255 class RepairManComdirect(RepairMan):
0256     Keywords = ['WERTPAPIERE',
0257                 'LASTSCHRIFT / BELASTUNG',
0258                 'ÜBERTRAG / ÜBERWEISUNG',
0259                 'KONTOÜBERTRAG',
0260                 'KUPON',
0261                 'SUMME MONATSABRECHNUNG VISA',
0262                 'KONTOABSCHLUSSABSCHLUSS ZINSEN']
0263 
0264     def repair_row(self, row):
0265         comment = row['comment'].strip()
0266 
0267         for key in self.Keywords:
0268             if comment.startswith(key):
0269                 row['comment'] = comment[len(key):]
0270                 row['mode'] = key
0271                 break
0272         return row
0273 
0274 
0275 class RepairManStrip(RepairMan):
0276     Keywords = ['date', 'mode', 'comment', 'payee', 'amount']
0277 
0278     def repair_row(self, row):
0279         for key in self.Keywords:
0280             row[key] = row[key].strip()
0281         return row
0282 
0283 
0284 class RepairManDC(RepairMan):
0285     Keywords = ['comment']
0286 
0287     def repair_row(self, row):
0288         for key in self.Keywords:
0289             row[key] = row[key].replace("\\xdc", "Ü")
0290         return row
0291 
0292 
0293 class SABB(object):
0294     ReturnValue_NormalExit = 0
0295     ReturnValue_InvalidVersion = 1
0296     ReturnValue_NoCommandGiven = 10
0297     ReturnValue_UnknownMimeType = 20
0298     ReturnValue_UnsupportedMimeType = 21
0299 
0300     def __init__(self):
0301         self.accounts = Accounts()
0302         self.repair_mans = [RepairManDC(),
0303                             RepairManSpardaNetBank(),
0304                             RepairManComdirect(),
0305                             RepairManStrip()]
0306 
0307         self.output_folder       = None
0308         self.terminal_emulator   = None
0309         self.disable_auto_repair = None
0310 
0311     def get_csv_writer(self, fieldnames, generateHeader=True):
0312         output = io.StringIO("")
0313         writer = csv.DictWriter(output, fieldnames=fieldnames, dialect=SkroogeDialect)
0314         if generateHeader:
0315             writer.writeheader()
0316         return output, writer
0317 
0318     def get_accounts(self):
0319         if not AqBankingCli.check_version():
0320             return self.ReturnValue_InvalidVersion
0321 
0322         fieldnames_output = ['id']
0323         output, writer = self.get_csv_writer(fieldnames_output)
0324         for account in self.accounts.get_accounts():
0325             row = {}
0326             row['id'] = account.nice_iban()
0327             writer.writerow(row)
0328         print(output.getvalue().strip())
0329         return self.ReturnValue_NormalExit
0330 
0331     def convert_transactions(self, aqbanking_output, generateHeader):
0332         fieldnames_input = ['date', 'value', 'remoteName', 'ultimateDebtor', 'purpose']
0333         reader = csv.DictReader(aqbanking_output.splitlines(), fieldnames=fieldnames_input, dialect=AqDialect)
0334         fieldnames_output = ['date', 'mode', 'comment', 'payee', 'amount']
0335         output, writer = self.get_csv_writer(fieldnames_output, generateHeader)
0336         for record in sorted(reader, key=lambda row: row['date']):
0337             row = {}
0338             row['date'] = record['date']
0339             row['mode'] = ""
0340             row['comment'] = record['purpose']
0341             if record['ultimateDebtor'] != "":
0342                 row['payee'] = record['ultimateDebtor'].split('/')[0]
0343             else:
0344                 row['payee'] = record['remoteName']
0345             row['amount']  = record['value']
0346             #  row['unit']    = ""  # record['value_currency']
0347             if not self.disable_auto_repair:
0348                 for repair_man in self.repair_mans:
0349                     row = repair_man.repair_row(row)
0350             writer.writerow(row)
0351         return output.getvalue()
0352 
0353     def process_context_file(self, context_file_path):
0354         self.output_folder = os.path.abspath(self.output_folder)
0355         if not os.path.exists(self.output_folder):
0356             os.makedirs(self.output_folder)
0357 
0358         files = {}
0359         for account in self.accounts.get_accounts():
0360             process_result = subprocess.run(
0361                 AqBankingCli.build_command([
0362                     'listtrans',
0363                     '--bank=' + str(account.bank_number),
0364                     '--account=' + str(account.account_number),
0365                     '--template="$(dateOrValutaDateAsString)";"$(valueAsString)";"$(remoteName)";"$(ultimateDebtor)";"$(purposeInOneLine)"',
0366                     '-c',
0367                     context_file_path]),
0368                 stdout=subprocess.PIPE,
0369                 stderr=subprocess.DEVNULL
0370             )
0371             transactions = process_result.stdout.decode("utf-8", errors="backslashreplace")
0372 
0373             output_file_path = os.path.join(self.output_folder, account.nice_iban() + ".csv")
0374             if output_file_path in files:
0375                 files[output_file_path] = files[output_file_path] + '\n' + self.convert_transactions(transactions, False)
0376             else:
0377                 files[output_file_path] = self.convert_transactions(transactions, True)
0378 
0379         for path, content in files.items():
0380             with open(path, 'w') as f:
0381                 f.write(content)
0382 
0383     def download(self):
0384         if not AqBankingCli.check_version():
0385             return self.ReturnValue_InvalidVersion
0386 
0387         today = datetime.date.today()
0388         weeksAgo6 = today + datetime.timedelta(weeks=-6)
0389         weeksNext = today + datetime.timedelta(weeks=1)
0390         with TemporaryContextFile() as context_file_path:
0391             args = ['request',
0392                     '--ignoreUnsupported',
0393                     '--transactions',
0394                     # '--number=1', # Was required as workaround for bug in 6.03
0395                     '--fromdate={}'.format(weeksAgo6.strftime("%Y%m%d")),
0396                     '--todate={}'.format(today.strftime("%Y%m%d")),
0397                     '-c',
0398                     context_file_path
0399                     ]
0400 
0401             command = str.split(self.terminal_emulator)
0402             command.extend(AqBankingCli.build_command(args))
0403             subprocess.run(command, check=True)
0404 
0405             self.process_context_file(context_file_path)
0406 
0407         return self.ReturnValue_NormalExit
0408 
0409 
0410 def main():
0411     parser = argparse.ArgumentParser(prog='sabb', description='Skrooge AqBanking Bridge (SABB)')
0412 
0413     # Global arguments
0414     parser.add_argument('--version', '-v', action='version', version='%(prog)s ' + __VERSION__, help='Shows version information.')
0415     subparsers = parser.add_subparsers(title='Commands', dest='command')
0416 
0417     # Command: listaccounts
0418     parser_listaccounts = subparsers.add_parser('listaccounts', help='Returns a list of accounts that can be queried with AqBanking.')
0419 
0420     # Command: bulkdownload
0421     parser_download = subparsers.add_parser('bulkdownload',         help='Downloads all transactions of the last 6 weeks into the given output folder')
0422     parser_download.add_argument('--output_folder', required=True,  help='The folder to store the csv files.')
0423     parser_download.add_argument(
0424         '--terminal-emulator',
0425         required=False,
0426         default="x-terminal-emulator -e",
0427         help='The terminal emulator command string that gets used to run the aqbanking user-interactive session. '
0428         'Use an empty value »""« to not start a new terminal, but reuse the terminal running this command. '
0429         'Example: "xterm -e". '
0430         '(Default: "x-terminal-emulator -e")'
0431     )
0432     parser_download.add_argument(
0433         '--disable-auto-repair',
0434         required=False,
0435         action='store_true',
0436         help='Disables bank specific repair steps.'
0437     )
0438 
0439     # Command: opticaltan
0440     parser_opticaltan = subparsers.add_parser(
0441         'opticaltan',
0442         help='Internal command. Used from {} to display optical tans.'.format(AqBankingCli.Tool))
0443     parser_opticaltan.add_argument('mime', help="Mime type of the payload.")
0444     parser_opticaltan.add_argument('payload', help="Mime type dependent payload.")
0445 
0446     args = parser.parse_args()
0447 
0448     sabb = SABB()
0449     if (args.command == "listaccounts"):
0450         return sabb.get_accounts()
0451     elif (args.command == "bulkdownload"):
0452         sabb.output_folder       = args.output_folder
0453         sabb.terminal_emulator   = args.terminal_emulator
0454         sabb.disable_auto_repair = args.disable_auto_repair
0455         return sabb.download()
0456         #return sabb.process_context_file("<context file>")
0457     elif (args.command == "opticaltan"):
0458         if args.mime == "image/png":
0459             image = Image.open(args.payload)
0460             image = image.resize((2 * s for s in image.size))
0461             image.show()
0462             return SABB.ReturnValue_NormalExit
0463         elif args.mime == "text/x-flickercode":
0464             print("Flicker code is not supported by SABB.")
0465             print("Configure your AqBanking account to use the manual (non-optical) flicker code.")
0466             return SABB.ReturnValue_UnsupportedMimeType
0467         else:
0468             print("Unsupported MIME type: {}".format(args.mime))
0469             return SABB.ReturnValue_UnknownMimeType
0470 
0471     parser.print_help()
0472     return SABB.ReturnValue_NoCommandGiven
0473 
0474 
0475 if __name__ == "__main__":
0476     sys.exit(main())