From e1693f3c631b4499864b3c9e1d0840895927857a Mon Sep 17 00:00:00 2001 From: lifegpc Date: Tue, 14 Dec 2021 20:30:10 +0800 Subject: [PATCH] add tg_user_bot --- tdlib.py | 127 ++++++++++++++++++++++++++- tg_user_bot.py | 231 +++++++++++++++++++++++++++++++++++++++++++++++++ util.py | 36 ++++++++ 3 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 tg_user_bot.py create mode 100644 util.py diff --git a/tdlib.py b/tdlib.py index 94ab0f7..2953b38 100644 --- a/tdlib.py +++ b/tdlib.py @@ -159,6 +159,7 @@ def parse_chat_type(d): if d['@type'].startswith('chatType'): typ = 'C' + d['@type'][1:] return globals()[typ](d) + raise ValueError(f'Unknown value: {d}') class TextParseMode(Enum): @@ -184,6 +185,107 @@ class TextParseMode(Enum): raise ValueError(f'Unknown value: {value}') +class MessageForwardOrigin: + def __iter__(self): + return self.to_dict().items().__iter__() + + def to_dict(self) -> dict: + pass + + def __repr__(self) -> str: + d = self.to_dict() + if d is None: + m = '' + else: + typ = d['@type'][20:] + d.pop('@type') + m = f"Type={typ} Data={d}" + return f"<{self.__class__.__module__}.{self.__class__.__name__} {m}>" + + def __str__(self): + d = self.to_dict() + if d is None: + return '' + return d['@type'][20:] + + +class MessageForwardOriginChannel(MessageForwardOrigin): + def __init__(self, v): + if isinstance(v, dict): + if v['@type'] == 'messageForwardOriginChannel': + self.chat_id = v['chat_id'] + self.message_id = v['message_id'] + self.author_signature = v['author_signature'] + return + raise ValueError(f'Unknown value: {v}') + + def to_dict(self): + return {"@type": 'messageForwardOriginChannel', + 'chat_id': self.chat_id, 'message_id': self.message_id, + 'author_signature': self.author_signature} + + +class MessageForwardOriginChat(MessageForwardOrigin): + def __init__(self, v): + if isinstance(v, dict): + if v['@type'] == 'messageForwardOriginChat': + self.sender_chat_id = v['sender_chat_id'] + self.author_signature = v['author_signature'] + return + raise ValueError(f'Unknown value: {v}') + + def to_dict(self): + return {"@type": "messageForwardOriginChat", + "sender_chat_id": self.sender_chat_id, + "author_signature": self.author_signature} + + +class MessageForwardOriginHiddenUser(MessageForwardOrigin): + def __init__(self, v): + if isinstance(v, dict): + if v['@type'] == 'messageForwardOriginHiddenUser': + self.sender_name = v['sender_name'] + return + raise ValueError(f'Unknown value: {v}') + + def to_dict(self): + return {"@type": "messageForwardOriginHiddenUser", + "sender_name": self.sender_name} + + +class MessageForwardOriginMessageImport(MessageForwardOrigin): + def __init__(self, v): + if isinstance(v, dict): + if v['@type'] == 'messageForwardOriginMessageImport': + self.sender_name = v['sender_name'] + return + raise ValueError(f'Unknown value: {v}') + + def to_dict(self): + return {"@type": "messageForwardOriginMessageImport", + "sender_name": self.sender_name} + + +class MessageForwardOriginUser(MessageForwardOrigin): + def __init__(self, v): + if isinstance(v, dict): + if v['@type'] == 'messageForwardOriginUser': + self.sender_user_id = v['sender_user_id'] + return + raise ValueError(f'Unknown value: {v}') + + def to_dict(self): + return {"@type": "messageForwardOriginUser", + "sender_user_id": self.sender_user_id} + + +def parse_message_forward_orgin(d): + if d['@type'].startswith('messageForwardOrigin'): + typ = 'M' + d['@type'][1:] + return globals()[typ](d) + raise ValueError(f'Unknown value: {d}') + + def json_object_hook(value): try: if isinstance(value, dict): @@ -192,6 +294,10 @@ def json_object_hook(value): return parse_chat_type(value) elif value['@type'].startswith('textParseMode'): return TextParseMode(value) + elif value['@type'].startswith('messageForwardOrigin'): + return parse_message_forward_orgin(value) + elif value['@type'] == 'message': + value['media_album_id'] = int(value['media_album_id']) except Exception: print_exc() return value @@ -205,10 +311,15 @@ class TdLibJSONDecoder(JSONDecoder): class TdLibJSONEncoder(JSONEncoder): def default(self, o): - if isinstance(o, (ChatType, TextParseMode)): + if isinstance(o, (ChatType, TextParseMode, MessageForwardOrigin)): return dict(o) elif isinstance(o, bytes): return b64encode(o).decode() + elif isinstance(o, dict): + if '@type' in o: + if o['@type'] == 'message': + if 'media_album_id' in o: + o['media_album_id'] = str(o['media_album_id']) return super().default(o) @@ -259,7 +370,7 @@ class TdLib: del self._re del self._rel - async def _send(self, data): + async def _send(self, data) -> dict: extra = random() data['@extra'] = extra en = TdLibJSONEncoder(ensure_ascii=False) @@ -326,6 +437,18 @@ class TdLib: print(f"{re['code']} {re['message']}") return False + async def createNewStickerSet(self, title: str, name: str, stickers, + is_masks: bool = False, source: str = ''): + re = await self._send({"@type": "createNewStickerSet", "title": title, + "name": name, "stickers": stickers, + "is_masks": is_masks, "source": source}) + if re['@type'] == 'stickerSet': + return re + else: + if re['@type'] == 'error': + print(f"{re['code']} {re['message']}") + return None + async def deleteAllMyMessageInChat(self, chat_id: int, start_time: int = None, end_time: int = None): diff --git a/tg_user_bot.py b/tg_user_bot.py new file mode 100644 index 0000000..4188715 --- /dev/null +++ b/tg_user_bot.py @@ -0,0 +1,231 @@ +from argparse import ArgumentParser +import asyncio +from json import load +from re import I, compile +from traceback import format_exc +from typing import List +from tdlib import ( + MessageForwardOriginChannel, + MessageForwardOriginChat, + MessageForwardOriginHiddenUser, + MessageForwardOriginMessageImport, + MessageForwardOriginUser, + TdLib, + TextParseMode, +) +from util import commandLineToArgv, timeToStr + + +USERNAME_REG = compile(r'^[0-9a-z_]+$', I) + + +def validate_sticker_set_name(n: str) -> str: + if len(n) == 0 or len(n) > 64: + raise ValueError('1-64 characters') + r = USERNAME_REG.search(n) + if r is None or n.startswith('_') or n.endswith('_'): + raise ValueError('Can contain only English letters, digits and underscores.') # noqa: E501 + return n + + +def validate_sticker_set_title(n: str) -> str: + if len(n) == 0 or len(n) > 64: + raise ValueError('1-64 characters is needed.') + return n + + +class MyArgParser(ArgumentParser): + def error(self, message: str): + raise ValueError(message) + + def exit(self, status, message): + pass + + +cnss = MyArgParser('-createnewstickerset', add_help=False, description="Create a new sticker set.", epilog="Note: Reply to a sticker message is needed.") # noqa: E501 +cnss.add_argument('name', help="Sticker set name. Can contain only English letters, digits and underscores. 1-64 characters", type=validate_sticker_set_name) # noqa: E501 +cnss.add_argument('title', help="Sticker set title. 1-64 characters", type=validate_sticker_set_title) # noqa: E501 +cnss.add_argument('source', help='Source of the sticker set', nargs='?', default='') # noqa: E501 +cnss.add_argument('-e', '--emoji', help='Emojis corresponding to the sticker', metavar='EMOJI', dest='emoji') # noqa: E501 + + +def generateFileInfo(f: dict) -> str: + m = '' + if '@type' in f and f['@type'] == 'file': + m = f"File ID: `{f['id']}`" + if f['size'] != 0: + m += f"\nFile Size: `{f['size']}B`" + elif f['expected_size'] != 0: + m += f"\nFile Expected Size: `{f['expected_size']}B`" + if 'remote' in f and f['remote']['@type'] == 'remoteFile': + rf = f['remote'] + if rf['id'] != '': + m += f"\nRemote File ID: `{rf['id']}`" + if rf['unique_id'] != '': + m += f"\nRemote Unique File ID: `{rf['unique_id']}`" + return m + + +async def handle_message_info(lib: TdLib, mes): + if mes['reply_to_message_id'] == 0: + re = await lib.editMessageText(mes['chat_id'], mes['id'], + "Reply a message is needed.") + else: + nmes = await lib.getMessage(mes['reply_in_chat_id'], + mes['reply_to_message_id']) + if nmes is None: + re = await lib.editMessageText(mes['chat_id'], mes['id'], + "Can not get the replied message.") + else: + me = f"Message ID: `{nmes['id']}`" + if nmes['sender_id']['@type'] == 'messageSenderChat': + me += f"\nSender Chat ID: `{nmes['sender_id']['chat_id']}`" + elif nmes['sender_id']['@type'] == 'messageSenderUser': + me += f"\nSender User ID: `{nmes['sender_id']['user_id']}`" + me += f"\nChat ID: `{nmes['chat_id']}`" + me += f"\nSend Date: `{timeToStr(nmes['date'])}`" + if nmes['edit_date'] != 0: + me += f"\nLast Edited Date: `{timeToStr(nmes['edit_date'])}`" + if 'forward_info' in nmes: + fi = nmes['forward_info'] + if fi is not None and fi['@type'] == 'messageForwardInfo': + me += "\nForward Infomation:" + o = fi['origin'] + me += f"\nOrigin Sender Type: `{o}`" + if isinstance(o, MessageForwardOriginUser): + me += f"\nOrigin Sender User ID: `{o.sender_user_id}`" + elif isinstance(o, (MessageForwardOriginHiddenUser, MessageForwardOriginMessageImport)): # noqa: E501 + me += f"\nOrigin Sender Name: `{o.sender_name}`" + elif isinstance(o, MessageForwardOriginChat): + me += f"\nOrigin Sender Chat ID: `{o.sender_chat_id}`" + if o.author_signature != "": + me += f"\nAuthor Signature: `{o.author_signature}`" + elif isinstance(o, MessageForwardOriginChannel): + me += f"\nOrigin Sender Chat ID: `{o.chat_id}`" + me += f"\nOrigin Sender Message ID: `{o.message_id}`" + if o.author_signature != "": + me += f"\nAuthor Signature: `{o.author_signature}`" + me += f"\nOrigin Send Date: `{timeToStr(fi['date'])}`" + if fi['public_service_announcement_type'] != '': + me += f"\nPublic Service Announcement Type: `{fi['public_service_announcement_type']}`" # noqa: E501 + if fi['from_chat_id'] != 0: + me += f"\nFrom Chat ID: `{fi['from_chat_id']}`" + if fi['from_message_id'] != 0: + me += f"\nFrom Message ID: `{fi['from_message_id']}`" + if 'interaction_info' in nmes: + ii = nmes['interaction_info'] + if ii is not None and ii['@type'] == 'messageInteractionInfo': + me += "\nIteraction Info:" + me += f"\nView Count: `{ii['view_count']}`" + me += f"\nForward Count: `{ii['forward_count']}`" + if nmes['reply_in_chat_id'] != 0: + me += f"\nReply In Chat ID: `{nmes['reply_in_chat_id']}`" + if nmes['reply_to_message_id'] != 0: + me += f"\nReply To Message ID: `{nmes['reply_to_message_id']}`" + if nmes['message_thread_id'] != 0: + me += f"\nMessage Thread ID: `{nmes['message_thread_id']}`" + if nmes['via_bot_user_id'] != 0: + me += f"\nVia Bot User ID: `{nmes['via_bot_user_id']}`" + if nmes['author_signature'] != '': + me += f"\nAuthor Signature: `{nmes['author_signature']}`" + if nmes['media_album_id'] != 0: + me += f"\nMedia Album ID: `{nmes['media_album_id']}`" + if nmes['restriction_reason'] != '': + me += f"\nRestriction Reason: `{nmes['restriction_reason']}`" + if nmes['content']['@type'] == 'messageSticker': + st = nmes['content']['sticker'] + me += f"\nSticker Set ID: `{st['set_id']}`" + me += f"\nSticker Width: `{st['width']}`" + me += f"\nSticker Height: `{st['height']}`" + me += f"\nSticker Emoji: `{st['emoji']}`" + me += f"\nAnimated Sticker: `{st['is_animated']}`" + me += f"\nMask Sticker: `{st['is_mask']}`" + if 'sticker' in st and st['sticker'] is not None: + me += "\nSticker File Info:" + me += "\n" + generateFileInfo(st['sticker']) + re = await lib.editMessageText( + mes['chat_id'], mes['id'], me, TextParseMode.MarkDown) + if re is None: + print('Can not edit message') + + +async def handle_create_new_sticker_set(lib: TdLib, mes: dict, + argv: List[str]): + if len(argv) == 1: + re = await lib.editMessageText( + mes['chat_id'], mes['id'], f"```\n{cnss.format_help()}\n```", + TextParseMode.MarkDown) + else: + try: + r = cnss.parse_intermixed_args(argv[1:]) + if mes['reply_to_message_id'] == 0: + raise ValueError('Reply a sticker message is needed.') + nmes = await lib.getMessage(mes['reply_in_chat_id'], + mes['reply_to_message_id']) + if nmes is None: + raise ValueError('Can not get the replied message.') + if nmes['content']['@type'] != 'messageSticker': + raise ValueError('Reply a sticker message is needed.') + width = nmes['content']['sticker']['width'] + height = nmes['content']['sticker']['height'] + if max(width, height) != 512: + raise ValueError('Invalid width or height') + fid = nmes['content']['sticker']['sticker']['remote']['id'] + emoji = nmes['content']['sticker']['emoji'] if r.emoji is None else r.emoji # noqa: E501 + if fid == '': + raise ValueError("The target sticker's file id is empty.") + st = [{"@type": "inputStickerStatic", "sticker": {"@type": "inputFileRemote", "id": fid}, "emojis": emoji}] # noqa: E501 + r = await lib.createNewStickerSet(r.title, r.name, st, source=r.source) # noqa: E501 + if r is None: + raise ValueError('Can not create new sticker set.') + re = await lib.editMessageText(mes['chat_id'], mes['id'], f"Created successfully.\nSticker Set ID: {r['id']}\nhttps://t.me/addstickers/{r['name']}") # noqa: E501 + except ValueError as e: + if len(e.args) == 0: + re = await lib.editMessageText(mes['chat_id'], mes['id'], "Unknown error.") # noqa: E501 + else: + re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{cnss.format_usage()}\n{cnss.prog}: error: {e.args[0] if len(e.args) == 1 else e.args}\n```", TextParseMode.MarkDown) # noqa: E501 + except Exception: + re = await lib.editMessageText(mes['chat_id'], mes['id'], f"```\n{format_exc()}\n```", TextParseMode.MarkDown) # noqa: E501 + if re is None: + print('Can not edit message') + + +async def main(lib: TdLib): + with open('tdlib.json', 'r', encoding='UTF-8') as f: + se = load(f) + if not await lib.login(se['TdlibParameters'], se['encryption_key'], + se.get("proxy"), se.get("phone_number")): + raise ValueError('Can not login') + uid = await lib.getUid() + while True: + mes = await lib.receive('updateNewMessage') + if mes['@type'] == 'updateNewMessage': + mes = mes['message'] + if mes['sender_id']['@type'] != 'messageSenderUser': + continue + if mes['sender_id']['user_id'] != uid: + continue + if mes['content']['@type'] != 'messageText': + continue + + def err(e): + e = e.result() + if e is None: + print('Can not edit message.') + print(e) + if mes['content']['text']['text'] == '-hello': + re = lib.editMessageText(mes['chat_id'], mes['id'], + 'Hello World!') + asyncio.create_task(re).add_done_callback(err) + elif mes['content']['text']['text'] in ['-msginfo', '-messageinfo']: # noqa: E501 + re = handle_message_info(lib, mes) + asyncio.create_task(re) + elif mes['content']['text']['text'].startswith('-'): + argv = commandLineToArgv(mes['content']['text']['text']) + if argv[0] in ['-createnewstickerset', '-cnss']: + re = handle_create_new_sticker_set(lib, mes, argv) + asyncio.create_task(re) + + +with TdLib(True) as lib: + asyncio.run(main(lib)) diff --git a/util.py b/util.py new file mode 100644 index 0000000..47936e6 --- /dev/null +++ b/util.py @@ -0,0 +1,36 @@ +from time import strftime, localtime, timezone +from typing import List + + +def timeToStr(t: int) -> str: + te = strftime('%Y-%m-%dT%H:%M:%S', localtime(t)) + op = '-' if timezone > 0 else '+' + te = te + op + \ + f'{int(abs(timezone)/3600):02}:{int(abs(timezone)%3600/60):02}' + return te + + +def commandLineToArgv(c: str) -> List[str]: + s = c.split(' ') + r = [] + t = '' + f = False + for i in s: + if not f and len(i) == 0: + continue + if i.startswith('"'): + if i.endswith('"') and not i.endswith('\\"'): + r.append(i[1:-1].replace('\\"', '"')) + else: + t += i + f = True + elif f and i.endswith('"') and not i.endswith('\\"'): + t += " " + i + f = False + r.append(t[1:-1].replace('\\"', '"')) + t = '' + elif f: + t += " " + i + else: + r.append(i) + return r