Files
pythonscript/tdlib.py
2022-01-21 17:46:30 +08:00

982 lines
36 KiB
Python

import asyncio
from base64 import b64encode
from ctypes import CDLL, c_char_p, c_double, c_int, c_void_p
from ctypes.util import find_library
from enum import Enum
from getpass import getpass
from json import dumps, loads, JSONDecoder, JSONEncoder
from random import random
import sys
from threading import Thread
from traceback import print_exc
from typing import List, Union
tdjson_path = find_library('tdjson')
if tdjson_path is None:
print("Can't find 'tdjson' library")
sys.exit(-1)
tdjson = CDLL(tdjson_path)
_td_json_client_create = tdjson.td_json_client_create
_td_json_client_create.restype = c_void_p
_td_json_client_create.argtypes = []
_td_json_client_send = tdjson.td_json_client_send
_td_json_client_send.restype = None
_td_json_client_send.argtypes = [c_void_p, c_char_p]
_td_json_client_destroy = tdjson.td_json_client_destroy
_td_json_client_destroy.restype = None
_td_json_client_destroy.argtypes = [c_void_p]
_td_receive = tdjson.td_receive
_td_receive.restype = c_char_p
_td_receive.argtypes = [c_double]
_td_send = tdjson.td_send
_td_send.restype = None
_td_send.argtypes = [c_int, c_char_p]
_td_execute = tdjson.td_execute
_td_execute.restype = c_char_p
_td_execute.argtypes = [c_char_p]
_td_json_client_receive = tdjson.td_json_client_receive
_td_json_client_receive.restype = c_char_p
_td_json_client_receive.argtypes = [c_void_p, c_double]
def td_execute(query):
query = dumps(query, ensure_ascii=False, separators=(',', ':')).encode()
result = _td_execute(query)
if result:
result = loads(result.decode())
return result
if td_execute({'@type': 'setLogVerbosityLevel', 'new_verbosity_level': 1, '@extra': 1.1111111})['@type'] != 'ok': # noqa: E501
print('Can not set log level to error level.')
class UpdateThread(Thread):
def __init__(self, tdlib) -> None:
self._tdlib = tdlib
self._need_killed = False
Thread.__init__(self)
def kill(self) -> None:
self._need_killed = True
async def killed(self):
while True:
if not self.is_alive():
return
await asyncio.sleep(1)
def run(self) -> None:
while True:
try:
self._tdlib._update()
if self._need_killed:
return
except Exception:
print_exc()
class ChatType:
def __iter__(self):
return self.to_dict().items().__iter__()
def to_dict(self) -> dict:
pass
def __repr__(self) -> str:
d = self.to_dict()
if d is None:
m = ''
else:
typ = d['@type'][8:]
d.pop('@type')
m = f"Type={typ} Data={d}"
return f"<{self.__class__.__module__}.{self.__class__.__name__} {m}>"
def __str__(self):
d = self.to_dict()
if d is None:
return ''
return d['@type'][8:]
class ChatTypeBasicGroup(ChatType):
def __init__(self, value):
if isinstance(value, dict):
if value['@type'] == 'chatTypeBasicGroup':
self.basic_group_id = int(value['basic_group_id'])
return
raise ValueError(f'Unknown value: {value}')
def to_dict(self):
return {"@type": "chatTypeBasicGroup",
"basic_group_id": self.basic_group_id}
class ChatTypePrivate(ChatType):
def __init__(self, value):
if isinstance(value, dict):
if value['@type'] == 'chatTypePrivate':
self.user_id = int(value['user_id'])
return
raise ValueError(f'Unknown value: {value}')
def to_dict(self):
return {"@type": 'chatTypePrivate', 'user_id': self.user_id}
class ChatTypeSecret(ChatType):
def __init__(self, value):
if isinstance(value, dict):
if value['@type'] == 'chatTypeSecret':
self.secret_chat_id = int(value['secret_chat_id'])
self.user_id = int(value['user_id'])
return
raise ValueError(f'Unknown value: {value}')
def to_dict(self):
return {"@type": "chatTypeSecret",
"secret_chat_id": self.secret_chat_id, "user_id": self.user_id}
class ChatTypeSupergroup(ChatType):
def __init__(self, value):
if isinstance(value, dict):
if value['@type'] == 'chatTypeSupergroup':
self.supergroup_id = int(value['supergroup_id'])
self.is_channel = bool(value['is_channel'])
return
raise ValueError(f'Unknown value: {value}')
def to_dict(self):
return {"@type": "chatTypeSupergroup",
"supergroup_id": self.supergroup_id,
"is_channel": self.is_channel}
def parse_chat_type(d):
if d['@type'].startswith('chatType'):
typ = 'C' + d['@type'][1:]
return globals()[typ](d)
raise ValueError(f'Unknown value: {d}')
class TextParseMode(Enum):
HTML = 0
MarkDown = 1
def __iter__(self):
return self.to_dict().items().__iter__()
def to_dict(self):
if self._value_ == 0:
return {"@type": "textParseModeHTML"}
elif self._value_ == 1:
return {"@type": "textParseModeMarkdown"}
@classmethod
def _missing_(cls, value: object):
if isinstance(value, dict):
if value['@type'] == 'textParseModeHTML':
return cls(0)
elif value['@type'] == 'textParseModeMarkdown':
return cls(1)
raise ValueError(f'Unknown value: {value}')
class MessageForwardOrigin:
def __iter__(self):
return self.to_dict().items().__iter__()
def to_dict(self) -> dict:
pass
def __repr__(self) -> str:
d = self.to_dict()
if d is None:
m = ''
else:
typ = d['@type'][20:]
d.pop('@type')
m = f"Type={typ} Data={d}"
return f"<{self.__class__.__module__}.{self.__class__.__name__} {m}>"
def __str__(self):
d = self.to_dict()
if d is None:
return ''
return d['@type'][20:]
class MessageForwardOriginChannel(MessageForwardOrigin):
def __init__(self, v):
if isinstance(v, dict):
if v['@type'] == 'messageForwardOriginChannel':
self.chat_id = v['chat_id']
self.message_id = v['message_id']
self.author_signature = v['author_signature']
return
raise ValueError(f'Unknown value: {v}')
def to_dict(self):
return {"@type": 'messageForwardOriginChannel',
'chat_id': self.chat_id, 'message_id': self.message_id,
'author_signature': self.author_signature}
class MessageForwardOriginChat(MessageForwardOrigin):
def __init__(self, v):
if isinstance(v, dict):
if v['@type'] == 'messageForwardOriginChat':
self.sender_chat_id = v['sender_chat_id']
self.author_signature = v['author_signature']
return
raise ValueError(f'Unknown value: {v}')
def to_dict(self):
return {"@type": "messageForwardOriginChat",
"sender_chat_id": self.sender_chat_id,
"author_signature": self.author_signature}
class MessageForwardOriginHiddenUser(MessageForwardOrigin):
def __init__(self, v):
if isinstance(v, dict):
if v['@type'] == 'messageForwardOriginHiddenUser':
self.sender_name = v['sender_name']
return
raise ValueError(f'Unknown value: {v}')
def to_dict(self):
return {"@type": "messageForwardOriginHiddenUser",
"sender_name": self.sender_name}
class MessageForwardOriginMessageImport(MessageForwardOrigin):
def __init__(self, v):
if isinstance(v, dict):
if v['@type'] == 'messageForwardOriginMessageImport':
self.sender_name = v['sender_name']
return
raise ValueError(f'Unknown value: {v}')
def to_dict(self):
return {"@type": "messageForwardOriginMessageImport",
"sender_name": self.sender_name}
class MessageForwardOriginUser(MessageForwardOrigin):
def __init__(self, v):
if isinstance(v, dict):
if v['@type'] == 'messageForwardOriginUser':
self.sender_user_id = v['sender_user_id']
return
raise ValueError(f'Unknown value: {v}')
def to_dict(self):
return {"@type": "messageForwardOriginUser",
"sender_user_id": self.sender_user_id}
def parse_message_forward_orgin(d):
if d['@type'].startswith('messageForwardOrigin'):
typ = 'M' + d['@type'][1:]
return globals()[typ](d)
raise ValueError(f'Unknown value: {d}')
def json_object_hook(value):
try:
if isinstance(value, dict):
if '@type' in value:
if value['@type'].startswith('chatType'):
return parse_chat_type(value)
elif value['@type'].startswith('textParseMode'):
return TextParseMode(value)
elif value['@type'].startswith('messageForwardOrigin'):
return parse_message_forward_orgin(value)
elif value['@type'] == 'message':
value['media_album_id'] = int(value['media_album_id'])
elif value['@type'] == 'getStickerSet':
value['set_id'] = int(value['set_id'])
elif value['@type'] == 'stickerSet':
value['id'] = int(value['id'])
except Exception:
print_exc()
return value
return value
class TdLibJSONDecoder(JSONDecoder):
def __init__(self, *k, **kw) -> None:
super().__init__(*k, object_hook=json_object_hook, **kw)
class TdLibJSONEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, (ChatType, TextParseMode, MessageForwardOrigin)):
return dict(o)
elif isinstance(o, bytes):
return b64encode(o).decode()
elif isinstance(o, dict):
if '@type' in o:
if o['@type'] == 'message':
if 'media_album_id' in o:
o['media_album_id'] = str(o['media_album_id'])
elif o['@type'] == 'getStickerSet':
if 'set_id' in o:
o['set_id'] = str(o['set_id'])
elif o['@type'] == 'stickerSet':
if 'id' in o:
o['id'] = str(o['id'])
return super().default(o)
class TdLib:
def __init__(self, verbose: bool = False, maxCache: int = None) -> None:
self._initalized = False
self._destoried = False
self._db_initalized = False
self._logined = False
self._v = verbose
self._maxCache = 1000
if maxCache is not None:
if maxCache < 100:
raise ValueError('At least cache 100 messages.')
self._maxCache = maxCache
self._e = 0
self._re = {}
self._rel = []
def __enter__(self):
if self._destoried:
raise ValueError('Already destoried.')
self._e += 1
if not self._initalized:
self._client_id = _td_json_client_create()
if not self._client_id:
raise ValueError('Can not create tdlib client.')
self._thread = UpdateThread(self)
self._thread.start()
return self
def __exit__(self, type, value, tb):
if self._destoried:
raise ValueError('Already destoried.')
self._e -= 1
if self._e == 0:
asyncio.run(self.__destory())
async def __destory(self):
self._thread.kill()
await self._thread.killed()
_td_json_client_destroy(self._client_id)
self._client_id = 0
self._destoried = True
if self._v:
le1 = len(self._re)
le2 = len(self._rel)
print(f"There are {le1 + le2} ({le1} + {le2}) messages in cache.")
del self._re
del self._rel
async def _send(self, data) -> dict:
extra = random()
data['@extra'] = extra
en = TdLibJSONEncoder(ensure_ascii=False)
data = en.encode(data).encode()
_td_json_client_send(self._client_id, data)
while True:
if extra in self._re:
re = self._re.pop(extra)
return re
await asyncio.sleep(0.1)
def _update(self):
result: bytes = _td_json_client_receive(self._client_id, 1.0)
if result:
de = TdLibJSONDecoder()
re = de.decode(result.decode())
if self._v:
if '@type' in re:
print(f'Get a new message. Message type: {re["@type"]}')
if '@extra' in re:
self._re[re['@extra']] = re
else:
while len(self._rel) >= self._maxCache:
m = self._rel.pop(0)
if self._v:
t = m['@type'] if '@type' in m else 'Unknown'
print(f'Discard a message in cache. Message type: {t}')
self._rel.append(re)
async def addProxy(self, server, port, enable, type):
while not self._db_initalized:
await asyncio.sleep(0.1)
return await self._send({"@type": "addProxy", "server": server,
"port": port, "enable": enable,
"type": type})
async def addStickerToSet(self, user_id: int, name: str, sticker):
re = await self._send({"@type": "addStickerToSet", "user_id": user_id,
"name": name, "sticker": sticker})
if re['@type'] == 'stickerSet':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def checkAuthenticationCode(self, code: str):
re = await self._send({"@type": "checkAuthenticationCode",
"code": code})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def checkDatabaseEncryptionKey(self, encryption_key):
re = await self._send({"@type": "checkDatabaseEncryptionKey",
"encryption_key": encryption_key})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def checkAuthenticationPassword(self, password: str):
re = await self._send({"@type": "checkAuthenticationPassword",
"password": password})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def checkAuthenticationBotToken(self, token: str):
re = await self._send({"@type": "checkAuthenticationBotToken",
"token": token})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def createNewStickerSet(self, title: str, name: str, stickers,
is_masks: bool = False, source: str = '',
user_id: int = 0):
re = await self._send({"@type": "createNewStickerSet", "title": title,
"name": name, "stickers": stickers,
"is_masks": is_masks, "source": source,
"user_id": user_id})
if re['@type'] == 'stickerSet':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def deleteAllMyMessageInChat(self, chat_id: int,
start_time: int = None,
end_time: int = None,
verbose: bool = True,
excludes: List[int] = None):
uid = await self.getUid()
messages = await self.searchChatMessages(chat_id, sender_user_id=uid,
limit=100)
c = 0
if messages is None:
return None
while len(messages['messages']) != 0:
last_mid = messages['messages'][-1]['id']
mids = []
for m in messages['messages']:
if excludes is not None:
if m['id'] in excludes:
continue
need_deleted = False
if start_time is None and end_time is None:
need_deleted = True
elif start_time is None:
if m['date'] <= end_time:
need_deleted = True
elif end_time is None:
if m['date'] >= start_time:
need_deleted = True
else:
if m['date'] >= start_time and m['date'] <= end_time:
need_deleted = True
if need_deleted and m['can_be_deleted_for_all_users']:
if verbose:
print(f"Add {m['id']} to delete list. ({m['date']})")
mids.append(m['id'])
if not await self.deleteMessages(chat_id, mids):
raise ValueError('Can not delete messages.')
c += len(mids)
messages = await self.searchChatMessages(
chat_id, sender_user_id=uid, from_message_id=last_mid,
limit=100)
if messages is None:
return None
return c
async def deleteChatHistory(self, chat_id: int,
remove_from_chat_list: bool = False,
revoke: bool = False):
re = await self._send({"@type": "deleteChatHistory",
"chat_id": chat_id,
"remove_from_chat_list": remove_from_chat_list,
"revoke": revoke})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def deleteMessages(self, chat_id: int,
message_ids: Union[List[int], int],
revoke: bool = True):
if isinstance(message_ids, int):
message_ids = [message_ids]
re = await self._send({"@type": "deleteMessages", "chat_id": chat_id,
"message_ids": message_ids, "revoke": revoke})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def editMessageText(self, chat_id: int, message_id: int, text: str,
parse_mode: TextParseMode = None, entities=None,
disable_web_page_preview: bool = False,
clear_draft: bool = False):
if parse_mode is not None and entities is not None:
raise ValueError('Bot parse_mode and entities are not supported.')
if parse_mode is not None:
mes = await self.parseTextEntities(text, parse_mode)
if mes is None:
return None
entities = mes['entities']
text = mes['text']
elif entities is None:
entities = await self.getTextEntities(text)
if entities is None:
return None
entities = entities['entities']
d = {'@type': 'editMessageText', 'chat_id': chat_id,
'message_id': message_id, 'input_message_content': {
'@type': 'inputMessageText',
'text': {'@type': 'formattedText', 'text': text,
'entities': entities},
'disable_web_page_preview': disable_web_page_preview,
'clear_draft': clear_draft
}}
re = await self._send(d)
if re['@type'] == 'message':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def enableProxy(self, proxy_id):
re = await self._send({"@type": "enableProxy", "proxy_id": proxy_id})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def getChat(self, chat_id: int):
re = await self._send({"@type": "getChat", "chat_id": chat_id})
if re['@type'] == 'chat':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getChatHistory(self, chat_id: int, from_message_id: int = None,
offset: int = None, limit: int = 20,
only_local: bool = False):
d = {"@type": 'getChatHistory', "chat_id": chat_id, "limit": limit,
"only_local": only_local}
if from_message_id is not None:
d['from_message_id'] = from_message_id
if offset is not None:
d['offset'] = offset
re = await self._send(d)
if re['@type'] == 'messages':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getMe(self):
re = await self._send({"@type": "getMe"})
if re['@type'] == 'user':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getMessage(self, chat_id: int, message_id: int):
re = await self._send({"@type": "getMessage", "chat_id": chat_id,
"message_id": message_id})
if re['@type'] == 'message':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getMessageLink(self, chat_id: int, message_id: int,
media_timestamp: int = 0, for_album: bool = False,
for_comment: bool = False):
re = await self._send({"@type": "getMessageLink", "chat_id": chat_id,
"message_id": message_id,
"media_timestamp": media_timestamp,
"for_album": for_album,
"for_comment": for_comment})
if re['@type'] == 'messageLink':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getProxies(self):
while not self._db_initalized:
await asyncio.sleep(0.1)
return await self._send({"@type": "getProxies"})
async def getStickerSet(self, set_id: int):
re = await self._send({"@type": "getStickerSet", "set_id": set_id})
if re['@type'] == 'stickerSet':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getTextEntities(self, text: str):
re = await self._send({"@type": "getTextEntities", "text": text})
if re['@type'] == 'textEntities':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def getUid(self) -> int:
if not hasattr(self, "_uid"):
self._uid = (await self.getMe())['id']
return self._uid
async def getUsername(self) -> str:
if not hasattr(self, "_username"):
self._username = (await self.getMe())['username']
return self._username
async def login(self, parameters, encryption_key, proxy, phone_number=None,
bot_token=None):
while True:
re = await self.receive('updateAuthorizationState')
state = re['authorization_state']
if state['@type'] == 'authorizationStateWaitTdlibParameters':
pa = {"use_message_database": True,
"system_language_code": "en",
"device_model": "Desktop",
"application_version": "1.0.0",
"enable_storage_optimizer": True,
'use_secret_chats': True}
pa.update(parameters)
if not await self.setTdlibParameters(pa):
raise ValueError("Can not change Tdlib's parameters.")
elif state['@type'] == 'authorizationStateWaitEncryptionKey':
if not await self.checkDatabaseEncryptionKey(encryption_key):
raise ValueError('Checks the database encryption key failed.') # noqa: E501
self._db_initalized = True
if proxy is not None:
if not await self.setProxy(proxy['server'], proxy['port'], proxy['type']): # noqa: E501
raise ValueError('Can not set proxy.')
elif state['@type'] == 'authorizationStateWaitPhoneNumber':
if bot_token is not None:
if not await self.checkAuthenticationBotToken(bot_token):
raise ValueError('Invalid bot token')
continue
if phone_number is None:
phone_number = input('Please input your phone number:')
if not await self.setAuthenticationPhoneNumber(phone_number):
raise ValueError('Can not set phone number.')
elif state['@type'] == 'authorizationStateWaitCode':
code = input('Please enter the authentication code you received: ') # noqa: E501
if not await self.checkAuthenticationCode(code):
raise ValueError('Incorrect code.')
elif state['@type'] == 'authorizationStateWaitPassword':
paw = getpass("Please enter your password: ")
if not await self.checkAuthenticationPassword(paw):
raise ValueError('Incorrect passwrod.')
elif state['@type'] == 'authorizationStateReady':
self._logined = True
return True
else:
raise ValueError("Unknown authorization_state", state)
async def optimizeStorage(self, size: int = -1, ttl: int = -1,
count: int = -1, immunity_delay: int = -1,
file_types: list = None,
chat_ids: List[int] = None,
exclude_chat_ids: List[int] = None,
return_deleted_file_statistics: bool = False,
chat_limit: int = None):
d = {"@type": "optimizeStorage", "size": size, "ttl": ttl,
"count": count, "immunity_delay": immunity_delay,
"return_deleted_file_statistics": return_deleted_file_statistics}
if file_types is not None:
d['file_types'] = file_types
if chat_ids is not None:
d['chat_ids'] = chat_ids
if exclude_chat_ids is not None:
d['exclude_chat_ids'] = exclude_chat_ids
if chat_limit is not None:
d['chat_limit'] = chat_limit
re = await self._send(d)
if re['@type'] == 'storageStatistics':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def parseMarkdown(self, text: str):
re = await self._send({"@type": "parseMarkdown", "text": text})
if re['@type'] == 'formattedText':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def parseTextEntities(self, text: str, parse_mode: TextParseMode):
re = await self._send({"@type": "parseTextEntities", "text": text,
"parse_mode": parse_mode})
if re['@type'] == 'formattedText':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def receive(self, type: str = None):
while True:
if len(self._rel) > 0:
if type is None:
return self._rel.pop(0)
else:
for i in self._rel:
if '@type' in i and i['@type'] == type:
self._rel.remove(i)
return i
await asyncio.sleep(0.1)
async def searchChatMessages(self, chat_id: int, query: str = None,
sender_chat_id: int = None,
sender_user_id: int = None,
from_message_id: int = None,
offset: int = None, limit: int = 20,
filter=None, message_thread_id: int = None):
d = {"@type": "searchChatMessages", "chat_id": chat_id, "limit": limit}
if query is not None:
d['query'] = query
if sender_chat_id is not None and sender_user_id is not None:
raise ValueError('Both chat and user is not supported.')
if sender_chat_id is not None:
d['sender_id'] = {'@type': "messageSenderChat",
"chat_id": sender_chat_id}
elif sender_user_id is not None:
d['sender_id'] = {'@type': "messageSenderUser",
'user_id': sender_user_id}
if from_message_id is not None:
d['from_message_id'] = from_message_id
if offset is not None:
d['offset'] = offset
if filter is not None:
d['filter'] = filter
if message_thread_id is not None:
d['message_thread_id'] = message_thread_id
re = await self._send(d)
if re['@type'] == 'messages':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def searchChats(self, query: str, limit: int = 20):
re = await self._send({"@type": "searchChats", "query": query,
"limit": limit})
if re['@type'] == 'chats':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def searchChatsOnServer(self, query: str, limit: int = 20):
re = await self._send({"@type": "searchChatsOnServer", "query": query,
"limit": limit})
if re['@type'] == 'chats':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def searchPublicChats(self, query: str):
re = await self._send({"@type": "searchPublicChats", "query": query})
if re['@type'] == 'chats':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def searchStickerSet(self, name: str):
re = await self._send({"@type": "searchStickerSet", "name": name})
if re['@type'] == 'stickerSet':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def sendMessage(self, chat_id: int, content,
message_thread_id: int = 0,
reply_to_message_id: int = 0, options=None,
reply_markup=None):
d = {"@type": "sendMessage", "chat_id": chat_id,
"input_message_content": content}
if message_thread_id != 0:
d['message_thread_id'] = message_thread_id
if reply_to_message_id != 0:
d['reply_to_message_id'] = reply_to_message_id
if options is not None:
d['options'] = options
if reply_markup is not None:
d['reply_markup'] = reply_markup
re = await self._send(d)
if re['@type'] == 'message':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def sendTextMessage(self, chat_id: int, text: str,
mode: TextParseMode = None,
disable_web_page_preview: bool = False,
clear_draft: bool = False, **kw):
mes = {'@type': 'inputMessageText', 'clear_draft': clear_draft,
'disable_web_page_preview': disable_web_page_preview}
if mode is None:
t = {"@type": "formattedText", "text": text}
e = await self.getTextEntities(text)
if e is None:
return None
t['entities'] = e['entities']
else:
mes['text'] = await self.parseTextEntities(text, mode)
if mes['text'] is None:
return None
mes['text'] = t
return await self.sendMessage(chat_id, mes, **kw)
async def setAuthenticationPhoneNumber(self, phone_number, settings=None):
sett = {"@type": "phoneNumberAuthenticationSettings",
"allow_flash_call": False,
"allow_missed_call": False,
"is_current_phone_number": False,
"allow_sms_retriever_api": False}
if settings is not None:
sett.update(settings)
re = await self._send({"@type": "setAuthenticationPhoneNumber",
"phone_number": str(phone_number),
"settings": sett})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def setDatabaseEncryptionKey(self,
new_encryption_key: Union[str, bytes]):
re = await self._send({"@type": "setDatabaseEncryptionKey", "new_encryption_key": new_encryption_key}) # noqa: E501
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def setProxy(self, server, port, type):
while not self._db_initalized:
await asyncio.sleep(0.1)
proxies = await self.getProxies()
for i in proxies['proxies']:
if i['server'] == server and i['port'] == port:
ok = True
for k in type:
if k in i['type']:
if type[k] != i['type']:
ok = False
break
else:
ok = False
break
if ok:
print(i)
if not i['is_enabled']:
await self.enableProxy(i['id'])
return True
re = await self.addProxy(server, port, True, type)
if re['@type'] == 'proxy':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False
async def setStickerSetThumbnail(self, user_id: int, name: str, thumb):
re = await self._send({"@type": "setStickerSetThumbnail",
"user_id": user_id, "name": name,
"thumbnail": thumb})
if re['@type'] == 'stickerSet':
return re
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return None
async def setTdlibParameters(self, se):
re = await self._send({"@type": "setTdlibParameters",
"parameters": se})
if re['@type'] == 'ok':
return True
else:
if re['@type'] == 'error':
print(f"{re['code']} {re['message']}")
return False