diff --git a/peaky.py b/peaky.py new file mode 100644 index 0000000..f901118 --- /dev/null +++ b/peaky.py @@ -0,0 +1,467 @@ +from KAGParser import * +from html import escape, unescape +import json +from os.path import isdir, join, basename, splitext, dirname, exists +from os import listdir, makedirs +from typing import List, Any, Dict +from re import compile, VERBOSE +from csv import DictReader + + +SEL_TAG = compile(r'^sel\d+$') + + +def extract_script(script_path: str, output_path: str): + with open(script_path, "r", encoding="cp932") as f: + script_text = f.read() + parser = KAGScriptParser(script_text) + script = parser.parse(True) + name = None + message = '' + result = [] + in_message = False + for line in script: + if isinstance(line, LabelNode): + if in_message: + d = {} + if name is not None: + d['name'] = name + d['message'] = message + message = '' + result.append(d) + name = None + in_message = False + elif isinstance(line, list): + for node in line: + if isinstance(node, TextNode): + if in_message: + message += node.text.replace("&", "&").replace("<", "<") + elif isinstance(node, TagNode): + if node.name == 'pcms': + d = {} + if name is not None: + d['name'] = name + d['message'] = message + message = '' + result.append(d) + name = None + in_message = False + continue + elif node.name == 'fc': + in_message = True + continue + elif node.name.startswith("【") and node.name.endswith("】"): + name = node.name[1:-1] + continue + elif SEL_TAG.match(node.name): + result.append({'message': node.attributes['text']}) + continue + if in_message and len(message) > 0: + if node.name == 'r': + message += '\n' + continue + data = f"<{escape(node.name)}" + for k, v in node.attributes.items(): + if v is True: + data += f' {escape(k)}' + else: + data += f' {escape(k)}="{escape(v)}"' + data += ">" + message += data + if name is not None or message: + d = {} + if name is not None: + d['name'] = name + d['message'] = message + result.append(d) + with open(output_path, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + +def extract_script_auto(script_path: str, output_path: str): + if isdir(script_path): + for file in listdir(script_path): + if not file.lower().endswith(".ks"): + continue + full_path = join(script_path, file) + output_file = splitext(basename(file))[0] + output_file += ".json" + output_full_path = join(output_path, output_file) + pdir = dirname(output_full_path) + if pdir and not isdir(pdir): + makedirs(pdir, exist_ok=True) + extract_script(full_path, output_full_path) + else: + pdir = dirname(output_path) + if pdir and not isdir(pdir): + makedirs(pdir, exist_ok=True) + extract_script(script_path, output_path) + + +def parse_message(message: str) -> List[ParsedLine]: + """Parse a message string (HTML-escaped KAG tag format) back into ParsedLines. + + Reverses the serialization in extract_script: -> TagNode, + HTML entities -> TextNode text, splits on \\n. + """ + + _TAG_RE = compile(r"(<[^>]+>)") + _ATTR_RE = compile(r"""([a-zA-Z0-9_]+)="([^"]*)"|([a-zA-Z0-9_]+)""", VERBOSE) + + result: List[ParsedLine] = [] + for line in message.split("\n"): + parsed_line: ParsedLine = [] + parts = _TAG_RE.split(line) + for part in parts: + if not part: + continue + if part.startswith("<") and part.endswith(">"): + inner = part[1:-1].strip() + tag_parts = inner.split(maxsplit=1) + tag_name = unescape(tag_parts[0]) + attributes: Dict[str, Any] = {} + if len(tag_parts) > 1: + for m in _ATTR_RE.finditer(tag_parts[1]): + if m.group(1) and m.group(2): # key="value" + key = unescape(m.group(1)) + value = unescape(m.group(2)) + attributes[key] = value + elif m.group(3): # boolean key + attributes[unescape(m.group(3))] = True + parsed_line.append(TagNode(name=tag_name, attributes=attributes)) + else: + parsed_line.append(TextNode(unescape(part))) + if parsed_line: + result.append(parsed_line) + + return result + + +def wrap_lines(input: List[ParsedLine], max_width: int = 30) -> List[ParsedLine]: + result: List[ParsedLine] = [] + current_line: ParsedLine = [] + current_len = 0 + + for parsed_line in input: + for node in parsed_line: + if isinstance(node, TagNode): + current_line.append(node) + elif isinstance(node, TextNode): + text = node.text + while text: + available = max_width - current_len + if available <= 0: + if current_line: + result.append(current_line) + current_line = [] + current_len = 0 + available = max_width + + take = min(len(text), available) + if take > 0: + current_line.append(TextNode(text[:take])) + current_len += take + text = text[take:] + + if current_len >= max_width: + result.append(current_line) + current_line = [] + current_len = 0 + + # End of original line -- flush current line to preserve \n breaks + if current_line: + result.append(current_line) + current_line = [] + current_len = 0 + + if current_line: + result.append(current_line) + + return result + + +def extract_dict_terms(script_path: str): + terms = {} + overrides = {} + with open(script_path, "r", encoding="utf-8-sig") as f: + for row in DictReader(f): + term = row['term'] + terms[term] = row + if '\u3000' in term: + overrides[term.split('\u3000')[0]] = term + return terms, overrides + + +def read_names(name_dict_path: str): + names = {} + with open(name_dict_path, 'r', encoding='utf-8-sig') as f: + for row in DictReader(f): + names[row['JP_Name']] = row['CN_Name'] + return names + + +def patch_script(script_path: str, m3t_path: str, output_path: str, names, term): + with open(script_path, "r", encoding="cp932") as f: + script_text = f.read() + script = KAGScriptParser(script_text).parse(True) + + # Read m3t translations + messages = [] + m3t_name = None + terms, overrides = term + ori_message = None + with open(m3t_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line.startswith("\u25cb NAME:"): + m3t_name = line[7:].strip() + elif line.startswith("\u25cb"): + ori_message = line[1:].strip() + elif line.startswith("\u25cf"): + message = line[1:].strip() + d = {"message": message.replace('\\n', '\n')} + if m3t_name is not None: + d['name'] = m3t_name + if ori_message: + if ori_message.startswith('\u300c') and not d['message'].startswith('\u300c'): + d['message'] = '\u300c' + d['message'] + if ori_message.endswith('\u300d') and not d['message'].endswith('\u300d'): + d['message'] += '\u300d' + if ori_message.startswith('\uff08') and not d['message'].startswith('\uff08'): + d['message'] = '\uff08' + d['message'] + if ori_message.endswith('\uff09') and not d['message'].endswith('\uff09'): + d['message'] += '\uff09' + messages.append(d) + m3t_name = None + + def translate_name_tag(line): + """Translate 【name】 tags in-place.""" + if not isinstance(line, list): + return + for node in line: + if isinstance(node, TagNode) and node.name.startswith('\u3010') and node.name.endswith('\u3011'): + spk_name = node.name[1:-1] + if spk_name in names: + node.name = '\u3010' + names[spk_name] + '\u3011' + + def build_translated_lines(msg_data): + """Parse translated message text into ParsedLines, process wd tags, and wrap.""" + nws = parse_message(msg_data['message']) + for nw in nws: + for n in nw: + if isinstance(n, TagNode) and n.name == 'wd' and terms: + target = n.attributes.get('s', '') + if target not in terms: + if target in overrides: + n.attributes['s'] = overrides[target] + else: + print(msg_data['message']) + raise ValueError('unknown wd target', target) + n.attributes['s'] = terms[n.attributes['s']]['translation'] + return wrap_lines(nws) + + new_script: ParsedScript = [] + in_message = False + msg_index = 0 + pre_speech_buf: List[list] = [] + + for line in script: + if isinstance(line, list): + # --- sel* tag (choice entry) outside message --- + if not in_message: + has_sel = False + for node in line: + if isinstance(node, TagNode) and SEL_TAG.match(node.name): + has_sel = True + if msg_index < len(messages): + node.attributes['text'] = messages[msg_index]['message'] + msg_index += 1 + break + if has_sel: + new_script.append(line) + continue + + # --- Detect [fc] to start a message block --- + if not in_message: + if any(isinstance(n, TagNode) and n.name == 'fc' for n in line): + in_message = True + pre_speech_buf = [] + + if in_message: + # --- Check for [pcms] to end the message block --- + has_pcms = any(isinstance(n, TagNode) and n.name == 'pcms' for n in line) + + if has_pcms: + in_message = False + + # Has speech content? (TextNode on this line or in pre_speech_buf) + has_speech = any(isinstance(n, TextNode) for n in line) + has_any_speech = has_speech or any( + any(isinstance(n, TextNode) for n in pline) + for pline in pre_speech_buf + ) + + if msg_index < len(messages) and has_any_speech: + msg_data = messages[msg_index] + msg_index += 1 + + # 1. Output pre-speech lines (structural, with name translation) + for pline in pre_speech_buf: + translate_name_tag(pline) + new_script.append(pline) + + # 2. If translation has a name and no 【name】 was in pre-speech, add it + has_name_in_pre = any( + any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline) + for pline in pre_speech_buf + ) + if 'name' in msg_data and msg_data['name'] and not has_name_in_pre: + cn_name = names.get(msg_data['name'], msg_data['name']) + new_script.append([TagNode('\u3010' + cn_name + '\u3011')]) + + # 3. Output translated text + new_script.extend(build_translated_lines(msg_data)) + + # 4. Output [pcms] and trailing tags from this line + tail = [] + found_pcms = False + for node in line: + if isinstance(node, TagNode) and node.name == 'pcms': + found_pcms = True + if found_pcms: + tail.append(node) + if tail: + new_script.append(tail) + else: + # No translation — output original content + for pline in pre_speech_buf: + new_script.append(pline) + new_script.append(line) + + pre_speech_buf = [] + continue + + # Still in message — buffer this line + pre_speech_buf.append(line) + continue + + # Not in message — normal line + translate_name_tag(line) + new_script.append(line) + + elif isinstance(line, LabelNode): + if in_message: + in_message = False + has_any_speech = any( + any(isinstance(n, TextNode) for n in pline) + for pline in pre_speech_buf + ) + if msg_index < len(messages) and has_any_speech: + msg_data = messages[msg_index] + msg_index += 1 + + for pline in pre_speech_buf: + translate_name_tag(pline) + new_script.append(pline) + + has_name_in_pre = any( + any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline) + for pline in pre_speech_buf + ) + if 'name' in msg_data and msg_data['name'] and not has_name_in_pre: + cn_name = names.get(msg_data['name'], msg_data['name']) + new_script.append([TagNode('\u3010' + cn_name + '\u3011')]) + + new_script.extend(build_translated_lines(msg_data)) + else: + for pline in pre_speech_buf: + new_script.append(pline) + + pre_speech_buf = [] + new_script.append(line) + + else: + if in_message: + # Wrap non-list nodes so loops over pre_speech_buf work uniformly + pre_speech_buf.append(line if isinstance(line, list) else [line]) + continue + new_script.append(line) + + # Trailing message at end of file + if in_message: + has_any_speech = any( + any(isinstance(n, TextNode) for n in pline) + for pline in pre_speech_buf + ) + if msg_index < len(messages) and has_any_speech: + msg_data = messages[msg_index] + msg_index += 1 + + for pline in pre_speech_buf: + translate_name_tag(pline) + new_script.append(pline) + + has_name_in_pre = any( + any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline) + for pline in pre_speech_buf + ) + if 'name' in msg_data and msg_data['name'] and not has_name_in_pre: + cn_name = names.get(msg_data['name'], msg_data['name']) + new_script.append([TagNode('\u3010' + cn_name + '\u3011')]) + + new_script.extend(build_translated_lines(msg_data)) + else: + for pline in pre_speech_buf: + new_script.append(pline) + + if msg_index != len(messages): + print(f"WARNING: processed message not matched. expected {len(messages)}, actual {msg_index}, {script_path}") + + script_data = KAGScriptParser.serialize(new_script) + with open(output_path, 'w', encoding='UTF-16-LE') as f: + f.write('\ufeff') + f.write(script_data) + + +def patch_script_auto(script_path: str, m3t_path: str, output_path: str, name_dict_path: str, dict_path=None): + names = read_names(name_dict_path) + term = extract_dict_terms(dict_path) if dict_path else ({}, {}) + if isdir(script_path): + for file in listdir(script_path): + if not file.lower().endswith(".ks"): + continue + full_path = join(script_path, file) + m3t_fpath = splitext(basename(file))[0] + ".m3t" + m3t_full_path = join(m3t_path, m3t_fpath) + if not exists(m3t_full_path): + continue + output_full_path = join(output_path, basename(file)) + pdir = dirname(output_full_path) + if pdir and not isdir(pdir): + makedirs(pdir, exist_ok=True) + patch_script(full_path, m3t_full_path, output_full_path, names, term) + else: + pdir = dirname(output_path) + if pdir and not isdir(pdir): + makedirs(pdir, exist_ok=True) + patch_script(script_path, m3t_path, output_path, names, term) + + +if __name__ == "__main__": + from argparse import ArgumentParser + parser = ArgumentParser(description="Process KAG script files") + subparser = parser.add_subparsers(title="Commands", dest="command") + extract_parser = subparser.add_parser("extract", help="Extract script to JSON") + extract_parser.add_argument("script_path", help="Path to KAG script file or directory") + extract_parser.add_argument("output_path", help="Path to output JSON file or directory") + patch_parser = subparser.add_parser("patch", help="Patch script with translations") + patch_parser.add_argument("script_path", help="Path to KAG script file or directory") + patch_parser.add_argument("m3t_path", help="Path to m3t file or directory") + patch_parser.add_argument("output_path", help="Path to output KAG script file or directory") + patch_parser.add_argument("name_dict_path", help="Path to name dict CSV") + patch_parser.add_argument("--dict-path", help="Path to dict.csv (optional)") + args = parser.parse_args() + if args.command == "extract": + extract_script_auto(args.script_path, args.output_path) + elif args.command == "patch": + patch_script_auto(args.script_path, args.m3t_path, args.output_path, args.name_dict_path, args.dict_path)