mirror of
https://github.com/lifegpc/pythonscript.git
synced 2026-06-06 11:28:58 +08:00
327 lines
9.3 KiB
Python
327 lines
9.3 KiB
Python
# gen_dlsite.py
|
|
# (C) 2021 lifegpc
|
|
# The repo location: https://github.com/lifegpc/pythonscript
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from hashlib import sha512 as __sha512
|
|
from typing import Dict, List
|
|
from urllib.parse import quote_plus as q, urlencode
|
|
from json import load as loadjson, dumps as dumpjson
|
|
from os.path import exists
|
|
from requests import Session
|
|
from time import time
|
|
from traceback import print_exc
|
|
from getopt import getopt
|
|
from re import search, IGNORECASE
|
|
jsonsep = (',', ':')
|
|
SHA512_TYPE = 1
|
|
ParseQsResult = Dict[str, List[str]]
|
|
|
|
|
|
def sha512(s, encoding='utf8') -> str:
|
|
if isinstance(s, str):
|
|
b = s.encode(encoding)
|
|
elif isinstance(s, bytes):
|
|
b = s
|
|
else:
|
|
b = str(s).encode(encoding)
|
|
h = __sha512()
|
|
h.update(b)
|
|
return h.hexdigest()
|
|
|
|
|
|
def genSign(sercet: str, qd: ParseQsResult, t: int = SHA512_TYPE):
|
|
para = ''
|
|
keys = qd.keys()
|
|
keys = sorted(keys)
|
|
for k in keys:
|
|
if k != 'sign' and qd[k] is not None:
|
|
for qv in qd[k]:
|
|
v = f"{q(k)}={q(qv)}"
|
|
para = v if para == '' else f"{para}&{v}"
|
|
para = sercet + para
|
|
if t & SHA512_TYPE:
|
|
h = sha512(para)
|
|
else:
|
|
h = sha512(para)
|
|
return h
|
|
|
|
|
|
def loadsWithHeaderSep(s: str) -> Dict[str, str]:
|
|
try:
|
|
d = {}
|
|
sl = s.split(';')
|
|
for i in sl:
|
|
i = i.strip()
|
|
ss = i.split('=', 1)
|
|
d[ss[0]] = ss[1]
|
|
return d
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class Settings:
|
|
def readSettings(self):
|
|
if exists('gen_dlsite.json'):
|
|
try:
|
|
with open('gen_dlsite.json', 'r', encoding='utf8') as f:
|
|
self.__data = loadjson(f)
|
|
except Exception:
|
|
self.__data = None
|
|
else:
|
|
self.__data = None
|
|
|
|
@property
|
|
def APIEntry(self) -> str:
|
|
'''Source code of API:
|
|
https://github.com/lifegpc/csweb/tree/master/proxy'''
|
|
defalut = 'https://cs.kanahanazawa.com/proxy'
|
|
if self.__data is None:
|
|
return defalut
|
|
key = 'APIEntry'
|
|
if key in self.__data and self.__data[key]:
|
|
return self.__data[key]
|
|
return defalut
|
|
|
|
@property
|
|
def APISecrets(self) -> str:
|
|
if self.__data is None:
|
|
return None
|
|
key = 'APISecrets'
|
|
if key in self.__data and self.__data[key] and self.__data[key] != '':
|
|
return self.__data[key]
|
|
return None
|
|
|
|
@property
|
|
def dlsiteCookies(self) -> Dict[str, str]:
|
|
if self.__data is None:
|
|
return None
|
|
key = 'dlsiteCookies'
|
|
if key in self.__data and self.__data[key] and self.__data[key] != '':
|
|
return loadsWithHeaderSep(self.__data[key])
|
|
return None
|
|
|
|
@property
|
|
def overrideOld(self) -> bool:
|
|
defalut = False
|
|
if self.__data is None:
|
|
return defalut
|
|
key = 'overrideOld'
|
|
if key in self.__data and isinstance(self.__data[key], bool):
|
|
return self.__data[key]
|
|
return defalut
|
|
|
|
|
|
class Main:
|
|
def __init__(self):
|
|
self._s = Settings()
|
|
self._s.readSettings()
|
|
self._ses = Session()
|
|
self._dlses = Session()
|
|
|
|
def add(self, id: str, cookies: Dict[str, str]):
|
|
try:
|
|
r = self.request('add', id=id, cookies=cookies,
|
|
headers={"referrer": "https://www.dlsite.com/"})
|
|
if r.status_code == 200:
|
|
j = r.json()
|
|
if j['code'] != 0:
|
|
print(f"{j['code']} {j['msg']}")
|
|
return None
|
|
return j['result']
|
|
else:
|
|
print(f'{r.status_code} {r.reason}')
|
|
return None
|
|
except Exception:
|
|
print_exc()
|
|
return None
|
|
|
|
@property
|
|
def APIEntry(self) -> str:
|
|
return self._s.APIEntry
|
|
|
|
@property
|
|
def APISecrets(self) -> str:
|
|
return self._s.APISecrets
|
|
|
|
@property
|
|
def dlsiteCookies(self) -> Dict[str, str]:
|
|
return self._s.dlsiteCookies
|
|
|
|
def exists(self, id: str) -> bool:
|
|
try:
|
|
r = self.request('exists', id=id)
|
|
if r.status_code == 200:
|
|
j = r.json()
|
|
if j['code'] != 0:
|
|
print(f"{j['code']} {j['msg']}")
|
|
return None
|
|
return j['result']
|
|
else:
|
|
print(f'{r.status_code} {r.reason}')
|
|
return None
|
|
except Exception:
|
|
print_exc()
|
|
return None
|
|
|
|
def gen(self, id: str, target: str) -> str:
|
|
try:
|
|
r = self.request('gen', id=id, target=target,
|
|
e=round(time() + 3600))
|
|
if r.status_code == 200:
|
|
j = r.json()
|
|
if j['code'] != 0:
|
|
print(f"{j['code']} {j['msg']}")
|
|
return None
|
|
return j['result']
|
|
else:
|
|
print(f'{r.status_code} {r.reason}')
|
|
return None
|
|
except Exception:
|
|
print_exc()
|
|
return None
|
|
|
|
def getCookies(self) -> Dict[str, str]:
|
|
d = {}
|
|
for i in self._dlses.cookies.iteritems():
|
|
d[i[0]] = i[1]
|
|
return d
|
|
|
|
def getName(self, link: str) -> str:
|
|
if link is None:
|
|
return None
|
|
rs = search(r'product_id/([^./]+)', link, IGNORECASE)
|
|
if rs is None:
|
|
return None
|
|
pn = rs.groups()[0]
|
|
rs = search(r'number/(\d+)', link, IGNORECASE)
|
|
if rs is not None:
|
|
pn += '.part' + rs.groups()[0]
|
|
return pn
|
|
|
|
def getopt(self, argv: List[str]):
|
|
ll = getopt(argv, '', [])
|
|
self._link = None
|
|
if len(ll[1]) > 0:
|
|
self._link = ll[1][0]
|
|
|
|
def getProxyLink(self, link):
|
|
r = self._dlses.head(link)
|
|
if r.status_code not in [301, 302, 307]:
|
|
print(f'{r.status_code} {r.reason}')
|
|
return None
|
|
url = r.headers['location']
|
|
pn = self.getName(link)
|
|
if not self.overrideOld:
|
|
tpn = pn
|
|
tpni = 0
|
|
ext = self.exists(tpn)
|
|
if ext is None:
|
|
return None
|
|
while ext is True:
|
|
tpni += 1
|
|
tpn = f'{pn}_{tpni}'
|
|
ext = self.exists(tpn)
|
|
if ext is None:
|
|
return None
|
|
pn = tpn
|
|
print(pn)
|
|
ck = self.getCookies()
|
|
if ck is None:
|
|
return None
|
|
r = self.add(pn, ck)
|
|
if not r:
|
|
return None
|
|
r = self.gen(pn, url)
|
|
if r:
|
|
return r
|
|
return None
|
|
|
|
@property
|
|
def link(self) -> str:
|
|
if hasattr(self, '_link'):
|
|
return self._link
|
|
else:
|
|
return None
|
|
|
|
def isDownloadLink(self) -> bool:
|
|
if self.link is None:
|
|
return False
|
|
return self.link.find('download') > -1
|
|
|
|
def isSerialLink(self) -> bool:
|
|
if self.link is None:
|
|
return False
|
|
return self.link.find('serial') > -1
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.getName(self.link)
|
|
|
|
def overrideOld(self) -> bool:
|
|
return self._s.overrideOld
|
|
|
|
def request(self, action: str, stream: bool = False, **kw):
|
|
url = f'{self.APIEntry}/{action}'
|
|
se = {'t': [str(round(time()))], 'a': [action]}
|
|
for i in kw:
|
|
c = kw[i]
|
|
if isinstance(c, (list, dict)):
|
|
se[i] = [dumpjson(c, ensure_ascii=False, separators=jsonsep)]
|
|
elif isinstance(c, str):
|
|
se[i] = [c]
|
|
else:
|
|
se[i] = [str(c)]
|
|
sg = genSign(self.APISecrets, se)
|
|
se['sign'] = [sg]
|
|
for i in se:
|
|
se[i] = se[i][0]
|
|
url += "?" + urlencode(se)
|
|
re = self._ses.get(url, stream=stream)
|
|
return re
|
|
|
|
def run(self, argv: List[str]) -> int:
|
|
if self.APISecrets is None:
|
|
print('APISecrets is needed.')
|
|
return -1
|
|
if self.dlsiteCookies is None:
|
|
print('dlsiteCookies is needed.')
|
|
return -2
|
|
self.getopt(argv)
|
|
self._dlses.cookies.update(self.dlsiteCookies)
|
|
self._dlses.headers.update({'referrer': 'https://www.dlsite.com/'})
|
|
if self.link is None:
|
|
print('gen_dlsite.py [options] <link>')
|
|
return -3
|
|
if self.isDownloadLink:
|
|
t = self.getProxyLink(self.link)
|
|
if t:
|
|
print(t)
|
|
elif self.isSerialLink:
|
|
pass
|
|
else:
|
|
print('Unknown link.')
|
|
return -4
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
try:
|
|
m = Main()
|
|
sys.exit(m.run(sys.argv[1:]))
|
|
except Exception:
|
|
print_exc()
|
|
sys.exit(-1)
|