From 4d5099e3749252b25ceac3f2be4051de9490796d Mon Sep 17 00:00:00 2001 From: lifegpc Date: Mon, 24 Jan 2022 23:30:36 +0800 Subject: [PATCH] add EPUB support (incomplete) --- google_play.py | 420 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 416 insertions(+), 4 deletions(-) diff --git a/google_play.py b/google_play.py index 934e24e..b93360f 100644 --- a/google_play.py +++ b/google_play.py @@ -1,19 +1,330 @@ from requests import Session from http.cookiejar import MozillaCookieJar from html.parser import HTMLParser -from typing import List, Tuple, Optional +from typing import Any, Callable, Dict, List, Tuple, Optional from js2py import eval_js from os import makedirs, remove from os.path import join, exists, relpath, abspath from traceback import print_exc -from json import load, dump +from json import load, dump, loads from urllib.parse import urljoin, parse_qs, urlparse from base64 import b64decode as _b64decode from re import compile from argparse import ArgumentParser, RawTextHelpFormatter from textwrap import wrap as _wrap -from zipfile import ZIP_STORED, ZipFile +from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile from subprocess import Popen, DEVNULL +from xml.etree import ElementTree as ET +from time import gmtime, strftime, strptime +from calendar import timegm + + +def get_iso8601_time(d: int) -> str: + return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(d)) + + +def parse_time(s: str) -> int: + return timegm(strptime(s, "%Y.%m.%d")) + + +DC_NS = 'http://purl.org/dc/elements/1.1/' +OPF_NS = 'http://www.idpf.org/2007/opf' +ALLOW_DC_LIST = ['coverage', 'description', 'format', 'publisher', 'relation', 'rights', 'source', 'type'] +PREFIX_DICT = {'rendition': 'http://www.idpf.org/vocab/rendition/#', + 'ebpaj': 'http://www.ebpaj.jp/', + 'fixed-layout-jp': 'http://www.digital-comic.jp/', + 'kadokawa': 'http://www.access-company.com/2012/layout#', + 'ibooks': 'http://vocabulary.itunes.apple.com/rdf/ibooks/vocabulary-extensions-1.0/'} +js_dumps: Callable[[Any], str] = eval_js('function(a){return JSON.stringify(a);}') + + +class InvalidEPUB(Exception): + def __init__(self, message: str, *args): + super().__init__(message, *args) + + +class EPUBLink: + def __init__(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None): + self.href = href + self.media_type = media_type + self.rel = rel + self.file = file + self.compress_type = compress_type + + def dump(self) -> Dict[str, str]: + if not self.href or not self.rel or not self.file: + raise InvalidEPUB('href or rel is needed for link element.') + d = {'href': self.href, 'rel': self.rel} + if self.media_type: + d['media-type'] = self.media_type + return d + + def save(self, f: ZipFile): + f.write(self.file, self.href, self.compress_type) + print(f'Added file {self.file} to {self.href} in archive.') + + +class EPUBIdentifier: + def __init__(self, value: str, id: str = None): + self.value = value + self.id = id + + +class EPUBCreator: + def __init__(self, type: str, value: str, role: str = None, file_as: str = None): + self.type = type + self.value = value + self.role = role + self.file_as = file_as + + def dump(self) -> ET.Element: + if not self.value or not self.type: + raise InvalidEPUB('Creator need a value or type.') + e = ET.Element(f"dc:{self.type}") + if self.role: + e.attrib['opf:role'] = self.role + if self.file_as: + e.attrib['opf:file_as'] = self.file_as + e.text = self.value + return e + + +class EPUBSubject: + def __init__(self, sub: str, authority: str = None, term: str = None): + self.sub = sub + self.authority = authority + self.term = term + + def dump(self) -> ET.Element: + if not self.sub: + raise InvalidEPUB('Subject is needed for subject.') + if self.authority and not self.term: + raise InvalidEPUB('Term is needed if authority is defined for subject.') + d = {'opf:authority': self.authority, 'opf:term': self.term} if self.authority else {} + e = ET.Element('dc:subject', d) + e.text = self.sub + return e + + +class EPUBMeta: + def __init__(self, prop: str, value: str): + self.prefix_url = None + if ':' in prop: + prefix = prop[:prop.find(':')] + if prefix not in PREFIX_DICT: + raise ValueError('Unknown prefix') + self.prefix_url = f"{prefix}: {PREFIX_DICT[prefix]}" + self.property = prop + self.value = value + + def dump(self) -> ET.Element: + e = ET.Element('meta', {'property': self.property}) + e.text = self.value + return e + + +class EPUBMetadata: + def __init__(self, p): + self._p: EPUBPackage = p + self.identifiers: List[EPUBIdentifier] = [] + self.unique_identifier = '' + self.title = '' + self._language: List[str] = [] + self.contributors: List[EPUBCreator] = [] + self.creators: List[EPUBCreator] = [] + self.date: int = None + self.subjects: List[EPUBSubject] = [] + self.maps: Dict[str, List[str]] = {} + self.metas: List[EPUBMeta] = [] + + def add_contributor(self, name: str, role: str = None, file_as: str = None): + self.contributors.append(EPUBCreator('contributor', name, role, file_as)) + + def add_creator(self, name: str, role: str = None, file_as: str = None): + self.creators.append(EPUBCreator('creator', name, role, file_as)) + + def add_data(self, key: str, value: str): + if key in ALLOW_DC_LIST: + if key in self.maps: + self.maps[key].append(value) + else: + self.maps[key] = [value] + + def add_identifier(self, value: str, id: str = None): + if id is not None: + for i in self.identifiers: + if i.id is not None and i.id == id: + raise ValueError(f'id {id} already have value.') + self.identifiers.append(EPUBIdentifier(value, id)) + + def add_meta(self, prop: str, value: str): + self.metas.append(EPUBMeta(prop, value)) + + def add_subject(self, sub: str, authority: str = None, term: str = None): + self.subjects.append(EPUBSubject(sub, authority, term)) + + def dump(self) -> ET.Element: + metadata = ET.Element('metadata', {'xmlns:dc': DC_NS, 'xmlns:opf': OPF_NS}) + for i in self.identifiers: + d = {"id": i.id} if i.id else {} + ide = ET.Element('dc:identifier', d) + ide.text = i.value + metadata.append(ide) + if not self.title: + raise InvalidEPUB('title is needed for metadata.') + title = ET.Element('dc:title') + title.text = self.title + metadata.append(title) + lang = self.language + if lang is None or len(lang) == 0: + raise InvalidEPUB('language is needed for metadata.') + for lan in lang: + la = ET.Element('dc:language') + la.text = lan + metadata.append(la) + for i in self.contributors: + metadata.append(i.dump()) + for i in self.creators: + metadata.append(i.dump()) + if self.date is not None: + date = ET.Element('dc:date') + date.text = get_iso8601_time(self.date) + metadata.append(date) + for i in self.subjects: + metadata.append(i.dump()) + for key in self.maps: + for v in self.maps[key]: + e = ET.Element(f"dc:{key}") + e.text = v + metadata.append(e) + for i in self.metas: + metadata.append(i.dump()) + return metadata + + def get_unique_identifier(self) -> Optional[str]: + if self.unique_identifier: + for i in self.identifiers: + if i.id == self.unique_identifier: + return self.unique_identifier + self.unique_identifier = '' + return self.get_unique_identifier() + else: + for i in self.identifiers: + if i.id: + return i.id + return None + + @property + def language(self): + if len(self._language) > 0: + return self._language + if self._p.language: + return [self._p.language] + return None + + @property + def prefix(self): + r = [] + for i in self.metas: + if i.prefix_url: + if i.prefix_url not in r: + r.append(i.prefix_url) + return ' '.join(r) + + +class EPUBPackage: + def __init__(self, location: str = 'package.opf'): + self.location = location + self.metadata = EPUBMetadata(self) + self.language = '' + + def add_identifier(self, value: str, id: str = None): + self.metadata.add_identifier(value, id) + + def dump(self): + ide = self.metadata.get_unique_identifier() + if ide is None: + raise InvalidEPUB('A unique identifier is needed.') + root = ET.Element('package', {'xmlns': OPF_NS, 'version': '3.0', 'unique-identifier': ide}) + if self.language: + root.attrib['xml:lang'] = self.language + prefix = self.metadata.prefix + if prefix: + root.attrib['prefix'] = prefix + root.append(self.metadata.dump()) + return ET.tostring(root, 'UTF-8') + + def save(self, f: ZipFile): + f.writestr(self.location, self.dump()) + print(f'Added {self.location} to archive.') + + @property + def unique_identifier(self): + return self.metadata.get_unique_identifier() + + @unique_identifier.setter + def unique_identifier(self, v): + if isinstance(v, str): + self.metadata.unique_identifier = v + else: + raise TypeError('Unsupported type.') + + +class EPUBContainer: + def __init__(self): + self.packages = [EPUBPackage()] + self.links: List[EPUBLink] = [] + + def add_link(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None): + for i in self.links: + if i.href == href: + raise ValueError(f'{href} already in links.') + self.links.append(EPUBLink(href, rel, file, media_type, compress_type)) + + def dump(self): + if len(self.packages) < 1: + raise InvalidEPUB('1 or more packages is needed.') + root = ET.Element('container', {'xmlns': 'urn:oasis:names:tc:opendocument:xmlns:container', 'version': '1.0'}) + rootfiles = ET.Element('rootfiles') + for p in self.packages: + rootfile = ET.Element('rootfile', {'full-path': p.location, 'media-type': 'application/oebps-package+xml'}) + rootfiles.append(rootfile) + root.append(rootfiles) + if len(self.links) > 0: + links = ET.Element('links') + for i in self.links: + link = ET.Element('link', i.dump()) + links.append(link) + root.append(links) + return ET.tostring(root, 'UTF-8') + + def save(self, f: ZipFile): + f.writestr('META-INF/container.xml', self.dump()) + print('Added META-INF/container.xml to archive.') + for i in self.links: + i.save(f) + for p in self.packages: + p.save(f) + + +class EPUB: + def __init__(self) -> None: + self.container = EPUBContainer() + self.packages = self.container.packages + self.package = self.packages[0] + self.metadata = self.package.metadata + + def add_identifier(self, value: str, id: str = None): + self.package.add_identifier(value, id) + + def add_link(self, href: str, rel: str, file: str, media_type: str = None, compress_type: int = None): + self.container.add_link(href, rel, file, media_type, compress_type) + + def save(self, filename: str): + with ZipFile(filename, 'w', ZIP_DEFLATED, True) as f: + f.writestr('mimetype', b'application/epub+zip', ZIP_STORED) + print('Added mimetype to archive.') + self.container.save(f) def detect_7z() -> bool: @@ -215,6 +526,67 @@ class MetadataParser(HTMLParser): return f() +class MetadataParser2(HTMLParser): + def __init__(self, *k, convert_charrefs: bool = ...) -> None: + self._metadata = '' + self._metadatal = [] + self._in_script = False + self._is_meta = False + super().__init__(*k, convert_charrefs=convert_charrefs) + + + def handle_data(self, data: str) -> None: + if self._in_script and (self._is_meta or (self._metadata == '' and data.startswith('AF_initDataCallback'))): + self._is_meta = True + self._metadata += data + + def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None: + if tag == 'script': + self._in_script = True + + def handle_endtag(self, tag: str) -> None: + if tag == 'script': + if self._in_script and self._metadata: + self._metadatal.append(self._metadata) + self._metadata = '' + self._in_script = False + self._is_meta = False + + @property + def metadata(self): + r = {} + for i in self._metadatal: + f = eval_js("function(){return " + i[20:-2] + ";}") + d = f() + d = loads(js_dumps(d)) + r[d['key']] = d['data'] + return r + + +def get_isbn(d: dict) -> Optional[str]: + for key in d: + try: + for v in d[key][1][7][16][0]: + k: str = v[0] + if k.lower() == 'isbn': + return v[1][0][0][1] + except Exception: + pass + return None + + +def get_genres(d: dict) -> Optional[str]: + for key in d: + try: + for v in d[key][1][7][16][0]: + k: str = v[0] + if k.lower() in ['类型', '類型', 'Genres', 'ジャンル']: + return v[1][0][0][1] + except Exception: + pass + return None + + cookies = MozillaCookieJar('google.txt') cookies.load() ses = Session() @@ -224,7 +596,7 @@ ses.cookies = cookies arg = ArgumentParser(description='Download from Google Play Books', add_help=True, formatter_class=RawTextHelpFormatter) arg.add_argument('id', help=wrap("Book's id or url. Id is recommend because url may not be detected.")) arg.add_argument('-o', '--output', help=wrap('Specify the location of output file. By default it will output to current directory and use " - " as file name.'), metavar='FILE', dest='output') -arg.add_argument('type', help=f"Specify output type. (Default: null)\nSupported value:\nnull: Only download segments and resources.\nCBZ: Package all images file as a comic book ZIP archive.\n{wrap('CB7: Package all images file as a comic book 7-ZIP archive.')}", nargs='?', default='null', choices=['null', 'CBZ', 'CB7'], metavar='type') +arg.add_argument('type', help=f"Specify output type. (Default: null)\nSupported value:\nnull: Only download segments and resources.\nCBZ: Package all images file as a comic book ZIP archive.\n{wrap('CB7: Package all images file as a comic book 7-ZIP archive.')}", nargs='?', default='null', choices=['null', 'CBZ', 'CB7', 'EPUB'], metavar='type') arg.add_argument('-c', '--cookies', help=wrap('Specify the location of cookies file. File must be Netscape HTTP Cookie File. (Default: google.txt)'), default="google.txt", metavar='FILE', dest='cookies') arg.add_argument('-a', '--authuser', help=wrap('Specify the index of current user. Will be useful when multiply Google Account is logined in a same cookie file. Index is start at 0. (Default: 0)'), default=0, type=int, metavar='INDEX', dest='authuser') arg.add_argument('-d', '--cache-dir', help=wrap('Specify the cache directory. By default, it will be "<author> - <title>"'), metavar='DIR', dest='cache_dir') @@ -277,6 +649,8 @@ try: else: decrypter = None segment = meta[0]['segment'] + with open(join(filename, 'metadata.json'), 'w', encoding='UTF8') as f: + f.write(js_dumps(meta)) resources = {} resource_file = join(filename, "resources.json") if exists(resource_file): @@ -442,6 +816,44 @@ try: with open(file_list_loc, 'w', encoding='UTF-8') as f: f.write('\n'.join(file_list)) add_7z_archive(output, file_list_loc, filename, argsd['7z_compress_level']) + else: + output = args.output if args.output else f'{authors} - {title}.epub' + webre = ses.get(f'https://play.google.com/store/books/details/?id={book_id}') + if webre.status_code >= 400: + raise ValueError('Failed to fetch metadata.') + mf = MetadataParser2() + mf.feed(webre.text) + webmeta = mf.metadata + with open(join(filename, 'metadata2.json'), 'w', encoding='UTF-8') as f: + dump(webmeta, f, ensure_ascii=False, separators=(',', ':')) + e = EPUB() + isbn = get_isbn(webmeta) + if isbn is not None: + print(f'Get ISBN: {isbn}') + e.add_identifier(isbn, 'ISBN') + else: + print('Failed to extract ISBN.') + e.add_identifier(book_id, 'GOOGLE') + e.metadata.title = title + e.package.language = meta[0]['language'] + e.metadata.add_creator(authors, 'aut') + e.metadata.date = parse_time(pub_date) + genres = get_genres(webmeta) + if genres is not None: + print(f'Get genres: {genres}') + genres = genres.split('/') + for genre in genres: + genre = genre.strip() + e.metadata.add_subject(genre) + else: + print('Failed to extarct genres.') + desc = meta[1][1][4] + print(f'Get description: {desc}') + e.metadata.add_data('description', desc) + e.metadata.add_data('publisher', publisher) + for i in meta[0]['meta']: + e.metadata.add_meta(i['property'], i['cdata']) + e.save(output) finally: cookies.save() try: