add link_to_jellyfin_music

This commit is contained in:
2023-12-03 19:43:55 +08:00
parent b1b7a26ada
commit 65d4efdc00

126
link_to_jellyfin_music.py Normal file
View File

@@ -0,0 +1,126 @@
import _rssbotlib
from argparse import ArgumentParser
import re
from typing import List
from os.path import join, splitext, isdir, exists, split
from os import listdir, remove, link, symlink, makedirs
from subprocess import PIPE, Popen
FMT = re.compile(r"(\d+)/(\d+)")
def generate_thumb(input: str, output: str):
p = Popen([arg.ffmpeg, '-i', input, '-c', 'copy',
'-y', output], stdout=PIPE, stderr=PIPE)
p.wait()
if p.returncode != 0:
print(p.stdout.read().decode(errors='ignore'))
print(p.stderr.read().decode(errors='ignore'))
raise ValueError(f"Failed to generate thumbnail for {input}")
def generate_path(input: str):
ext = splitext(input)[1]
i = _rssbotlib.VideoInfo()
if not i.parse(input):
raise ValueError(f"Failed to parse {input}")
s = i.streams
thumb = False
for stream in s:
if stream.is_video:
thumb = True
break
m = i.meta.to_dict()
artist = m.get('album_artist', m.get('artist', None))
album = m.get('album', None)
title = m.get('title', None)
disc = m.get('disc', '1/1')
track = m.get('track', None)
if artist is None or album is None or title is None:
raise ValueError("No artist, album or title")
discs = FMT.match(disc)
if discs is None:
raise ValueError(f"Failed to parse discs {disc}")
discs = discs.group(1, 2)
discs = (int(discs[0]), int(discs[1]))
if discs[1] == 1:
if track is None:
return (join(arg.OUTPUT, artist, album, title + ext), thumb)
else:
tracks = FMT.match(track)
if tracks is None:
raise ValueError(f"Failed to parse tracks {track}")
tracks = tracks.group(1, 2)
tracks = (int(tracks[0]), int(tracks[1]))
return (join(arg.OUTPUT, artist, album,
f"{tracks[0]:02} - {title}{ext}"), thumb)
else:
if track is None:
return (join(arg.OUTPUT, artist, album, f"Disc {discs[0]}",
f"{title}{ext}"), thumb)
else:
tracks = FMT.match(track)
if tracks is None:
raise ValueError(f"Failed to parse tracks {track}")
tracks = tracks.group(1, 2)
tracks = (int(tracks[0]), int(tracks[1]))
return (join(arg.OUTPUT, artist, album, f"Disc {discs[0]}",
f"{tracks[0]:02} - {title}{ext}"), thumb)
def get_m4a_files(dir: str, r: bool) -> List[str]:
if not isdir(dir):
return []
files = listdir(dir)
re = []
for file in files:
file = join(dir, file)
if isdir(file):
if r:
re += get_m4a_files(file, r)
elif file.endswith('.m4a'):
re.append(file)
return re
p = ArgumentParser()
p.add_argument("INPUT", help="Input directory")
p.add_argument("OUTPUT", help="Output directory")
p.add_argument("-F", "--ffmpeg", help="Path to ffmpeg", default="ffmpeg")
p.add_argument("-v", "--verbose", help="Verbose output",
action="store_true", default=False)
p.add_argument("-f", "--force", help="Overwrite existing files",
action="store_true", default=False)
p.add_argument("-r", "--recursive", help="Recursive search",
action="store_true", default=False)
p.add_argument("-c", "--cover", help="Cover file name", default="cover.jpg")
p.add_argument("-H", "--hardlink", help="Use hard link instead of copy",
action="store_true", default=False)
arg = p.parse_intermixed_args()
print(arg)
for f in get_m4a_files(arg.INPUT, arg.recursive):
if arg.verbose:
print(f"Processing {f}")
r = generate_path(f)
if arg.verbose:
print(f"Target path: {r[0]}")
if r[1]:
thumb = join(split(r[0])[0], arg.cover)
if arg.verbose:
print(f"Target thumb: {thumb}")
if not exists(thumb):
makedirs(split(thumb)[0], exist_ok=True)
generate_thumb(f, thumb)
if not arg.force and exists(r[0]):
print(f"File {r[0]} exists, skipped")
continue
if exists(r[0]):
remove(r[0])
makedirs(split(r[0])[0], exist_ok=True)
if arg.hardlink:
link(f, r[0])
else:
symlink(f, r[0])
if arg.verbose:
print(f"Linked {f} to {r[0]}")