from yaml import load try: from yaml import CSafeLoader as SafeLoader except Exception: from yaml import SafeLoader from os.path import join, relpath, isfile, isdir, isabs, abspath from typing import List, Union from game_backuper.file import listdirs from collections import namedtuple try: from functools import cached_property except ImportError: cached_property = property from game_backuper.regexp import Regex, wildcards_to_regex from game_backuper.compress import CompressConfig class BasicOption: '''Basic options which is included in config, program and files.''' _remove_old_files = None _enable_pcre2 = None _encrypt_files = None _compress_config = None _disable_compress = False _protect_filename = None _unpin_file = None @property def compress_config(self) -> CompressConfig: if self._disable_compress: return None if self._compress_config is not None: return self._compress_config prog = getattr(self, "_prog", None) if prog is not None: if prog._compress_config is not None: return prog._compress_config cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._compress_config is not None: return cfg._compress_config return None @cached_property def enable_pcre2(self) -> bool: if self._enable_pcre2 is not None: return self._enable_pcre2 prog = getattr(self, "_prog", None) if prog is not None: if prog._enable_pcre2 is not None: return prog._enable_pcre2 cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._enable_pcre2 is not None: return cfg._enable_pcre2 return False @cached_property def encrypt_files(self) -> bool: if self._encrypt_files is not None: return self._encrypt_files prog = getattr(self, "_prog", None) if prog is not None: if prog._encrypt_files is not None: return prog._encrypt_files cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._encrypt_files is not None: return cfg._encrypt_files return False @cached_property def protect_filename(self) -> bool: if self._protect_filename is not None: return self._protect_filename and self.encrypt_files prog = getattr(self, "_prog", None) if prog is not None: if prog._protect_filename is not None: return prog._protect_filename and self.encrypt_files cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._protect_filename is not None: return cfg._protect_filename and self.encrypt_files return False @cached_property def remove_old_files(self) -> bool: if self._remove_old_files is not None: return self._remove_old_files prog = getattr(self, "_prog", None) if prog is not None: if prog._remove_old_files is not None: return prog._remove_old_files cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._remove_old_files is not None: return cfg._remove_old_files return True @cached_property def unpin_file(self) -> bool: if self._unpin_file is not None: return self._unpin_file prog = getattr(self, "_prog", None) if prog is not None: if prog._unpin_file is not None: return prog._unpin_file cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._unpin_file is not None: return cfg._unpin_file return False def parse_all(self, data=None): self.parse_compress_config(data) self.parse_remove_old_files(data) self.parse_enable_pcre2(data) self.parse_encrypt_files(data) self.parse_protect_filename(data) self.parse_unpin_file(data) def parse_compress_config(self, data=None): if data is None: data = getattr(self, 'data') if 'compress_method' in data: v = data['compress_method'] if isinstance(v, str): self._compress_config = CompressConfig(v, data.get("compress_level")) # noqa: E501 elif v is not None: raise TypeError('compress_method option should be str or None.') # noqa: E501 else: self._disable_compress = True def parse_enable_pcre2(self, data=None): if data is None: data = getattr(self, 'data') if 'enable_pcre2' in data: v = data['enable_pcre2'] if isinstance(v, bool): self._enable_pcre2 = v else: raise TypeError('enable_pcre2 option must be a boolean.') del v def parse_encrypt_files(self, data=None): if data is None: data = getattr(self, 'data') if 'encrypt_files' in data: v = data['encrypt_files'] if isinstance(v, bool): self._encrypt_files = v else: raise TypeError('encrypt_files option must be a boolean.') del v def parse_protect_filename(self, data=None): if data is None: data = getattr(self, 'data') if 'protect_filename' in data: v = data['protect_filename'] if isinstance(v, bool): self._protect_filename = v else: raise TypeError('protect_filename option must be a boolean.') del v def parse_remove_old_files(self, data=None): if data is None: data = getattr(self, 'data') if 'remove_old_files' in data: v = data['remove_old_files'] if isinstance(v, bool): self._remove_old_files = v else: raise TypeError('remove_old_files option must be a boolean.') del v def parse_unpin_file(self, data=None): if data is None: data = getattr(self, 'data') if 'unpin_file' in data: v = data['unpin_file'] if isinstance(v, bool): self._unpin_file = v else: raise TypeError('unpin_file option must be a boolean.') del v class NFBasicOption: """Basic options which is included in config, program.""" _ignore_hidden_files = None def __init__(self, cfg=None, prog=None): self._cfg = cfg self._prog = prog @cached_property def ignore_hidden_files(self) -> bool: if self._ignore_hidden_files is not None: return self._ignore_hidden_files prog = getattr(self, "_prog", None) if prog is not None: if prog._ignore_hidden_files is not None: return prog._ignore_hidden_files cfg = getattr(self, "_cfg", None) if cfg is not None: if cfg._ignore_hidden_files is not None: return cfg._ignore_hidden_files return True def parse_all_nf(self, data=None): self.parse_ignore_hidden_files(data) def parse_ignore_hidden_files(self, data=None): if data is None: data = getattr(self, 'data') if 'ignore_hidden_files' in data: v = data['ignore_hidden_files'] if isinstance(v, bool): self._ignore_hidden_files = v else: raise TypeError('ignore_hidden_files option must be a bool.') del v # pylint: disable=unsupported-membership-test, unsubscriptable-object class BasicConfig: def __repr__(self) -> str: data = getattr(self, "data", None) return f"{self.__class__.__name__}<{data}>" @cached_property def name(self) -> str: data = getattr(self, "data", None) if isinstance(data, dict) and 'name' in data: v = data['name'] if isinstance(v, str) and len(v) > 0: return v @cached_property def real_name(self) -> str: return self.name if self.name else self.path @cached_property def path(self) -> str: data = getattr(self, "data", None) if isinstance(data, dict) and 'path' in data: v = data['path'] if isinstance(v, str) and len(v) > 0: return v raise ValueError('Path not found.') @cached_property def type(self) -> str: data = getattr(self, "data", None) if isinstance(data, dict) and 'type' in data: v = data['type'] if isinstance(v, str) and len(v) > 0: return v raise ValueError('Type not found.') # pylint: enable=unsupported-membership-test, unsubscriptable-object def parse_ex_or_in_cludes(li: list, enable_pcre2) -> List[Union[str, Regex]]: r = [] for i in li: if isinstance(i, str): r.append(i) elif isinstance(i, dict): t = i['type'] if t == 'wildcards': r.append(wildcards_to_regex(i['rule'], use_pcre2=enable_pcre2)) elif t == "regex": r.append(Regex(i['rule'], use_pcre2=enable_pcre2)) return r class ConfigPath(BasicOption, NFBasicOption, BasicConfig): def __init__(self, data, cfg, prog): NFBasicOption.__init__(self, cfg, prog) if isinstance(data, str): self.data = {"path": data, "type": "path"} elif isinstance(data, dict): self.data = data else: raise TypeError('Must be str or dict.') self.parse_all() self.parse_all_nf() @property def excludes(self) -> List[Union[str, Regex]]: t = getattr(self, "__excludes", None) if t is not None: return t del t if 'excludes' in self.data: if isinstance(self.data['excludes'], list): r = parse_ex_or_in_cludes(self.data["excludes"], self.enable_pcre2) # noqa: E501 self.__excludes = r return r @property def includes(self) -> List[Union[str, Regex]]: t = getattr(self, "__includes", None) if t is not None: return t del t if 'includes' in self.data: if isinstance(self.data['includes'], list): r = parse_ex_or_in_cludes(self.data["includes"], self.enable_pcre2) # noqa: E501 self.__includes = r return r def is_ex_or_in_clude(self, b: str, loc: str, exclude: bool) -> bool: e = self.excludes if exclude else self.includes if e is None: return False if exclude else True if isabs(loc): bl = abspath(loc) rl = relpath(loc, b) else: bl = abspath(join(b, loc)) rl = relpath(join(b, loc), b) for i in e: if isinstance(i, str): if isabs(i): if abspath(i) == bl: return True else: if relpath(join(b, i), b) == rl: return True elif isinstance(i, Regex): if i.match_only(rl): return True elif bl != loc and i.match_only(bl): return True return False def is_exclude(self, b: str, loc: str) -> bool: return self.is_ex_or_in_clude(b, loc, True) def is_include(self, b: str, loc: str) -> bool: return self.is_ex_or_in_clude(b, loc, False) class ConfigOLeveldb(BasicOption, NFBasicOption, BasicConfig): def __init__(self, data, cfg, prog): NFBasicOption.__init__(self, cfg, prog) if isinstance(data, dict): self.data = data else: raise TypeError('Must be dict.') self.parse_all() self.parse_all_nf() @cached_property def ignore_hidden_files(self): True @cached_property def domains(self) -> List[str]: if 'domains' in self.data: if isinstance(self.data['domains'], list): dms = [] for i in self.data['domains']: if isinstance(i, str) and len(i) > 0: dms.append(i.encode()) if len(dms) > 0: return dms def namedtuple_bo(typename, field_names): a = namedtuple(typename, field_names) return type(typename, (a, BasicOption), {}) ConfigNormalFile = namedtuple_bo('ConfigNormalFile', ['name', 'full_path']) ConfigLeveldb = namedtuple_bo('ConfigLeveldb', ['name', 'full_path', 'domains']) # noqa: E501 ConfigResult = Union[ConfigNormalFile, ConfigLeveldb] ConfigOriginResult = Union[ConfigPath, ConfigOLeveldb] class Program(BasicOption, NFBasicOption): def __init__(self, data: dict, cfg): self.data = data self._files = None self._cfg = cfg self.parse_all() self.parse_all_nf() @cached_property def all_configs(self) -> List[ConfigOriginResult]: r = [] for i in self.data['files']: if isinstance(i, str): r.append(ConfigPath(i, self._cfg, self)) elif isinstance(i, dict): t = i['type'] if t == 'path': r.append(ConfigPath(i, self._cfg, self)) elif t == 'leveldb': r.append(ConfigOLeveldb(i, self._cfg, self)) return r @cached_property def base(self) -> str: if 'base' in self.data: v = self.data['base'] if isinstance(v, str) and len(v) > 0: return v def check(self) -> bool: if self.name is None or self.base is None: return False self.files return True def clear_cache(self): self._files = None @property def files(self) -> List[ConfigResult]: ke = 'files' if ke not in self.data or not isinstance(self.data[ke], list): raise ValueError('Files is needed and should be a list.') if self._files is not None: return self._files.copy() r = [] self._files = r.copy() for i in self.all_configs: b = self.base if isinstance(i, ConfigPath): if isabs(i.path): bp = i.path else: bp = join(b, i.path) name = i.real_name if isfile(bp): tname = relpath(join(b, name), b) tmp = ConfigNormalFile(tname, bp) del tname tmp.parse_all(i.data) r.append(tmp) elif isdir(bp): top = NFBasicOption(self._cfg, self) top.parse_ignore_hidden_files(i.data) ll = listdirs(bp, top.ignore_hidden_files) del top for ii in ll: if i.is_exclude(bp, ii): continue if not i.is_include(bp, ii): continue tname = relpath(join(b, join(name, relpath(ii, bp))), b) # noqa: E501 tmp = ConfigNormalFile(tname, ii) del tname tmp.parse_all(i.data) r.append(tmp) elif isinstance(i, ConfigOLeveldb): if isabs(i.path): p = i.path else: p = join(b, i.path) name = i.real_name tname = relpath(join(b, name), b) tmp = ConfigLeveldb(tname, p, i.domains) del tname tmp.parse_all(i.data) r.append(tmp) for i in r: i._cfg = self._cfg i._prog = self return r def get_config(self, name: str) -> ConfigOriginResult: for i in self.data['files']: if isinstance(i, str): if not relpath(name, i).startswith('..'): return ConfigPath(i, self._cfg, self) elif isinstance(i, dict): t = i['type'] if t == 'path': if isabs(i['path']): n = i['name'] else: n = i['path'] if 'name' in i and isinstance(i['name'], str): if i['name'] != '': n = i['name'] if not relpath(name, n).startswith('..'): r = ConfigPath(i, self._cfg, self) tmp = relpath(name, n) if r.is_exclude(n, tmp): continue if not r.is_include(n, tmp): continue return r elif t == 'leveldb': if isabs(i['path']): n = i['name'] else: n = i['path'] if 'name' in i and isinstance(i['name'], str): if i['name'] != '': n = i['name'] if relpath(n, name) == '.': return ConfigOLeveldb(i, self._cfg, self) @cached_property def name(self) -> str: if 'name' in self.data: v = self.data['name'] if isinstance(v, str) and len(v) > 0: return v class Config(BasicOption, NFBasicOption): dest = '' encrypt_db = False db_password = None db_path = None progs = [] progs_name = [] def __init__(self, fn: str): with open(fn, 'r', encoding='UTF-8') as f: t = load(f.read(), SafeLoader) if t is None: raise ValueError('Can not load config file.') if not isinstance(t, dict): raise ValueError('Config file error.') if 'dest' not in t or not isinstance(t['dest'], str): raise ValueError("Config file don't have dest or dest is not str.") self.dest = t['dest'] if 'encrypt_db' in t: if not isinstance(t['encrypt_db'], bool): raise ValueError('encrypt_db should be true or false.') self.encrypt_db = t['encrypt_db'] else: self.encrypt_db = False if 'db_password' in t: if not isinstance(t['db_password'], str): raise ValueError('db_password should be a string.') self.db_password = t['db_password'] if 'db_path' in t: if not isinstance(t['db_path'], str): raise ValueError('db_path should be a string.') self.db_path = t['db_path'] if 'programs' not in t: raise ValueError("No programs found.") self.parse_all(t) self.parse_all_nf(t) progs = t['programs'] if not isinstance(progs, list): raise ValueError("programs should be list.") for prog in progs: p = Program(prog, self) if not p.check(): raise ValueError('Config error: program information error') if p.name not in self.progs_name: self.progs_name.append(p.name) self.progs.append(p) else: raise ValueError(f'have same name "{p.name}" in programs.')