mirror of
https://github.com/lifegpc/pythonscript.git
synced 2026-06-05 11:08:49 +08:00
update tg_user_bot
This commit is contained in:
122
tg_user_bot.py
122
tg_user_bot.py
@@ -1,5 +1,6 @@
|
||||
from argparse import ArgumentParser
|
||||
import asyncio
|
||||
from html import escape
|
||||
from json import load
|
||||
from re import I, compile
|
||||
from traceback import format_exc
|
||||
@@ -13,6 +14,7 @@ from tdlib import (
|
||||
TdLib,
|
||||
TextParseMode,
|
||||
)
|
||||
from time import time
|
||||
from util import commandLineToArgv, timeToStr, tparse
|
||||
import requests
|
||||
|
||||
@@ -65,6 +67,21 @@ damm.add_argument('-e', '--end-time', type=tparse, metavar='TIME', help="The mes
|
||||
nbnhhsh = MyArgParser('-nbnhhsh', description='「能不能好好说话?」 拼音首字母缩写翻译工具', add_help=False) # noqa: E501
|
||||
nbnhhsh.add_argument('缩写', help='缩写形式')
|
||||
nbnhhsh.add_argument('-t', '--timeout', help='设置超时时长,单位为秒(默认为 5 秒)', default=5, type=int, metavar='时长', dest='timeout') # noqa: E501
|
||||
sm = MyArgParser('-searchmessage', description='Search message in a chat.', add_help=False) # noqa: E501
|
||||
sm.add_argument('keyword', help='Sepcify the keyword.')
|
||||
sm.add_argument('-c', '--chat_id', help='Specify the chat to search.', type=int, metavar='ID', dest='chat_id') # noqa: E501
|
||||
sm.add_argument('-m', '--max_searched', help='Specify the maximum count of messages to searched. -1 if unlimited. (Default: 1000)', type=int, default=1000, metavar='COUNT', dest='max_searched') # noqa: E501
|
||||
om = MyArgParser('-optimizestorage', description='Optimizes storage usage for tdlib (user bot).', add_help=False) # noqa: E501
|
||||
om.add_argument('-h', '--help', help='Print this message.', action='store_true', dest='help') # noqa: E501
|
||||
om.add_argument('-s', '--size', help='Limit on the total size of files after deletion, in bytes.', type=int, metavar='BYTES', dest='size') # noqa: E501
|
||||
om.add_argument('-t', '--ttl', help='Limit on the time that has passed since the last time a file was accessed (or creation time for some filesystems).', type=int, metavar='TIME', dest='ttl') # noqa: E501
|
||||
om.add_argument('-c', '--count', help='Limit on the total count of files after deletion.', type=int, metavar='COUNT', dest='count') # noqa: E501
|
||||
om.add_argument('-i', '--immunity_delay', help="The amount of time after the creation of a file during which it can't be deleted, in seconds.", type=int, metavar='TIME', dest='immunity_delay') # noqa: E501
|
||||
om.add_argument('-C', '--chat_ids', help='If non-empty, only files from the given chats are considered. Use 0 as chat identifier to delete files not belonging to any chat (e.g., profile photos).', action='append', type=int, metavar='ID', dest='chat_ids') # noqa: E501
|
||||
om.add_argument('-e', '--exclude_chat_ids', help='If non-empty, files from the given chats are excluded. Use 0 as chat identifier to exclude all files not belonging to any chat (e.g., profile photos).', action='append', type=int, metavar='ID', dest='exclude_chat_ids') # noqa: E501
|
||||
om.add_argument('-r', '--return_deleted_file_statistics', help='Specifiy it if statistics about the files that were deleted must be returned instead of the whole storage usage statistics. Affects only returned statistics.', action='store_true', dest='return_deleted_file_statistics') # noqa: E501
|
||||
om.add_argument('--chat_limit', help='The maximum number of chats with the largest storage usage for which separate statistics need to be returned.', type=int, metavar='COUNT', dest='chat_limit') # noqa: E501
|
||||
om.add_argument('-R', '--robot', help="Optimize for robot's storage rather than user.", action='store_true', dest='robot') # noqa: E501
|
||||
|
||||
|
||||
def generateFileInfo(f: dict) -> str:
|
||||
@@ -388,6 +405,107 @@ async def handle_nbnhhsh(lib: TdLib, mes: dict, argv: List[str]):
|
||||
print('Can not edit message.')
|
||||
|
||||
|
||||
async def handle_search_message(lib: TdLib, mes: dict, argv: List[str]):
|
||||
if len(argv) == 1:
|
||||
re = await lib.editMessageText(
|
||||
mes['chat_id'], mes['id'], f"```\n{sm.format_help()}\n```",
|
||||
TextParseMode.MarkDown)
|
||||
else:
|
||||
try:
|
||||
opt = sm.parse_intermixed_args(argv[1:])
|
||||
chat_id = opt.chat_id if opt.chat_id else mes['chat_id']
|
||||
await lib.editMessageText(mes['chat_id'], mes['id'], f'Started to search `{opt.keyword}`.', TextParseMode.MarkDown) # noqa: E501
|
||||
mess: List[int] = []
|
||||
tmes = 0
|
||||
st: int = None
|
||||
et: int = None
|
||||
fmi: int = None
|
||||
n = time()
|
||||
while True:
|
||||
mesl = await lib.searchChatMessages(chat_id, from_message_id=fmi, limit=100) # noqa:
|
||||
if mesl is None or len(mesl['messages']) == 0:
|
||||
break
|
||||
fmi = mesl['messages'][-1]['id']
|
||||
for nmes in mesl['messages']:
|
||||
if nmes['content']['@type'] != 'messageText':
|
||||
continue
|
||||
if nmes['id'] == mes['id']:
|
||||
continue
|
||||
tmes += 1
|
||||
if st is None or st > nmes['date']:
|
||||
st = nmes['date']
|
||||
if et is None or et < nmes['date']:
|
||||
et = nmes['date']
|
||||
text: str = nmes['content']['text']['text']
|
||||
if text.find(opt.keyword) > -1:
|
||||
mess.append(nmes['id'])
|
||||
if (opt.max_searched > -1 and tmes >= opt.max_searched) or len(mess) >= 20: # noqa: E501
|
||||
break
|
||||
if n + 10 < time():
|
||||
await lib.editMessageText(mes['chat_id'], mes['id'], f'Search `{opt.keyword}`...\nAlready searched `{tmes}` messages, founded `{len(mess)}` result.', TextParseMode.MarkDown) # noqa: E501
|
||||
n = time()
|
||||
tmp = '' if tmes == 0 else f" (from <code>{timeToStr(st)}</code> to <code>{timeToStr(et)}</code>)" # noqa: E501
|
||||
r = f"Searched <code>{escape(opt.keyword)}</code> in <code>{tmes}</code> messages{tmp}, founded <code>{len(mess)}</code> results:" # noqa: E501
|
||||
for i in mess:
|
||||
link = await lib.getMessageLink(chat_id, i)
|
||||
if link is None:
|
||||
r += "\nFailed to get message link."
|
||||
else:
|
||||
r += f"\n{escape(link['link'])}"
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], r, TextParseMode.HTML) # noqa: E501
|
||||
except ValueError as e:
|
||||
if len(e.args) == 0:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], "Unknown error.") # noqa: E501
|
||||
else:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{sm.format_usage()}{sm.prog}: error: {e.args[0] if len(e.args) == 1 else e.args}\n```", TextParseMode.MarkDown) # noqa: E501
|
||||
except Exception:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{format_exc()}\n```", TextParseMode.MarkDown) # noqa: E501
|
||||
if re is None:
|
||||
print('Can not edit message.')
|
||||
|
||||
|
||||
async def handle_optimize_storage(lib: TdLib, robot: TdLib, mes: dict,
|
||||
argv: List[str]):
|
||||
try:
|
||||
opt = om.parse_intermixed_args(argv[1:])
|
||||
if opt.help:
|
||||
re = await lib.editMessageText(
|
||||
mes['chat_id'], mes['id'], f"```\n{om.format_help()}\n```",
|
||||
TextParseMode.MarkDown)
|
||||
else:
|
||||
if opt.robot and not robot._logined:
|
||||
raise ValueError('Robot is not logined.')
|
||||
d = {'return_deleted_file_statistics': opt.return_deleted_file_statistics} # noqa: E501
|
||||
if opt.size:
|
||||
d['size'] = opt.size
|
||||
if opt.ttl:
|
||||
d['ttl'] = opt.ttl
|
||||
if opt.count:
|
||||
d['count'] = opt.count
|
||||
if opt.immunity_delay:
|
||||
d['immunity_delay'] = opt.immunity_delay
|
||||
if opt.chat_ids:
|
||||
d['chat_ids'] = opt.chat_ids
|
||||
if opt.exclude_chat_ids:
|
||||
d['exclude_chat_ids'] = opt.exclude_chat_ids
|
||||
if opt.chat_limit:
|
||||
d['chat_limit'] = opt.chat_limit
|
||||
r = await robot.optimizeStorage(**d) if opt.robot else await lib.optimizeStorage(**d) # noqa: E501
|
||||
if r is None:
|
||||
raise ValueError('Failed to optimize the storage.')
|
||||
text = f"Optimized successfully.\nTotal file size: <code>{r['size']}B</code>\nFile count: <code>{r['count']}</code>" # noqa: E501
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], text, TextParseMode.HTML) # noqa: E501
|
||||
except ValueError as e:
|
||||
if len(e.args) == 0:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], "Unknown error.") # noqa: E501
|
||||
else:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{om.format_usage()}{om.prog}: error: {e.args[0] if len(e.args) == 1 else e.args}\n```", TextParseMode.MarkDown) # noqa: E501
|
||||
except Exception:
|
||||
re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{format_exc()}\n```", TextParseMode.MarkDown) # noqa: E501
|
||||
if re is None:
|
||||
print('Can not edit message.')
|
||||
|
||||
|
||||
async def main(lib: TdLib, robot: TdLib):
|
||||
with open('tdlib.json', 'r', encoding='UTF-8') as f:
|
||||
se = load(f)
|
||||
@@ -440,6 +558,10 @@ async def main(lib: TdLib, robot: TdLib):
|
||||
asyncio.create_task(re)
|
||||
elif argv[0] == '-nbnhhsh':
|
||||
asyncio.create_task(handle_nbnhhsh(lib, mes, argv))
|
||||
elif argv[0] in ['-searchmessage', '-sm']:
|
||||
asyncio.create_task(handle_search_message(lib, mes, argv))
|
||||
elif argv[0] in ['-optimizestorage', '-om']:
|
||||
asyncio.create_task(handle_optimize_storage(lib, robot, mes, argv)) # noqa: E501
|
||||
|
||||
|
||||
with TdLib() as lib:
|
||||
|
||||
Reference in New Issue
Block a user