mirror of
https://github.com/lifegpc/pythonscript.git
synced 2026-06-06 11:28:58 +08:00
Update pdtest
This commit is contained in:
34
pdtest.py
34
pdtest.py
@@ -4,6 +4,8 @@ from typing import Union
|
|||||||
from cryptography.hazmat.primitives import serialization as ser
|
from cryptography.hazmat.primitives import serialization as ser
|
||||||
from cryptography.hazmat.primitives.asymmetric import padding
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
import requests as r
|
import requests as r
|
||||||
|
from urllib.parse import urlparse, urlencode
|
||||||
|
from posixpath import basename
|
||||||
|
|
||||||
HOST = "http://localhost:8080"
|
HOST = "http://localhost:8080"
|
||||||
HEADERS = {}
|
HEADERS = {}
|
||||||
@@ -36,6 +38,17 @@ def post(path: str, token: Union[bytes, str] = None, token_id: int = None,
|
|||||||
return r.post(f"{HOST}/{path}", headers=h, data=data)
|
return r.post(f"{HOST}/{path}", headers=h, data=data)
|
||||||
|
|
||||||
|
|
||||||
|
def gen_sign(data, token: bytes):
|
||||||
|
keys = sorted([i for i in data.keys()])
|
||||||
|
u = sha512()
|
||||||
|
u.update(token)
|
||||||
|
for key in keys:
|
||||||
|
u.update(key.encode() if isinstance(key, str) else key)
|
||||||
|
v = data[key]
|
||||||
|
u.update(v.encode() if isinstance(v, str) else v)
|
||||||
|
return u.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def get(path: str, token: Union[bytes, str] = None, token_id: int = None,
|
def get(path: str, token: Union[bytes, str] = None, token_id: int = None,
|
||||||
data=None):
|
data=None):
|
||||||
if data is None:
|
if data is None:
|
||||||
@@ -44,14 +57,7 @@ def get(path: str, token: Union[bytes, str] = None, token_id: int = None,
|
|||||||
return r.get(f"{HOST}/{path}", data=data, headers=HEADERS)
|
return r.get(f"{HOST}/{path}", data=data, headers=HEADERS)
|
||||||
if isinstance(token, str):
|
if isinstance(token, str):
|
||||||
token = b64decode(token)
|
token = b64decode(token)
|
||||||
keys = sorted([i for i in data.keys()])
|
h = {'X-TOKEN-ID': str(token_id), 'X-SIGN': gen_sign(data, token)}
|
||||||
u = sha512()
|
|
||||||
u.update(token)
|
|
||||||
for key in keys:
|
|
||||||
u.update(key.encode() if isinstance(key, str) else key)
|
|
||||||
v = data[key]
|
|
||||||
u.update(v.encode() if isinstance(v, str) else v)
|
|
||||||
h = {'X-TOKEN-ID': str(token_id), 'X-SIGN': u.hexdigest()}
|
|
||||||
h.update(HEADERS)
|
h.update(HEADERS)
|
||||||
return r.get(f"{HOST}/{path}", headers=h, data=data)
|
return r.get(f"{HOST}/{path}", headers=h, data=data)
|
||||||
|
|
||||||
@@ -73,3 +79,15 @@ def add_user(username: str, password: str, name: str,
|
|||||||
token_id = None
|
token_id = None
|
||||||
pas = key.encrypt(password.encode(), padding.PKCS1v15())
|
pas = key.encrypt(password.encode(), padding.PKCS1v15())
|
||||||
return check_err(post("/auth/user/add", token, token_id, {"username": username, "name": name, "password": b64encode(pas)}).json()) # noqa: E501
|
return check_err(post("/auth/user/add", token, token_id, {"username": username, "name": name, "password": b64encode(pas)}).json()) # noqa: E501
|
||||||
|
|
||||||
|
|
||||||
|
def get_proxy_pixiv_url(url: str, secrets: str):
|
||||||
|
u = urlparse(url)
|
||||||
|
if u.scheme not in ['http', 'https']:
|
||||||
|
raise ValueError("Invalid url.")
|
||||||
|
if not u.netloc.endswith(".pximg.net"):
|
||||||
|
raise ValueError("Server only accept pximg.net URL.")
|
||||||
|
data = {'url': url}
|
||||||
|
data['sign'] = gen_sign(data, secrets.encode())
|
||||||
|
name = basename(u.path)
|
||||||
|
return f"{HOST}/proxy/pixiv/{name}?{urlencode(data)}"
|
||||||
|
|||||||
Reference in New Issue
Block a user