感谢 Chao 佬开源,请参阅原贴 OneDrive同域一键复制转存机器人已上线 查看机器人使用说明。由 Chao 佬授权,本贴将开放机器人源代码、描述较为详细的搭建教程,请从本贴附件下载源码、配置文件。我搭建的机器人使用自建 API 已通过表演域的审核: @CRODsaveBot 。
https://shields.io/badge/python-pip-yellow?logo=python&style=flat
搭建机器人需要会基本的Linux操作,我这里以 Ubuntu20 为例,其他系统请灵活变通。需要 python 环境,一般的 Linux 系统都自带。
1. 上传源码
新建bot文件夹并进入:
1 | mkdir bot && cd bot |
上传源码 main.py
、配置文件 config.json
至 bot
目录,需要修改源码第10行:
1 | bot_api = "" |
私信 BotFather 创建一个新的 bot 获取 token 填入双引号之中。
源码第18、19行为 Chao 佬原本 API 的 id 、 secret ,登陆 1ove 的账号无需再审核,若需使用自建 API 登陆 1ove 账号需向管理员提出申请。自建 API 你需要关注以下事项:
- 重定向 URI (redirect URL) :
http://localhost:53682
- AAD应用获得永久密码:https://www.qian.blue/archives/aad-forever-secret.html
2. 安装 python 依赖库
提示 command not found
请切换 python 版本或安装 pip3 。
python2:
1 | pip install requests pyTelegramBotAPI |
或 pyhon3 :
1 | apt update |
3. 运行 bot
安装 screen 来让 bot 实现后台运行:
1 | apt install screen |
新建一个 screen 会话,名称自定,以 bot 为例:
1 | screen -S bot |
启动bot:
1 | python main.py |
或python3:
1 | python3 main.py |
退出会话:快捷键 Ctrl + A + D
进入会话: screen -r bot
终止会话:pkill screen
最后给机器人发送命令 /start
得到回应即为搭建成功。
- 代码main.py import telebot
import time
from telebot import types
import os
import json
import requests
import re
import base64 bot_api = “”
bot = telebot.TeleBot(bot_api, parse_mode=None) redirect_uri = “http://localhost:53682“
scope = “offline_access user.read Files.Read Files.ReadWrite Files.Read.All Files.ReadWrite.All Sites.Read.All “
“Sites.ReadWrite.All “ client_id = “352ce6ec-85b0-4b0b-b1e2-fe6cadc23c64”
client_secret = “HLTSIBS5.9PK13m8pPs_gSk6_hN~4.oaNl”
login_url = f”https://login.microsoftonline.com/common/oauth2/v2.0/authorize?”
f”client_id={client_id}”
“&response_type=code”
f”&redirect_uri={redirect_uri}”
“&response_mode=query”
f”&scope={scope}” def small_copy(driveid, itemid, dir_drive, dir_item,refresh_token,call):
try:
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
}
except Exception as e:copy_url = f"<https://graph.microsoft.com/v1.0/drives/{driveid}/items/{itemid}/copy>" copy_body = { "parentReference": { "driveId": dir_drive, "id": dir_item } } try: html = requests.post(url=copy_url, headers=temp_headers, json=copy_body,timeout=10) except Exception as e: print(f"复制失败,跳过{e}") bot.edit_message_text(text=f"复制失败,跳过:`{e}`", chat_id=call.message.chat.id, parse_mode='Markdown', message_id=call.message.message_id) return # 大文件夹 if html.status_code == 413: print("文件超出大小,启用分步复制") bot.edit_message_text(text=f"文件超出大小,启用分步复制", chat_id=call.message.chat.id, message_id=call.message.message_id) # 获取文件夹名称 temp_json = old_get_info(driveid, itemid,refresh_token) print(temp_json) new_folder_name = temp_json['name'] print(new_folder_name) # 新建文件夹 temp_json = creat_folder(drive_id=dir_drive, parent_item_id=dir_item, name=new_folder_name,refresh_token=refresh_token) # 获取新建文件夹信息 temp_drive = temp_json['parentReference']['driveId'] temp_item = temp_json['id'] # 获取分享内容文件夹内子文件列表信息 share_list = get_folder_list(driveid=driveid, itemid=itemid,refresh_token=refresh_token) # print(share_list) # 遍历分享文件夹内容 for a in share_list: share_driveid = a['parentReference']['driveId'] share_id = a['id'] small_copy(driveid=share_driveid, itemid=share_id, dir_drive=temp_drive, dir_item=temp_item,refresh_token=refresh_token,call=call) return # print(f"{new_folder_name} 全部复制完成") else: temp_json = old_get_info(driveid, itemid,refresh_token) print(temp_json) folder_name = temp_json['name'] print(f"添加成功:{folder_name}") bot.edit_message_text(text=f"添加成功:`{folder_name}`", chat_id=call.message.chat.id, parse_mode='Markdown', message_id=call.message.message_id) return
def get_folder_list(driveid, itemid, refresh_token): access_token = gettoken(client_id, client_secret, refresh_token)try: print(f"复制失败,跳过{e}") bot.edit_message_text(text=f"复制失败,跳过:`{e}`", chat_id=call.message.chat.id, parse_mode='Markdown', message_id=call.message.message_id) except: return return
temp_headers = {
} html = requests.get(f’https://graph.microsoft.com/v1.0/drives/{driveid}/items/{itemid}/children‘,'Authorization': access_token, 'Content-Type': 'application/json'
print(f”获取子文件夹{itemid}”)headers=temp_headers,timeout=10)
print(html.json())
copy_list = []
while True:for a in html.json()['value']: copy_list.append(a) if "@odata.nextLink" in html.json(): print("该资源子文件超过200") new_url = html.json()['@odata.nextLink'] html = requests.get(url=new_url, headers=temp_headers,timeout=10) else: break
print(copy_list)
print(f”子文件数为:{len(copy_list)}”)
return copy_list def old_get_info(drive_id, item_id,refresh_token):
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
}
info_url = f”https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}“ html = requests.get(url=info_url, headers=temp_headers)
return html.json() def creat_folder(drive_id, parent_item_id, name, refresh_token):
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
}
creat_url = f”https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{parent_item_id}/children“
creat_body = {
“name”: name,
“folder”: {}
}
html = requests.post(url=creat_url, headers=temp_headers, json=creat_body,timeout=10)print(html.json())
return html.json() def start_copy(driveid, itemid,root_driveid,root_itemid,refresh_token,call): access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
}'Authorization': access_token, 'Content-Type': 'application/json'
copy_url = f”https://graph.microsoft.com/v1.0/drives/{driveid}/items/{itemid}/copy“
copy_body = {
}"parentReference": { "driveId": root_driveid, "id": root_itemid }
print(“调用复制”)
bot.edit_message_text(text=”调用复制”, chat_id=call.message.chat.id,
html = requests.post(url=copy_url, headers=temp_headers, json=copy_body,timeout=10)message_id=call.message.message_id)
大文件夹
if html.status_code == 413:print("文件超出大小,启用分步复制") bot.edit_message_text(text="文件超出大小,启用分步复制", chat_id=call.message.chat.id, message_id=call.message.message_id) # 获取文件夹名称 temp_json = old_get_info(driveid, itemid,refresh_token) print(temp_json) new_folder_name = temp_json['name'] print(new_folder_name) # 新建文件夹 temp_json = creat_folder(drive_id=root_driveid, parent_item_id=root_itemid, name=new_folder_name, refresh_token=refresh_token) # 获取新建文件夹信息 temp_drive = temp_json['parentReference']['driveId'] temp_item = temp_json['id'] # 获取分享内容文件夹内子文件列表信息 share_list = get_folder_list(driveid=driveid, itemid=itemid,refresh_token=refresh_token) # 遍历分享文件夹内容 for a in share_list: share_driveid = a['parentReference']['driveId'] share_id = a['id'] small_copy(driveid=share_driveid, itemid=share_id, dir_drive=temp_drive, dir_item=temp_item,refresh_token=refresh_token,call=call) print("添加云端复制完成") bot.edit_message_text(text="添加云端复制完成", chat_id=call.message.chat.id, message_id=call.message.message_id) return
print(f”{new_folder_name} 全部复制完成”)
else:
def gettoken(id, secret, refresh_token):temp_json =old_get_info(driveid, itemid,refresh_token) print(temp_json) folder_name = temp_json['name'] print(f"添加任务成功:{folder_name}") bot.edit_message_text(text=f"添加任务成功:`{folder_name}`", chat_id=call.message.chat.id,parse_mode='Markdown', message_id=call.message.message_id) time.sleep(1) bot.edit_message_text(text=f"所有任务完成", chat_id=call.message.chat.id, parse_mode='Markdown', message_id=call.message.message_id) return
headers = {‘Content-Type’: ‘application/x-www-form-urlencoded’
}
data = {‘grant_type’: ‘refresh_token’,
‘refresh_token’: refresh_token,
‘client_id’: id,
‘client_secret’: secret,
‘redirect_uri’: ‘http://localhost:53682/‘
}
html = requests.post(‘https://login.microsoftonline.com/common/oauth2/v2.0/token‘,
data=data, headers=headers,timeout=10)
jsontxt = json.loads(html.text)
refresh_token = jsontxt[‘refresh_token’]
access_token = jsontxt[‘access_token’]
return access_token def get_folder_par(drive_id, item_id, refresh_token):
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
}
info_url = f”https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}“ html = requests.get(url=info_url, headers=temp_headers,timeout=10)
return html.json() def my_file(refresh_token):
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
} html = requests.get(r’https://graph.microsoft.com/v1.0/me/drive/root/children‘, headers=temp_headers,timeout=10) return html.json() def get_config(user_id,od_name):
with open(“config.json”, “r”, encoding=’utf-8’) as jsonFile:
data = json.load(jsonFile)
jsonFile.close()
Telegram_list = data[‘Telegram list’]
for a in Telegram_list:
if str(a[‘id’]) == str(user_id):
for a in account_list:account_list = list(a["account list"])
return od_config def get_info(drive_id, item_id, refresh_token):if str(a['name']) == str(od_name): od_config = a
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {
‘Authorization’: access_token,
‘Content-Type’: ‘application/json’
}
info_url = f”https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/children“ html = requests.get(url=info_url, headers=temp_headers,timeout=10)
return html.json() def create_onedrive_directdownload(onedrive_link):
data_bytes64 = base64.b64encode(bytes(onedrive_link, ‘utf-8’))
data_bytes64_String = data_bytes64.decode(‘utf-8’).replace(‘/‘, ‘_’).replace(‘+’, ‘-‘).rstrip(“=”)
data_bytes64_String = “u!” + data_bytes64_Stringprint(data_bytes64_String)
return data_bytes64_String def visit_url(link,config,call):
refresh_token=config[‘refresh_token’]
print(refresh_token)
access_token = gettoken(client_id, client_secret, refresh_token)
temp_headers = {‘Authorization’: access_token,
‘Content-Type’: ‘application/json’,
‘Prefer’: “redeemSharingLink”} shareIdOrEncodedSharingUrl = create_onedrive_directdownload(onedrive_link=link)
print(shareIdOrEncodedSharingUrl)
html = requests.get(
print(html.json())rf'<https://graph.microsoft.com/v1.0/shares/{shareIdOrEncodedSharingUrl}/driveItem>', headers=temp_headers,timeout=10)
if “error” in str(html.json()):
else:print("获取分享链接失败") bot.edit_message_text(text="获取分享链接失败", chat_id=call.message.chat.id, message_id=call.message.message_id) return
@bot.callback_query_handler(func=lambda call: “copy” in str(call.data))print("获取分享信息成功") bot.edit_message_text(text="获取分享信息成功", chat_id=call.message.chat.id, message_id=call.message.message_id) driveid = html.json()['parentReference']['driveId'] itemid = html.json()['id'] try: temp=config['save_folder'] except: bot.edit_message_text(text="未设置保存文件夹", chat_id=call.message.chat.id, message_id=call.message.message_id) return start_copy(driveid=driveid, itemid=itemid, root_driveid=config['drive_id'],root_itemid=config['save_folder'], refresh_token=refresh_token,call=call)
def od_copy(call):
print(call.data)
od_name = str(call.data).split(“ “)[1]
print(od_name)
user_id = call.from_user.id
info_text = call.message.text url = re.findall(‘([a-zA-z]+://[^\s]*)’, str(info_text))[0]
print(url)
od_config = get_config(user_id, od_name)
visit_url(link=url,config=od_config,call=call) @bot.callback_query_handler(func=lambda call: “save” == str(call.data))
def choose_folder(call):
try:
user_id = call.from_user.id
except Exception as e:temp_list = str(call.message.text).split("\\n") od_name = temp_list[0] temp_drive_id = temp_list[1] temp_item_id = temp_list[2] with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() for a in data['Telegram list']: if str(a['id']) == str(user_id): account_list = list(a["account list"]) for a in account_list: if str(a['name']) == str(od_name): od_config = a od_config['drive_id'] = temp_drive_id od_config['save_folder'] = temp_item_id for a in range(len(account_list)): if str(account_list[a]['name']) == str(od_name): account_list[a] = od_config for a in range(len(data['Telegram list'])): if str(data['Telegram list'][a]['id']) == str(user_id): data['Telegram list'][a]['account list'] = account_list with open("config.json", "w") as jsonFile: json.dump(data, jsonFile, indent=4, ensure_ascii=False) jsonFile.close() print("写入完成") bot.edit_message_text("设定保存文件夹成功", call.message.chat.id, call.message.message_id) time.sleep(1) change_old_account(user_id=user_id,call=call.message)
@bot.callback_query_handler(func=lambda call: “back” == str(call.data))print(f"choose_folder {e}")
def back_page(call):
user_id = call.from_user.id temp_list = str(call.message.text).split(“\n”)
od_name = temp_list[0] temp_drive_id = temp_list[1]
temp_item_id = temp_list[2]
od_config = get_config(user_id, od_name)
info_list = get_folder_par(drive_id=temp_drive_id,
print(info_list)item_id=temp_item_id, refresh_token=od_config['refresh_token'])
drive_id = info_list[‘parentReference’][‘driveId’]
item_id = info_list[‘parentReference’][‘id’] folder_info = get_folder_par(drive_id=drive_id,
new_file_list = get_info(drive_id=drive_id,item_id=item_id, refresh_token=od_config['refresh_token'])
text = f”{od_name}\n{drive_id}\n{item_id}\n”item_id=item_id, refresh_token=od_config['refresh_token'])
file_list = []
num = 0
markup = types.InlineKeyboardMarkup()
button_list = []
for a in new_file_list[‘value’]:
markup.add(*button_list) if folder_info[‘name’] != “root”:if num == 10: break if 'folder' in a: file_list.append(a) # print(num+" - "+ a['name']+' ItemID:'+a['id']+' driveId:'+a['parentReference']['driveId']) # print(f"{num} - {a['name']}") button_list.append(types.InlineKeyboardButton(num, callback_data=f"folder {num}")) if (num + 1) % 4 == 5: markup.add(*button_list) button_list = [] text = text + f"{num} - {a['name']}\\n" num = num + 1
bot.edit_message_text(text, call.message.chat.id, call.message.message_id,markup.add(types.InlineKeyboardButton("上级文件夹", callback_data=f"back"), types.InlineKeyboardButton("设置为保存文件夹", callback_data=f"save"))
@bot.callback_query_handler(func=lambda call: “folder” in str(call.data))reply_markup=markup)
def show_folder(call):
user_id = call.from_user.id temp_list = str(call.message.text).split(“\n”)
od_name = temp_list[0]
par = temp_list[1] key = int(str(call.data).split(“ “)[1]) od_config=get_config(user_id,od_name)
if par == “root”:
else:info_list = my_file(refresh_token=od_config['refresh_token'])['value']
print(key)temp_drive_id = temp_list[1] temp_item_id = temp_list[2] info_list = get_info(drive_id=temp_drive_id, item_id=temp_item_id, refresh_token=od_config['refresh_token'])['value']
print(info_list)
print(info_list[key][‘parentReference’][‘driveId’])
print(info_list[key][‘id’])
drive_id = info_list[key][‘parentReference’][‘driveId’]
item_id = info_list[key][‘id’]
new_file_list = get_info(drive_id=drive_id,
print(new_file_list) text = f”{od_name}\n{drive_id}\n{item_id}\n”item_id = item_id, refresh_token=od_config['refresh_token'])
file_list=[]
num=0
markup = types.InlineKeyboardMarkup()
button_list = []
for a in new_file_list[‘value’]:
markup.add(*button_list)if num == 10: break if 'folder' in a: file_list.append(a) # print(num+" - "+ a['name']+' ItemID:'+a['id']+' driveId:'+a['parentReference']['driveId']) # print(f"{num} - {a['name']}") button_list.append(types.InlineKeyboardButton(num, callback_data=f"folder {num}")) if (num + 1) % 4 == 5: markup.add(*button_list) button_list = [] text = text + f"{num} - {a['name']}\\n" num = num + 1
print(text)
markup.add(types.InlineKeyboardButton(“上级文件夹”, callback_data=f”back”),
markup.add(types.InlineKeyboardButton(“账号列表”, callback_data=”account list”))types.InlineKeyboardButton("设置为保存文件夹", callback_data=f"save"))
bot.edit_message_text(text, call.message.chat.id, call.message.message_id,
@bot.callback_query_handler(func=lambda call: “onedrive” in str(call.data))reply_markup=markup)
def show_od(call):
print(call.data)
od_name = str(call.data).split(“ “)[1]
print(od_name)
user_id = call.from_user.id
with open(“config.json”, “r”, encoding=’utf-8’) as jsonFile:
data = json.load(jsonFile)
jsonFile.close()
Telegram_list = data[‘Telegram list’]
for a in Telegram_list:
if str(a[‘id’]) == str(user_id):
print(“找到用户信息”)
account_list = list(a[“account list”]) for a in account_list:
info_list = my_file(refresh_token=od_config[‘refresh_token’]) file_list = []if str(a['name']) == str(od_name): od_config = a
num = 0 text = f”{od_name}\nroot\n选择对应序号进入文件夹,根目录无法设置为保存文件夹\n” markup = types.InlineKeyboardMarkup()
button_list = []
print(info_list)
for a in info_list[‘value’]:
markup.add(*button_list)if num == 10: break if 'folder' in a: file_list.append(a) # print(num+" - "+ a['name']+' ItemID:'+a['id']+' driveId:'+a['parentReference']['driveId']) # print(f"{num} - {a['name']}") button_list.append(types.InlineKeyboardButton(num, callback_data=f"folder {num}")) if (num + 1) % 4 == 5: markup.add(*button_list) button_list = [] text = text + f"{num} - {a['name']}\\n" num = num + 1
print(text)
markup.add(types.InlineKeyboardButton(“账号列表”, callback_data=”account list”))
bot.edit_message_text(text, call.message.chat.id, call.message.message_id,
def check_id(info_list, telegram_id):reply_markup=markup)
num = 0
print(info_list, telegram_id)
for a in info_list:
if str(a[‘id’]) == str(telegram_id):
num = 1
if num == 0:
return False
else:
return True def add_onedrive_token(message, name, info):
print(message)
bot.delete_message(chat_id=message.chat.id, message_id=message.message_id)
user_id = message.from_user.id
result = str(message.text)
if result == “/cancel”:
change_old_account(call=info,user_id=user_id)
return
else:
def change_old_account(user_id,call): with open(“config.json”, “r”, encoding=’utf-8’) as jsonFile:code = re.findall(".*?code=(.*?)&", result, re.S)[0] data = { "client_id": client_id, "grant_type": "authorization_code", "redirect_uri": redirect_uri, "scope": scope, "client_secret": client_secret, "code": code } print("获取参数成功,开始登录") test_url = "<https://login.microsoftonline.com/common/oauth2/v2.0/token>" result = requests.post(url=test_url, data=data, timeout=10) if result.status_code == 200: result_json = result.json() print("开始写入配置") with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() for a in data['Telegram list']: if str(a['id']) == str(user_id): account_list = list(a["account list"]) config_data = {} config_data['name'] = name config_data['access_token'] = result_json['access_token'] config_data['refresh_token'] = result_json['refresh_token'] account_list.append(config_data) for a in range(len(data['Telegram list'])): if str(data['Telegram list'][a]['id']) == str(user_id): data['Telegram list'][a]['account list'] = account_list with open("config.json", "w") as jsonFile: json.dump(data, jsonFile, indent=4, ensure_ascii=False) jsonFile.close() print("登录成功") bot.edit_message_text(text="登录成功", chat_id=info.chat.id, message_id=int(info.message_id), parse_mode='Markdown') time.sleep(1) change_old_account(user_id=user_id,call=info) return else: bot.edit_message_text(text=f"登录失败\\n{result.text}", chat_id=info.chat.id, message_id=int(info.message_id), parse_mode='Markdown') return
Telegram_list = data[‘Telegram list’]data = json.load(jsonFile) jsonFile.close()
print(user_id)
if check_id(Telegram_list, user_id) == True:
def add_onedrive_name(message,call):print("用户存在") for a in Telegram_list: if str(a['id']) == str(user_id): print("找到用户信息") account_list = list(a["account list"]) if len(account_list) == 0: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`0`" markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("添加账号", callback_data="add_account")) bot.edit_message_text(text, call.chat.id, call.message_id, parse_mode='Markdown', reply_markup=markup) else: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`{len(account_list)}`" markup = types.InlineKeyboardMarkup() for a in account_list: markup.add(types.InlineKeyboardButton(a['name'], callback_data=f"onedrive {a['name']}")) markup.add(types.InlineKeyboardButton("添加账号", callback_data="add_account")) markup.add(types.InlineKeyboardButton("删除账号", callback_data="del_account")) bot.edit_message_text(text, call.chat.id, call.message_id, parse_mode='Markdown', reply_markup=markup)
keywords = str(message.text)
print(message)
user_id = message.from_user.id
bot.delete_message(chat_id=message.chat.id, message_id=message.message_id) if keywords == “/cancel”:
else:print("取消输入昵称") change_old_account(user_id=user_id,call=call) return
@bot.callback_query_handler(func=lambda call: “delod” in str(call.data))name = keywords print(f"备注名:{keywords}-{user_id}") text = f"你的ID:{user_id}\\n" \\ f"账号备注:{name}\\n" \\ f"请复制链接到浏览器打开登录:[链接地址]({login_url})\\n" \\ f"登录说明:登录成功后会显示**无法访问此网站**,此时复制网址发送给本Bot即可\\n" \\ f"或者输入 /cancel 取消添加" msg = bot.edit_message_text(text=text, chat_id=call.chat.id, message_id=int(call.message_id), parse_mode='Markdown') # info = bot.send_message(text=text, chat_id=call.from_user.id, parse_mode='Markdown') bot.register_next_step_handler(msg, add_onedrive_token, name, msg) return
def del_od(call):
try:
print(“删除用户”)
user_id = call.from_user.id
print(call.data)
temp_list = str(call.data).split(“ “)
od_name = temp_list[1]
except Exception as e:with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() for a in range(len(data['Telegram list'])): print(a) if str(data['Telegram list'][a]['id']) == str(user_id): account_list = list(data['Telegram list'][a]["account list"]) print(account_list) for a in range(len(account_list)): print(a) print(account_list[a]['name'],str(od_name)) if str(account_list[a]['name']) == str(od_name): print(account_list[a]) del account_list[a] break for a in range(len(data['Telegram list'])): if str(data['Telegram list'][a]['id']) == str(user_id): data['Telegram list'][a]['account list'] = account_list with open("config.json", "w") as jsonFile: json.dump(data, jsonFile, indent=4, ensure_ascii=False) jsonFile.close() print("删除成功") change_old_account(call=call.message, user_id=user_id)
@bot.callback_query_handler(func=lambda call: “del_account” == str(call.data))print(f"del_od:{e}")
def del_account_list(call):
print(call.message)
user_id = call.from_user.id
print(user_id) try:
except Exception as e:with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() Telegram_list = data['Telegram list'] for a in Telegram_list: if str(a['id']) == str(user_id): print("找到用户信息") account_list = list(a["account list"]) if len(account_list) == 0: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`0`\\n" \\ f"请选择删除的账号" markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("账号列表", callback_data="account list")) bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode='Markdown', reply_markup=markup) else: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`{len(account_list)}`\\n" \\ f"请选择删除的账号" markup = types.InlineKeyboardMarkup() for a in account_list: markup.add(types.InlineKeyboardButton(a['name'], callback_data=f"delod {a['name']}")) markup.add(types.InlineKeyboardButton("账号列表", callback_data="account list")) bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode='Markdown', reply_markup=markup)
@bot.callback_query_handler(func=lambda call: “add_account” == str(call.data))input(f"del_account:{e}")
def add_onedrive(call):
try:
text = “请输入该Onedrive账号的备注\n”
“或者输入 /cancel 取消添加\n”
“名称尽量简洁”
info = bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode=’Markdown’)
except Exception as e:bot.register_next_step_handler(info, add_onedrive_name,info) return
@bot.callback_query_handler(func=lambda call: “account list” == str(call.data))print(f'add_onedrive error {e}')
def account_list(call):
try:
print(call.message)
user_id = call.from_user.id
print(user_id)
except Exception as e:if os.path.isfile("config.json") == True: try: with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() Telegram_list = data['Telegram list'] if check_id(Telegram_list, user_id) == True: print("用户存在") for a in Telegram_list: if str(a['id']) == str(user_id): print("找到用户信息") account_list = list(a["account list"]) if len(account_list) == 0: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`0`" markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("添加账号", callback_data="add_account")) bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode='Markdown', reply_markup=markup) else: text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`{len(account_list)}`" markup = types.InlineKeyboardMarkup() for a in account_list: markup.add(types.InlineKeyboardButton(a['name'], callback_data=f"onedrive {a['name']}")) markup.add(types.InlineKeyboardButton("添加账号", callback_data="add_account")) markup.add(types.InlineKeyboardButton("删除账号", callback_data="del_account")) bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode='Markdown', reply_markup=markup) else: print("用户不存在") Telegram_list = list(data['Telegram list']) config_data = {} config_data['id'] = user_id config_data['account list'] = [] Telegram_list.append(config_data) data['Telegram list'] = Telegram_list with open("config.json", "w") as jsonFile: json.dump(data, jsonFile, indent=4, ensure_ascii=False) jsonFile.close() account_list = config_data['account list'] text = f"你的ID:`{user_id}`\\n" \\ f"已有账号数:`{len(account_list)}`" markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("添加账号", callback_data="add_account")) bot.edit_message_text(text, call.message.chat.id, call.message.message_id, parse_mode='Markdown', reply_markup=markup) except Exception as e: input(f"读取配置错误:{e}") else: print("未找到配置") with open("config.json", "a") as file: # 只需要将之前的”w"改为“a"即可,代表追加内容 file.close()
@bot.message_handler(commands=[‘start’], func=lambda message: (message.chat.type == “private”))print(e)
def start_info(message): try:
except Exception as e:text = "本Bot为Onedrive同域转存的工具,旨在方便广大群友转存\\n" \\ "注意:\\n" \\ "- 本Bot获取的权限可对你的onedrive文件进行一系列操作,介意请勿使用\\n" \\ "- 此Bot使用microsoft官方API,复制操作仅为向微软服务器提交复制请求,非vps搬运\\n" \\ "- 不支持带密码的分享链接\\n" \\ "- Bot提交时为单线程,速度较慢但避免滥用\\n" \\ "- 此Bot不支持世纪互联\\n\\n" \\ "使用方法:\\n" \\ "1 - 在账号列表添加账号\\n" \\ "2 - 进入账号设定保存文件夹\\n" \\ "3 - 向Bot发送分享链接,选择正确的账号\\n" \\ "4 - 等待提交任务完成" markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("账号列表", callback_data="account list")) bot.send_message(message.chat.id, text, disable_notification=True, reply_markup=markup)
@bot.message_handler(content_types=[‘text’],func=lambda message: “https” in str(message.text))print(f"start 错误:{e}") bot.send_message(message.chat.id, f"start 错误:`{e}`", parse_mode='Markdown')
def get_share_url(message): try:
except Exception as e:print(message) print(message.text) user_id = message.from_user.id if "https" in str(message.text): print("检测到链接") bot.delete_message(chat_id=message.chat.id,message_id=message.message_id) url = re.findall('([a-zA-z]+://[^\\s]*)', str(message.text))[0] print(url) text=f"检测到链接:{url}\\n" \\ f"请选择保存的od账号\\n" \\ f"**保存前请设定好保存文件夹**" with open("config.json", "r", encoding='utf-8') as jsonFile: data = json.load(jsonFile) jsonFile.close() Telegram_list = data['Telegram list'] for a in Telegram_list: if str(a['id']) == str(user_id): account_list = list(a["account list"]) markup = types.InlineKeyboardMarkup() for a in account_list: markup.add(types.InlineKeyboardButton(a['name'], callback_data=f"copy {a['name']}")) bot.send_message(text=text, chat_id=message.chat.id, reply_markup=markup)
if name == ‘main‘:print(e)
bot.enable_save_next_step_handlers(delay=2)Load next_step_handlers from save file (default “./.handlers-saves/step.save”)
WARNING It will work only if enable_save_next_step_handlers was called!
bot.load_next_step_handlers()
while True:try: print("启动机器人") bot.polling(none_stop=True, timeout=100) except Exception as e: print(f"启动失败{e}") log = open('log.txt', 'a') log.writelines("启动错误: {}\\n".format(e)) log.close() time.sleep(10)
- 代码config.json {
“Telegram list”: [ ] }