diff --git a/check_hash.py b/check_hash.py new file mode 100644 index 0000000..11c9260 --- /dev/null +++ b/check_hash.py @@ -0,0 +1,176 @@ +from argparse import ArgumentParser +import hashlib +import json +from os.path import abspath, exists, getsize, join, relpath +try: + from rich.live import Live + from rich.progress import ( + Progress, + SpinnerColumn, + BarColumn, + TextColumn, + TimeRemainingColumn, + TransferSpeedColumn, + DownloadColumn, + MofNCompleteColumn, + ) + from rich.table import Table + have_rich = True +except ImportError: + have_rich = False + + +OK = 0 +CHECKSUM_FAILED = 1 +FILE_NOT_EXISTS = 2 + + +p = ArgumentParser(description='Check checksum of files.') +p.add_argument("-m", "--method", help='The hash method to use. Default: auto. Available choices: auto, md5, sha1, sha224, sha256, sha384, sha512.', choices=['auto', 'md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'], metavar='METHOD', default="auto") # noqa: E501 +p.add_argument("-f", "--file", help="The path to the checksum file. Default: checksum.txt. Releative path is relative to the input directory.", default="checksum.txt") # noqa: E501 +p.add_argument("-o", "--output", help="The path to the result file. Default: .checksum.json. Releative path is relative to the input directory.", default=".checksum.json") # noqa: E501 +p.add_argument("-F", "--force", help="Force rechecking.", action='store_true') +p.add_argument("input", help='The path to the input file or directory.', nargs='*', default=['.']) # noqa: E501 + + +def checksum(file: str, method: str, task=None, progress=None): + h = hashlib.new(method) + with open(file, 'rb') as f: + while True: + data = f.read(40960) + if not data: + break + h.update(data) + if progress is not None and task is not None: + progress.update(task, advance=len(data)) + return h.hexdigest() + + +def print_result(result, start: str): + ok = True + count = 0 + for i in result: + r = result[i] + if r == OK: + continue + ok = False + p = relpath(i, start) + if r == CHECKSUM_FAILED: + msg = 'Failed' + elif r == FILE_NOT_EXISTS: + msg = 'Not exists' + print(f"{p}: {msg}") + count += 1 + if not ok: + print(f"Total: {count} files failed.") + else: + print("All files are OK.") + return ok + + +def main(args=None): + arg = p.parse_intermixed_args(args) + for input in arg.input: + result = {} + result_file = join(input, arg.output) + if not arg.force: + try: + with open(result_file, encoding='UTF-8') as f: + result = json.load(f)['result'] + except Exception: + pass + checksum_file = join(input, arg.file) + with open(checksum_file, encoding='UTF-8') as f: + lines = [i.split(" ") for i in f.readlines() if i.strip()] + if len(lines) == 0: + print('No checksum entries found.') + continue + if arg.method == 'auto': + h = lines[0][0] + if len(h) == 32: + method = 'md5' + elif len(h) == 40: + method = 'sha1' + elif len(h) == 56: + method = 'sha224' + elif len(h) == 64: + method = 'sha256' + elif len(h) == 96: + method = 'sha384' + elif len(h) == 128: + method = 'sha512' + else: + raise ValueError('Unknown hash method.') + else: + method = arg.method + if have_rich: + progress = Progress("{task.description}", + SpinnerColumn(), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), # noqa: E501 + MofNCompleteColumn(), + ) + job_progress = Progress("{task.description}", + SpinnerColumn(), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), # noqa: E501 + DownloadColumn(), + TransferSpeedColumn(), + TimeRemainingColumn(), + ) + total_tasks = progress.add_task("Checking checksum...", + total=len(lines)) + progress_table = Table.grid() + progress_table.add_row(progress) + progress_table.add_row(job_progress) + live = Live(progress_table, refresh_per_second=10) + live.start() + else: + task = None + job_progress = None + files = [] + for line in lines: + file = abspath(join(input, " ".join(line[1:]).strip('\n'))) + sum = line[0] + files.append(file) + if file in result and result[file] == 0: + if have_rich: + progress.update(total_tasks, advance=1) + continue + if not exists(file): + result[file] = FILE_NOT_EXISTS + if have_rich: + progress.update(total_tasks, advance=1) + continue + rp = relpath(file, abspath(input)) + if have_rich: + task = job_progress.add_task(f"Checking {rp}...", + total=getsize(file)) + try: + rsum = checksum(file, method, task, job_progress) + if rsum == sum: + result[file] = OK + msg = 'OK' + else: + result[file] = CHECKSUM_FAILED + msg = 'FAILED' + if not have_rich: + print(f"{rp}: {msg}") + except Exception: + result[file] = CHECKSUM_FAILED + finally: + if have_rich: + progress.update(total_tasks, advance=1) + job_progress.remove_task(task) + if have_rich: + live.stop() + if len(result) != len(files): + result = {i: result[i] for i in result if i in files} + ok = print_result(result, abspath(input)) + with open(result_file, encoding='UTF-8', mode='w') as f: + json.dump({'ok': ok, 'result': result}, f, ensure_ascii=False, + indent=2) + + +if __name__ == '__main__': + main() diff --git a/gen_hash.py b/gen_hash.py index 7592a84..0948a1e 100644 --- a/gen_hash.py +++ b/gen_hash.py @@ -21,19 +21,22 @@ except ImportError: p = ArgumentParser(description='Generate checksum of files.') +p.add_argument('-a', '--all', help='Include all files, including hidden files.', action='store_true') # noqa: E501 p.add_argument("-o", "--output", help='The path to output file. Default: checksum.txt. Releative path is relative to the input directory.', default="checksum.txt") # noqa: E501 p.add_argument("-m", "--method", help='The hash method to use. Default: md5. Available choices: md5, sha1, sha224, sha256, sha384, sha512.', choices=['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'], metavar='METHOD', default="md5") # noqa: E501 p.add_argument("input", help='The path to the input file or directory.', nargs='*', default=['.']) # noqa: E501 -def list_files(input: str, output=None): +def list_files(input: str, all: bool, output=None): files = [] for i in listdir(input): + if not all and i.startswith('.'): + continue path = join(input, i) if isfile(path): files.append(path) elif isdir(path): - files.extend(list_files(path, None)) + files.extend(list_files(path, all, None)) if output is not None: output_file = join(input, output) if output_file in files: @@ -53,14 +56,14 @@ def cal_hash(file: str, method: str, task=None, progress=None): progress.update(task, advance=len(data)) t = f"{h.hexdigest()} {file}\n" if progress is None or task is None: - print(t) + print(t, end='') return t def main(args=None): arg = p.parse_intermixed_args(args) for i in arg.input: - files = list_files(i, arg.output) + files = list_files(i, arg.all, arg.output) output_file = join(i, arg.output) with open(output_file, encoding='UTF-8', mode='w', newline='\n') as f: if have_rich: