From 531e90b82f3e524bb83344c85abcb7dcecb1e672 Mon Sep 17 00:00:00 2001 From: lifegpc Date: Thu, 29 Jul 2021 15:26:07 +0800 Subject: [PATCH] add gen_dlsite.py --- .gitignore | 1 + gen_dlsite.py | 326 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 327 insertions(+) create mode 100644 gen_dlsite.py diff --git a/.gitignore b/.gitignore index d89c233..f95020a 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,4 @@ dmypy.json .vscode/ *.txt *.xls +*.json diff --git a/gen_dlsite.py b/gen_dlsite.py new file mode 100644 index 0000000..a87d5c4 --- /dev/null +++ b/gen_dlsite.py @@ -0,0 +1,326 @@ +# gen_dlsite.py +# (C) 2021 lifegpc +# The repo location: https://github.com/lifegpc/pythonscript +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from hashlib import sha512 as __sha512 +from typing import Dict, List +from urllib.parse import quote_plus as q, urlencode +from json import load as loadjson, dumps as dumpjson +from os.path import exists +from requests import Session +from time import time +from traceback import print_exc +from getopt import getopt +from re import search, IGNORECASE +jsonsep = (',', ':') +SHA512_TYPE = 1 +ParseQsResult = Dict[str, List[str]] + + +def sha512(s, encoding='utf8') -> str: + if isinstance(s, str): + b = s.encode(encoding) + elif isinstance(s, bytes): + b = s + else: + b = str(s).encode(encoding) + h = __sha512() + h.update(b) + return h.hexdigest() + + +def genSign(sercet: str, qd: ParseQsResult, t: int = SHA512_TYPE): + para = '' + keys = qd.keys() + keys = sorted(keys) + for k in keys: + if k != 'sign' and qd[k] is not None: + for qv in qd[k]: + v = f"{q(k)}={q(qv)}" + para = v if para == '' else f"{para}&{v}" + para = sercet + para + if t & SHA512_TYPE: + h = sha512(para) + else: + h = sha512(para) + return h + + +def loadsWithHeaderSep(s: str) -> Dict[str, str]: + try: + d = {} + sl = s.split(';') + for i in sl: + i = i.strip() + ss = i.split('=', 1) + d[ss[0]] = ss[1] + return d + except Exception: + return None + + +class Settings: + def readSettings(self): + if exists('gen_dlsite.json'): + try: + with open('gen_dlsite.json', 'r', encoding='utf8') as f: + self.__data = loadjson(f) + except Exception: + self.__data = None + else: + self.__data = None + + @property + def APIEntry(self) -> str: + '''Source code of API: + https://github.com/lifegpc/csweb/tree/master/proxy''' + defalut = 'https://cs.kanahanazawa.com/proxy' + if self.__data is None: + return defalut + key = 'APIEntry' + if key in self.__data and self.__data[key]: + return self.__data[key] + return defalut + + @property + def APISecrets(self) -> str: + if self.__data is None: + return None + key = 'APISecrets' + if key in self.__data and self.__data[key] and self.__data[key] != '': + return self.__data[key] + return None + + @property + def dlsiteCookies(self) -> Dict[str, str]: + if self.__data is None: + return None + key = 'dlsiteCookies' + if key in self.__data and self.__data[key] and self.__data[key] != '': + return loadsWithHeaderSep(self.__data[key]) + return None + + @property + def overrideOld(self) -> bool: + defalut = False + if self.__data is None: + return defalut + key = 'overrideOld' + if key in self.__data and isinstance(self.__data[key], bool): + return self.__data[key] + return defalut + + +class Main: + def __init__(self): + self._s = Settings() + self._s.readSettings() + self._ses = Session() + self._dlses = Session() + + def add(self, id: str, cookies: Dict[str, str]): + try: + r = self.request('add', id=id, cookies=cookies, + headers={"referrer": "https://www.dlsite.com/"}) + if r.status_code == 200: + j = r.json() + if j['code'] != 0: + print(f"{j['code']} {j['msg']}") + return None + return j['result'] + else: + print(f'{r.status_code} {r.reason}') + return None + except Exception: + print_exc() + return None + + @property + def APIEntry(self) -> str: + return self._s.APIEntry + + @property + def APISecrets(self) -> str: + return self._s.APISecrets + + @property + def dlsiteCookies(self) -> Dict[str, str]: + return self._s.dlsiteCookies + + def exists(self, id: str) -> bool: + try: + r = self.request('exists', id=id) + if r.status_code == 200: + j = r.json() + if j['code'] != 0: + print(f"{j['code']} {j['msg']}") + return None + return j['result'] + else: + print(f'{r.status_code} {r.reason}') + return None + except Exception: + print_exc() + return None + + def gen(self, id: str, target: str) -> str: + try: + r = self.request('gen', id=id, target=target, + e=round(time() + 3600)) + if r.status_code == 200: + j = r.json() + if j['code'] != 0: + print(f"{j['code']} {j['msg']}") + return None + return j['result'] + else: + print(f'{r.status_code} {r.reason}') + return None + except Exception: + print_exc() + return None + + def getCookies(self) -> Dict[str, str]: + d = {} + for i in self._dlses.cookies.iteritems(): + d[i[0]] = i[1] + return d + + def getName(self, link: str) -> str: + if link is None: + return None + rs = search(r'product_id/([^./]+)', link, IGNORECASE) + if rs is None: + return None + pn = rs.groups()[0] + rs = search(r'number/(\d+)', link, IGNORECASE) + if rs is not None: + pn += '.part' + rs.groups()[0] + return pn + + def getopt(self, argv: List[str]): + ll = getopt(argv, '', []) + self._link = None + if len(ll[1]) > 0: + self._link = ll[1][0] + + def getProxyLink(self, link): + r = self._dlses.head(link) + if r.status_code not in [301, 302, 307]: + print(f'{r.status_code} {r.reason}') + return None + url = r.headers['location'] + pn = self.getName(link) + if not self.overrideOld: + tpn = pn + tpni = 0 + ext = self.exists(tpn) + if ext is None: + return None + while ext is True: + tpni += 1 + tpn = f'{pn}_{tpni}' + ext = self.exists(tpn) + if ext is None: + return None + pn = tpn + print(pn) + ck = self.getCookies() + if ck is None: + return None + r = self.add(pn, ck) + if not r: + return None + r = self.gen(pn, url) + if r: + return r + return None + + @property + def link(self) -> str: + if hasattr(self, '_link'): + return self._link + else: + return None + + def isDownloadLink(self) -> bool: + if self.link is None: + return False + return self.link.find('download') > -1 + + def isSerialLink(self) -> bool: + if self.link is None: + return False + return self.link.find('serial') > -1 + + @property + def name(self) -> str: + return self.getName(self.link) + + def overrideOld(self) -> bool: + return self._s.overrideOld + + def request(self, action: str, stream: bool = False, **kw): + url = f'{self.APIEntry}/{action}' + se = {'t': [str(round(time()))], 'a': [action]} + for i in kw: + c = kw[i] + if isinstance(c, (list, dict)): + se[i] = [dumpjson(c, ensure_ascii=False, separators=jsonsep)] + elif isinstance(c, str): + se[i] = [c] + else: + se[i] = [str(c)] + sg = genSign(self.APISecrets, se) + se['sign'] = [sg] + for i in se: + se[i] = se[i][0] + url += "?" + urlencode(se) + re = self._ses.get(url, stream=stream) + return re + + def run(self, argv: List[str]) -> int: + if self.APISecrets is None: + print('APISecrets is needed.') + return -1 + if self.dlsiteCookies is None: + print('dlsiteCookies is needed.') + return -2 + self.getopt(argv) + self._dlses.cookies.update(self.dlsiteCookies) + self._dlses.headers.update({'referrer': 'https://www.dlsite.com/'}) + if self.link is None: + print('gen_dlsite.py [options] ') + return -3 + if self.isDownloadLink: + t = self.getProxyLink(self.link) + if t: + print(t) + elif self.isSerialLink: + pass + else: + print('Unknown link.') + return -4 + return 0 + + +if __name__ == "__main__": + import sys + try: + m = Main() + sys.exit(m.run(sys.argv[1:])) + except Exception: + print_exc() + sys.exit(-1)