# (C) 2021 lifegpc # This file is part of rssbot. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from database import database, userStatus, RSSConfig from RSSEntry import HashEntry, HashEntries, calHash, ChatEntry from os.path import exists from readset import settings, commandline from requests import Session from traceback import format_exc from threading import Thread from typing import List from rssparser import RSSParser from html import escape from hashl import md5WithBase64 from enum import Enum, unique from rsstempdict import rssMetaInfo, rssMetaList from random import randrange from textc import textc, removeEmptyLine, decodeURI from re import search, I from rsschecker import RSSCheckerThread from rsslist import getInlineKeyBoardForRSSList, InlineKeyBoardForRSSList, getInlineKeyBoardForRSSInList, getTextContentForRSSInList, getInlineKeyBoardForRSSUnsubscribeInList, getTextContentForRSSUnsubscribeInList, getInlineKeyBoardForRSSSettingsInList from usercheck import checkUserPermissionsInChat, UserPermissionsInChatCheckResult import sys from fileEntry import FileEntries, remove from dictdeal import json2data from rssbotlib import loadRSSBotLib, AddVideoInfoResult from time import sleep, time from miraiDatabase import MiraiDatabase from mirai import Mirai from blackList import BlackList, InlineKeyBoardForBlackList, getInlineKeyBoardForBlackList, getTextContentForBlackInfo, getInlineKeyBoardForBlackInfo, getTextContentForUnbanBlackInfo, getInlineKeyBoardForUnbanBlackInfo, BlackInfo MAX_ITEM_IN_MEDIA_GROUP = 10 MAX_PHOTO_SIZE = 10485760 def getMediaInfo(m: dict, config: RSSConfig = RSSConfig()) -> str: s = '' if 'link' in m: s = f"""{s}标题:{m['title']}""" else: s = f"{s}标题:{m['title']}" if 'description' in m: s = f"{s}\n描述:{escape(m['description'])}" if 'ttl' in m and m['ttl'] is not None: s = f"{s}\n更新间隔:{m['ttl']}分" else: s = f"{s}\n更新间隔:未知" if 'chatId' in m and m['chatId'] is not None: s = f"""{s}\n群/频道ID:{m['chatId']}""" elif 'userId' in m and m['userId'] is not None: s = f"""{s}\n订阅的账号""" if '_type' in m and m['_type'] is not None: s = f"""{s}\n类型:{m['_type']}""" s = f"{s}\n设置:" s = f"{s}\n禁用预览:{config.disable_web_page_preview}" s = f"{s}\n显示RSS标题:{config.show_RSS_title}" s = f"{s}\n显示内容标题:{config.show_Content_title}" s = f"{s}\n显示内容:{config.show_content}" s = f"{s}\n发送媒体:{config.send_media}" s = f"{s}\n单独一行显示链接:{config.display_entry_link}" s += f"\n发送图片为文件:{config.send_img_as_file}" return s @unique class InlineKeyBoardCallBack(Enum): Subscribe = 0 SendPriview = 1 ModifyChatId = 2 BackUserId = 3 SettingsPage = 4 BackToNormalPage = 5 DisableWebPagePreview = 6 ShowRSSTitle = 7 ShowContentTitle = 8 ShowContent = 9 SendMedia = 10 DisplayEntryLink = 11 SendImgAsFile = 12 def getInlineKeyBoardWhenRSS(hashd: str, m: dict) -> dict: d = [] i = 0 d.append([]) d[i].append( {'text': '订阅', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.Subscribe.value}'}) d[i].append( {'text': '发送示例消息', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.SendPriview.value}'}) if 'chatId' in m and m['chatId'] is not None: d.append([]) i = i + 1 d[i].append( {'text': '修改群组/频道ID', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.ModifyChatId.value}'}) if 'userId' in m and m['userId'] is not None: d[i].append( {'text': '发送至私聊', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.BackUserId.value}'}) elif 'userId' in m and m['userId'] is not None: d.append([]) i = i + 1 d[i].append( {'text': '发送至群组/频道', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.ModifyChatId.value}'}) d.append([]) i = i + 1 d[i].append( {'text': '设置', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.SettingsPage.value}'}) return {'inline_keyboard': d} def getInlineKeyBoardWhenRSS2(hashd: str, config: RSSConfig) -> str: d = [] i = 0 temp = '启用预览' if config.disable_web_page_preview else '禁用预览' d.append([]) d[i].append( {'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.DisableWebPagePreview.value}'}) temp = '隐藏RSS标题' if config.show_RSS_title else '显示RSS标题' d[i].append( {'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.ShowRSSTitle.value}'}) d.append([]) i = i + 1 temp = '隐藏内容标题' if config.show_Content_title else '显示内容标题' d[i].append( {'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.ShowContentTitle.value}'}) temp = '隐藏内容' if config.show_content else '显示内容' d[i].append( {'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.ShowContent.value}'}) d.append([]) i = i + 1 temp = '禁用发送媒体' if config.send_media else '启用发送媒体' d[i].append( {'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.SendMedia.value}'}) temp = '禁用单独一行显示链接' if config.display_entry_link else '启用单独一行显示链接' d[i].append({'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.DisplayEntryLink.value}'}) d.append([]) i += 1 temp = '禁用发送图片为文件' if config.send_img_as_file else '启用发送图片为文件' d[i].append({'text': temp, 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.SendImgAsFile.value}'}) d.append([]) i += 1 d[i].append( {'text': '返回', 'callback_data': f'0,{hashd},{InlineKeyBoardCallBack.BackToNormalPage.value}'}) return {'inline_keyboard': d} class main: def __init__(self): pass def _request(self, methodName: str, HTTPMethod: str = 'get', data: dict = None, json: dict = None, files: dict = None, returnType: str = 'json', telegramBotApiServer: str = None): try: if json is not None and files is not None: data = json2data(json) json = None r = self._r.request( HTTPMethod, f'{self._telegramBotApiServer if telegramBotApiServer is None else telegramBotApiServer}/bot{self._setting.token}/{methodName}', data=data, json=json, files=files) if r.status_code not in [200, 400]: return None if returnType == 'json': return r.json() elif returnType == 'text': return r.text elif returnType == 'content': return r.content except: print(format_exc()) return None def _sendMessage(self, chatId: int, meta: dict, content: dict, config: RSSConfig, returnError: bool = False, testMessage: bool = False): with self._tempFileEntries._value_lock: return self.__sendMessage(chatId, meta, content, config, returnError, testMessage) def __sendMessage(self, chatId: int, meta: dict, content: dict, config: RSSConfig, returnError: bool = False, testMessage: bool = False): di = {} di['chat_id'] = chatId text = textc() if testMessage: text.addtotext('#测试消息') if config.show_RSS_title: text.addtotext(f"{meta['title']}") if config.show_Content_title and 'title' in content and content['title'] is not None and content['title'] != '': if 'link' in content and content['link'] is not None and content['link'] != '': if not config.display_entry_link: text += f"""{content['title']}""" else: text += f"{content['title']}" text += f"""{escape(content['link'])}""" else: text.addtotext(f"{content['title']}") elif 'link' in content and content['link'] is not None and content['link'] != '': text.addtotext( f"""{escape(content['link'])}""") if config.show_content and 'description' in content and content['description'] is not None and content['description'] != '': text.addtotext(removeEmptyLine(content['description'])) def getListCount(content: dict, key: str): if key not in content or content[key] is None: return 0 return len(content[key]) if not config.send_media or (getListCount(content, 'imgList') == 0 and getListCount(content, 'videoList') == 0): if config.disable_web_page_preview: di['disable_web_page_preview'] = True while len(text) > 0: di['text'] = text.tostr() di['parse_mode'] = 'HTML' for i in range(self._setting.maxRetryCount + 1): re = self._request('sendMessage', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] break if i == self._setting.maxRetryCount: if returnError and re is not None and 'description' in re: return False, re['description'] elif returnError: return False, '' else: return False sleep(5) elif getListCount(content, 'imgList') == 1 and getListCount(content, 'videoList') == 0: f = True while len(text) > 0 or f: if f: di['caption'] = text.tostr(1024) else: di['text'] = text.tostr() di['parse_mode'] = 'HTML' for i in range(self._setting.maxRetryCount + 1): if f: if not self._setting.downloadMediaFile: di['photo'] = content['imgList'][0] re = self._request('sendPhoto', 'post', json=di) else: fileEntry = self._tempFileEntries.add( content['imgList'][0]) if not fileEntry.ok: continue should_use_file = False if fileEntry._fileSize < MAX_PHOTO_SIZE and not config.send_img_as_file else True if self._setting.sendFileURLScheme: if not should_use_file: di['photo'] = fileEntry._localURI re = self._request('sendPhoto', 'post', json=di) else: di['document'] = fileEntry._localURI re = self._request('sendDocument', 'post', json=di) else: fileEntry.open() if not should_use_file: re = self._request('sendPhoto', 'post', json=di, files={ 'photo': (fileEntry._fullfn, fileEntry._f)}) else: re = self._request('sendDocument', 'post', json=di, files={ 'document': (fileEntry._fullfn, fileEntry._f)}) else: re = self._request('sendMessage', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] if f: del di['caption'] if 'photo' in di: del di['photo'] if 'document' in di: del di['document'] f = False if config.disable_web_page_preview: di['disable_web_page_preview'] = True break if i == self._setting.maxRetryCount: if returnError and re is not None and 'description' in re: return False, re['description'] elif returnError: return False, '' else: return False sleep(5) elif getListCount(content, 'imgList') == 0 and getListCount(content, 'videoList') == 1: f = True while len(text) > 0 or f: if f: di['caption'] = text.tostr(1024) else: di['text'] = text.tostr() di['parse_mode'] = 'HTML' for i in range(self._setting.maxRetryCount + 1): if f: if self._setting.downloadMediaFile and not self._setting.sendFileURLScheme: di2 = {} if not self._setting.downloadMediaFile: di['video'] = content['videoList'][0]['src'] else: fileEntry = self._tempFileEntries.add( content['videoList'][0]['src']) if not fileEntry.ok: continue if self._setting.sendFileURLScheme: di['video'] = fileEntry._localURI else: fileEntry.open() di2['video'] = ( fileEntry._fullfn, fileEntry._f) if 'poster' in content['videoList'][0] and content['videoList'][0]['poster'] is not None and content['videoList'][0]['poster'] != '': if not self._setting.downloadMediaFile: di['thumb'] = content['videoList'][0]['poster'] else: fileEntry = self._tempFileEntries.add( content['videoList'][0]['poster']) if not fileEntry.ok: continue if self._setting.sendFileURLScheme: di['thumb'] = fileEntry._localURI else: fileEntry.open() di2['thumb'] = ( fileEntry._fullfn, fileEntry._f) di['supports_streaming'] = True isOk = True if self._rssbotLib is not None: loc = self._tempFileEntries.get(content['videoList'][0]['src'])._abspath if self._setting.downloadMediaFile and self._tempFileEntries.get( content['videoList'][0]['src']) is not None else None addre = self._rssbotLib.addVideoInfo( content['videoList'][0]['src'], di, loc) if addre == AddVideoInfoResult.IsHLS: isOk = False f = False if 'video' in di: del di['video'] if 'thumb' in di: del di['thumb'] del di['supports_streaming'] di['text'] = di['caption'] del di['caption'] if config.disable_web_page_preview: di['disable_web_page_preview'] = True re = self._request( 'sendMessage', 'post', json=di) if isOk: if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request( 'sendVideo', 'post', json=di) else: re = self._request( 'sendVideo', 'post', json=di, files=di2) else: re = self._request('sendMessage', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] if f: if 'video' in di: del di['video'] if 'thumb' in di: del di['thumb'] if 'caption' in di: del di['caption'] del di['supports_streaming'] if 'duration' in di: del di['duration'] if 'width' in di: del di['width'] if 'height' in di: del di['height'] if config.disable_web_page_preview: di['disable_web_page_preview'] = True f = False break if i == self._setting.maxRetryCount: if returnError and re is not None and 'description' in re: return False, re['description'] elif returnError: return False, '' else: return False sleep(5) else: ind = 0 if self._setting.downloadMediaFile and not self._setting.sendFileURLScheme: ind2 = 0 di3 = {} di['media'] = [] for i in content['imgList']: if ind % MAX_ITEM_IN_MEDIA_GROUP == 0 and ind != 0: for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendMediaGroup', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result'][0]['message_id'] di['media'] = [] break else: re = self._request( 'sendMediaGroup', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result'][0]['message_id'] di['media'] = [] di3 = {} break sleep(5) di2 = {'type': 'photo'} if not self._setting.downloadMediaFile: di2['media'] = i else: fileEntry = self._tempFileEntries.add(i) if not fileEntry.ok: return None should_use_file = False if fileEntry._fileSize < MAX_PHOTO_SIZE and not config.send_img_as_file else True if should_use_file: di2['type'] = 'document' if self._setting.sendFileURLScheme: di2['media'] = fileEntry._localURI else: fileEntry.open() di2['media'] = f'attach://file{ind2}' di3[f'file{ind2}'] = (fileEntry._fullfn, fileEntry._f) ind2 = ind2 + 1 if ind % MAX_ITEM_IN_MEDIA_GROUP == 0: di2['caption'] = text.tostr(1024) di2['parse_mode'] = 'HTML' di['media'].append(di2) ind = ind + 1 for i in content['videoList']: if ind % MAX_ITEM_IN_MEDIA_GROUP == 0 and ind != 0: for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendMediaGroup', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] di['media'] = [] break else: re = self._request( 'sendMediaGroup', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] di['media'] = [] di3 = {} break sleep(5) di2 = {'type': 'video', 'supports_streaming': True} if not self._setting.downloadMediaFile: di2['media'] = i['src'] else: fileEntry = self._tempFileEntries.add(i['src']) if not fileEntry.ok: return None if self._setting.sendFileURLScheme: di2['media'] = fileEntry._localURI else: fileEntry.open() di2['media'] = f'attach://file{ind2}' di3[f'file{ind2}'] = (fileEntry._fullfn, fileEntry._f) ind2 = ind2 + 1 if 'poster' in i and i['poster'] is not None and i['poster'] != '': if not self._setting.downloadMediaFile: di2['thumb'] = i['poster'] else: fileEntry = self._tempFileEntries.add(i['poster']) if not fileEntry.ok: return None if self._setting.sendFileURLScheme: di2['thumb'] = fileEntry._localURI else: fileEntry.open() di2['thumb'] = f'attach://file{ind2}' di3[f'file{ind2}'] = ( fileEntry._fullfn, fileEntry._f) ind2 = ind2 + 1 if ind % MAX_ITEM_IN_MEDIA_GROUP == 0: di2['caption'] = text.tostr(1024) di2['parse_mode'] = 'HTML' if self._rssbotLib is not None: loc = self._tempFileEntries.get(i['src'])._abspath if self._setting.downloadMediaFile and self._tempFileEntries.get( i['src']) is not None else None addre = self._rssbotLib.addVideoInfo(i['src'], di2, loc) if addre == AddVideoInfoResult.IsHLS: continue di['media'].append(di2) ind = ind + 1 if len(di['media']) > 1: for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendMediaGroup', 'post', json=di) else: re = self._request( 'sendMediaGroup', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: break sleep(5) if len(di['media']) == 1: if 'caption' in di['media'][0]: di['caption'] = di['media'][0]['caption'] di['parse_mode'] = 'HTML' if di['media'][0]['type'] == 'photo': if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: di['photo'] = di['media'][0]['media'] else: mekey = di['media'][0]['media'][9:] di3['photo'] = di3[mekey] del di3[mekey] del di['media'] for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendPhoto', 'post', json=di) else: re = self._request( 'sendPhoto', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: break sleep(5) elif di['media'][0]['type'] == 'video': if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: di['video'] = di['media'][0]['media'] if 'thumb' in di['media'][0]: di['thumb'] = di['media'][0]['thumb'] else: mekey = di['media'][0]['media'][9:] di3['video'] = di3[mekey] del di3[mekey] if 'thumb' in di['media'][0]: mekey = di['media'][0]['thumb'][9:] di3['thumb'] = di3[mekey] del di3[mekey] if 'duration' in di['media'][0]: di['duration'] = di['media'][0]['duration'] if 'width' in di['media'][0]: di['width'] = di['media'][0]['width'] if 'height' in di['media'][0]: di['height'] = di['media'][0]['height'] di['supports_streaming'] = di['media'][0]['supports_streaming'] del di['media'] for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendVideo', 'post', json=di) else: re = self._request( 'sendVideo', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: break sleep(5) elif di['media'][0]['type'] == 'document': if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: di['document'] = di['media'][0]['media'] else: mekey = di['media'][0]['media'][9:] di3['document'] = di3[mekey] del di3[mekey] del di['media'] for _ in range(self._setting.maxRetryCount + 1): if not self._setting.downloadMediaFile or self._setting.sendFileURLScheme: re = self._request('sendDocument', 'post', json=di) else: re = self._request( 'sendDocument', 'post', json=di, files=di3) if re is not None and 'ok' in re and re['ok']: break sleep(5) if len(text) > 0: di = {} di['chat_id'] = chatId if config.disable_web_page_preview: di['disable_web_page_preview'] = True if re is not None and 'ok' in re and re['ok']: if isinstance(re['result'], list): di['reply_to_message_id'] = re['result'][0]['message_id'] else: di['reply_to_message_id'] = re['result']['message_id'] while len(text) > 0: di['text'] = text.tostr() di['parse_mode'] = 'HTML' for i in range(self._setting.maxRetryCount + 1): re = self._request('sendMessage', 'post', json=di) if re is not None and 'ok' in re and re['ok']: di['reply_to_message_id'] = re['result']['message_id'] break if i == self._setting.maxRetryCount: if returnError and re is not None and 'description' in re: return False, re['description'] elif returnError: return False, '' else: return False sleep(5) if re is not None and 'ok' in re and re['ok']: if returnError: return True, '' return True if returnError and re is not None and 'description' in re: return False, re['description'] elif returnError: return False, '' return False def _updateLoop(self): d = {'allowed_updates': ['message', 'edited_message', 'channel_post', 'edited_channel_post', 'callback_query']} if self._upi is not None: d['offset'] = self._upi ud = self._request('getUpdates', 'post', json=d) if ud is not None and 'ok' in ud and ud['ok']: for i in ud['result']: for key in ['message', 'edited_message', 'channel_post', 'edited_channel_post']: if key in i: m = messageHandle(self, i[key]) m.start() if 'callback_query' in i: m = callbackQueryHandle(self, i['callback_query']) m.start() self._upi = i['update_id'] + 1 def start(self): self._commandLine = commandline() if len(sys.argv) > 1: self._commandLine.parse(sys.argv[1:]) if not exists('settings.txt'): print('找不到settings.txt') return -1 self._setting = settings(self, self._commandLine._config) if self._setting.token is None: print('没有机器人token') return -1 self._telegramBotApiServer = self._setting.telegramBotApiServer self._db = database(self, self._setting.databaseLocation) if self._setting.miraiApiHTTPServer is not None: if self._setting.miraiApiHTTPAuthKey is None: print('未设置AuthKey。') return -1 if self._setting.miraiApiQQ is None: print('未设置QQ号。') return -1 self._mriaidb = MiraiDatabase(self, self._setting.databaseLocation) self._mriai = Mirai(self) self._r = Session() if self._telegramBotApiServer != 'https://api.telegram.org': self._request("logOut", "post", telegramBotApiServer="https://api.telegram.org") remove('Temp') self._tempFileEntries = FileEntries(self) self._me = self._request('getMe') self._rssMetaList = rssMetaList() print(self._me) if self._me is None or 'ok' not in self._me or not self._me['ok']: print('无法读取机器人信息') self._me = self._me['result'] self._rssbotLib = loadRSSBotLib(self) self._blackList = BlackList(self) self._blackList.checkRSSList() self._upi = None self._updateThread = updateThread(self) self._updateThread.start() self._rssCheckerThread = RSSCheckerThread(self) self._rssCheckerThread.start() class updateThread(Thread): def __init__(self, main: main): Thread.__init__(self) self._main = main def run(self): while True: self._main._updateLoop() class messageHandle(Thread): def __init__(self, main: main, data: dict): Thread.__init__(self) self._main = main self._data = data def __getBotCommand(self) -> str: for i in self._data['entities']: if i['type'] == 'bot_command': v = self._data['text'][i['offset']: i['offset'] + i['length']] founded = v.find('@') if founded == -1: return v return v[0:founded] return None def __getChatId(self) -> int: if 'chat' in self._data: return self._data['chat']['id'] return None def __getChatType(self) -> str: if 'chat' in self._data: return self._data['chat']['type'] return None def __getFromUserId(self) -> int: if 'from' in self._data: return self._data['from']['id'] return None def _getCommandlinePara(self) -> List[str]: s = self._data['text'] if 'entities' in self._data: for i in self._data['entities']: if i['type'] == 'bot_command': s = s[:i['offset']] + s[i['offset']+i['length']:] break l = s.split(' ') r = [] t = '' quote = False for i in l: if i != '': if quote: if i[-1] == '"': quote = False t = t + ' ' + i[:-1] r.append(t) t = '' else: t = t + ' ' + i else: if i[0] == '"': if i[-1] == '"': r.append(i[1:-1]) else: quote = True t = i[1:] else: r.append(i) if t != '': r.append(t) return r def run(self): print(self._data) for key in ["new_chat_members", "left_chat_member", "new_chat_title", "new_chat_photo", "delete_chat_photo", "pinned_message"]: if key in self._data: return if self._data['chat']['type'] == 'channel': return self._messageId = self._data['message_id'] self._chatId = self.__getChatId() if self._chatId is None: print('未知的chat id') return -1 self._fromUserId = self.__getFromUserId() if self._main._blackList.isInBlackList(self._chatId) and not self._main._setting.botOwnerList.isOwner(self._fromUserId): c = self.__getChatType() if c == 'private': c = '您' elif c in ['supergroup', 'group']: c = '该群组' elif c == 'channel': c = '该频道' di = {'chat_id': self._chatId, 'text': f'{c}已被封禁。'} self._main._request("sendMessage", 'post', json=di) return self._botCommand = None if 'text' in self._data: if 'entities' in self._data: self._botCommand = self.__getBotCommand() if self._fromUserId is not None: di = {'chat_id': self._chatId} if self.__getChatType() in ['supergroup', 'group'] and self._fromUserId is not None: di['reply_to_message_id'] = self._messageId if self._main._blackList.isInBlackList(self._fromUserId): di['text'] = '您已被封禁。' self._main._request("sendMessage", 'post', json=di) return self._userStatus, self._hashd = self._main._db.getUserStatus( self._fromUserId) if self._userStatus == userStatus.normalStatus: pass elif self._botCommand is not None and self._botCommand == '/cancle': if self._userStatus == userStatus.needInputChatId: di['text'] = '已取消输入群/频道ID。' self._main._db.setUserStatus( self._fromUserId, userStatus.normalStatus) self._main._request('sendMessage', 'post', json=di) return elif self._userStatus in [userStatus.needInputChatId]: metainfo = self._main._rssMetaList.getRSSMeta(self._hashd) if metainfo is None: self._main._db.setUserStatus( self._fromUserId, userStatus.normalStatus) di['text'] = '已过期。' self._main._request('sendMessage', 'post', json=di) return elif self._userStatus == userStatus.needInputChatId: para = self._getCommandlinePara() chatId = None for i in para: if search(r'^[\+-]?[0-9]+$', i) is not None: chatId = int(i) break if chatId is None: di['text'] = '找不到ID。' self._main._request('sendMessage', 'post', json=di) return di['text'] = '正在获取群/频道信息……' re = self._main._request('sendMessage', 'post', json=di) if re is not None and 'ok' in re and re['ok']: re = re['result'] di = {'chat_id': self._chatId, 'message_id': re['message_id']} re2 = self._main._request( 'getChat', 'post', {'chat_id': chatId}) if re2 is not None and 'ok' in re2 and re2['ok']: re2 = re2['result'] if re2['type'] == "private": di['text'] = '该ID是私聊。' self._main._request( 'editMessageText', 'post', json=di) return di['text'] = '正在获取群/频道管理员列表……' self._main._request( 'editMessageText', 'post', json=di) re3 = self._main._request( 'getChatAdministrators', 'post', {'chat_id': chatId}) if re3 is not None and 'ok' in re3 and re3['ok']: re3 = re3['result'] chatM = None for chatMember in re3: if chatMember['user']['id'] != self._fromUserId: continue if chatMember['status'] not in ['creator', 'administrator']: continue if re2['type'] == 'channel' and chatMember['status'] == 'administrator' and ('can_post_messages' not in chatMember or not chatMember['can_post_messages']): continue if re2['type'] == 'channel' and chatMember['status'] == 'administrator' and ('can_edit_messages' not in chatMember or not chatMember['can_edit_messages']): continue chatM = chatMember if chatM is None: di['text'] = '你没有权限操作。' self._main._request( 'editMessageText', 'post', json=di) return di['text'] = '正在确认机器人的权限……' self._main._request( 'editMessageText', 'post', json=di) re4 = self._main._request("getChatMember", "post", { "chat_id": chatId, "user_id": self._main._me['id']}) if re4 is not None and 'ok' in re4 and re4['ok']: re4 = re4['result'] if re2['type'] == 'channel' and (re4['status'] not in ['creator', 'administrator'] or not re4['can_post_messages'] or not re4['can_edit_messages']): di['text'] = '机器人在频道内缺少必要的权限' self._main._request( 'editMessageText', 'post', json=di) return if re2['type'] in ["group", "supergroup"]: if re4['status'] in ['creator', 'administrator']: pass elif 'permissions' in re2 and re2['permissions'] is not None and (not re2['permissions']['can_send_messages'] or not re2['permissions']['can_send_media_messages'] or not re2['permissions']['can_send_other_messages'] or not re2['permissions']['can_add_web_page_previews']): di['text'] = '机器人在群组内缺少必要的权限' self._main._request( 'editMessageText', 'post', json=di) return elif re4['status'] in ['left', 'kicked']: di['text'] = '机器人不在群组内' if re4['status'] == 'lefy' else '机器人已被踢' self._main._request( 'editMessageText', 'post', json=di) return elif re4['status'] == 'restricted' and (not re4['can_send_messages'] or not re4['can_send_media_messages'] or not re4['can_send_other_messages'] or not re4['can_add_web_page_previews']): di['text'] = '机器人在群组内缺少必要的权限' self._main._request( 'editMessageText', 'post', json=di) return else: di['text'] = '获取机器人权限失败!' self._main._request( 'editMessageText', 'post', json=di) return else: di['text'] = '获取群/频道信息失败!' self._main._request( 'editMessageText', 'post', json=di) return metainfo.meta['chatId'] = chatId metainfo.flushTime() di['text'] = '修改完成!' self._main._request('editMessageText', 'post', json=di) di = {'chat_id': metainfo.chatId, 'message_id': metainfo.messageId} di['text'] = getMediaInfo( metainfo.meta, metainfo.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS( self._hashd, metainfo.meta) self._main._request('editMessageText', 'post', json=di) self._main._db.setUserStatus( self._fromUserId, userStatus.normalStatus) return if self._botCommand is None and self._data['chat']['type'] in ['group', 'supergroup']: return if self._botCommand is None or self._botCommand not in ['/help', '/rss', '/rsslist', '/ban', '/banlist', '/unban']: self._botCommand = '/help' di = {'chat_id': self._chatId} if self.__getChatType() in ['supergroup', 'group'] and self._fromUserId is not None: di['reply_to_message_id'] = self._messageId if self._botCommand == '/help': di['text'] = '''/help 显示帮助 /rss url 订阅RSS /rsslist [chatId] 获取RSS订阅列表 /ban 封禁某用户 /banlist 查询被封禁列表 /unban 取消封禁某用户''' elif self._botCommand == '/rss': self._botCommandPara = self._getCommandlinePara() self._uri = None for i in self._botCommandPara: if i.find('://') > -1: self._uri = decodeURI(i) break if self._data['chat']['type'] != "private" and checkUserPermissionsInChat(self._main, self._data['chat']['id'], self._fromUserId) != UserPermissionsInChatCheckResult.OK: di['text'] = '你没有权限操作。' self._uri = None elif self._uri is None: di['text'] = '没有找到URL' else: di['text'] = '正在获取信息中……' elif self._botCommand == '/rsslist': self._needCheckUser = False self._botCommandPara = self._getCommandlinePara() targetChatId = self._chatId for i in self._botCommandPara: if search(r'^[\+-]?[0-9]+$', i) is not None: targetChatId = int(i) if targetChatId == self._chatId and self._data['chat']['type'] == 'private': try: rssList = self._main._db.getRSSListByChatId(self._chatId) di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForRSSList( self._chatId, rssList) except: di['text'] = '获取列表失败。' else: di['text'] = '正在确认操作者权限……' self._needCheckUser = True elif self._botCommand == '/banlist': if self._fromUserId is None or not self._main._setting.botOwnerList.isOwner(self._fromUserId): di['text'] = '❌你没有权限操作,请与Bot主人进行PY交易以获得权限。' else: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList()) elif self._botCommand in ['/ban', '/unban']: isban = self._botCommand == '/ban' words = '封禁' if isban else '取消封禁' if self._fromUserId is None or not self._main._setting.botOwnerList.isOwner(self._fromUserId): di['text'] = '❌你没有权限操作,请与Bot主人进行PY交易以获得权限。' else: ban_uid: int = None title: str = None if 'reply_to_message' in self._data and self._data['reply_to_message'] is not None: try: ban_uid = self._data['reply_to_message']['from']['id'] if self._data['reply_to_message']['from']['is_bot']: di['text'] = f'尝试{words}Bot,你的请求被滥权了。' self._main._request('sendMessage', 'post', json=di) return title = self._data['reply_to_message']['from']['first_name'] if self._data['reply_to_message']['from']['last_name'] is not None: title += f" {self._data['reply_to_message']['from']['last_name']}" if self._data['reply_to_message']['from']['username'] is not None: title += f" ({self._data['reply_to_message']['from']['username']})" except Exception: pass self._botCommandPara = self._getCommandlinePara() first_uid = False if ban_uid is None: first_uid = True if self._botCommandPara[0] in ['chat', 'channel', 'group']: try: ban_uid = self._data['chat']['id'] title = self._data['chat']['title'] except Exception: pass else: try: ban_uid = int(self._botCommandPara[0]) except Exception: pass if ban_uid is None: di['text'] = f'未找到要{words}的用户。' elif self._main._setting.botOwnerList.isOwner(ban_uid): di['text'] = f'尝试{words}Bot主人,你的请求被滥权了。' else: bl = self._main._blackList.getBlackList() ind = bl.find(ban_uid) if ind > -1: if isban: di['text'] = '该用户已被封禁。' self._main._request('sendMessage', 'post', json=di) return if bl[ind].from_config: di['text'] = '无法取消封禁来自配置文件的用户,请修改配置文件后重启bot。' self._main._request('sendMessage', 'post', json=di) return re = self._main._blackList.unban(bl[ind].uid) if title is None or title == '': title = str(ban_uid) link = '' if ban_uid < 0 else f'tg://user?id={ban_uid}' if re: di['text'] = f'取消封禁{title}成功!' else: di['text'] = f'取消封禁{title}失败!' di['parse_mode'] = 'HTML' else: if not isban: di['text'] = '该用户未被封禁' self._main._request('sendMessage', 'post', json=di) return reason = '' if first_uid and len(self._botCommandPara) > 1: reason = self._botCommandPara[1] elif not first_uid and len(self._botCommandPara) > 0: reason = self._botCommandPara[0] info = BlackInfo(ban_uid, self._fromUserId, int(time()), reason, title) re = self._main._blackList.ban(info) if title is None or title == '': title = str(ban_uid) link = '' if ban_uid < 0 else f'tg://user?id={ban_uid}' if re: di['text'] = f'封禁{title}成功!封禁理由:{reason}' else: di['text'] = f'封禁{title}失败!' di['parse_mode'] = 'HTML' re = self._main._request('sendMessage', 'post', json=di) if self._botCommand == '/rss' and self._uri is not None and re is not None and 'ok' in re and re['ok']: re = re['result'] chatId = re['chat']['id'] messageId = re['message_id'] di = {'chat_id': chatId, 'message_id': messageId} checked = False try: p = RSSParser() p.parse(self._uri) checked = p.check() except: pass if not checked: di['text'] = '获取信息失败!' else: media = p.m media['url'] = self._uri media['ttl'] = p.ttl media['userId'] = None if self._fromUserId: media['userId'] = self._fromUserId if self._data['chat']['type'] != "private" and checkUserPermissionsInChat(self._main, chatId, self._fromUserId) == UserPermissionsInChatCheckResult.OK: media['chatId'] = chatId self._hash = md5WithBase64( f'{self._uri},{self._messageId},{self._chatId}') di['text'] = getMediaInfo(media) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS( self._hash, media) self._main._rssMetaList.addRSSMeta(rssMetaInfo( re['message_id'], chatId, media, p.itemList, self._hash)) self._main._request('editMessageText', 'post', json=di) if self._botCommand == '/rsslist' and self._needCheckUser and re is not None and 'ok' in re and re['ok']: messageInfo = re['result'] messageChatId = messageInfo['chat']['id'] messageId = messageInfo['message_id'] checkResult = checkUserPermissionsInChat( self._main, targetChatId, self._fromUserId) di = {'chat_id': messageChatId, 'message_id': messageId} if checkResult == UserPermissionsInChatCheckResult.OK: rssList = self._main._db.getRSSListByChatId(targetChatId) di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForRSSList( targetChatId, rssList) elif checkResult == UserPermissionsInChatCheckResult.GetChatInfoError: di['text'] = '获取群/频道信息失败。' elif checkResult == UserPermissionsInChatCheckResult.PrivateChat: di['text'] = '该chat ID为私聊。' elif checkResult == UserPermissionsInChatCheckResult.GetChatAdministratorsError: di['text'] = '获取群/频道管理员列表失败。' elif checkResult == UserPermissionsInChatCheckResult.NoPermissions: di['text'] = '您没有权限进行操作。' self._main._request('editMessageText', 'post', json=di) class callbackQueryHandle(Thread): def __init__(self, main: main, data: dict): Thread.__init__(self) self._main = main self._data = data def answer(self, text: str = ''): di = {} di['callback_query_id'] = self._callbackQueryId di['text'] = text self._main._request('answerCallbackQuery', 'post', json=di) def run(self): self._callbackQueryId = self._data['id'] self._fromUserId = self._data['from']['id'] if self._main._blackList.isInBlackList(self._fromUserId): self.answer('您已被封禁。') return l = self._data['data'].split(',') self._inputList = l try: self._loc = int(l[0]) if self._loc == 0: if len(l) < 3: self.answer('错误的按钮数据。') return self._hashd = l[1] self._command = int(l[2]) except: self.answer('错误的按钮数据。') return if self._loc == 0: if 'message' not in self._data: self.answer('找不到信息。') return if self._data['message']['chat']['type'] != 'private': if checkUserPermissionsInChat(self._main, self._data['message']['chat']['id'], self._fromUserId) != UserPermissionsInChatCheckResult.OK: self.answer('您没有权限操作') return try: self._inlineKeyBoardCommand = InlineKeyBoardCallBack( self._command) except: self.answer('未知的按钮。') return self._rssMeta = self._main._rssMetaList.getRSSMeta(self._hashd) if self._rssMeta is None: self.answer('找不到数据。可能已经超时。') return self._rssMeta.flushTime() self._userId = None if 'userId' in self._rssMeta.meta and self._rssMeta.meta['userId'] is not None: self._userId = self._rssMeta.meta['userId'] if self._userId is not None and self._data['from']['id'] != self._userId: self.answer('你没有权限操作。') if self._inlineKeyBoardCommand == InlineKeyBoardCallBack.Subscribe: title = self._rssMeta.meta['title'] url = self._rssMeta.meta['url'] chatId = None if 'chatId' in self._rssMeta.meta and self._rssMeta.meta['chatId'] is not None: chatId = self._rssMeta.meta['chatId'] elif self._userId is not None: chatId = self._userId if chatId is None: self.answer('缺少发送的位置') return config = self._rssMeta.config ttl = self._rssMeta.meta['ttl'] if 'ttl' in self._rssMeta.meta else None hashEntries = HashEntries(self._main._setting.maxCount) tempList = self._rssMeta.itemList.copy() tempList.reverse() for v in tempList[-self._main._setting.maxCount:]: hashEntries.add(calHash(None, url, v)) suc = self._main._db.addRSSList( title, url, chatId, config, ttl, hashEntries) if suc: self.answer('订阅成功!') else: self.answer('订阅失败!') di = {'chat_id': self._rssMeta.chatId, 'message_id': self._rssMeta.messageId} di['text'] = getMediaInfo( self._rssMeta.meta, self._rssMeta.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.SendPriview: chatId = None if 'chatId' in self._rssMeta.meta and self._rssMeta.meta['chatId'] is not None: chatId = self._rssMeta.meta['chatId'] elif self._userId is not None: chatId = self._userId if chatId is None: self.answer('找不到可以发送的聊天。') return if len(self._rssMeta.itemList) == 0: self.answer('列表为空。') return ran = randrange(0, len(self._rssMeta.itemList)) suc, mes = self._main._sendMessage( chatId, self._rssMeta.meta, self._rssMeta.itemList[ran], self._rssMeta.config, True, True) if suc: self.answer(f'第{ran}条发送成功!') else: print(mes) self.answer(f'第{ran}条发送失败!{mes}') self._main._request("sendMessage", "post", { "chat_id": chatId, "text": f'第{ran}条发送失败!{mes}'}) return elif self._userId is not None and self._inlineKeyBoardCommand == InlineKeyBoardCallBack.ModifyChatId: self._main._db.setUserStatus( self._userId, userStatus.needInputChatId, self._hashd) di = {} if 'message' in self._data and self._data['message'] is not None: di['chat_id'] = self._data['message']['chat']['id'] else: di['chat_id'] = self._data['from']['id'] di["text"] = "请输入群/频道的ID(使用 /cancle 可以取消):" self._main._request("sendMessage", "post", json=di) self.answer() return elif self._userId is not None and self._inlineKeyBoardCommand == InlineKeyBoardCallBack.BackUserId: self._rssMeta.meta['chatId'] = None di = {'chat_id': self._rssMeta.chatId, 'message_id': self._rssMeta.messageId} di['text'] = getMediaInfo( self._rssMeta.meta, self._rssMeta.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS( self._hashd, self._rssMeta.meta) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.SettingsPage: di = {'chat_id': self._rssMeta.chatId, 'message_id': self._rssMeta.messageId} di['text'] = getMediaInfo( self._rssMeta.meta, self._rssMeta.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS2( self._hashd, self._rssMeta.config) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.BackToNormalPage: di = {'chat_id': self._rssMeta.chatId, 'message_id': self._rssMeta.messageId} di['text'] = getMediaInfo( self._rssMeta.meta, self._rssMeta.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS( self._hashd, self._rssMeta.meta) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardCommand in [InlineKeyBoardCallBack.DisableWebPagePreview, InlineKeyBoardCallBack.ShowRSSTitle, InlineKeyBoardCallBack.ShowContentTitle, InlineKeyBoardCallBack.ShowContent, InlineKeyBoardCallBack.SendMedia, InlineKeyBoardCallBack.DisplayEntryLink, InlineKeyBoardCallBack.SendImgAsFile]: if self._inlineKeyBoardCommand == InlineKeyBoardCallBack.DisableWebPagePreview: self._rssMeta.config.disable_web_page_preview = not self._rssMeta.config.disable_web_page_preview elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.ShowRSSTitle: self._rssMeta.config.show_RSS_title = not self._rssMeta.config.show_RSS_title elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.ShowContentTitle: self._rssMeta.config.show_Content_title = not self._rssMeta.config.show_Content_title elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.ShowContent: self._rssMeta.config.show_content = not self._rssMeta.config.show_content elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.SendMedia: self._rssMeta.config.send_media = not self._rssMeta.config.send_media elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.DisplayEntryLink: self._rssMeta.config.display_entry_link = not self._rssMeta.config.display_entry_link elif self._inlineKeyBoardCommand == InlineKeyBoardCallBack.SendImgAsFile: self._rssMeta.config.send_img_as_file = not self._rssMeta.config.send_img_as_file di = {'chat_id': self._rssMeta.chatId, 'message_id': self._rssMeta.messageId} di['text'] = getMediaInfo( self._rssMeta.meta, self._rssMeta.config) di['parse_mode'] = 'HTML' di['disable_web_page_preview'] = True di['reply_markup'] = getInlineKeyBoardWhenRSS2( self._hashd, self._rssMeta.config) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._loc == 1: chatId = int(self._inputList[1]) self._inlineKeyBoardForRSSListCommand = InlineKeyBoardForRSSList( int(self._inputList[2])) if 'message' not in self._data: self.answer('找不到信息。') return if self._data['message']['chat']['type'] != 'private': if checkUserPermissionsInChat(self._main, chatId, self._fromUserId) != UserPermissionsInChatCheckResult.OK: self.answer('您没有权限操作') return if self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.FirstPage: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} di['text'] = '列表如下:' rssList = self._main._db.getRSSListByChatId(chatId) di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.LastPage: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} di['text'] = '列表如下:' rssList = self._main._db.getRSSListByChatId(chatId) di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList, lastPage=True) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.PrevPage: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} pageNum = int(self._inputList[3]) di['text'] = '列表如下:' rssList = self._main._db.getRSSListByChatId(chatId) di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList, pageNum-1) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.NextPage: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} pageNum = int(self._inputList[3]) di['text'] = '列表如下:' rssList = self._main._db.getRSSListByChatId(chatId) di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList, pageNum+1) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.Close: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} self._main._request("deleteMessage", "post", json=di) return elif self._inlineKeyBoardForRSSListCommand in [InlineKeyBoardForRSSList.Content, InlineKeyBoardForRSSList.CancleUnsubscribe, InlineKeyBoardForRSSList.BackToContentPage]: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} ind = int(self._inputList[3]) ind = max(ind, 0) rssId = int(self._inputList[4]) rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return di['text'] = getTextContentForRSSInList( rss, self._main._setting) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForRSSInList( chatId, rss, ind, self._main._setting.botOwnerList.isOwner(self._fromUserId)) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.BackToList: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} rssList = self._main._db.getRSSListByChatId(chatId) ind = int(self._inputList[3]) di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList, itemIndex=ind) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.Unsubscribe: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} rssList = self._main._db.getRSSListByChatId(chatId) ind = int(self._inputList[3]) ind = max(ind, 0) rssId = int(self._inputList[4]) rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return di['text'] = getTextContentForRSSUnsubscribeInList(rss) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForRSSUnsubscribeInList( chatId, rss, ind) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.ConfirmUnsubscribe: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} rssList = self._main._db.getRSSListByChatId(chatId) ind = int(self._inputList[3]) rssId = int(self._inputList[4]) rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('取消订阅失败:找不到该RSS。') else: unsubscribed = self._main._db.removeItemInChatList( chatId, rss.id) if unsubscribed: self.answer('取消订阅成功。') ind = ind - 1 else: self.answer('取消订阅失败:数据库删除失败。') rssList = self._main._db.getRSSListByChatId(chatId) ind = max(min(ind, len(rssList)), 0) di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForRSSList( chatId, rssList, itemIndex=ind) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.SettingsPage: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} ind = int(self._inputList[3]) ind = max(ind, 0) rssId = int(self._inputList[4]) rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return di['text'] = getTextContentForRSSInList( rss, self._main._setting) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForRSSSettingsInList( chatId, rss, ind) self._main._request("editMessageText", "post", json=di) self.answer() return elif self._inlineKeyBoardForRSSListCommand in [InlineKeyBoardForRSSList.DisableWebPagePreview, InlineKeyBoardForRSSList.ShowRSSTitle, InlineKeyBoardForRSSList.ShowContentTitle, InlineKeyBoardForRSSList.ShowContent, InlineKeyBoardForRSSList.SendMedia, InlineKeyBoardForRSSList.DisplayEntryLink, InlineKeyBoardForRSSList.SendImgAsFile]: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} rssList = self._main._db.getRSSListByChatId(chatId) ind = int(self._inputList[3]) ind = max(ind, 0) rssId = int(self._inputList[4]) rssEntry = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rssEntry is None: self.answer('找不到该RSS。') return chatEntry: ChatEntry = rssEntry.chatList[0] config = chatEntry.config if self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.DisableWebPagePreview: config.disable_web_page_preview = not config.disable_web_page_preview elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.ShowRSSTitle: config.show_RSS_title = not config.show_RSS_title elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.ShowContentTitle: config.show_Content_title = not config.show_Content_title elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.ShowContent: config.show_content = not config.show_content elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.SendMedia: config.send_media = not config.send_media elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.DisplayEntryLink: config.display_entry_link = not config.display_entry_link elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.SendImgAsFile: config.send_img_as_file = not config.send_img_as_file updated = self._main._db.updateChatConfig(chatEntry) if updated: self.answer('修改设置成功') else: self.answer('修改设置失败') rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return di['text'] = getTextContentForRSSInList( rss, self._main._setting) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForRSSSettingsInList( chatId, rss, ind) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForRSSListCommand == InlineKeyBoardForRSSList.ForceUpdate: di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} ind = int(self._inputList[3]) ind = max(ind, 0) rssId = int(self._inputList[4]) rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return if self._main._db.setRSSForceUpdate(rss.id, True): self.answer('已发送强制更新请求。') else: self.answer('发送强制更新请求失败。') rss = self._main._db.getRSSByIdAndChatId(rssId, chatId) if rss is None: self.answer('找不到该RSS。') return di['text'] = getTextContentForRSSInList( rss, self._main._setting) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForRSSInList( chatId, rss, ind, self._main._setting.botOwnerList.isOwner(self._fromUserId)) self._main._request("editMessageText", "post", json=di) return elif self._loc == 2: if 'message' not in self._data: self.answer('找不到信息。') return if self._fromUserId is None or not self._main._setting.botOwnerList.isOwner(self._fromUserId): self.answer('❌你没有权限操作,请与Bot主人进行PY交易以获得权限。') return try: self._inlineKeyBoardForBlackListCommand = InlineKeyBoardForBlackList(int(self._inputList[1])) except Exception: self.answer('未知的按钮。') return di = {'chat_id': self._data['message']['chat']['id'], 'message_id': self._data['message']['message_id']} if self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.FirstPage: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList()) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.LastPage: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList(), lastPage=True) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.PrevPage: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList(), int(self._inputList[2]) - 1) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.NextPage: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList(), int(self._inputList[2]) + 1) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.Close: self._main._request("deleteMessage", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand in [InlineKeyBoardForBlackList.BlackInfo, InlineKeyBoardForBlackList.CancleUnban]: bl = self._main._blackList.getBlackList() ind = bl.find(int(self._inputList[3])) if ind == -1: self.answer('在黑名单里找不到该用户。') di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(bl, itemIndex=int(self._inputList[2])) self._main._request("editMessageText", "post", json=di) return else: di['text'] = getTextContentForBlackInfo(bl[ind]) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForBlackInfo(bl[ind], ind) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.BackToList: di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList(), itemIndex=int(self._inputList[2])) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.Unban: bl = self._main._blackList.getBlackList() ind = bl.find(int(self._inputList[3])) if ind == -1: self.answer('在黑名单里找不到该用户。') di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(bl, itemIndex=int(self._inputList[2])) self._main._request("editMessageText", "post", json=di) return else: if bl[ind].from_config: self.answer('无法取消封禁来自配置文件的用户,请修改配置文件后重启bot。') return di['text'] = getTextContentForUnbanBlackInfo(bl[ind]) di['parse_mode'] = 'HTML' di['reply_markup'] = getInlineKeyBoardForUnbanBlackInfo(bl[ind], ind) self._main._request("editMessageText", "post", json=di) return elif self._inlineKeyBoardForBlackListCommand == InlineKeyBoardForBlackList.ConfirmUnban: bl = self._main._blackList.getBlackList() ind = bl.find(int(self._inputList[3])) if ind == -1: self.answer('在黑名单里找不到该用户。') else: if bl[ind].from_config: self.answer('无法取消封禁来自配置文件的用户,请修改配置文件后重启bot。') return re = self._main._blackList.unban(bl[ind].uid) self.answer('取消封禁' + ('成功!' if re else '失败!')) di['text'] = '列表如下:' di['reply_markup'] = getInlineKeyBoardForBlackList(self._main._blackList.getBlackList(), itemIndex=int(self._inputList[2])) self._main._request("editMessageText", "post", json=di) return else: self.answer('未知的按钮。') return if __name__ == "__main__": m = main() m.start()