Add peaky
This commit is contained in:
467
peaky.py
Normal file
467
peaky.py
Normal file
@@ -0,0 +1,467 @@
|
||||
from KAGParser import *
|
||||
from html import escape, unescape
|
||||
import json
|
||||
from os.path import isdir, join, basename, splitext, dirname, exists
|
||||
from os import listdir, makedirs
|
||||
from typing import List, Any, Dict
|
||||
from re import compile, VERBOSE
|
||||
from csv import DictReader
|
||||
|
||||
|
||||
SEL_TAG = compile(r'^sel\d+$')
|
||||
|
||||
|
||||
def extract_script(script_path: str, output_path: str):
|
||||
with open(script_path, "r", encoding="cp932") as f:
|
||||
script_text = f.read()
|
||||
parser = KAGScriptParser(script_text)
|
||||
script = parser.parse(True)
|
||||
name = None
|
||||
message = ''
|
||||
result = []
|
||||
in_message = False
|
||||
for line in script:
|
||||
if isinstance(line, LabelNode):
|
||||
if in_message:
|
||||
d = {}
|
||||
if name is not None:
|
||||
d['name'] = name
|
||||
d['message'] = message
|
||||
message = ''
|
||||
result.append(d)
|
||||
name = None
|
||||
in_message = False
|
||||
elif isinstance(line, list):
|
||||
for node in line:
|
||||
if isinstance(node, TextNode):
|
||||
if in_message:
|
||||
message += node.text.replace("&", "&").replace("<", "<")
|
||||
elif isinstance(node, TagNode):
|
||||
if node.name == 'pcms':
|
||||
d = {}
|
||||
if name is not None:
|
||||
d['name'] = name
|
||||
d['message'] = message
|
||||
message = ''
|
||||
result.append(d)
|
||||
name = None
|
||||
in_message = False
|
||||
continue
|
||||
elif node.name == 'fc':
|
||||
in_message = True
|
||||
continue
|
||||
elif node.name.startswith("【") and node.name.endswith("】"):
|
||||
name = node.name[1:-1]
|
||||
continue
|
||||
elif SEL_TAG.match(node.name):
|
||||
result.append({'message': node.attributes['text']})
|
||||
continue
|
||||
if in_message and len(message) > 0:
|
||||
if node.name == 'r':
|
||||
message += '\n'
|
||||
continue
|
||||
data = f"<{escape(node.name)}"
|
||||
for k, v in node.attributes.items():
|
||||
if v is True:
|
||||
data += f' {escape(k)}'
|
||||
else:
|
||||
data += f' {escape(k)}="{escape(v)}"'
|
||||
data += ">"
|
||||
message += data
|
||||
if name is not None or message:
|
||||
d = {}
|
||||
if name is not None:
|
||||
d['name'] = name
|
||||
d['message'] = message
|
||||
result.append(d)
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def extract_script_auto(script_path: str, output_path: str):
|
||||
if isdir(script_path):
|
||||
for file in listdir(script_path):
|
||||
if not file.lower().endswith(".ks"):
|
||||
continue
|
||||
full_path = join(script_path, file)
|
||||
output_file = splitext(basename(file))[0]
|
||||
output_file += ".json"
|
||||
output_full_path = join(output_path, output_file)
|
||||
pdir = dirname(output_full_path)
|
||||
if pdir and not isdir(pdir):
|
||||
makedirs(pdir, exist_ok=True)
|
||||
extract_script(full_path, output_full_path)
|
||||
else:
|
||||
pdir = dirname(output_path)
|
||||
if pdir and not isdir(pdir):
|
||||
makedirs(pdir, exist_ok=True)
|
||||
extract_script(script_path, output_path)
|
||||
|
||||
|
||||
def parse_message(message: str) -> List[ParsedLine]:
|
||||
"""Parse a message string (HTML-escaped KAG tag format) back into ParsedLines.
|
||||
|
||||
Reverses the serialization in extract_script: <tagname key="val"> -> TagNode,
|
||||
HTML entities -> TextNode text, splits on \\n.
|
||||
"""
|
||||
|
||||
_TAG_RE = compile(r"(<[^>]+>)")
|
||||
_ATTR_RE = compile(r"""([a-zA-Z0-9_]+)="([^"]*)"|([a-zA-Z0-9_]+)""", VERBOSE)
|
||||
|
||||
result: List[ParsedLine] = []
|
||||
for line in message.split("\n"):
|
||||
parsed_line: ParsedLine = []
|
||||
parts = _TAG_RE.split(line)
|
||||
for part in parts:
|
||||
if not part:
|
||||
continue
|
||||
if part.startswith("<") and part.endswith(">"):
|
||||
inner = part[1:-1].strip()
|
||||
tag_parts = inner.split(maxsplit=1)
|
||||
tag_name = unescape(tag_parts[0])
|
||||
attributes: Dict[str, Any] = {}
|
||||
if len(tag_parts) > 1:
|
||||
for m in _ATTR_RE.finditer(tag_parts[1]):
|
||||
if m.group(1) and m.group(2): # key="value"
|
||||
key = unescape(m.group(1))
|
||||
value = unescape(m.group(2))
|
||||
attributes[key] = value
|
||||
elif m.group(3): # boolean key
|
||||
attributes[unescape(m.group(3))] = True
|
||||
parsed_line.append(TagNode(name=tag_name, attributes=attributes))
|
||||
else:
|
||||
parsed_line.append(TextNode(unescape(part)))
|
||||
if parsed_line:
|
||||
result.append(parsed_line)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def wrap_lines(input: List[ParsedLine], max_width: int = 30) -> List[ParsedLine]:
|
||||
result: List[ParsedLine] = []
|
||||
current_line: ParsedLine = []
|
||||
current_len = 0
|
||||
|
||||
for parsed_line in input:
|
||||
for node in parsed_line:
|
||||
if isinstance(node, TagNode):
|
||||
current_line.append(node)
|
||||
elif isinstance(node, TextNode):
|
||||
text = node.text
|
||||
while text:
|
||||
available = max_width - current_len
|
||||
if available <= 0:
|
||||
if current_line:
|
||||
result.append(current_line)
|
||||
current_line = []
|
||||
current_len = 0
|
||||
available = max_width
|
||||
|
||||
take = min(len(text), available)
|
||||
if take > 0:
|
||||
current_line.append(TextNode(text[:take]))
|
||||
current_len += take
|
||||
text = text[take:]
|
||||
|
||||
if current_len >= max_width:
|
||||
result.append(current_line)
|
||||
current_line = []
|
||||
current_len = 0
|
||||
|
||||
# End of original line -- flush current line to preserve \n breaks
|
||||
if current_line:
|
||||
result.append(current_line)
|
||||
current_line = []
|
||||
current_len = 0
|
||||
|
||||
if current_line:
|
||||
result.append(current_line)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def extract_dict_terms(script_path: str):
|
||||
terms = {}
|
||||
overrides = {}
|
||||
with open(script_path, "r", encoding="utf-8-sig") as f:
|
||||
for row in DictReader(f):
|
||||
term = row['term']
|
||||
terms[term] = row
|
||||
if '\u3000' in term:
|
||||
overrides[term.split('\u3000')[0]] = term
|
||||
return terms, overrides
|
||||
|
||||
|
||||
def read_names(name_dict_path: str):
|
||||
names = {}
|
||||
with open(name_dict_path, 'r', encoding='utf-8-sig') as f:
|
||||
for row in DictReader(f):
|
||||
names[row['JP_Name']] = row['CN_Name']
|
||||
return names
|
||||
|
||||
|
||||
def patch_script(script_path: str, m3t_path: str, output_path: str, names, term):
|
||||
with open(script_path, "r", encoding="cp932") as f:
|
||||
script_text = f.read()
|
||||
script = KAGScriptParser(script_text).parse(True)
|
||||
|
||||
# Read m3t translations
|
||||
messages = []
|
||||
m3t_name = None
|
||||
terms, overrides = term
|
||||
ori_message = None
|
||||
with open(m3t_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line.startswith("\u25cb NAME:"):
|
||||
m3t_name = line[7:].strip()
|
||||
elif line.startswith("\u25cb"):
|
||||
ori_message = line[1:].strip()
|
||||
elif line.startswith("\u25cf"):
|
||||
message = line[1:].strip()
|
||||
d = {"message": message.replace('\\n', '\n')}
|
||||
if m3t_name is not None:
|
||||
d['name'] = m3t_name
|
||||
if ori_message:
|
||||
if ori_message.startswith('\u300c') and not d['message'].startswith('\u300c'):
|
||||
d['message'] = '\u300c' + d['message']
|
||||
if ori_message.endswith('\u300d') and not d['message'].endswith('\u300d'):
|
||||
d['message'] += '\u300d'
|
||||
if ori_message.startswith('\uff08') and not d['message'].startswith('\uff08'):
|
||||
d['message'] = '\uff08' + d['message']
|
||||
if ori_message.endswith('\uff09') and not d['message'].endswith('\uff09'):
|
||||
d['message'] += '\uff09'
|
||||
messages.append(d)
|
||||
m3t_name = None
|
||||
|
||||
def translate_name_tag(line):
|
||||
"""Translate 【name】 tags in-place."""
|
||||
if not isinstance(line, list):
|
||||
return
|
||||
for node in line:
|
||||
if isinstance(node, TagNode) and node.name.startswith('\u3010') and node.name.endswith('\u3011'):
|
||||
spk_name = node.name[1:-1]
|
||||
if spk_name in names:
|
||||
node.name = '\u3010' + names[spk_name] + '\u3011'
|
||||
|
||||
def build_translated_lines(msg_data):
|
||||
"""Parse translated message text into ParsedLines, process wd tags, and wrap."""
|
||||
nws = parse_message(msg_data['message'])
|
||||
for nw in nws:
|
||||
for n in nw:
|
||||
if isinstance(n, TagNode) and n.name == 'wd' and terms:
|
||||
target = n.attributes.get('s', '')
|
||||
if target not in terms:
|
||||
if target in overrides:
|
||||
n.attributes['s'] = overrides[target]
|
||||
else:
|
||||
print(msg_data['message'])
|
||||
raise ValueError('unknown wd target', target)
|
||||
n.attributes['s'] = terms[n.attributes['s']]['translation']
|
||||
return wrap_lines(nws)
|
||||
|
||||
new_script: ParsedScript = []
|
||||
in_message = False
|
||||
msg_index = 0
|
||||
pre_speech_buf: List[list] = []
|
||||
|
||||
for line in script:
|
||||
if isinstance(line, list):
|
||||
# --- sel* tag (choice entry) outside message ---
|
||||
if not in_message:
|
||||
has_sel = False
|
||||
for node in line:
|
||||
if isinstance(node, TagNode) and SEL_TAG.match(node.name):
|
||||
has_sel = True
|
||||
if msg_index < len(messages):
|
||||
node.attributes['text'] = messages[msg_index]['message']
|
||||
msg_index += 1
|
||||
break
|
||||
if has_sel:
|
||||
new_script.append(line)
|
||||
continue
|
||||
|
||||
# --- Detect [fc] to start a message block ---
|
||||
if not in_message:
|
||||
if any(isinstance(n, TagNode) and n.name == 'fc' for n in line):
|
||||
in_message = True
|
||||
pre_speech_buf = []
|
||||
|
||||
if in_message:
|
||||
# --- Check for [pcms] to end the message block ---
|
||||
has_pcms = any(isinstance(n, TagNode) and n.name == 'pcms' for n in line)
|
||||
|
||||
if has_pcms:
|
||||
in_message = False
|
||||
|
||||
# Has speech content? (TextNode on this line or in pre_speech_buf)
|
||||
has_speech = any(isinstance(n, TextNode) for n in line)
|
||||
has_any_speech = has_speech or any(
|
||||
any(isinstance(n, TextNode) for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
|
||||
if msg_index < len(messages) and has_any_speech:
|
||||
msg_data = messages[msg_index]
|
||||
msg_index += 1
|
||||
|
||||
# 1. Output pre-speech lines (structural, with name translation)
|
||||
for pline in pre_speech_buf:
|
||||
translate_name_tag(pline)
|
||||
new_script.append(pline)
|
||||
|
||||
# 2. If translation has a name and no 【name】 was in pre-speech, add it
|
||||
has_name_in_pre = any(
|
||||
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
|
||||
cn_name = names.get(msg_data['name'], msg_data['name'])
|
||||
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
|
||||
|
||||
# 3. Output translated text
|
||||
new_script.extend(build_translated_lines(msg_data))
|
||||
|
||||
# 4. Output [pcms] and trailing tags from this line
|
||||
tail = []
|
||||
found_pcms = False
|
||||
for node in line:
|
||||
if isinstance(node, TagNode) and node.name == 'pcms':
|
||||
found_pcms = True
|
||||
if found_pcms:
|
||||
tail.append(node)
|
||||
if tail:
|
||||
new_script.append(tail)
|
||||
else:
|
||||
# No translation — output original content
|
||||
for pline in pre_speech_buf:
|
||||
new_script.append(pline)
|
||||
new_script.append(line)
|
||||
|
||||
pre_speech_buf = []
|
||||
continue
|
||||
|
||||
# Still in message — buffer this line
|
||||
pre_speech_buf.append(line)
|
||||
continue
|
||||
|
||||
# Not in message — normal line
|
||||
translate_name_tag(line)
|
||||
new_script.append(line)
|
||||
|
||||
elif isinstance(line, LabelNode):
|
||||
if in_message:
|
||||
in_message = False
|
||||
has_any_speech = any(
|
||||
any(isinstance(n, TextNode) for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
if msg_index < len(messages) and has_any_speech:
|
||||
msg_data = messages[msg_index]
|
||||
msg_index += 1
|
||||
|
||||
for pline in pre_speech_buf:
|
||||
translate_name_tag(pline)
|
||||
new_script.append(pline)
|
||||
|
||||
has_name_in_pre = any(
|
||||
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
|
||||
cn_name = names.get(msg_data['name'], msg_data['name'])
|
||||
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
|
||||
|
||||
new_script.extend(build_translated_lines(msg_data))
|
||||
else:
|
||||
for pline in pre_speech_buf:
|
||||
new_script.append(pline)
|
||||
|
||||
pre_speech_buf = []
|
||||
new_script.append(line)
|
||||
|
||||
else:
|
||||
if in_message:
|
||||
# Wrap non-list nodes so loops over pre_speech_buf work uniformly
|
||||
pre_speech_buf.append(line if isinstance(line, list) else [line])
|
||||
continue
|
||||
new_script.append(line)
|
||||
|
||||
# Trailing message at end of file
|
||||
if in_message:
|
||||
has_any_speech = any(
|
||||
any(isinstance(n, TextNode) for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
if msg_index < len(messages) and has_any_speech:
|
||||
msg_data = messages[msg_index]
|
||||
msg_index += 1
|
||||
|
||||
for pline in pre_speech_buf:
|
||||
translate_name_tag(pline)
|
||||
new_script.append(pline)
|
||||
|
||||
has_name_in_pre = any(
|
||||
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
|
||||
for pline in pre_speech_buf
|
||||
)
|
||||
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
|
||||
cn_name = names.get(msg_data['name'], msg_data['name'])
|
||||
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
|
||||
|
||||
new_script.extend(build_translated_lines(msg_data))
|
||||
else:
|
||||
for pline in pre_speech_buf:
|
||||
new_script.append(pline)
|
||||
|
||||
if msg_index != len(messages):
|
||||
print(f"WARNING: processed message not matched. expected {len(messages)}, actual {msg_index}, {script_path}")
|
||||
|
||||
script_data = KAGScriptParser.serialize(new_script)
|
||||
with open(output_path, 'w', encoding='UTF-16-LE') as f:
|
||||
f.write('\ufeff')
|
||||
f.write(script_data)
|
||||
|
||||
|
||||
def patch_script_auto(script_path: str, m3t_path: str, output_path: str, name_dict_path: str, dict_path=None):
|
||||
names = read_names(name_dict_path)
|
||||
term = extract_dict_terms(dict_path) if dict_path else ({}, {})
|
||||
if isdir(script_path):
|
||||
for file in listdir(script_path):
|
||||
if not file.lower().endswith(".ks"):
|
||||
continue
|
||||
full_path = join(script_path, file)
|
||||
m3t_fpath = splitext(basename(file))[0] + ".m3t"
|
||||
m3t_full_path = join(m3t_path, m3t_fpath)
|
||||
if not exists(m3t_full_path):
|
||||
continue
|
||||
output_full_path = join(output_path, basename(file))
|
||||
pdir = dirname(output_full_path)
|
||||
if pdir and not isdir(pdir):
|
||||
makedirs(pdir, exist_ok=True)
|
||||
patch_script(full_path, m3t_full_path, output_full_path, names, term)
|
||||
else:
|
||||
pdir = dirname(output_path)
|
||||
if pdir and not isdir(pdir):
|
||||
makedirs(pdir, exist_ok=True)
|
||||
patch_script(script_path, m3t_path, output_path, names, term)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from argparse import ArgumentParser
|
||||
parser = ArgumentParser(description="Process KAG script files")
|
||||
subparser = parser.add_subparsers(title="Commands", dest="command")
|
||||
extract_parser = subparser.add_parser("extract", help="Extract script to JSON")
|
||||
extract_parser.add_argument("script_path", help="Path to KAG script file or directory")
|
||||
extract_parser.add_argument("output_path", help="Path to output JSON file or directory")
|
||||
patch_parser = subparser.add_parser("patch", help="Patch script with translations")
|
||||
patch_parser.add_argument("script_path", help="Path to KAG script file or directory")
|
||||
patch_parser.add_argument("m3t_path", help="Path to m3t file or directory")
|
||||
patch_parser.add_argument("output_path", help="Path to output KAG script file or directory")
|
||||
patch_parser.add_argument("name_dict_path", help="Path to name dict CSV")
|
||||
patch_parser.add_argument("--dict-path", help="Path to dict.csv (optional)")
|
||||
args = parser.parse_args()
|
||||
if args.command == "extract":
|
||||
extract_script_auto(args.script_path, args.output_path)
|
||||
elif args.command == "patch":
|
||||
patch_script_auto(args.script_path, args.m3t_path, args.output_path, args.name_dict_path, args.dict_path)
|
||||
Reference in New Issue
Block a user