add tg_delete_all_my_msgs

This commit is contained in:
2021-12-10 22:42:20 +08:00
parent 3ac24547b9
commit 53df51c5d5
7 changed files with 515 additions and 6 deletions

1
.gitignore vendored
View File

@@ -132,3 +132,4 @@ dmypy.json
*.txt
*.xls
*.json
*.pyi

150
convert_lrc.py Normal file
View File

@@ -0,0 +1,150 @@
from getopt import gnu_getopt as getopt, GetoptError
from os.path import basename, dirname, join, splitext
from re import compile, search
from sys import argv, exit
from typing import List, Optional
try:
from _rssbotlib import AVDict, version, VideoInfo
have_rssbotlib = True
except ImportError:
have_rssbotlib = False
RSSBOTLIB_NOTFOUND = '''rssbotlib not found.
The source code is available at https://github.com/lifegpc/ffmpeg-study/tree/master/rssbotlib''' # noqa: E501
DUR_REG = compile(r'^(?P<sign>[\+-])?(((?P<h>\d+):)?((?P<min>\d+):))?(?P<sec>\d+)(\.(?P<ms>\d+))?$') # noqa: E501
def prase_duration(s: str) -> float:
r = search(DUR_REG, s)
if r is None:
raise ValueError(f'Can not parse duration "{s}"')
rd = r.groupdict()
t = int(rd['sec'])
if rd['ms']:
t += int(rd['ms']) / (10 ** len(rd['ms']))
if rd['min']:
t += int(rd['min']) * 60
if rd['h']:
t += int(rd['h']) * 3600
if rd['sign'] == '-':
t = -t
return t
def generate_good_filename(meta: AVDict) -> Optional[str]:
m = meta.to_dict()
if 'title' in m and 'artist' in m:
return f"{m['artist']} - {m['title']}.lrc"
elif 'title' in m:
return f"{m['title']}.lrc"
class Cml:
def __init__(self, arg: List[str]) -> None:
self.output = None
self.file = None
self.verbose = False
self.duration = None
self.dir = None
if len(arg) == 0:
self.print_help()
exit(0)
try:
r = getopt(arg, '-hVvo:f:t:d:',
['help', 'version', 'verbose', 'output=', 'file=',
'duration=', 'dir='])
for i in r[0]:
if i[0] == '-h' or i[0] == '--help':
self.print_help()
exit(0)
elif i[0] == '-V' or i[0] == '--version':
self.print_version()
exit(0)
elif i[0] == '-v' or i[0] == '--verbose':
self.verbose = True
elif i[0] == '-o' or i[0] == '--output':
self.output = i[1]
elif i[0] == '-f' or i[0] == '--file':
self.file = i[1]
elif i[0] == '-t' or i[0] == '--duration':
self.duration = prase_duration(i[1])
elif i[0] == '-d' or i[0] == '--dir':
self.dir = i[1]
if len(r[1]) == 0:
raise GetoptError('Input lyric file is needed.')
if len(r[1]) > 1:
raise GetoptError('Too much input lyric file.')
self.input = r[1][0]
except GetoptError as e:
print(e.msg)
exit(1)
def print_help(self):
print('''convert_lrc.py [options] <lyric file>
Convert translated lryics.
Options:
-h, --help Print this help message.
-V, --version Print version.
-v, --verbose Enable verbose logging.
-o, --output <path> Specify output path.
-f, --file <path> Specify music file, will read duration and other
information from file. (rssbotlib is needed.)
-t, --duration <time> Specify the duration of music.
-d, --dir <path> Specify the output directory.''')
def print_version(self):
print('convert_lrc.py v1.0.0.0')
if have_rssbotlib:
print(f"rssbotlib v{'.'.join(str(s) for s in version())}")
else:
print(RSSBOTLIB_NOTFOUND)
def main() -> int:
cml = Cml(argv[1:])
dur = None
if cml.duration is not None:
if cml.duration <= 0:
print('Warning: the duration is 0 or less than 0. Ignored.')
dur = cml.duration
if cml.file is not None:
if not have_rssbotlib:
raise NotImplementedError(RSSBOTLIB_NOTFOUND)
video_info = VideoInfo()
if not video_info.parse(cml.file):
raise Exception(f'Can not parse music file: {cml.file}')
dur = video_info.duration
if cml.verbose:
print(f'Get duration from music file: {dur}')
metadata = video_info.meta
if cml.verbose:
print(f'Duration: {dur}')
output = None
if cml.output:
output = cml.output
elif cml.file and have_rssbotlib and metadata is not None:
output = generate_good_filename(metadata)
if output is not None:
output = join(dirname(cml.input), output)
if cml.verbose:
print(f'Get file name from metadata: {output}')
if output is None:
output = splitext(cml.input)[0] + '.lrc'
if cml.verbose:
print(f'Get file name from input file: {output}')
if cml.dir is not None:
output = join(cml.dir, basename(output))
print(f'Replace output directory: {output}')
if __name__ == "__main__":
try:
exit(main())
except SystemExit:
pass
except: # noqa: E722
from traceback import print_exc
print_exc()
exit(1)

View File

@@ -132,7 +132,7 @@ class Main:
def add(self, id: str, cookies: Dict[str, str]):
try:
r = self.request('add', id=id, cookies=cookies,
r = self.request('post', 'add', id=id, cookies=cookies,
headers={"referrer": "https://www.dlsite.com/"})
if r.status_code == 200:
j = r.json()
@@ -161,7 +161,7 @@ class Main:
def exists(self, id: str) -> bool:
try:
r = self.request('exists', id=id)
r = self.request('post', 'exists', id=id)
if r.status_code == 200:
j = r.json()
if j['code'] != 0:
@@ -177,7 +177,7 @@ class Main:
def gen(self, id: str, target: str) -> str:
try:
r = self.request('gen', id=id, target=target,
r = self.request('post', 'gen', id=id, target=target,
e=round(time() + 3600))
if r.status_code == 200:
j = r.json()
@@ -272,7 +272,7 @@ class Main:
def overrideOld(self) -> bool:
return self._s.overrideOld
def request(self, action: str, stream: bool = False, **kw):
def request(self, method: str, action: str, stream: bool = False, **kw):
url = f'{self.APIEntry}/{action}'
se = {'t': [str(round(time()))], 'a': [action]}
for i in kw:
@@ -287,8 +287,7 @@ class Main:
se['sign'] = [sg]
for i in se:
se[i] = se[i][0]
url += "?" + urlencode(se)
re = self._ses.get(url, stream=stream)
re = self._ses.request(method, url, data=se, stream=stream)
return re
def run(self, argv: List[str]) -> int:

83
libass_test.py Normal file
View File

@@ -0,0 +1,83 @@
from ctypes import (
CDLL,
c_void_p,
c_char_p,
c_int
)
libass = CDLL('ass.dll')
libass.ass_library_init.restype = c_void_p
libass.ass_library_done.argtypes = [c_void_p]
libass.ass_set_fonts_dir.argtypes = [c_void_p, c_char_p]
libass.ass_set_fonts.argtypes = [c_void_p, c_char_p, c_char_p, c_int, c_char_p,
c_int]
libass.ass_renderer_init.restype = c_void_p
libass.ass_renderer_init.argtypes = [c_void_p]
libass.ass_renderer_done.argtypes = [c_void_p]
def ass_library_init() -> int:
return libass.ass_library_init()
def ass_library_done(priv: int):
libass.ass_library_done(c_void_p(priv))
def ass_set_fonts_dir(priv: int, fonts_dir: str, encoding: str = 'UTF-8'):
libass.ass_set_fonts_dir(c_void_p(priv),
fonts_dir.encode(encoding) if fonts_dir else None)
def ass_renderer_init(priv: int) -> int:
return libass.ass_renderer_init(c_void_p(priv))
def ass_renderer_done(priv: int):
libass.ass_renderer_done(c_void_p(priv))
def ass_set_fonts(priv: int, default_font: str, default_family: str, dfp: int,
config: str, update: int, encoding: str = 'UTF-8'):
libass.ass_set_fonts(c_void_p(priv),
default_font.encode(encoding) if default_font else None, # noqa: E501
default_family.encode(encoding) if default_family else None, # noqa: E501
dfp, config.encode(encoding) if config else None,
update)
if __name__ == "__main__":
priv = ass_library_init()
print(f'Created ASS_LIBARAY Handle: {priv}')
font_dir = "E:\\fonts-test-来点中文\\"
ass_set_fonts_dir(priv, font_dir)
print(f'Set fonts dir to "{font_dir}" (UTF-8)')
da_font = font_dir + "\\文道奶酪体.ttf"
fconfig = "D:\\programs\\fonts"
print('Call ass_renderer_init')
red = ass_renderer_init(priv)
print(f'Call ass_set_fonts with {da_font}, NULL, 1, {fconfig}, 1)')
ass_set_fonts(red, da_font, None, 1, fconfig, 1)
print('Free ass_renderer')
ass_renderer_done(red)
ass_set_fonts_dir(priv, font_dir, 'GB2312')
print(f'Set fonts dir to "{font_dir}" (GB2312(ANSI))')
print('Call ass_renderer_init')
red = ass_renderer_init(priv)
print(f'Call ass_set_fonts with {da_font}, NULL, 1, {fconfig}, 1)')
ass_set_fonts(red, da_font, None, 1, fconfig, 1, 'GB2312')
print('Free ass_renderer')
ass_renderer_done(red)
print('Test chars don\'t exist in GB2312.')
font_dir = 'E:\\微软必须死个🐴'
ass_set_fonts_dir(priv, font_dir)
print(f'Set fonts dir to "{font_dir}" (UTF-8)')
print('Call ass_renderer_init')
red = ass_renderer_init(priv)
print(f'Call ass_set_fonts with {da_font}, NULL, 1, {fconfig}, 1)')
ass_set_fonts(red, da_font, None, 1, fconfig, 1)
print('Free ass_renderer')
ass_renderer_done(red)
ass_library_done(priv)
print('Free Handle.')

152
pack-prog.py Normal file
View File

@@ -0,0 +1,152 @@
# pack-prog.py
# (C) 2021 lifegpc
# The repo location: https://github.com/lifegpc/pythonscript
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from os import system, devnull, environ, remove
from getopt import getopt
from typing import List
from sys import argv, exit
from subprocess import Popen, PIPE
from re import search, IGNORECASE
from os.path import splitext, exists, abspath
from tempfile import NamedTemporaryFile
def add_path_ext(path: str) -> str:
p, n = splitext(path)
if n != '':
return path
else:
pext = environ['PATHEXT']
pextl = pext.split(';')
for ext in pextl:
if ext == '':
continue
if exists(p + ext):
return p + ext
return path
def check_needed_prog():
if system(f'ldd --help > {devnull}'):
return False
if system(f'7z --help > {devnull}'):
return False
return True
def check_prog(prog: str) -> List[str]:
r = Popen(f'ldd {prog}', stdout=PIPE, stderr=PIPE)
out: bytes = r.communicate()[0]
r.wait()
out += r.communicate()[0]
if not r.returncode:
sl = out.splitlines(False)
rl = []
for r in sl:
r = r.decode()
rs = search(r'=?-?> (.+) ?(\(0x[0-9a-f]+\))?$', r, IGNORECASE)
if rs is not None:
rl.append(abspath(rs.groups()[0]))
else:
raise ValueError(f'Can not find path for {r}.')
return rl
return None
def getUnixPath(path: str) -> str:
rs = search(r'^[A-Z]:', path, IGNORECASE)
if rs is None:
return path.replace('\\', '/')
return '/' + path[0].lower() + path[2:].replace('\\', '/')
def getWindowsPath(path: str) -> str:
rs = search(r'^[\\/][A-Z][\\/]', path, IGNORECASE)
if rs is None:
return path.replace('/', '\\')
return path[1].upper() + ":" + path[2:].replace('/', '\\')
def print_help():
print('''pack-prog.py [-o <output_name>] [programs]''')
class Prog:
def __init__(self):
self._loc = []
def add_dep(self, path: str):
path_w = getWindowsPath(path)
if path_w.upper().startswith('C:\\WINDOWS'):
return
if path_w not in self._loc:
print(f'add dependence: "{path_w}"')
self._loc.append(path_w)
def add_prog(self, path: str):
pro = add_path_ext(path)
pro_w = getWindowsPath(pro)
if pro_w not in self._loc:
print(f'add program: "{pro_w}"')
self._loc.append(pro_w)
def to_7z(self, output: str):
p = NamedTemporaryFile(delete=False)
for i in self._loc:
p.write((i + '\n').encode('UTF8'))
fp = p.name
p.close()
try:
system(f'7z a -mmt1 -mx9 -y {output} @{fp}')
except Exception:
remove(fp)
from traceback import print_exc
print_exc()
remove(fp)
def main(prog: List[str], output: str = None):
if output is None:
output = 'prog.7z'
if not check_needed_prog():
print('ldd and 7z is needed.')
p = Prog()
for pro in prog:
pro = abspath(pro)
pro = add_path_ext(pro)
# pro_u = getUnixPath(pro)
rel = check_prog(pro)
if rel is None:
print(f'Can not get dependencies for {pro},')
exit(-1)
p.add_prog(pro)
for i in rel:
p.add_dep(i)
p.to_7z(output)
if __name__ == "__main__":
if len(argv) > 1:
d = getopt(argv[1:], 'o:')
output = None
if len(d[0]):
for i in d[0]:
if i[0] == '-o':
output = i[1]
break
main(d[1], output)
else:
print_help()

59
test_chardet.py Normal file
View File

@@ -0,0 +1,59 @@
from ctypes import (
CDLL,
c_void_p,
POINTER,
byref,
Structure,
c_char_p,
c_float,
c_short,
c_size_t
)
class DetectObj(Structure):
_fields_ = [("encoding", c_char_p), ("confidence", c_float),
("bom", c_short)]
chardetlib = CDLL('libchardet.dll')
chardetlib.detect_init.restype = c_void_p
chardetlib.detect_destroy.argtypes = [POINTER(c_void_p)]
chardetlib.detect_obj_init.restype = POINTER(DetectObj)
chardetlib.detect_obj_free.argtypes = [POINTER(POINTER(DetectObj))]
chardetlib.detect_r.argtypes = [c_char_p, c_size_t,
POINTER(POINTER(DetectObj))]
chardetlib.detect_r.restype = c_short
def detect_init() -> int:
return chardetlib.detect_init()
def detect_destroy(Detect: int):
chardetlib.detect_destroy(byref(c_void_p(Detect)))
def detect_obj_init() -> POINTER(DetectObj):
return chardetlib.detect_obj_init()
def detect_obj_free(detectObj: POINTER(DetectObj)):
chardetlib.detect_obj_free(byref(detectObj))
def detect_r(buf: bytes, detectObj: POINTER(DetectObj)) -> int:
return chardetlib.detect_r(buf, len(buf), byref(detectObj))
if __name__ == "__main__":
d = detect_init()
obj = detect_obj_init()
b = detect_r("你妈死了,这识别不行。WTF,这就是GBK啊。啊啊啊啊啊,为啥不返回GBK啊".encode('UTF-8'), obj)
print(f'detect_r returned {b}')
if b == 0:
print(obj.contents.encoding)
print(obj.contents.confidence)
detect_obj_free(obj)
detect_destroy(d)
print('All done.')

65
tg_delete_all_my_msgs.py Normal file
View File

@@ -0,0 +1,65 @@
from argparse import ArgumentParser
import asyncio
from json import load
import sys
from traceback import print_exc
try:
from dateutil.parser import parse
have_dateutil = True
except ImportError:
print('Warning: python-dateutil not found. -s and -e only accept integer now.') # noqa: E501
have_dateutil = False
from tdlib import TdLib
def tparse(s: str) -> int:
try:
return int(s)
except Exception:
if have_dateutil:
return round(parse(s).timestamp())
else:
raise ValueError()
async def get_chat_id_from_name(lib: TdLib, name: str) -> int:
re = await lib.searchChatsOnServer(name)
if re is None:
raise ValueError('Can not found chat.')
le = len(re['chat_ids'])
if le == 0:
raise ValueError('No chat found.')
elif le == 1:
return re['chat_ids'][0]
else:
raise NotImplementedError('Multiply chat is returned.')
async def main(lib: TdLib, arg):
with open(arg.config, 'r', encoding='UTF-8') as f:
se = load(f)
if not await lib.login(se['TdlibParameters'], se['encryption_key'],
se.get("proxy"), se.get("phone_number")):
raise ValueError('Can not login')
chat_id = arg.chat_id if arg.chat_id is not None else await get_chat_id_from_name(lib, arg.chat_name) # noqa: E501
re = await lib.deleteAllMyMessageInChat(chat_id, arg.start_time, arg.end_time) # noqa: E501
print(re)
return 0 if re else -1
p = ArgumentParser(description='Delete all my messages in a chat.', add_help=True) # noqa: E501
p.add_argument('-c', '--config', default='tdlib.json', metavar='CONFIG', dest='config', help='Specify the location of config file. Default: tdlib.json') # noqa: E501
p.add_argument('chat_id', nargs='?', type=int, help="Specify the chat's ID.")
p.add_argument('-n', '--chat-name', help="Specify chat's name. Will used if chat_id is not sepcified.", metavar='NAME', dest='chat_name') # noqa: E501
p.add_argument('-s', '--start-time', type=tparse, metavar='TIME', help="The messages which are sended before this time will not be deleted.", dest='start_time') # noqa: E501
p.add_argument('-e', '--end-time', type=tparse, metavar='TIME', help="The messages which are sended after this time will not be deleted.", dest="end_time") # noqa: E501
arg = p.parse_intermixed_args()
if arg.chat_id is None and arg.chat_name is None:
raise ValueError('chat_id or chat_name is needed.')
try:
with TdLib() as lib:
re = asyncio.run(main(lib, arg))
sys.exit(re)
except Exception:
print_exc()
sys.exit(-1)