from KAGParser import *
from html import escape, unescape
import json
from os.path import isdir, join, basename, splitext, dirname, exists
from os import listdir, makedirs
from typing import List, Any, Dict
from re import compile, VERBOSE
from csv import DictReader
SEL_TAG = compile(r'^sel\d+$')
def extract_script(script_path: str, output_path: str):
with open(script_path, "r", encoding="cp932") as f:
script_text = f.read()
parser = KAGScriptParser(script_text)
script = parser.parse(True)
name = None
message = ''
result = []
in_message = False
for line in script:
if isinstance(line, LabelNode):
if in_message:
d = {}
if name is not None:
d['name'] = name
d['message'] = message
message = ''
result.append(d)
name = None
in_message = False
elif isinstance(line, list):
for node in line:
if isinstance(node, TextNode):
if in_message:
message += node.text.replace("&", "&").replace("<", "<")
elif isinstance(node, TagNode):
if node.name == 'pcms':
d = {}
if name is not None:
d['name'] = name
d['message'] = message
message = ''
result.append(d)
name = None
in_message = False
continue
elif node.name == 'fc':
in_message = True
continue
elif node.name.startswith("【") and node.name.endswith("】"):
name = node.name[1:-1]
continue
elif SEL_TAG.match(node.name):
result.append({'message': node.attributes['text']})
continue
if in_message and len(message) > 0:
if node.name == 'r':
message += '\n'
continue
data = f"<{escape(node.name)}"
for k, v in node.attributes.items():
if v is True:
data += f' {escape(k)}'
else:
data += f' {escape(k)}="{escape(v)}"'
data += ">"
message += data
if name is not None or message:
d = {}
if name is not None:
d['name'] = name
d['message'] = message
result.append(d)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
def extract_script_auto(script_path: str, output_path: str):
if isdir(script_path):
for file in listdir(script_path):
if not file.lower().endswith(".ks"):
continue
full_path = join(script_path, file)
output_file = splitext(basename(file))[0]
output_file += ".json"
output_full_path = join(output_path, output_file)
pdir = dirname(output_full_path)
if pdir and not isdir(pdir):
makedirs(pdir, exist_ok=True)
extract_script(full_path, output_full_path)
else:
pdir = dirname(output_path)
if pdir and not isdir(pdir):
makedirs(pdir, exist_ok=True)
extract_script(script_path, output_path)
def parse_message(message: str) -> List[ParsedLine]:
"""Parse a message string (HTML-escaped KAG tag format) back into ParsedLines.
Reverses the serialization in extract_script: -> TagNode,
HTML entities -> TextNode text, splits on \\n.
"""
_TAG_RE = compile(r"(<[^>]+>)")
_ATTR_RE = compile(r"""([a-zA-Z0-9_]+)="([^"]*)"|([a-zA-Z0-9_]+)""", VERBOSE)
result: List[ParsedLine] = []
for line in message.split("\n"):
parsed_line: ParsedLine = []
parts = _TAG_RE.split(line)
for part in parts:
if not part:
continue
if part.startswith("<") and part.endswith(">"):
inner = part[1:-1].strip()
tag_parts = inner.split(maxsplit=1)
tag_name = unescape(tag_parts[0])
attributes: Dict[str, Any] = {}
if len(tag_parts) > 1:
for m in _ATTR_RE.finditer(tag_parts[1]):
if m.group(1) and m.group(2): # key="value"
key = unescape(m.group(1))
value = unescape(m.group(2))
attributes[key] = value
elif m.group(3): # boolean key
attributes[unescape(m.group(3))] = True
parsed_line.append(TagNode(name=tag_name, attributes=attributes))
else:
parsed_line.append(TextNode(unescape(part)))
if parsed_line:
result.append(parsed_line)
return result
def wrap_lines(input: List[ParsedLine], max_width: int = 30) -> List[ParsedLine]:
result: List[ParsedLine] = []
current_line: ParsedLine = []
current_len = 0
for parsed_line in input:
for node in parsed_line:
if isinstance(node, TagNode):
current_line.append(node)
elif isinstance(node, TextNode):
text = node.text
while text:
available = max_width - current_len
if available <= 0:
if current_line:
result.append(current_line)
current_line = []
current_len = 0
available = max_width
take = min(len(text), available)
if take > 0:
current_line.append(TextNode(text[:take]))
current_len += take
text = text[take:]
if current_len >= max_width:
result.append(current_line)
current_line = []
current_len = 0
# End of original line -- flush current line to preserve \n breaks
if current_line:
result.append(current_line)
current_line = []
current_len = 0
if current_line:
result.append(current_line)
return result
def extract_dict_terms(script_path: str):
terms = {}
overrides = {}
with open(script_path, "r", encoding="utf-8-sig") as f:
for row in DictReader(f):
term = row['term']
terms[term] = row
if '\u3000' in term:
overrides[term.split('\u3000')[0]] = term
return terms, overrides
def read_names(name_dict_path: str):
names = {}
with open(name_dict_path, 'r', encoding='utf-8-sig') as f:
for row in DictReader(f):
names[row['JP_Name']] = row['CN_Name']
return names
def patch_script(script_path: str, m3t_path: str, output_path: str, names, term):
with open(script_path, "r", encoding="cp932") as f:
script_text = f.read()
script = KAGScriptParser(script_text).parse(True)
# Read m3t translations
messages = []
m3t_name = None
terms, overrides = term
ori_message = None
with open(m3t_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("\u25cb NAME:"):
m3t_name = line[7:].strip()
elif line.startswith("\u25cb"):
ori_message = line[1:].strip()
elif line.startswith("\u25cf"):
message = line[1:].strip()
d = {"message": message.replace('\\n', '\n')}
if m3t_name is not None:
d['name'] = m3t_name
if ori_message:
if ori_message.startswith('\u300c') and not d['message'].startswith('\u300c'):
d['message'] = '\u300c' + d['message']
if ori_message.endswith('\u300d') and not d['message'].endswith('\u300d'):
d['message'] += '\u300d'
if ori_message.startswith('\uff08') and not d['message'].startswith('\uff08'):
d['message'] = '\uff08' + d['message']
if ori_message.endswith('\uff09') and not d['message'].endswith('\uff09'):
d['message'] += '\uff09'
messages.append(d)
m3t_name = None
def translate_name_tag(line):
"""Translate 【name】 tags in-place."""
if not isinstance(line, list):
return
for node in line:
if isinstance(node, TagNode) and node.name.startswith('\u3010') and node.name.endswith('\u3011'):
spk_name = node.name[1:-1]
if spk_name in names:
node.name = '\u3010' + names[spk_name] + '\u3011'
def build_translated_lines(msg_data):
"""Parse translated message text into ParsedLines, process wd tags, and wrap."""
nws = parse_message(msg_data['message'])
for nw in nws:
for n in nw:
if isinstance(n, TagNode) and n.name == 'wd' and terms:
target = n.attributes.get('s', '')
if target not in terms:
if target in overrides:
n.attributes['s'] = overrides[target]
else:
print(msg_data['message'])
raise ValueError('unknown wd target', target)
n.attributes['s'] = terms[n.attributes['s']]['translation']
return wrap_lines(nws)
new_script: ParsedScript = []
in_message = False
msg_index = 0
pre_speech_buf: List[list] = []
for line in script:
if isinstance(line, list):
# --- sel* tag (choice entry) outside message ---
if not in_message:
has_sel = False
for node in line:
if isinstance(node, TagNode) and SEL_TAG.match(node.name):
has_sel = True
if msg_index < len(messages):
node.attributes['text'] = messages[msg_index]['message']
msg_index += 1
break
if has_sel:
new_script.append(line)
continue
# --- Detect [fc] to start a message block ---
if not in_message:
if any(isinstance(n, TagNode) and n.name == 'fc' for n in line):
in_message = True
pre_speech_buf = []
if in_message:
# --- Check for [pcms] to end the message block ---
has_pcms = any(isinstance(n, TagNode) and n.name == 'pcms' for n in line)
if has_pcms:
in_message = False
# Has speech content? (TextNode on this line or in pre_speech_buf)
has_speech = any(isinstance(n, TextNode) for n in line)
has_any_speech = has_speech or any(
any(isinstance(n, TextNode) for n in pline)
for pline in pre_speech_buf
)
if msg_index < len(messages) and has_any_speech:
msg_data = messages[msg_index]
msg_index += 1
# 1. Output pre-speech lines (structural, with name translation)
for pline in pre_speech_buf:
translate_name_tag(pline)
new_script.append(pline)
# 2. If translation has a name and no 【name】 was in pre-speech, add it
has_name_in_pre = any(
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
for pline in pre_speech_buf
)
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
cn_name = names.get(msg_data['name'], msg_data['name'])
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
# 3. Output translated text
new_script.extend(build_translated_lines(msg_data))
# 4. Output [pcms] and trailing tags from this line
tail = []
found_pcms = False
for node in line:
if isinstance(node, TagNode) and node.name == 'pcms':
found_pcms = True
if found_pcms:
tail.append(node)
if tail:
new_script.append(tail)
else:
# No translation — output original content
for pline in pre_speech_buf:
new_script.append(pline)
new_script.append(line)
pre_speech_buf = []
continue
# Still in message — buffer this line
pre_speech_buf.append(line)
continue
# Not in message — normal line
translate_name_tag(line)
new_script.append(line)
elif isinstance(line, LabelNode):
if in_message:
in_message = False
has_any_speech = any(
any(isinstance(n, TextNode) for n in pline)
for pline in pre_speech_buf
)
if msg_index < len(messages) and has_any_speech:
msg_data = messages[msg_index]
msg_index += 1
for pline in pre_speech_buf:
translate_name_tag(pline)
new_script.append(pline)
has_name_in_pre = any(
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
for pline in pre_speech_buf
)
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
cn_name = names.get(msg_data['name'], msg_data['name'])
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
new_script.extend(build_translated_lines(msg_data))
else:
for pline in pre_speech_buf:
new_script.append(pline)
pre_speech_buf = []
new_script.append(line)
else:
if in_message:
# Wrap non-list nodes so loops over pre_speech_buf work uniformly
pre_speech_buf.append(line if isinstance(line, list) else [line])
continue
new_script.append(line)
# Trailing message at end of file
if in_message:
has_any_speech = any(
any(isinstance(n, TextNode) for n in pline)
for pline in pre_speech_buf
)
if msg_index < len(messages) and has_any_speech:
msg_data = messages[msg_index]
msg_index += 1
for pline in pre_speech_buf:
translate_name_tag(pline)
new_script.append(pline)
has_name_in_pre = any(
any(isinstance(n, TagNode) and n.name.startswith('\u3010') for n in pline)
for pline in pre_speech_buf
)
if 'name' in msg_data and msg_data['name'] and not has_name_in_pre:
cn_name = names.get(msg_data['name'], msg_data['name'])
new_script.append([TagNode('\u3010' + cn_name + '\u3011')])
new_script.extend(build_translated_lines(msg_data))
else:
for pline in pre_speech_buf:
new_script.append(pline)
if msg_index != len(messages):
print(f"WARNING: processed message not matched. expected {len(messages)}, actual {msg_index}, {script_path}")
script_data = KAGScriptParser.serialize(new_script)
with open(output_path, 'w', encoding='UTF-16-LE') as f:
f.write('\ufeff')
f.write(script_data)
def patch_script_auto(script_path: str, m3t_path: str, output_path: str, name_dict_path: str, dict_path=None):
names = read_names(name_dict_path)
term = extract_dict_terms(dict_path) if dict_path else ({}, {})
if isdir(script_path):
for file in listdir(script_path):
if not file.lower().endswith(".ks"):
continue
full_path = join(script_path, file)
m3t_fpath = splitext(basename(file))[0] + ".m3t"
m3t_full_path = join(m3t_path, m3t_fpath)
if not exists(m3t_full_path):
continue
output_full_path = join(output_path, basename(file))
pdir = dirname(output_full_path)
if pdir and not isdir(pdir):
makedirs(pdir, exist_ok=True)
patch_script(full_path, m3t_full_path, output_full_path, names, term)
else:
pdir = dirname(output_path)
if pdir and not isdir(pdir):
makedirs(pdir, exist_ok=True)
patch_script(script_path, m3t_path, output_path, names, term)
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description="Process KAG script files")
subparser = parser.add_subparsers(title="Commands", dest="command")
extract_parser = subparser.add_parser("extract", help="Extract script to JSON")
extract_parser.add_argument("script_path", help="Path to KAG script file or directory")
extract_parser.add_argument("output_path", help="Path to output JSON file or directory")
patch_parser = subparser.add_parser("patch", help="Patch script with translations")
patch_parser.add_argument("script_path", help="Path to KAG script file or directory")
patch_parser.add_argument("m3t_path", help="Path to m3t file or directory")
patch_parser.add_argument("output_path", help="Path to output KAG script file or directory")
patch_parser.add_argument("name_dict_path", help="Path to name dict CSV")
patch_parser.add_argument("--dict-path", help="Path to dict.csv (optional)")
args = parser.parse_args()
if args.command == "extract":
extract_script_auto(args.script_path, args.output_path)
elif args.command == "patch":
patch_script_auto(args.script_path, args.m3t_path, args.output_path, args.name_dict_path, args.dict_path)