mirror of
https://github.com/lifegpc/pythonscript.git
synced 2026-06-06 11:28:58 +08:00
add EPUB support (incomplete)
This commit is contained in:
420
google_play.py
420
google_play.py
@@ -1,19 +1,330 @@
|
||||
from requests import Session
|
||||
from http.cookiejar import MozillaCookieJar
|
||||
from html.parser import HTMLParser
|
||||
from typing import List, Tuple, Optional
|
||||
from typing import Any, Callable, Dict, List, Tuple, Optional
|
||||
from js2py import eval_js
|
||||
from os import makedirs, remove
|
||||
from os.path import join, exists, relpath, abspath
|
||||
from traceback import print_exc
|
||||
from json import load, dump
|
||||
from json import load, dump, loads
|
||||
from urllib.parse import urljoin, parse_qs, urlparse
|
||||
from base64 import b64decode as _b64decode
|
||||
from re import compile
|
||||
from argparse import ArgumentParser, RawTextHelpFormatter
|
||||
from textwrap import wrap as _wrap
|
||||
from zipfile import ZIP_STORED, ZipFile
|
||||
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
|
||||
from subprocess import Popen, DEVNULL
|
||||
from xml.etree import ElementTree as ET
|
||||
from time import gmtime, strftime, strptime
|
||||
from calendar import timegm
|
||||
|
||||
|
||||
def get_iso8601_time(d: int) -> str:
|
||||
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(d))
|
||||
|
||||
|
||||
def parse_time(s: str) -> int:
|
||||
return timegm(strptime(s, "%Y.%m.%d"))
|
||||
|
||||
|
||||
DC_NS = 'http://purl.org/dc/elements/1.1/'
|
||||
OPF_NS = 'http://www.idpf.org/2007/opf'
|
||||
ALLOW_DC_LIST = ['coverage', 'description', 'format', 'publisher', 'relation', 'rights', 'source', 'type']
|
||||
PREFIX_DICT = {'rendition': 'http://www.idpf.org/vocab/rendition/#',
|
||||
'ebpaj': 'http://www.ebpaj.jp/',
|
||||
'fixed-layout-jp': 'http://www.digital-comic.jp/',
|
||||
'kadokawa': 'http://www.access-company.com/2012/layout#',
|
||||
'ibooks': 'http://vocabulary.itunes.apple.com/rdf/ibooks/vocabulary-extensions-1.0/'}
|
||||
js_dumps: Callable[[Any], str] = eval_js('function(a){return JSON.stringify(a);}')
|
||||
|
||||
|
||||
class InvalidEPUB(Exception):
|
||||
def __init__(self, message: str, *args):
|
||||
super().__init__(message, *args)
|
||||
|
||||
|
||||
class EPUBLink:
|
||||
def __init__(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None):
|
||||
self.href = href
|
||||
self.media_type = media_type
|
||||
self.rel = rel
|
||||
self.file = file
|
||||
self.compress_type = compress_type
|
||||
|
||||
def dump(self) -> Dict[str, str]:
|
||||
if not self.href or not self.rel or not self.file:
|
||||
raise InvalidEPUB('href or rel is needed for link element.')
|
||||
d = {'href': self.href, 'rel': self.rel}
|
||||
if self.media_type:
|
||||
d['media-type'] = self.media_type
|
||||
return d
|
||||
|
||||
def save(self, f: ZipFile):
|
||||
f.write(self.file, self.href, self.compress_type)
|
||||
print(f'Added file {self.file} to {self.href} in archive.')
|
||||
|
||||
|
||||
class EPUBIdentifier:
|
||||
def __init__(self, value: str, id: str = None):
|
||||
self.value = value
|
||||
self.id = id
|
||||
|
||||
|
||||
class EPUBCreator:
|
||||
def __init__(self, type: str, value: str, role: str = None, file_as: str = None):
|
||||
self.type = type
|
||||
self.value = value
|
||||
self.role = role
|
||||
self.file_as = file_as
|
||||
|
||||
def dump(self) -> ET.Element:
|
||||
if not self.value or not self.type:
|
||||
raise InvalidEPUB('Creator need a value or type.')
|
||||
e = ET.Element(f"dc:{self.type}")
|
||||
if self.role:
|
||||
e.attrib['opf:role'] = self.role
|
||||
if self.file_as:
|
||||
e.attrib['opf:file_as'] = self.file_as
|
||||
e.text = self.value
|
||||
return e
|
||||
|
||||
|
||||
class EPUBSubject:
|
||||
def __init__(self, sub: str, authority: str = None, term: str = None):
|
||||
self.sub = sub
|
||||
self.authority = authority
|
||||
self.term = term
|
||||
|
||||
def dump(self) -> ET.Element:
|
||||
if not self.sub:
|
||||
raise InvalidEPUB('Subject is needed for subject.')
|
||||
if self.authority and not self.term:
|
||||
raise InvalidEPUB('Term is needed if authority is defined for subject.')
|
||||
d = {'opf:authority': self.authority, 'opf:term': self.term} if self.authority else {}
|
||||
e = ET.Element('dc:subject', d)
|
||||
e.text = self.sub
|
||||
return e
|
||||
|
||||
|
||||
class EPUBMeta:
|
||||
def __init__(self, prop: str, value: str):
|
||||
self.prefix_url = None
|
||||
if ':' in prop:
|
||||
prefix = prop[:prop.find(':')]
|
||||
if prefix not in PREFIX_DICT:
|
||||
raise ValueError('Unknown prefix')
|
||||
self.prefix_url = f"{prefix}: {PREFIX_DICT[prefix]}"
|
||||
self.property = prop
|
||||
self.value = value
|
||||
|
||||
def dump(self) -> ET.Element:
|
||||
e = ET.Element('meta', {'property': self.property})
|
||||
e.text = self.value
|
||||
return e
|
||||
|
||||
|
||||
class EPUBMetadata:
|
||||
def __init__(self, p):
|
||||
self._p: EPUBPackage = p
|
||||
self.identifiers: List[EPUBIdentifier] = []
|
||||
self.unique_identifier = ''
|
||||
self.title = ''
|
||||
self._language: List[str] = []
|
||||
self.contributors: List[EPUBCreator] = []
|
||||
self.creators: List[EPUBCreator] = []
|
||||
self.date: int = None
|
||||
self.subjects: List[EPUBSubject] = []
|
||||
self.maps: Dict[str, List[str]] = {}
|
||||
self.metas: List[EPUBMeta] = []
|
||||
|
||||
def add_contributor(self, name: str, role: str = None, file_as: str = None):
|
||||
self.contributors.append(EPUBCreator('contributor', name, role, file_as))
|
||||
|
||||
def add_creator(self, name: str, role: str = None, file_as: str = None):
|
||||
self.creators.append(EPUBCreator('creator', name, role, file_as))
|
||||
|
||||
def add_data(self, key: str, value: str):
|
||||
if key in ALLOW_DC_LIST:
|
||||
if key in self.maps:
|
||||
self.maps[key].append(value)
|
||||
else:
|
||||
self.maps[key] = [value]
|
||||
|
||||
def add_identifier(self, value: str, id: str = None):
|
||||
if id is not None:
|
||||
for i in self.identifiers:
|
||||
if i.id is not None and i.id == id:
|
||||
raise ValueError(f'id {id} already have value.')
|
||||
self.identifiers.append(EPUBIdentifier(value, id))
|
||||
|
||||
def add_meta(self, prop: str, value: str):
|
||||
self.metas.append(EPUBMeta(prop, value))
|
||||
|
||||
def add_subject(self, sub: str, authority: str = None, term: str = None):
|
||||
self.subjects.append(EPUBSubject(sub, authority, term))
|
||||
|
||||
def dump(self) -> ET.Element:
|
||||
metadata = ET.Element('metadata', {'xmlns:dc': DC_NS, 'xmlns:opf': OPF_NS})
|
||||
for i in self.identifiers:
|
||||
d = {"id": i.id} if i.id else {}
|
||||
ide = ET.Element('dc:identifier', d)
|
||||
ide.text = i.value
|
||||
metadata.append(ide)
|
||||
if not self.title:
|
||||
raise InvalidEPUB('title is needed for metadata.')
|
||||
title = ET.Element('dc:title')
|
||||
title.text = self.title
|
||||
metadata.append(title)
|
||||
lang = self.language
|
||||
if lang is None or len(lang) == 0:
|
||||
raise InvalidEPUB('language is needed for metadata.')
|
||||
for lan in lang:
|
||||
la = ET.Element('dc:language')
|
||||
la.text = lan
|
||||
metadata.append(la)
|
||||
for i in self.contributors:
|
||||
metadata.append(i.dump())
|
||||
for i in self.creators:
|
||||
metadata.append(i.dump())
|
||||
if self.date is not None:
|
||||
date = ET.Element('dc:date')
|
||||
date.text = get_iso8601_time(self.date)
|
||||
metadata.append(date)
|
||||
for i in self.subjects:
|
||||
metadata.append(i.dump())
|
||||
for key in self.maps:
|
||||
for v in self.maps[key]:
|
||||
e = ET.Element(f"dc:{key}")
|
||||
e.text = v
|
||||
metadata.append(e)
|
||||
for i in self.metas:
|
||||
metadata.append(i.dump())
|
||||
return metadata
|
||||
|
||||
def get_unique_identifier(self) -> Optional[str]:
|
||||
if self.unique_identifier:
|
||||
for i in self.identifiers:
|
||||
if i.id == self.unique_identifier:
|
||||
return self.unique_identifier
|
||||
self.unique_identifier = ''
|
||||
return self.get_unique_identifier()
|
||||
else:
|
||||
for i in self.identifiers:
|
||||
if i.id:
|
||||
return i.id
|
||||
return None
|
||||
|
||||
@property
|
||||
def language(self):
|
||||
if len(self._language) > 0:
|
||||
return self._language
|
||||
if self._p.language:
|
||||
return [self._p.language]
|
||||
return None
|
||||
|
||||
@property
|
||||
def prefix(self):
|
||||
r = []
|
||||
for i in self.metas:
|
||||
if i.prefix_url:
|
||||
if i.prefix_url not in r:
|
||||
r.append(i.prefix_url)
|
||||
return ' '.join(r)
|
||||
|
||||
|
||||
class EPUBPackage:
|
||||
def __init__(self, location: str = 'package.opf'):
|
||||
self.location = location
|
||||
self.metadata = EPUBMetadata(self)
|
||||
self.language = ''
|
||||
|
||||
def add_identifier(self, value: str, id: str = None):
|
||||
self.metadata.add_identifier(value, id)
|
||||
|
||||
def dump(self):
|
||||
ide = self.metadata.get_unique_identifier()
|
||||
if ide is None:
|
||||
raise InvalidEPUB('A unique identifier is needed.')
|
||||
root = ET.Element('package', {'xmlns': OPF_NS, 'version': '3.0', 'unique-identifier': ide})
|
||||
if self.language:
|
||||
root.attrib['xml:lang'] = self.language
|
||||
prefix = self.metadata.prefix
|
||||
if prefix:
|
||||
root.attrib['prefix'] = prefix
|
||||
root.append(self.metadata.dump())
|
||||
return ET.tostring(root, 'UTF-8')
|
||||
|
||||
def save(self, f: ZipFile):
|
||||
f.writestr(self.location, self.dump())
|
||||
print(f'Added {self.location} to archive.')
|
||||
|
||||
@property
|
||||
def unique_identifier(self):
|
||||
return self.metadata.get_unique_identifier()
|
||||
|
||||
@unique_identifier.setter
|
||||
def unique_identifier(self, v):
|
||||
if isinstance(v, str):
|
||||
self.metadata.unique_identifier = v
|
||||
else:
|
||||
raise TypeError('Unsupported type.')
|
||||
|
||||
|
||||
class EPUBContainer:
|
||||
def __init__(self):
|
||||
self.packages = [EPUBPackage()]
|
||||
self.links: List[EPUBLink] = []
|
||||
|
||||
def add_link(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None):
|
||||
for i in self.links:
|
||||
if i.href == href:
|
||||
raise ValueError(f'{href} already in links.')
|
||||
self.links.append(EPUBLink(href, rel, file, media_type, compress_type))
|
||||
|
||||
def dump(self):
|
||||
if len(self.packages) < 1:
|
||||
raise InvalidEPUB('1 or more packages is needed.')
|
||||
root = ET.Element('container', {'xmlns': 'urn:oasis:names:tc:opendocument:xmlns:container', 'version': '1.0'})
|
||||
rootfiles = ET.Element('rootfiles')
|
||||
for p in self.packages:
|
||||
rootfile = ET.Element('rootfile', {'full-path': p.location, 'media-type': 'application/oebps-package+xml'})
|
||||
rootfiles.append(rootfile)
|
||||
root.append(rootfiles)
|
||||
if len(self.links) > 0:
|
||||
links = ET.Element('links')
|
||||
for i in self.links:
|
||||
link = ET.Element('link', i.dump())
|
||||
links.append(link)
|
||||
root.append(links)
|
||||
return ET.tostring(root, 'UTF-8')
|
||||
|
||||
def save(self, f: ZipFile):
|
||||
f.writestr('META-INF/container.xml', self.dump())
|
||||
print('Added META-INF/container.xml to archive.')
|
||||
for i in self.links:
|
||||
i.save(f)
|
||||
for p in self.packages:
|
||||
p.save(f)
|
||||
|
||||
|
||||
class EPUB:
|
||||
def __init__(self) -> None:
|
||||
self.container = EPUBContainer()
|
||||
self.packages = self.container.packages
|
||||
self.package = self.packages[0]
|
||||
self.metadata = self.package.metadata
|
||||
|
||||
def add_identifier(self, value: str, id: str = None):
|
||||
self.package.add_identifier(value, id)
|
||||
|
||||
def add_link(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None):
|
||||
self.container.add_link(href, rel, file, media_type, compress_type)
|
||||
|
||||
def save(self, filename: str):
|
||||
with ZipFile(filename, 'w', ZIP_DEFLATED, True) as f:
|
||||
f.writestr('mimetype', b'application/epub+zip', ZIP_STORED)
|
||||
print('Added mimetype to archive.')
|
||||
self.container.save(f)
|
||||
|
||||
|
||||
def detect_7z() -> bool:
|
||||
@@ -215,6 +526,67 @@ class MetadataParser(HTMLParser):
|
||||
return f()
|
||||
|
||||
|
||||
class MetadataParser2(HTMLParser):
|
||||
def __init__(self, *k, convert_charrefs: bool = ...) -> None:
|
||||
self._metadata = ''
|
||||
self._metadatal = []
|
||||
self._in_script = False
|
||||
self._is_meta = False
|
||||
super().__init__(*k, convert_charrefs=convert_charrefs)
|
||||
|
||||
|
||||
def handle_data(self, data: str) -> None:
|
||||
if self._in_script and (self._is_meta or (self._metadata == '' and data.startswith('AF_initDataCallback'))):
|
||||
self._is_meta = True
|
||||
self._metadata += data
|
||||
|
||||
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
|
||||
if tag == 'script':
|
||||
self._in_script = True
|
||||
|
||||
def handle_endtag(self, tag: str) -> None:
|
||||
if tag == 'script':
|
||||
if self._in_script and self._metadata:
|
||||
self._metadatal.append(self._metadata)
|
||||
self._metadata = ''
|
||||
self._in_script = False
|
||||
self._is_meta = False
|
||||
|
||||
@property
|
||||
def metadata(self):
|
||||
r = {}
|
||||
for i in self._metadatal:
|
||||
f = eval_js("function(){return " + i[20:-2] + ";}")
|
||||
d = f()
|
||||
d = loads(js_dumps(d))
|
||||
r[d['key']] = d['data']
|
||||
return r
|
||||
|
||||
|
||||
def get_isbn(d: dict) -> Optional[str]:
|
||||
for key in d:
|
||||
try:
|
||||
for v in d[key][1][7][16][0]:
|
||||
k: str = v[0]
|
||||
if k.lower() == 'isbn':
|
||||
return v[1][0][0][1]
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def get_genres(d: dict) -> Optional[str]:
|
||||
for key in d:
|
||||
try:
|
||||
for v in d[key][1][7][16][0]:
|
||||
k: str = v[0]
|
||||
if k.lower() in ['类型', '類型', 'Genres', 'ジャンル']:
|
||||
return v[1][0][0][1]
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
cookies = MozillaCookieJar('google.txt')
|
||||
cookies.load()
|
||||
ses = Session()
|
||||
@@ -224,7 +596,7 @@ ses.cookies = cookies
|
||||
arg = ArgumentParser(description='Download from Google Play Books', add_help=True, formatter_class=RawTextHelpFormatter)
|
||||
arg.add_argument('id', help=wrap("Book's id or url. Id is recommend because url may not be detected."))
|
||||
arg.add_argument('-o', '--output', help=wrap('Specify the location of output file. By default it will output to current directory and use "<author> - <title>" as file name.'), metavar='FILE', dest='output')
|
||||
arg.add_argument('type', help=f"Specify output type. (Default: null)\nSupported value:\nnull: Only download segments and resources.\nCBZ: Package all images file as a comic book ZIP archive.\n{wrap('CB7: Package all images file as a comic book 7-ZIP archive.')}", nargs='?', default='null', choices=['null', 'CBZ', 'CB7'], metavar='type')
|
||||
arg.add_argument('type', help=f"Specify output type. (Default: null)\nSupported value:\nnull: Only download segments and resources.\nCBZ: Package all images file as a comic book ZIP archive.\n{wrap('CB7: Package all images file as a comic book 7-ZIP archive.')}", nargs='?', default='null', choices=['null', 'CBZ', 'CB7', 'EPUB'], metavar='type')
|
||||
arg.add_argument('-c', '--cookies', help=wrap('Specify the location of cookies file. File must be Netscape HTTP Cookie File. (Default: google.txt)'), default="google.txt", metavar='FILE', dest='cookies')
|
||||
arg.add_argument('-a', '--authuser', help=wrap('Specify the index of current user. Will be useful when multiply Google Account is logined in a same cookie file. Index is start at 0. (Default: 0)'), default=0, type=int, metavar='INDEX', dest='authuser')
|
||||
arg.add_argument('-d', '--cache-dir', help=wrap('Specify the cache directory. By default, it will be "<author> - <title>"'), metavar='DIR', dest='cache_dir')
|
||||
@@ -277,6 +649,8 @@ try:
|
||||
else:
|
||||
decrypter = None
|
||||
segment = meta[0]['segment']
|
||||
with open(join(filename, 'metadata.json'), 'w', encoding='UTF8') as f:
|
||||
f.write(js_dumps(meta))
|
||||
resources = {}
|
||||
resource_file = join(filename, "resources.json")
|
||||
if exists(resource_file):
|
||||
@@ -442,6 +816,44 @@ try:
|
||||
with open(file_list_loc, 'w', encoding='UTF-8') as f:
|
||||
f.write('\n'.join(file_list))
|
||||
add_7z_archive(output, file_list_loc, filename, argsd['7z_compress_level'])
|
||||
else:
|
||||
output = args.output if args.output else f'{authors} - {title}.epub'
|
||||
webre = ses.get(f'https://play.google.com/store/books/details/?id={book_id}')
|
||||
if webre.status_code >= 400:
|
||||
raise ValueError('Failed to fetch metadata.')
|
||||
mf = MetadataParser2()
|
||||
mf.feed(webre.text)
|
||||
webmeta = mf.metadata
|
||||
with open(join(filename, 'metadata2.json'), 'w', encoding='UTF-8') as f:
|
||||
dump(webmeta, f, ensure_ascii=False, separators=(',', ':'))
|
||||
e = EPUB()
|
||||
isbn = get_isbn(webmeta)
|
||||
if isbn is not None:
|
||||
print(f'Get ISBN: {isbn}')
|
||||
e.add_identifier(isbn, 'ISBN')
|
||||
else:
|
||||
print('Failed to extract ISBN.')
|
||||
e.add_identifier(book_id, 'GOOGLE')
|
||||
e.metadata.title = title
|
||||
e.package.language = meta[0]['language']
|
||||
e.metadata.add_creator(authors, 'aut')
|
||||
e.metadata.date = parse_time(pub_date)
|
||||
genres = get_genres(webmeta)
|
||||
if genres is not None:
|
||||
print(f'Get genres: {genres}')
|
||||
genres = genres.split('/')
|
||||
for genre in genres:
|
||||
genre = genre.strip()
|
||||
e.metadata.add_subject(genre)
|
||||
else:
|
||||
print('Failed to extarct genres.')
|
||||
desc = meta[1][1][4]
|
||||
print(f'Get description: {desc}')
|
||||
e.metadata.add_data('description', desc)
|
||||
e.metadata.add_data('publisher', publisher)
|
||||
for i in meta[0]['meta']:
|
||||
e.metadata.add_meta(i['property'], i['cdata'])
|
||||
e.save(output)
|
||||
finally:
|
||||
cookies.save()
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user