diff --git a/api/answer.py b/api/answer.py index 76acddad..119411fa 100644 --- a/api/answer.py +++ b/api/answer.py @@ -4,15 +4,18 @@ import json from api.logger import logger import random -from urllib3 import disable_warnings,exceptions +from urllib3 import disable_warnings, exceptions + # 关闭警告 disable_warnings(exceptions.InsecureRequestWarning) + class CacheDAO: """ @Author: SocialSisterYi @Reference: https://github.com/SocialSisterYi/xuexiaoyi-to-xuexitong-tampermonkey-proxy """ + def __init__(self, file: str = "cache.json"): self.cacheFile = Path(file) if not self.cacheFile.is_file(): @@ -35,8 +38,8 @@ def addCache(self, question: str, answer: str): class Tiku: CONFIG_PATH = "config.ini" # 默认配置文件路径 - DISABLE = False # 停用标志 - SUBMIT = False # 提交标志 + DISABLE = False # 停用标志 + SUBMIT = False # 提交标志 def __init__(self) -> None: self._name = None @@ -46,7 +49,7 @@ def __init__(self) -> None: @property def name(self): return self._name - + @name.setter def name(self, value): self._name = value @@ -54,7 +57,7 @@ def name(self, value): @property def api(self): return self._api - + @api.setter def api(self, value): self._api = value @@ -64,7 +67,7 @@ def token(self): return self._token @token.setter - def token(self,value): + def token(self, value): self._token = value def init_tiku(self): @@ -74,15 +77,15 @@ def init_tiku(self): self.config_set(self._get_conf()) if not self.DISABLE: # 设置提交模式 - self.SUBMIT = True if self._conf['submit'] == 'true' else False + self.SUBMIT = True if self._conf["submit"] == "true" else False # 调用自定义题库初始化 self._init_tiku() - + def _init_tiku(self): # 仅用于题库初始化, 例如配置token, 交由自定义题库完成 pass - def config_set(self,config): + def config_set(self, config): self._conf = config def _get_conf(self): @@ -92,38 +95,39 @@ def _get_conf(self): try: config = configparser.ConfigParser() config.read(self.CONFIG_PATH, encoding="utf8") - return config['tiku'] + return config["tiku"] except KeyError or FileNotFoundError: logger.info("未找到tiku配置, 已忽略题库功能") self.DISABLE = True return None - def query(self,q_info:dict): + def query(self, q_info: dict): if self.DISABLE: return None # 预处理, 去除【单选题】这样与标题无关的字段 # 此处需要改进!!! - logger.debug(f"原始标题:{q_info['title']}") - q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 - logger.debug(f"处理后标题:{q_info['title']}") + logger.debug(f"原始标题: {q_info['title']}") + q_info["title"] = q_info["title"][6:] # 暂时直接用裁切解决 + logger.debug(f"处理后标题: {q_info['title']}") # 先过缓存 cache_dao = CacheDAO() - answer = cache_dao.getCache(q_info['title']) + answer = cache_dao.getCache(q_info["title"]) if answer: - logger.info(f"从缓存中获取答案:{q_info['title']} -> {answer}") + logger.info(f"从缓存中获取答案: {q_info['title']} -> {answer}") return answer.strip() else: answer = self._query(q_info) if answer: answer = answer.strip() - cache_dao.addCache(q_info['title'], answer) - logger.info(f"从{self.name}获取答案:{q_info['title']} -> {answer}") + cache_dao.addCache(q_info["title"], answer) + logger.info(f"从{self.name}获取答案: {q_info['title']} -> {answer}") return answer - logger.error(f"从{self.name}获取答案失败:{q_info['title']}") + logger.error(f"从{self.name}获取答案失败: {q_info['title']}") return None - def _query(self,q_info:dict): + + def _query(self, q_info: dict): """ 查询接口, 交由自定义题库实现 """ @@ -139,7 +143,7 @@ def get_tiku_from_config(self): if self.DISABLE: return self try: - cls_name = self._conf['provider'] + cls_name = self._conf["provider"] if not cls_name: raise KeyError except KeyError: @@ -148,16 +152,16 @@ def get_tiku_from_config(self): new_cls = globals()[cls_name]() new_cls.config_set(self._conf) return new_cls - - def jugement_select(self,answer:str) -> bool: + + def jugement_select(self, answer: str) -> bool: """ 这是一个专用的方法, 要求配置维护两个选项列表, 一份用于正确选项, 一份用于错误选项, 以应对题库对判断题答案响应的各种可能的情况 它的作用是将获取到的答案answer与可能的选项列对比并返回对应的布尔值 """ if self.DISABLE: return False - true_list = self._conf['true_list'].split(',') - false_list = self._conf['false_list'].split(',') + true_list = self._conf["true_list"].split(",") + false_list = self._conf["false_list"].split(",") # 对响应的答案作处理 answer = answer.strip() if answer in true_list: @@ -166,9 +170,11 @@ def jugement_select(self,answer:str) -> bool: return False else: # 无法判断, 随机选择 - logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项') - return random.choice([True,False]) - + logger.error( + f"无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项" + ) + return random.choice([True, False]) + def get_submit_params(self): """ 这是一个专用方法, 用于根据当前设置的提交模式, 响应对应的答题提交API中的pyFlag值 @@ -179,97 +185,99 @@ def get_submit_params(self): else: return "1" + # 按照以下模板实现更多题库 + class TikuYanxi(Tiku): # 言溪题库实现 def __init__(self) -> None: super().__init__() - self.name = '言溪题库' - self.api = 'https://tk.enncy.cn/query' + self.name = "言溪题库" + self.api = "https://tk.enncy.cn/query" self._token = None - self._token_index = 0 # token队列计数器 - self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 + self._token_index = 0 # token队列计数器 + self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 - def _query(self,q_info:dict): + def _query(self, q_info: dict): res = requests.get( self.api, - params={ - 'question':q_info['title'], - 'token':self._token - }, - verify=False + params={"question": q_info["title"], "token": self._token}, + verify=False, ) if res.status_code == 200: res_json = res.json() - if not res_json['code']: + if not res_json["code"]: # 如果是因为TOKEN次数到期, 则更换token - if self._times == 0 or '次数不足' in res_json['data']['answer']: - logger.info(f'TOKEN查询次数不足, 将会更换并重新搜题') + if self._times == 0 or "次数不足" in res_json["data"]["answer"]: + logger.info("TOKEN查询次数不足, 将会更换并重新搜题") self._token_index += 1 self.load_token() # 重新查询 return self._query(q_info) - logger.error(f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}') + logger.error( + f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}' + ) return None - self._times = res_json["data"].get("times",self._times) - return res_json['data']['answer'].strip() + self._times = res_json["data"].get("times", self._times) + return res_json["data"]["answer"].strip() else: - logger.error(f'{self.name}查询失败:\n{res.text}') + logger.error(f"{self.name}查询失败:\n{res.text}") return None - - def load_token(self): - token_list = self._conf['tokens'].split(',') + + def load_token(self): + token_list = self._conf["tokens"].split(",") if self._token_index == len(token_list): # TOKEN 用完 - logger.error('TOKEN用完, 请自行更换再重启脚本') - raise Exception(f'{self.name} TOKEN 已用完, 请更换') + logger.error("TOKEN用完, 请自行更换再重启脚本") + raise Exception(f"{self.name} TOKEN 已用完, 请更换") self._token = token_list[self._token_index] def _init_tiku(self): self.load_token() + class TikuAdapter(Tiku): # TikuAdapter题库实现 https://github.com/DokiDoki1103/tikuAdapter def __init__(self) -> None: super().__init__() - self.name = 'TikuAdapter题库' - self.api = '' + self.name = "TikuAdapter题库" + self.api = "" def _query(self, q_info: dict): # 判断题目类型 - if q_info['type'] == "single": + if q_info["type"] == "single": type = 0 - elif q_info['type'] == 'multiple': + elif q_info["type"] == "multiple": type = 1 - elif q_info['type'] == 'completion': + elif q_info["type"] == "completion": type = 2 - elif q_info['type'] == 'judgement': + elif q_info["type"] == "judgement": type = 3 else: type = 4 - options = q_info['options'] + options = q_info["options"] res = requests.post( self.api, json={ - 'question': q_info['title'], - 'options': options.split('\n'), - 'type': type + "question": q_info["title"], + "options": options.split("\n"), + "type": type, }, - verify=False + verify=False, ) if res.status_code == 200: res_json = res.json() - if bool(res_json['plat']): - logger.error("查询失败, 返回:" + res.text) + if bool(res_json["plat"]): + logger.error("查询失败, 返回: " + res.text) return None sep = "\n" - return sep.join(res_json['answer']['allAnswer'][0]).strip() - # else: - logger.error(f'{self.name}查询失败:\n{res.text}') + return sep.join(res_json["answer"]["allAnswer"][0]).strip() + # else: # https://github.com/Samueli924/chaoxing/blame/3369cae6e55a44d6d284e17bccefb56d1606f5bb/api/answer.py#L269 + # logger.error(f"{self.name}查询失败:\n{res.text}") # Unreachable code return None def _init_tiku(self): # self.load_token() - self.api = self._conf['url'] + self.api = self._conf["url"] diff --git a/api/base.py b/api/base.py index dbb1586c..1d366b0f 100644 --- a/api/base.py +++ b/api/base.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from calendar import LocaleHTMLCalendar import re import time import random @@ -297,42 +298,55 @@ def study_document(self, _course, _job): def study_work(self, _course, _job, _job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码, 帮助修复#391错误 - def random_answer(options: str) -> str: - answer = "" + def match_answer_with_options(answer: str, options_list: list[str], q_type: str) -> str: + # pretreatment + # del ';' ';' ',' ',' ' ' '、' + # Prevents incorrect answers due to poor matches with the question bank. + # It would be better to maintain the return in terms of correctness, starting with this. + def delete_option_separator(options: str) -> str: + return re.sub(r"[;,,、\s]", "", options) + """统一的答案匹配处理函数""" + matched_answer = "" + if q_type == "multiple": + for ans in multi_cut(answer): + ans = delete_option_separator(ans) + ans = ans.strip().upper() + for opt in options_list: + opt = delete_option_separator(opt) + if ans == opt[:1].upper() or ans in opt.upper(): + matched_answer += opt[:1] + matched_answer = "".join(sorted(set(matched_answer))) # 去重并排序 + elif q_type == "judgement": + matched_answer = "true" if self.tiku.jugement_select(answer) else "false" + else: # single + for opt in options_list: + if answer.upper() in opt.upper(): + matched_answer = opt[:1] + break + return matched_answer + + def random_answer(options: str, q_type: str) -> str: + """改进的随机答案生成""" if not options: - return answer - - if q["type"] == "multiple": - logger.debug(f"当前选项列表[cut前] -> {options}") - _op_list = multi_cut(options) - logger.debug(f"当前选项列表[cut后] -> {_op_list}") - - if not _op_list: - logger.error( - "选项为空, 未能正确提取题目选项信息! 请反馈并提供以上信息" - ) - return answer - - for i in range( - random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] - ): # 此处表示随机多选答案几率:2个 10%, 3个 50%, 4个 40% - _choice = random.choice(_op_list) - _op_list.remove(_choice) - answer += _choice[:1] # 取首字为答案, 例如A或B - # 对答案进行排序, 否则会提交失败 - answer = "".join(sorted(answer)) - elif q["type"] == "single": - answer = random.choice(options.split("\n"))[ - :1 - ] # 取首字为答案, 例如A或B - # 判断题处理 - elif q["type"] == "judgement": - # answer = self.tiku.jugement_select(_answer) - answer = "true" if random.choice([True, False]) else "false" - logger.info(f"随机选择 -> {answer}") - return answer + return "" + + options_list = multi_cut(options) + if not options_list: + logger.error("选项为空,未能正确提取题目选项信息!") + return "" + + if q_type == "multiple": + # 权重调整:2选项(10%), 3选项(50%), 4选项(40%) + num_choices = random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] + num_choices = min(num_choices, len(options_list)) # 确保不超过可用选项数 + selected = random.sample(options_list, num_choices) + return "".join(sorted(opt[:1] for opt in selected)) + elif q_type == "single": + return random.choice(options_list)[:1] + elif q_type == "judgement": + return "true" if random.choice([True, False]) else "false" + return "" def multi_cut(answer: str) -> list[str]: """ @@ -353,6 +367,7 @@ def multi_cut(answer: str) -> list[str]: # 同时为了避免没有考虑到的 case, 应该先按照 '\n' 匹配, 匹配不到再按照其他字符匹配 cut_char = [ "\n", + ";", ",", ",", "|", @@ -374,10 +389,13 @@ def multi_cut(answer: str) -> list[str]: ] # 多选答案切割符 res = [] for char in cut_char: + # logger.debug(f"尝试使用字符 '{char}' 进行切割") # 输出的时候不要渲染 res = [ opt for opt in answer.split(char) if opt.strip() ] # Filter empty strings if len(res) > 1: + logger.debug(f"使用字符 '{char}' 切割成功") + logger.debug(f"切割后的选项列表为: {res}") return res logger.warning( f"未能从网页中提取题目信息, 以下为相关信息:\n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" @@ -385,7 +403,7 @@ def multi_cut(answer: str) -> list[str]: logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") return ["A", "B", "C", "D"] # 默认多选题为4个选项 - # 学习通这里根据参数差异能重定向至两个不同接口, 需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle + # 获取题目信息 _session = init_session() headers = { "Host": "mooc1.chaoxing.com", @@ -429,38 +447,23 @@ def multi_cut(answer: str) -> list[str]: _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码, 帮助修复#391错误 questions = decode_questions_info(_resp.text) # 加载题目信息 - # 搜题 + # 处理每个题目 for q in questions["questions"]: logger.debug(f"当前题目信息 -> {q}") + options_list = multi_cut(q["options"]) res = self.tiku.query(q) - answer = "" + if not res: - # 随机答题 - answer = random_answer(q["options"]) + answer = random_answer(q["options"], q["type"]) + logger.info(f"未找到答案,随机选择 -> {answer}") else: - # 根据响应结果选择答案 - options_list = multi_cut(q["options"]) - if q["type"] == "multiple": - # 多选处理 - for _a in multi_cut(res): - for o in options_list: - if ( - _a.upper() in o - ): # 题库返回的答案可能包含选项, 如A, B, C, 全部转成大写与学习通一致 - answer += o[:1] - # 对答案进行排序, 否则会提交失败 - answer = "".join(sorted(answer)) - elif q["type"] == "judgement": - answer = "true" if self.tiku.jugement_select(res) else "false" + answer = match_answer_with_options(res, options_list, q["type"]) + if not answer: + answer = random_answer(q["options"], q["type"]) + logger.info(f"答案匹配失败,随机选择 -> {answer}") else: - for o in options_list: - if res in o: - answer = o[:1] - break - # 如果未能匹配, 依然随机答题 - logger.info(f"找到答案但答案未能匹配 -> {res}\t随机选择答案") - answer = answer if answer else random_answer(q["options"]) - # 填充答案 + logger.info(f"找到匹配答案 -> {answer}") + q["answerField"][f'answer{q["id"]}'] = answer logger.info(f'{q["title"]} 填写答案为 {answer}') @@ -498,6 +501,7 @@ def multi_cut(answer: str) -> list[str]: "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", }, ) + # TODO 提交答案后查询分数和正确率 if res.status_code == 200: res_json = res.json() if res_json["status"]: diff --git a/api/cxsecret_font.py b/api/cxsecret_font.py index d66fb856..64a5210c 100644 --- a/api/cxsecret_font.py +++ b/api/cxsecret_font.py @@ -74,12 +74,12 @@ def decrypt(dststr_fontmap: Dict[str, str], dst_str: str) -> str: ori_str = "" for char in dst_str: if dstchar_hash := dststr_fontmap.get(f"uni{ord(char):X}"): - # 存在于“密钥”字体,解密 + # 存在于 "密钥" 字体, 解密 orichar_hash = fonthash_dao.find_char(dstchar_hash) if orichar_hash is not None: ori_str += chr(int(orichar_hash[3:], 16)) else: - # 不存在于“密钥”字体,直接复制 + # 不存在于 "密钥" 字体, 直接复制 ori_str += char # 替换解密后的康熙部首 ori_str = ori_str.translate(KX_RADICALS_TAB) diff --git a/api/decode.py b/api/decode.py index 06993d3c..2fbb0047 100644 --- a/api/decode.py +++ b/api/decode.py @@ -127,9 +127,9 @@ def decode_course_card(_text: str): # 不属于任务点的任务 if "job" not in _card or _card["job"] is False: if _card.get("type") and _card["type"] == "read": - # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 + # 发现有在视频任务下掺杂阅读任务, 不完成可能会导致无法开启下一章节 if _card["property"].get("read", False): - # 已阅读,跳过 + # 已阅读, 跳过 continue _job = {} _job["title"] = _card["property"]["title"] @@ -153,7 +153,7 @@ def decode_course_card(_text: str): try: _job["mid"] = _card["mid"] except KeyError: - logger.warning("出现转码失败视频,已跳过...") + logger.warning("出现转码失败视频, 已跳过...") continue _job["objectid"] = _card["objectId"] _job["aid"] = _card["aid"] diff --git a/config_template.ini b/config_template.ini index b5758e6d..91ccc4d7 100644 --- a/config_template.ini +++ b/config_template.ini @@ -10,6 +10,15 @@ course_list = xxx,xxx,xxx ; 视频播放倍速(默认1,最大2) speed = 1 + +; TODO +; index_point = 0 + +; 章节测验 +; 0: 不做 +; 1: 做 +chapter_test = 1 + [tiku] ; 可选项 : ; 1. TikuYanxi(言溪题库 https://tk.enncy.cn/) diff --git a/main.py b/main.py index baa69898..55c5a6e3 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import argparse import configparser import random +import traceback from api.logger import logger from api.base import Chaoxing, Account @@ -11,32 +12,31 @@ import time import sys import os +from dataclasses import dataclass +from typing import Optional, List -# # 定义全局变量, 用于存储配置文件路径 -# textPath = './resource/BookID.txt' +# 关闭警告 +disable_warnings(exceptions.InsecureRequestWarning) -# # 获取文本 -> 用于查看学习过的课程ID -# def getText(): -# try: -# if not os.path.exists(textPath): -# with open(textPath, 'x') as file: pass -# return [] -# with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') -# content = {int(item.strip()) for item in content if item.strip()} -# return list(content) -# except Exception as e: logger.error(f"获取文本失败: {e}"); return [] -# # 追加文本 -> 用于记录学习过的课程ID -# def appendText(text): -# if not os.path.exists(textPath): return -# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') +@dataclass +class Config: + username: str + password: str + course_list: Optional[List[str]] + speed: float + chapter_test: Optional[int] + tiku_config: Optional[dict] -# 关闭警告 -disable_warnings(exceptions.InsecureRequestWarning) +def parse_course_list(course_str: Optional[str]) -> Optional[List[str]]: + """解析课程列表字符串为列表""" + if not course_str: + return None + return [item.strip() for item in course_str.split(",") if item.strip()] -def init_config(): +def init_config() -> Config: parser = argparse.ArgumentParser( description="Samueli924/chaoxing", formatter_class=argparse.ArgumentDefaultsHelpFormatter, @@ -48,7 +48,7 @@ def init_config(): parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") parser.add_argument( - "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔" + "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔 (Optional)" ) parser.add_argument( "-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)" @@ -73,24 +73,22 @@ def init_config(): if args.config: config = configparser.ConfigParser() config.read(args.config, encoding="utf8") - return ( - config.get("common", "username"), - config.get("common", "password"), - ( - str(config.get("common", "course_list")).split(",") - if config.get("common", "course_list") - else None - ), - int(config.get("common", "speed")), - config["tiku"], + return Config( + username=config.get("common", "username"), + password=config.get("common", "password"), + course_list=parse_course_list(config.get("common", "course_list")), + speed=float(config.get("common", "speed")), + chapter_test=config.getint("common", "chapter_test", fallback=None), + tiku_config=config["tiku"], ) else: - return ( - args.username, - args.password, - args.list.split(",") if args.list else None, - int(args.speed) if args.speed else 1, - None, + return Config( + username=args.username, + password=args.password, + course_list=parse_course_list(args.list), + speed=float(args.speed) if args.speed else 1.0, + chapter_test=None, + tiku_config=None, ) @@ -115,16 +113,21 @@ def add_times(self, id: str) -> None: # 避免异常的无限回滚 RB = RollBackManager() # 初始化登录信息 - username, password, course_list, speed, tiku_config = init_config() + config = init_config() # 规范化播放速度的输入值 - speed = min(2.0, max(1.0, speed)) - if (not username) or (not password): - username = input("请输入你的手机号, 按回车确认\n手机号:") - password = input("请输入你的密码, 按回车确认\n密码:") - account = Account(username, password) + config.speed = min(2.0, max(1.0, config.speed)) + if (not config.username) or (not config.password): + config.username = input("请输入你的手机号, 按回车确认\n手机号:") + config.password = input("请输入你的密码, 按回车确认\n密码:") + logger.debug( + f"username: {config.username}, password: {config.password}, course_list: {config.course_list}, speed: {config.speed}" + ) + account = Account(config.username, config.password) # 设置题库 tiku = Tiku() - tiku.config_set(tiku_config) # 载入配置 + logger.debug(f"tiku_config: {config.tiku_config}") + # if config.tiku_config: + tiku.config_set(config.tiku_config) # 载入配置 tiku = tiku.get_tiku_from_config() # 载入题库 tiku.init_tiku() # 初始化题库 @@ -138,24 +141,26 @@ def add_times(self, id: str) -> None: all_course = chaoxing.get_course_list() course_task = [] # 手动输入要学习的课程ID列表 - if not course_list: + if not config.course_list: print("*" * 10 + "课程列表" + "*" * 10) for course in all_course: print(f"ID: {course['courseId']} 课程名: {course['title']}") print("*" * 28) try: - course_list = input( + config.course_list = input( "请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n" ).split(",") except Exception as e: raise FormatError("输入格式错误") from e # 筛选需要学习的课程 for course in all_course: - if course["courseId"] in course_list: + if course["courseId"] in config.course_list: course_task.append(course) if not course_task: course_task = all_course + # 开始遍历要学习的课程列表 + logger.debug(f"要学习的课程列表: {course_task}") logger.info(f"课程列表过滤完毕, 当前课程任务数量: {len(course_task)}") for course in course_task: logger.info(f"开始学习课程: {course['title']}") @@ -165,14 +170,17 @@ def add_times(self, id: str) -> None: ) # 为了支持课程任务回滚, 采用下标方式遍历任务点 - __point_index = 0 + logger.debug(f"当前课程子任务点共计: {len(point_list['points'])}") + + # __point_index = 0 + __point_index = 26 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') - logger.debug(f"当前章节 __point_index: {__point_index}") # 触发参数: -v + logger.debug(f"当前 __point_index: {__point_index}") sleep_duration = random.uniform(1, 3) - logger.debug(f"本次随机等待时间: {sleep_duration}") - time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep + logger.debug(f"本次随机等待时间: {sleep_duration:.1f}s") + time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机 sleep # 获取当前章节的所有任务点 jobs = [] job_info = None @@ -222,7 +230,11 @@ def add_times(self, id: str) -> None: isAudio = False try: chaoxing.study_video( - course, job, job_info, _speed=speed, _type="Video" + course, + job, + job_info, + _speed=config.speed, + _type="Video", ) except JSONDecodeError as e: logger.warning("当前任务非视频任务, 正在尝试音频任务解码") @@ -230,7 +242,11 @@ def add_times(self, id: str) -> None: if isAudio: try: chaoxing.study_video( - course, job, job_info, _speed=speed, _type="Audio" + course, + job, + job_info, + _speed=config.speed, + _type="Audio", ) except JSONDecodeError as e: logger.warning( @@ -244,6 +260,12 @@ def add_times(self, id: str) -> None: chaoxing.study_document(course, job) # 测验任务 elif job["type"] == "workid": + # 检测配置文件是否跳过测验任务 + if config.chapter_test == 0: + logger.info( + f"跳过章节测验任务, 任务章节: {course['title']}" + ) + continue logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") chaoxing.study_work(course, job, job_info) # 阅读任务 @@ -259,8 +281,6 @@ def add_times(self, id: str) -> None: else: raise except BaseException as e: - import traceback - logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) raise e