From a5110542b9245944ff0b2edb4720208d13104408 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Sun, 22 Dec 2024 22:27:20 +0800 Subject: [PATCH 1/9] feat: Add help, verbose options, and random sleep; fix multi_cut function (#405) - Added help and verbose options for better user control and logging. - Introduced random sleep between chapters for more realistic simulation. - Fixed multi_cut function to handle multiple-choice questions correctly. Fixes: https://github.com/Samueli924/chaoxing/issues/405 --- .gitignore | 4 +- api/answer.py | 2 + api/base.py | 346 ++++++++++++++++++++++++++++++------------------- api/config.py | 8 +- api/cookies.py | 6 +- api/process.py | 15 ++- main.py | 64 +++++++-- 7 files changed, 287 insertions(+), 158 deletions(-) diff --git a/.gitignore b/.gitignore index fea5fc26..9a6cee37 100644 --- a/.gitignore +++ b/.gitignore @@ -139,9 +139,11 @@ cookies.txt .config.ini config.ini chaoxing.log +config*.ini .chaoxing.log ./config.ini ./chaoxing.log ./cookies.txt .idea/ -cache.json +.vscode/ +cache.json \ No newline at end of file diff --git a/api/answer.py b/api/answer.py index 330f6439..15814ef3 100644 --- a/api/answer.py +++ b/api/answer.py @@ -104,7 +104,9 @@ def query(self,q_info:dict): # 预处理,去除【单选题】这样与标题无关的字段 # 此处需要改进!!! + logger.debug(f"原始标题:{q_info['title']}") q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 + logger.debug(f"处理后标题:{q_info['title']}") # 先过缓存 cache_dao = CacheDAO() diff --git a/api/base.py b/api/base.py index b942bbc9..af38e4d7 100644 --- a/api/base.py +++ b/api/base.py @@ -11,14 +11,16 @@ from api.cookies import save_cookies, use_cookies from api.process import show_progress from api.config import GlobalConst as gc -from api.decode import (decode_course_list, - decode_course_point, - decode_course_card, - decode_course_folder, - decode_questions_info - ) +from api.decode import ( + decode_course_list, + decode_course_point, + decode_course_card, + decode_course_folder, + decode_questions_info, +) from api.answer import * + def get_timestamp(): return str(int(time.time() * 1000)) @@ -30,8 +32,8 @@ def get_random_seconds(): def init_session(isVideo: bool = False, isAudio: bool = False): _session = requests.session() _session.verify = False - _session.mount('http://', HTTPAdapter(max_retries=3)) - _session.mount('https://', HTTPAdapter(max_retries=3)) + _session.mount("http://", HTTPAdapter(max_retries=3)) + _session.mount("https://", HTTPAdapter(max_retries=3)) if isVideo: _session.headers = gc.VIDEO_HEADERS elif isAudio: @@ -47,13 +49,14 @@ class Account: password = None last_login = None isSuccess = None + def __init__(self, _username, _password): self.username = _username self.password = _password class Chaoxing: - def __init__(self, account: Account = None,tiku:Tiku=None): + def __init__(self, account: Account = None, tiku: Tiku = None): self.account = account self.cipher = AESCipher() self.tiku = tiku @@ -62,16 +65,17 @@ def login(self): _session = requests.session() _session.verify = False _url = "https://passport2.chaoxing.com/fanyalogin" - _data = {"fid": "-1", - "uname": self.cipher.encrypt(self.account.username), - "password": self.cipher.encrypt(self.account.password), - "refer": "https%3A%2F%2Fi.chaoxing.com", - "t": True, - "forbidotherlogin": 0, - "validate": "", - "doubleFactorLogin": 0, - "independentId": 0 - } + _data = { + "fid": "-1", + "uname": self.cipher.encrypt(self.account.username), + "password": self.cipher.encrypt(self.account.password), + "refer": "https%3A%2F%2Fi.chaoxing.com", + "t": True, + "forbidotherlogin": 0, + "validate": "", + "doubleFactorLogin": 0, + "independentId": 0, + } logger.trace("正在尝试登录...") resp = _session.post(_url, headers=gc.HEADERS, data=_data) if resp and resp.json()["status"] == True: @@ -92,21 +96,16 @@ def get_uid(self): def get_course_list(self): _session = init_session() _url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" - _data = { - "courseType": 1, - "courseFolderId": 0, - "query": "", - "superstarClass": 0 - } + _data = {"courseType": 1, "courseFolderId": 0, "query": "", "superstarClass": 0} logger.trace("正在读取所有的课程列表...") # 接口突然抽风,增加headers _headers = { "Host": "mooc2-ans.chaoxing.com", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "text/html, */*; q=0.01", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "sec-ch-ua-mobile": "?0", "Origin": "https://mooc2-ans.chaoxing.com", @@ -114,9 +113,9 @@ def get_course_list(self): "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction?moocDomain=https://mooc1-1.chaoxing.com/mooc-ans", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", } - _resp = _session.post(_url,headers=_headers,data=_data) + _resp = _session.post(_url, headers=_headers, data=_data) # logger.trace(f"原始课程列表内容:\n{_resp.text}") logger.info("课程列表读取完毕...") course_list = decode_course_list(_resp.text) @@ -129,7 +128,7 @@ def get_course_list(self): "courseType": 1, "courseFolderId": folder["id"], "query": "", - "superstarClass": 0 + "superstarClass": 0, } _resp = _session.post(_url, data=_data) course_list += decode_course_list(_resp.text) @@ -148,12 +147,16 @@ def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): _session = init_session() job_list = [] job_info = {} - for _possible_num in ["0", "1","2"]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 + for _possible_num in [ + "0", + "1", + "2", + ]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 _url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={_clazzid}&courseid={_courseid}&knowledgeid={_knowledgeid}&num={_possible_num}&ut=s&cpi={_cpi}&v=20160407-3&mooc2=1" logger.trace("开始读取章节所有任务点...") _resp = _session.get(_url) _job_list, _job_info = decode_course_card(_resp.text) - if _job_info.get('notOpen',False): + if _job_info.get("notOpen", False): # 直接返回,节省一次请求 logger.info("该章节未开放") return [], _job_info @@ -167,39 +170,51 @@ def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): def get_enc(self, clazzId, jobid, objectId, playingTime, duration, userid): return md5( - f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" - .encode()).hexdigest() - - def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type: str = "Video"): - if "courseId" in _job['otherinfo']: + f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]".encode() + ).hexdigest() + + def video_progress_log( + self, + _session, + _course, + _job, + _job_info, + _dtoken, + _duration, + _playingTime, + _type: str = "Video", + ): + if "courseId" in _job["otherinfo"]: _mid_text = f"otherInfo={_job['otherinfo']}&" else: _mid_text = f"otherInfo={_job['otherinfo']}&courseId={_course['courseId']}&" _success = False for _possible_rt in ["0.9", "1"]: - _url = (f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" - f"{_course['cpi']}/" - f"{_dtoken}?" - f"clazzId={_course['clazzId']}&" - f"playingTime={_playingTime}&" - f"duration={_duration}&" - f"clipTime=0_{_duration}&" - f"objectId={_job['objectid']}&" - f"{_mid_text}" - f"jobid={_job['jobid']}&" - f"userid={self.get_uid()}&" - f"isdrag=3&" - f"view=pc&" - f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" - f"rt={_possible_rt}&" - f"dtype={_type}&" - f"_t={get_timestamp()}") + _url = ( + f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" + f"{_course['cpi']}/" + f"{_dtoken}?" + f"clazzId={_course['clazzId']}&" + f"playingTime={_playingTime}&" + f"duration={_duration}&" + f"clipTime=0_{_duration}&" + f"objectId={_job['objectid']}&" + f"{_mid_text}" + f"jobid={_job['jobid']}&" + f"userid={self.get_uid()}&" + f"isdrag=3&" + f"view=pc&" + f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" + f"rt={_possible_rt}&" + f"dtype={_type}&" + f"_t={get_timestamp()}" + ) resp = _session.get(_url) if resp.status_code == 200: _success = True - break # 如果返回为200正常,则跳出循环 + break # 如果返回为200正常,则跳出循环 elif resp.status_code == 403: - continue # 如果出现403无权限报错,则继续尝试不同的rt参数 + continue # 如果出现403无权限报错,则继续尝试不同的rt参数 if _success: return resp.json() else: @@ -207,7 +222,9 @@ def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _durat logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") return False - def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"): + def study_video( + self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video" + ): if _type == "Video": _session = init_session(isVideo=True) else: @@ -227,7 +244,16 @@ def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str while not _isFinished: if _isFinished: _playingTime = _duration - _isPassed = self.video_progress_log(_session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type) + _isPassed = self.video_progress_log( + _session, + _course, + _job, + _job_info, + _dtoken, + _duration, + _playingTime, + _type, + ) if not _isPassed or (_isPassed and _isPassed["isPassed"]): break _wait_time = get_random_seconds() @@ -235,7 +261,7 @@ def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str _wait_time = int(_duration) - _playingTime _isFinished = True # 播放进度条 - show_progress(_job['name'], _playingTime, _wait_time, _duration, _speed) + show_progress(_job["name"], _playingTime, _wait_time, _duration, _speed) _playingTime += _wait_time print("\r", end="", flush=True) logger.info(f"任务完成: {_job['name']}") @@ -245,164 +271,220 @@ def study_document(self, _course, _job): _url = f"https://mooc1.chaoxing.com/ananas/job/document?jobid={_job['jobid']}&knowledgeid={re.findall(r'nodeId_(.*?)-', _job['otherinfo'])[0]}&courseid={_course['courseId']}&clazzid={_course['clazzId']}&jtoken={_job['jtoken']}&_dc={get_timestamp()}" _resp = _session.get(_url) - def study_work(self, _course, _job,_job_info) -> None: + def study_work(self, _course, _job, _job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码,帮助修复#391错误 + _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码,帮助修复#391错误 - def random_answer(options:str) -> str: - answer = '' + def random_answer(options: str) -> str: + answer = "" if not options: return answer - - if q['type'] == "multiple": + + if q["type"] == "multiple": + logger.debug(f"当前选项列表[cut前] -> {options}") _op_list = multi_cut(options) - for i in range(random.choices([2,3,4],weights=[0.1,0.5,0.4],k=1)[0]): # 此处表示随机多选答案几率:2个 10%,3个 50% ,4个 40% + logger.debug(f"当前选项列表[cut后] -> {_op_list}") + + if not _op_list: + logger.error( + "选项为空, 未能正确提取题目选项信息! 请反馈并提供以上信息" + ) + return answer + + for i in range( + random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] + ): # 此处表示随机多选答案几率:2个 10%,3个 50% ,4个 40% _choice = random.choice(_op_list) _op_list.remove(_choice) - answer+=_choice[:1] # 取首字为答案,例如A或B + answer += _choice[:1] # 取首字为答案,例如A或B # 对答案进行排序,否则会提交失败 answer = "".join(sorted(answer)) - elif q['type'] == "single": - answer = random.choice(options.split('\n'))[:1] # 取首字为答案,例如A或B + elif q["type"] == "single": + answer = random.choice(options.split("\n"))[ + :1 + ] # 取首字为答案,例如A或B # 判断题处理 - elif q['type'] == "judgement": + elif q["type"] == "judgement": # answer = self.tiku.jugement_select(_answer) - answer = "true" if random.choice([True,False]) else "false" - logger.info(f'随机选择 -> {answer}') + answer = "true" if random.choice([True, False]) else "false" + logger.info(f"随机选择 -> {answer}") return answer - - def multi_cut(answer:str) -> list[str]: - cut_char = [',',',','|','\n','\r','\t','#','*','-','_','+','@','~','/','\\','.','&',' '] # 多选答案切割符 + + def multi_cut(answer: str) -> list[str]: + """ + 将多选题答案字符串按特定字符进行切割,并返回切割后的答案列表。 + + 参数: + answer (str): 多选题答案字符串。 + + 返回: + list[str]: 切割后的答案列表。如果无法切割,则返回默认的选项列表 ['A', 'B', 'C', 'D']。 + + 注意: + 如果无法从网页中提取题目信息,将记录警告日志并返回默认选项列表。 + """ + # cut_char = [',',',','|','\n','\r','\t','#','*','-','_','+','@','~','/','\\','.','&',' '] # 多选答案切割符 + # ',' 在常规被正确划分的, 选项中出现, 导致 multi_cut 无法正确划分选项 #391 + # IndexError: Cannot choose from an empty sequence #391 + # 同时为了避免没有考虑到的 case, 应该先按照 '\n' 匹配, 匹配不到再按照其他字符匹配 + cut_char = [ + "\n", + ",", + ",", + "|", + "\r", + "\t", + "#", + "*", + "-", + "_", + "+", + "@", + "~", + "/", + "\\", + ".", + "&", + " ", + "、", + ] # 多选答案切割符 res = [] for char in cut_char: - res = answer.split(char) - if len(res)>1: + res = [ + opt for opt in answer.split(char) if opt.strip() + ] # Filter empty strings + if len(res) > 1: return res - logger.warning(f"未能从网页中提取题目信息,以下为相关信息:\n{answer}\n\n{_ORIGIN_HTML_CONTENT}\n") # 尝试输出网页内容和选项信息 + logger.warning( + f"未能从网页中提取题目信息,以下为相关信息:\n{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" + ) # 尝试输出网页内容和选项信息 logger.warning("未能正确提取题目选项信息!请反馈并提供以上信息。") - return ['A','B','C','D'] # 默认多选题为4个选项 - + return ["A", "B", "C", "D"] # 默认多选题为4个选项 # 学习通这里根据参数差异能重定向至两个不同接口,需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle _session = init_session() - headers={ + headers = { "Host": "mooc1.chaoxing.com", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Dest": "iframe", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", } cookies = _session.cookies.get_dict() - - _url = "https://mooc1.chaoxing.com/mooc-ans/api/work" + _url = "https://mooc1.chaoxing.com/mooc-ans/api/work" _resp = requests.get( _url, headers=headers, cookies=cookies, verify=False, - params = { + params={ "api": "1", - "workId": _job['jobid'].replace("work-",""), - "jobid": _job['jobid'], - "originJobId": _job['jobid'], + "workId": _job["jobid"].replace("work-", ""), + "jobid": _job["jobid"], + "originJobId": _job["jobid"], "needRedirect": "true", "skipHeader": "true", - "knowledgeid": str(_job_info['knowledgeid']), - 'ktoken': _job_info['ktoken'], - "cpi": _job_info['cpi'], + "knowledgeid": str(_job_info["knowledgeid"]), + "ktoken": _job_info["ktoken"], + "cpi": _job_info["cpi"], "ut": "s", - "clazzId": _course['clazzId'], + "clazzId": _course["clazzId"], "type": "", - "enc": _job['enc'], + "enc": _job["enc"], "mooc2": "1", - "courseid": _course['courseId'] - } + "courseid": _course["courseId"], + }, ) - _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码,帮助修复#391错误 - questions = decode_questions_info(_resp.text) # 加载题目信息 + _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码,帮助修复#391错误 + questions = decode_questions_info(_resp.text) # 加载题目信息 # 搜题 - for q in questions['questions']: + for q in questions["questions"]: + logger.debug(f"当前题目信息 -> {q}") res = self.tiku.query(q) - answer = '' + answer = "" if not res: # 随机答题 - answer = random_answer(q['options']) + answer = random_answer(q["options"]) else: # 根据响应结果选择答案 - options_list = multi_cut(q['options']) - if q['type'] == "multiple": + options_list = multi_cut(q["options"]) + if q["type"] == "multiple": # 多选处理 for _a in multi_cut(res): for o in options_list: - if _a.upper() in o: # 题库返回的答案可能包含选项,如A,B,C,全部转成大写与学习通一致 + if ( + _a.upper() in o + ): # 题库返回的答案可能包含选项,如A,B,C,全部转成大写与学习通一致 answer += o[:1] # 对答案进行排序,否则会提交失败 answer = "".join(sorted(answer)) - elif q['type'] == 'judgement': - answer = 'true' if self.tiku.jugement_select(res) else 'false' + elif q["type"] == "judgement": + answer = "true" if self.tiku.jugement_select(res) else "false" else: for o in options_list: if res in o: answer = o[:1] break # 如果未能匹配,依然随机答题 - answer = answer if answer else random_answer(q['options']) + logger.info(f"找到答案但答案未能匹配 -> {res}\t随机选择答案") + answer = answer if answer else random_answer(q["options"]) # 填充答案 - q['answerField'][f'answer{q["id"]}'] = answer + q["answerField"][f'answer{q["id"]}'] = answer logger.info(f'{q["title"]} 填写答案为 {answer}') - + # 提交模式 现在与题库绑定 - questions['pyFlag'] = self.tiku.get_submit_params() + questions["pyFlag"] = self.tiku.get_submit_params() # 组建提交表单 for q in questions["questions"]: - questions.update({ - f'answer{q["id"]}':q['answerField'][f'answer{q["id"]}'], - f'answertype{q["id"]}':q['answerField'][f'answertype{q["id"]}'] - }) - + questions.update( + { + f'answer{q["id"]}': q["answerField"][f'answer{q["id"]}'], + f'answertype{q["id"]}': q["answerField"][f'answertype{q["id"]}'], + } + ) del questions["questions"] res = _session.post( - 'https://mooc1.chaoxing.com/mooc-ans/work/addStudentWorkNew', + "https://mooc1.chaoxing.com/mooc-ans/work/addStudentWorkNew", data=questions, - headers= { + headers={ "Host": "mooc1.chaoxing.com", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "application/json, text/javascript, */*; q=0.01", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "sec-ch-ua-mobile": "?0", "Origin": "https://mooc1.chaoxing.com", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", - #"Referer": "https://mooc1.chaoxing.com/mooc-ans/work/doHomeWorkNew?courseId=246831735&workAnswerId=52680423&workId=37778125&api=1&knowledgeid=913820156&classId=107515845&oldWorkId=07647c38d8de4c648a9277c5bed7075a&jobid=work-07647c38d8de4c648a9277c5bed7075a&type=&isphone=false&submit=false&enc=1d826aab06d44a1198fc983ed3d243b1&cpi=338350298&mooc2=1&skipHeader=true&originJobId=work-07647c38d8de4c648a9277c5bed7075a", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" - } + # "Referer": "https://mooc1.chaoxing.com/mooc-ans/work/doHomeWorkNew?courseId=246831735&workAnswerId=52680423&workId=37778125&api=1&knowledgeid=913820156&classId=107515845&oldWorkId=07647c38d8de4c648a9277c5bed7075a&jobid=work-07647c38d8de4c648a9277c5bed7075a&type=&isphone=false&submit=false&enc=1d826aab06d44a1198fc983ed3d243b1&cpi=338350298&mooc2=1&skipHeader=true&originJobId=work-07647c38d8de4c648a9277c5bed7075a", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", + }, ) if res.status_code == 200: res_json = res.json() - if res_json['status']: + if res_json["status"]: logger.info(f'提交答题成功 -> {res_json["msg"]}') else: logger.error(f'提交答题失败 -> {res_json["msg"]}') else: logger.error(f"提交答题失败 -> {res.text}") - def strdy_read(self, _course, _job,_job_info) -> None: + def strdy_read(self, _course, _job, _job_info) -> None: """ 阅读任务学习,仅完成任务点,并不增长时长 """ @@ -410,17 +492,15 @@ def strdy_read(self, _course, _job,_job_info) -> None: _resp = _session.get( url="https://mooc1.chaoxing.com/ananas/job/readv2", params={ - 'jobid': _job['jobid'], - 'knowledgeid':_job_info['knowledgeid'], - 'jtoken': _job['jtoken'], - 'courseid': _course['courseId'], - 'clazzid': _course['clazzId'] - } + "jobid": _job["jobid"], + "knowledgeid": _job_info["knowledgeid"], + "jtoken": _job["jtoken"], + "courseid": _course["courseId"], + "clazzid": _course["clazzId"], + }, ) if _resp.status_code != 200: logger.error(f"阅读任务学习失败 -> [{_resp.status_code}]{_resp.text}") else: _resp_json = _resp.json() logger.info(f"阅读任务学习 -> {_resp_json['msg']}") - - diff --git a/api/config.py b/api/config.py index bee24b85..bc0a9569 100644 --- a/api/config.py +++ b/api/config.py @@ -3,17 +3,17 @@ class GlobalConst: AESKey = "u2oh6Vu^HWe4_AES" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", - "Sec-Ch-Ua": '"Chromium";v="118", "Google Chrome";v="118", "Not=A?Brand";v="99"' + "Sec-Ch-Ua": '"Chromium";v="118", "Google Chrome";v="118", "Not=A?Brand";v="99"', } COOKIES_PATH = "cookies.txt" VIDEO_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/ananas/modules/video/index.html?v=2023-1110-1610", - "Host": "mooc1.chaoxing.com" + "Host": "mooc1.chaoxing.com", } AUDIO_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/ananas/modules/audio/index_new.html?v=2023-0428-1705", - "Host": "mooc1.chaoxing.com" + "Host": "mooc1.chaoxing.com", } - THRESHOLD = 3 \ No newline at end of file + THRESHOLD = 3 diff --git a/api/cookies.py b/api/cookies.py index 1158d8ec..ce99d990 100644 --- a/api/cookies.py +++ b/api/cookies.py @@ -5,12 +5,12 @@ def save_cookies(_session): - with open(gc.COOKIES_PATH, 'wb') as f: + with open(gc.COOKIES_PATH, "wb") as f: pickle.dump(_session.cookies, f) def use_cookies(): if os.path.exists(gc.COOKIES_PATH): - with open(gc.COOKIES_PATH, 'rb') as f: + with open(gc.COOKIES_PATH, "rb") as f: _cookies = pickle.load(f) - return _cookies \ No newline at end of file + return _cookies diff --git a/api/process.py b/api/process.py index 7e5d9cba..309e5745 100644 --- a/api/process.py +++ b/api/process.py @@ -1,15 +1,16 @@ import time from api.config import GlobalConst as gc + def sec2time(sec: int): h = int(sec / 3600) m = int(sec % 3600 / 60) s = int(sec % 60) if h != 0: - return f'{h}:{m:02}:{s:02}' + return f"{h}:{m:02}:{s:02}" if sec != 0: - return f'{m:02}:{s:02}' - return '--:--' + return f"{m:02}:{s:02}" + return "--:--" def show_progress(name: str, start: int, span: int, total: int, _speed: float): @@ -20,5 +21,9 @@ def show_progress(name: str, start: int, span: int, total: int, _speed: float): length = int(percent * 40 // 100) progress = ("#" * length).ljust(40, " ") # remain = (total - current) - print(f"\r当前任务: {name} |{progress}| {percent}% {sec2time(current)}/{sec2time(total)}", end="", flush=True) - time.sleep(gc.THRESHOLD) \ No newline at end of file + print( + f"\r当前任务: {name} |{progress}| {percent}% {sec2time(current)}/{sec2time(total)}", + end="", + flush=True, + ) + time.sleep(gc.THRESHOLD) diff --git a/main.py b/main.py index d97cbf17..0b5936b6 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,15 @@ # -*- coding: utf-8 -*- import argparse import configparser +import random + from api.logger import logger from api.base import Chaoxing, Account from api.exceptions import LoginError, FormatError, JSONDecodeError,MaxRollBackError from api.answer import Tiku from urllib3 import disable_warnings,exceptions +import time +import sys import os # # 定义全局变量,用于存储配置文件路径 @@ -32,25 +36,48 @@ disable_warnings(exceptions.InsecureRequestWarning) def init_config(): - parser = argparse.ArgumentParser(description='Samueli924/chaoxing') # 命令行传参 + parser = argparse.ArgumentParser( + description='Samueli924/chaoxing', + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("-c", "--config", type=str, default=None, help="使用配置文件运行程序") parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") - parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表") - parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") + parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔") + parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)") + parser.add_argument("-v", "--verbose", "--debug", action="store_true", help="启用调试模式, 输出DEBUG级别日志") + + + # 在解析之前捕获 -h 的行为 + if len(sys.argv) == 2 and sys.argv[1] in {"-h", "--help"}: + parser.print_help() + # 返回一个 SystemExit 异常, 用于退出程序 + raise SystemExit + + # 提前检查 -h 和 --help 并退出 args = parser.parse_args() + if args.config: config = configparser.ConfigParser() config.read(args.config, encoding="utf8") - return (config.get("common", "username"), - config.get("common", "password"), - str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, - int(config.get("common", "speed")), - config['tiku'] - ) + return ( + config.get("common", "username"), + config.get("common", "password"), + str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, + int(config.get("common", "speed")), + config['tiku'] + ) else: - return (args.username, args.password, args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1,None) + return ( + args.username, + args.password, + args.list.split(",") if args.list else None, + int(args.speed) if args.speed else 1, + None + ) + class RollBackManager: def __init__(self) -> None: self.rollback_times = 0 @@ -84,6 +111,7 @@ def add_times(self,id:str) -> None: tiku.config_set(tiku_config) # 载入配置 tiku = tiku.get_tiku_from_config() # 载入题库 tiku.init_tiku() # 初始化题库 + # 实例化超星API chaoxing = Chaoxing(account=account,tiku=tiku) # 检查当前登录状态,并检查账号密码 @@ -117,10 +145,16 @@ def add_times(self,id:str) -> None: point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) # 为了支持课程任务回滚,采用下标方式遍历任务点 - __point_index = 0 + # __point_index = 0 + # __point_index = 26 # 当前章节: 13.3 《水浒传》是一部... + __point_index = 26 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') + logger.debug(f'当前章节 __point_index: {__point_index}') # 触发参数: -v + sleep_duration = random.uniform(1, 3) + logger.debug(f"本次随机等待时间: {sleep_duration}") + time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep # 获取当前章节的所有任务点 jobs = [] job_info = None @@ -160,7 +194,7 @@ def add_times(self,id:str) -> None: # break # 如果已经学习过该课程,则跳过 # appendText(bookID) # 记录正在学习的课程ID - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") # 超星的接口没有返回当前任务是否为Audio音频任务 isAudio = False try: @@ -187,6 +221,12 @@ def add_times(self,id:str) -> None: chaoxing.strdy_read(course, job,job_info) __point_index += 1 logger.info("所有课程学习任务已完成") + + except SystemExit as e: + if e.code == 0: # 正常退出 + sys.exit(0) + else: + raise except BaseException as e: import traceback logger.error(f"错误: {type(e).__name__}: {e}") From fbafb6e03a16a5424a7e174750e74e49df9966c9 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Mon, 23 Dec 2024 23:37:15 +0800 Subject: [PATCH 2/9] Reset task point index --- main.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/main.py b/main.py index 0b5936b6..f74b5a15 100644 --- a/main.py +++ b/main.py @@ -145,9 +145,7 @@ def add_times(self,id:str) -> None: point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) # 为了支持课程任务回滚,采用下标方式遍历任务点 - # __point_index = 0 - # __point_index = 26 # 当前章节: 13.3 《水浒传》是一部... - __point_index = 26 + __point_index = 0 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') From 0d11aab0865e07177c116b9a22ee940612534cc0 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Tue, 24 Dec 2024 01:27:10 +0800 Subject: [PATCH 3/9] chore: Format code, improve log readability, and handle non-ASCII characters --- api/answer.py | 2 +- api/base.py | 27 ++++++- api/cipher.py | 10 +-- api/cxsecret_font.py | 3 +- api/decode.py | 164 ++++++++++++++++++++++++------------------- api/exceptions.py | 2 +- api/font_decoder.py | 15 ++-- api/logger.py | 2 +- app.py | 4 +- main.py | 124 ++++++++++++++++++++------------ 10 files changed, 215 insertions(+), 138 deletions(-) diff --git a/api/answer.py b/api/answer.py index 15814ef3..04e93192 100644 --- a/api/answer.py +++ b/api/answer.py @@ -210,7 +210,7 @@ def _query(self,q_info:dict): self.load_token() # 重新查询 return self._query(q_info) - logger.error(f'{self.name}查询失败:\n剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n消息:{res_json["message"]}') + logger.error(f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}') return None self._times = res_json["data"].get("times",self._times) return res_json['data']['answer'].strip() diff --git a/api/base.py b/api/base.py index af38e4d7..924550b4 100644 --- a/api/base.py +++ b/api/base.py @@ -267,6 +267,29 @@ def study_video( logger.info(f"任务完成: {_job['name']}") def study_document(self, _course, _job): + """ + Study a document in Chaoxing platform. + + This method makes a GET request to fetch document information for a given course and job. + + Args: + _course (dict): Dictionary containing course information with keys: + - courseId: ID of the course + - clazzId: ID of the class + _job (dict): Dictionary containing job information with keys: + - jobid: ID of the job + - otherinfo: String containing node information + - jtoken: Authentication token for the job + + Returns: + requests.Response: Response object from the GET request + + Note: + This method requires the following helper functions: + - init_session(): To initialize a new session + - get_timestamp(): To get current timestamp + - re module for regular expression matching + """ _session = init_session() _url = f"https://mooc1.chaoxing.com/ananas/job/document?jobid={_job['jobid']}&knowledgeid={re.findall(r'nodeId_(.*?)-', _job['otherinfo'])[0]}&courseid={_course['courseId']}&clazzid={_course['clazzId']}&jtoken={_job['jtoken']}&_dc={get_timestamp()}" _resp = _session.get(_url) @@ -357,9 +380,9 @@ def multi_cut(answer: str) -> list[str]: if len(res) > 1: return res logger.warning( - f"未能从网页中提取题目信息,以下为相关信息:\n{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" + f"未能从网页中提取题目信息, 以下为相关信息:\n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" ) # 尝试输出网页内容和选项信息 - logger.warning("未能正确提取题目选项信息!请反馈并提供以上信息。") + logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") return ["A", "B", "C", "D"] # 默认多选题为4个选项 # 学习通这里根据参数差异能重定向至两个不同接口,需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle diff --git a/api/cipher.py b/api/cipher.py index e06e7cdd..efe31acc 100644 --- a/api/cipher.py +++ b/api/cipher.py @@ -6,7 +6,7 @@ def pkcs7_unpadding(string): - return string[0:-ord(string[-1])] + return string[0 : -ord(string[-1])] def pkcs7_padding(s, block_size=16): @@ -29,15 +29,15 @@ def split_to_data_blocks(byte_str, block_size=16): return blocks -class AESCipher(): +class AESCipher: def __init__(self): self.key = str(gc.AESKey).encode("utf8") self.iv = str(gc.AESKey).encode("utf8") def encrypt(self, plaintext: str): - ciphertext = b'' + ciphertext = b"" cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) - plaintext = plaintext.encode('utf-8') + plaintext = plaintext.encode("utf-8") blocks = split_to_data_blocks(pkcs7_padding(plaintext)) for b in blocks: ciphertext = ciphertext + cbc.encrypt(b) @@ -51,4 +51,4 @@ def encrypt(self, plaintext: str): # ptext = b"" # for b in split_to_data_blocks(ciphertext): # ptext = ptext + cbc.decrypt(b) - # return pkcs7_unpadding(ptext.decode()) \ No newline at end of file + # return pkcs7_unpadding(ptext.decode()) diff --git a/api/cxsecret_font.py b/api/cxsecret_font.py index f44fb5b7..d66fb856 100644 --- a/api/cxsecret_font.py +++ b/api/cxsecret_font.py @@ -1,7 +1,7 @@ ## # @Author: SocialSisterYi # @Reference: https://github.com/SocialSisterYi/xuexiaoyi-to-xuexitong-tampermonkey-proxy -# +# import base64 import hashlib @@ -24,6 +24,7 @@ class FontHashDAO: """原始字体hashmap DAO""" + char_map: Dict[str, str] # unicode -> hsah hash_map: Dict[str, str] # hash -> unicode diff --git a/api/decode.py b/api/decode.py index 7fe74639..06993d3c 100644 --- a/api/decode.py +++ b/api/decode.py @@ -5,30 +5,42 @@ from api.logger import logger from api.font_decoder import FontDecoder + def decode_course_list(_text): logger.trace("开始解码课程列表...") _soup = BeautifulSoup(_text, "lxml") _raw_courses = _soup.select("div.course") _course_list = list() for course in _raw_courses: - if not course.select_one("a.not-open-tip") and not course.select_one("div.not-open-tip"): + if not course.select_one("a.not-open-tip") and not course.select_one( + "div.not-open-tip" + ): _course_detail = {} _course_detail["id"] = course.attrs["id"] _course_detail["info"] = course.attrs["info"] _course_detail["roleid"] = course.attrs["roleid"] - _course_detail["clazzId"] = course.select_one("input.clazzId").attrs["value"] - _course_detail["courseId"] = course.select_one("input.courseId").attrs["value"] - _course_detail["cpi"] = re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0] - _course_detail["title"] = course.select_one("span.course-name").attrs["title"] + _course_detail["clazzId"] = course.select_one("input.clazzId").attrs[ + "value" + ] + _course_detail["courseId"] = course.select_one("input.courseId").attrs[ + "value" + ] + _course_detail["cpi"] = re.findall( + r"cpi=(.*?)&", course.select_one("a").attrs["href"] + )[0] + _course_detail["title"] = course.select_one("span.course-name").attrs[ + "title" + ] if course.select_one("p.margint10") is None: - _course_detail["desc"] = '' + _course_detail["desc"] = "" else: _course_detail["desc"] = course.select_one("p.margint10").attrs["title"] _course_detail["teacher"] = course.select_one("p.color3").attrs["title"] _course_list.append(_course_detail) return _course_list + def decode_course_folder(_text): logger.trace("开始解码二级课程列表...") _soup = BeautifulSoup(_text, "lxml") @@ -38,39 +50,45 @@ def decode_course_folder(_text): if course.attrs["fileid"]: _course_folder_detail = {} _course_folder_detail["id"] = course.attrs["fileid"] - _course_folder_detail["rename"] = course.select_one("input.rename-input").attrs["value"] + _course_folder_detail["rename"] = course.select_one( + "input.rename-input" + ).attrs["value"] _course_folder_list.append(_course_folder_detail) return _course_folder_list + def decode_course_point(_text): logger.trace("开始解码章节列表...") _soup = BeautifulSoup(_text, "lxml") _course_point = { - "hasLocked": False, # 用于判断该课程任务是否是需要解锁 - "points": [] + "hasLocked": False, # 用于判断该课程任务是否是需要解锁 + "points": [], } - - - for _chapter_unit in _soup.find_all("div",class_="chapter_unit") : + + for _chapter_unit in _soup.find_all("div", class_="chapter_unit"): _point_list = [] _raw_points = _chapter_unit.find_all("li") for _point in _raw_points: _point = _point.div - if (not "id" in _point.attrs): + if not "id" in _point.attrs: continue _point_detail = {} _point_detail["id"] = re.findall(r"^cur(\d{1,20})$", _point.attrs["id"])[0] - _point_detail["title"] = _point.select_one("a.clicktitle").text.replace("\n",'').strip(' ') - _point_detail["jobCount"] = 1 # 默认为1 + _point_detail["title"] = ( + _point.select_one("a.clicktitle").text.replace("\n", "").strip(" ") + ) + _point_detail["jobCount"] = 1 # 默认为1 if _point.select_one("input.knowledgeJobCount"): - _point_detail["jobCount"] = _point.select_one("input.knowledgeJobCount").attrs["value"] + _point_detail["jobCount"] = _point.select_one( + "input.knowledgeJobCount" + ).attrs["value"] else: # 判断是不是因为需要解锁 - if '解锁' in _point.select_one("span.bntHoverTips").text: + if "解锁" in _point.select_one("span.bntHoverTips").text: _course_point["hasLocked"] = True - + _point_list.append(_point_detail) - _course_point["points"]+=_point_list + _course_point["points"] += _point_list return _course_point @@ -79,27 +97,27 @@ def decode_course_card(_text: str): _job_info = {} _job_list = [] # 对于未开放章节检测 - if '章节未开放' in _text: - _job_info['notOpen'] = True - return [],_job_info - + if "章节未开放" in _text: + _job_info["notOpen"] = True + return [], _job_info + _temp = re.findall(r"mArg=\{(.*?)\};", _text.replace(" ", "")) if _temp: _temp = _temp[0] else: - return [],{} + return [], {} _cards = json.loads("{" + _temp + "}") - + if _cards: _job_info = {} _job_info["ktoken"] = _cards["defaults"]["ktoken"] _job_info["mtEnc"] = _cards["defaults"]["mtEnc"] - _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 + _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 _job_info["defenc"] = _cards["defaults"]["defenc"] _job_info["cardid"] = _cards["defaults"]["cardid"] _job_info["cpi"] = _cards["defaults"]["cpi"] _job_info["qnenc"] = _cards["defaults"]["qnenc"] - _job_info['knowledgeid'] = _cards["defaults"]["knowledgeid"] + _job_info["knowledgeid"] = _cards["defaults"]["knowledgeid"] _cards = _cards["attachments"] _job_list = [] for _card in _cards: @@ -108,21 +126,21 @@ def decode_course_card(_text: str): continue # 不属于任务点的任务 if "job" not in _card or _card["job"] is False: - if _card.get('type') and _card['type'] == "read": + if _card.get("type") and _card["type"] == "read": # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 - if _card['property'].get('read',False): + if _card["property"].get("read", False): # 已阅读,跳过 continue _job = {} - _job['title'] = _card['property']['title'] + _job["title"] = _card["property"]["title"] _job["type"] = "read" - _job['id'] = _card['property']['id'] + _job["id"] = _card["property"]["id"] _job["jobid"] = _card["jobid"] _job["jtoken"] = _card["jtoken"] - _job['mid'] = _card['mid'] - _job['otherinfo'] = _card["otherInfo"] - _job['enc'] = _card["enc"] - _job['aid'] = _card["aid"] + _job["mid"] = _card["mid"] + _job["otherinfo"] = _card["otherInfo"] + _job["enc"] = _card["enc"] + _job["aid"] = _card["aid"] _job_list.append(_job) continue # 视频任务 @@ -165,67 +183,67 @@ def decode_course_card(_text: str): _job["aid"] = _card["aid"] _job_list.append(_job) continue - + if _card["type"] == "vote": # 调查问卷 同上 continue return _job_list, _job_info - + def decode_questions_info(html_content) -> dict: def replace_rtn(text): - return text.replace('\r', '').replace('\t', '').replace('\n', '') + return text.replace("\r", "").replace("\t", "").replace("\n", "") soup = BeautifulSoup(html_content, "lxml") form_data = {} form_tag = soup.find("form") fd = FontDecoder(html_content) # 加载字体 - + # 抽取表单信息 for input_tag in form_tag.find_all("input"): - if 'name' not in input_tag.attrs or 'answer' in input_tag.attrs["name"]: + if "name" not in input_tag.attrs or "answer" in input_tag.attrs["name"]: continue - form_data.update({ - input_tag.attrs["name"]: input_tag.attrs.get("value",'') - }) + form_data.update({input_tag.attrs["name"]: input_tag.attrs.get("value", "")}) - form_data['questions'] = [] - for div_tag in form_tag.find_all("div",class_="singleQuesId"): # 目前来说无论是单选还是多选的题class都是这个 + form_data["questions"] = [] + for div_tag in form_tag.find_all( + "div", class_="singleQuesId" + ): # 目前来说无论是单选还是多选的题class都是这个 q_title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) - q_options = '' + q_options = "" for li_tag in div_tag.find("ul").find_all("li"): - q_options += replace_rtn(fd.decode(li_tag.text))+'\n' - q_options=q_options[:-1] # 去除尾部'\n' + q_options += replace_rtn(fd.decode(li_tag.text)) + "\n" + q_options = q_options[:-1] # 去除尾部'\n' # 尝试使用 data 属性来判断题型 - q_type_code = div_tag.find('div',class_='TiMu').attrs['data'] - q_type = '' + q_type_code = div_tag.find("div", class_="TiMu").attrs["data"] + q_type = "" # 此处可能需要完善更多题型的判断 - if q_type_code == '0': - q_type = 'single' - elif q_type_code == '1': - q_type = 'multiple' - elif q_type_code == '2': - q_type = 'completion' - elif q_type_code == '3': - q_type = 'judgement' + if q_type_code == "0": + q_type = "single" + elif q_type_code == "1": + q_type = "multiple" + elif q_type_code == "2": + q_type = "completion" + elif q_type_code == "3": + q_type = "judgement" else: - logger.info("未知题型代码 -> "+q_type_code) - q_type = 'unknown' # 避免出现未定义取值错误 - - form_data["questions"].append({ - 'id': div_tag.attrs["data"], - 'title':q_title, # 题目 - 'options':q_options, # 选项 可提供给题库作为辅助 - 'type': q_type, # 题型 可提供给题库作为辅助 - 'answerField':{ - 'answer'+div_tag.attrs["data"]:'', # 答案填入处 - 'answertype'+div_tag.attrs["data"]:q_type_code + logger.info("未知题型代码 -> " + q_type_code) + q_type = "unknown" # 避免出现未定义取值错误 + + form_data["questions"].append( + { + "id": div_tag.attrs["data"], + "title": q_title, # 题目 + "options": q_options, # 选项 可提供给题库作为辅助 + "type": q_type, # 题型 可提供给题库作为辅助 + "answerField": { + "answer" + div_tag.attrs["data"]: "", # 答案填入处 + "answertype" + div_tag.attrs["data"]: q_type_code, + }, } - }) + ) # 处理答题信息 - form_data['answerwqbid'] = ",".join([q['id'] for q in form_data['questions']])+"," + form_data["answerwqbid"] = ",".join([q["id"] for q in form_data["questions"]]) + "," return form_data - - diff --git a/api/exceptions.py b/api/exceptions.py index 913e71ee..57e00b5c 100644 --- a/api/exceptions.py +++ b/api/exceptions.py @@ -13,7 +13,7 @@ class FormatError(Exception): def __init__(self, *args: object): super().__init__(*args) + class MaxRollBackError(Exception): def __init__(self, *args: object): super().__init__(*args) - \ No newline at end of file diff --git a/api/font_decoder.py b/api/font_decoder.py index 1742c04d..5d0ca4ab 100644 --- a/api/font_decoder.py +++ b/api/font_decoder.py @@ -4,19 +4,20 @@ class FontDecoder: - def __init__(self,html_content:str=None): + def __init__(self, html_content: str = None): self.html_content = html_content # self.__isNeedDecode = True self.__font_hash_map = None self.__decode_init(html_content) - + def __decode_init(self, html_content): if html_content: soup = BeautifulSoup(html_content, "lxml") - style_tag = soup.find("style",id="cxSecretStyle") - match = re.search(r'base64,([\w\W]+?)\'', style_tag.text) - self.__font_hash_map = cxfont.font2map('data:application/font-ttf;charset=utf-8;base64,'+match.group(1)) + style_tag = soup.find("style", id="cxSecretStyle") + match = re.search(r"base64,([\w\W]+?)\'", style_tag.text) + self.__font_hash_map = cxfont.font2map( + "data:application/font-ttf;charset=utf-8;base64," + match.group(1) + ) - def decode(self,target_str:str) -> str: + def decode(self, target_str: str) -> str: return cxfont.decrypt(self.__font_hash_map, target_str) - diff --git a/api/logger.py b/api/logger.py index 2caa5689..ba1e61df 100644 --- a/api/logger.py +++ b/api/logger.py @@ -1,3 +1,3 @@ from loguru import logger -logger.add("chaoxing.log", rotation="10 MB", level="TRACE") \ No newline at end of file +logger.add("chaoxing.log", rotation="10 MB", level="TRACE") diff --git a/app.py b/app.py index 486af05a..ba72eb4c 100644 --- a/app.py +++ b/app.py @@ -15,7 +15,7 @@ def __call__(self, *args: object, **kwargs: object) -> object: return celery_app -if __name__ == '__main__': +if __name__ == "__main__": app = Flask(__name__) app.config.from_mapping( CELERY=dict( @@ -24,4 +24,4 @@ def __call__(self, *args: object, **kwargs: object) -> object: task_ignore_result=True, ), ) - celery_app = celery_init_app(app) \ No newline at end of file + celery_app = celery_init_app(app) diff --git a/main.py b/main.py index f74b5a15..f5163ec5 100644 --- a/main.py +++ b/main.py @@ -5,9 +5,9 @@ from api.logger import logger from api.base import Chaoxing, Account -from api.exceptions import LoginError, FormatError, JSONDecodeError,MaxRollBackError +from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError from api.answer import Tiku -from urllib3 import disable_warnings,exceptions +from urllib3 import disable_warnings, exceptions import time import sys import os @@ -17,9 +17,9 @@ # # 获取文本 -> 用于查看学习过的课程ID # def getText(): -# try: +# try: # if not os.path.exists(textPath): -# with open(textPath, 'x') as file: pass +# with open(textPath, 'x') as file: pass # return [] # with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') # content = {int(item.strip()) for item in content if item.strip()} @@ -29,25 +29,37 @@ # # 追加文本 -> 用于记录学习过的课程ID # def appendText(text): # if not os.path.exists(textPath): return -# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') - +# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') + # 关闭警告 disable_warnings(exceptions.InsecureRequestWarning) + def init_config(): parser = argparse.ArgumentParser( - description='Samueli924/chaoxing', - formatter_class=argparse.ArgumentDefaultsHelpFormatter + description="Samueli924/chaoxing", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument("-c", "--config", type=str, default=None, help="使用配置文件运行程序") + parser.add_argument( + "-c", "--config", type=str, default=None, help="使用配置文件运行程序" + ) parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") - parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔") - parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)") - parser.add_argument("-v", "--verbose", "--debug", action="store_true", help="启用调试模式, 输出DEBUG级别日志") - + parser.add_argument( + "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔" + ) + parser.add_argument( + "-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)" + ) + parser.add_argument( + "-v", + "--verbose", + "--debug", + action="store_true", + help="启用调试模式, 输出DEBUG级别日志", + ) # 在解析之前捕获 -h 的行为 if len(sys.argv) == 2 and sys.argv[1] in {"-h", "--help"}: @@ -64,9 +76,13 @@ def init_config(): return ( config.get("common", "username"), config.get("common", "password"), - str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, + ( + str(config.get("common", "course_list")).split(",") + if config.get("common", "course_list") + else None + ), int(config.get("common", "speed")), - config['tiku'] + config["tiku"], ) else: return ( @@ -74,32 +90,32 @@ def init_config(): args.password, args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1, - None + None, ) - + class RollBackManager: def __init__(self) -> None: self.rollback_times = 0 self.rollback_id = "" - def add_times(self,id:str) -> None: + def add_times(self, id: str) -> None: if id == self.rollback_id and self.rollback_times == 3: raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") elif id != self.rollback_id: # 新job self.rollback_id = id self.rollback_times = 1 - else: + else: self.rollback_times += 1 -if __name__ == '__main__': +if __name__ == "__main__": try: # 避免异常的无限回滚 RB = RollBackManager() # 初始化登录信息 - username, password, course_list, speed,tiku_config= init_config() + username, password, course_list, speed, tiku_config = init_config() # 规范化播放速度的输入值 speed = min(2.0, max(1.0, speed)) if (not username) or (not password): @@ -108,12 +124,12 @@ def add_times(self,id:str) -> None: account = Account(username, password) # 设置题库 tiku = Tiku() - tiku.config_set(tiku_config) # 载入配置 + tiku.config_set(tiku_config) # 载入配置 tiku = tiku.get_tiku_from_config() # 载入题库 - tiku.init_tiku() # 初始化题库 - + tiku.init_tiku() # 初始化题库 + # 实例化超星API - chaoxing = Chaoxing(account=account,tiku=tiku) + chaoxing = Chaoxing(account=account, tiku=tiku) # 检查当前登录状态,并检查账号密码 _login_state = chaoxing.login() if not _login_state["status"]: @@ -128,7 +144,9 @@ def add_times(self,id:str) -> None: print(f"ID: {course['courseId']} 课程名: {course['title']}") print("*" * 28) try: - course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") + course_list = input( + "请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n" + ).split(",") except Exception as e: raise FormatError("输入格式错误") from e # 筛选需要学习的课程 @@ -142,32 +160,38 @@ def add_times(self,id:str) -> None: for course in course_task: logger.info(f"开始学习课程: {course['title']}") # 获取当前课程的所有章节 - point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) + point_list = chaoxing.get_course_point( + course["courseId"], course["clazzId"], course["cpi"] + ) # 为了支持课程任务回滚,采用下标方式遍历任务点 __point_index = 0 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') - logger.debug(f'当前章节 __point_index: {__point_index}') # 触发参数: -v + logger.debug(f"当前章节 __point_index: {__point_index}") # 触发参数: -v sleep_duration = random.uniform(1, 3) logger.debug(f"本次随机等待时间: {sleep_duration}") - time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep + time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep # 获取当前章节的所有任务点 jobs = [] job_info = None - jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) - + jobs, job_info = chaoxing.get_job_list( + course["clazzId"], course["courseId"], course["cpi"], point["id"] + ) + # bookID = job_info["knowledgeid"] # 获取视频ID - + # 发现未开放章节,尝试回滚上一个任务重新完成一次 try: - if job_info.get('notOpen',False): + if job_info.get("notOpen", False): __point_index -= 1 # 默认第一个任务总是开放的 # 针对题库启用情况 if not tiku or tiku.DISABLE or not tiku.SUBMIT: # 未启用题库或未开启题库提交,章节检测未完成会导致无法开始下一章,直接退出 - logger.error(f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") + logger.error( + f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交" + ) break RB.add_times(point["id"]) continue @@ -176,7 +200,6 @@ def add_times(self,id:str) -> None: # 跳过该课程,继续下一课程 break - # 可能存在章节无任何内容的情况 if not jobs: __point_index += 1 @@ -187,39 +210,49 @@ def add_times(self,id:str) -> None: if job["type"] == "video": # TODO: 目前这个记录功能还不够完善,中途退出的课程ID也会被记录 # TextBookID = getText() # 获取学习过的课程ID - # if TextBookID.count(bookID) > 0: + # if TextBookID.count(bookID) > 0: # logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中,跳过") # 如果已经学习过该课程,则跳过 # break # 如果已经学习过该课程,则跳过 # appendText(bookID) # 记录正在学习的课程ID - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + logger.trace( + f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}" + ) # 超星的接口没有返回当前任务是否为Audio音频任务 isAudio = False try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") + chaoxing.study_video( + course, job, job_info, _speed=speed, _type="Video" + ) except JSONDecodeError as e: logger.warning("当前任务非视频任务,正在尝试音频任务解码") isAudio = True if isAudio: try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") + chaoxing.study_video( + course, job, job_info, _speed=speed, _type="Audio" + ) except JSONDecodeError as e: - logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") + logger.warning( + f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过" + ) # 文档任务 elif job["type"] == "document": - logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + logger.trace( + f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}" + ) chaoxing.study_document(course, job) # 测验任务 elif job["type"] == "workid": logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - chaoxing.study_work(course, job,job_info) + chaoxing.study_work(course, job, job_info) # 阅读任务 elif job["type"] == "read": logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") - chaoxing.strdy_read(course, job,job_info) + chaoxing.strdy_read(course, job, job_info) __point_index += 1 logger.info("所有课程学习任务已完成") - + except SystemExit as e: if e.code == 0: # 正常退出 sys.exit(0) @@ -227,6 +260,7 @@ def add_times(self,id:str) -> None: raise except BaseException as e: import traceback + logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) - raise e \ No newline at end of file + raise e From 60a08dc0919ac72b0b39e93c091bc0f531aa4371 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Tue, 24 Dec 2024 01:35:45 +0800 Subject: [PATCH 4/9] chore: handle non-ASCII characters --- api/answer.py | 38 +++++++++++++++++++------------------- api/base.py | 44 ++++++++++++++++++++++---------------------- main.py | 32 ++++++++++++++++---------------- 3 files changed, 57 insertions(+), 57 deletions(-) diff --git a/api/answer.py b/api/answer.py index 04e93192..76acddad 100644 --- a/api/answer.py +++ b/api/answer.py @@ -68,7 +68,7 @@ def token(self,value): self._token = value def init_tiku(self): - # 仅用于题库初始化,应该在题库载入后作初始化调用,随后才可以使用题库 + # 仅用于题库初始化, 应该在题库载入后作初始化调用, 随后才可以使用题库 # 尝试根据配置文件设置提交模式 if not self._conf: self.config_set(self._get_conf()) @@ -79,7 +79,7 @@ def init_tiku(self): self._init_tiku() def _init_tiku(self): - # 仅用于题库初始化,例如配置token,交由自定义题库完成 + # 仅用于题库初始化, 例如配置token, 交由自定义题库完成 pass def config_set(self,config): @@ -87,14 +87,14 @@ def config_set(self,config): def _get_conf(self): """ - 从默认配置文件查询配置,如果未能查到,停用题库 + 从默认配置文件查询配置, 如果未能查到, 停用题库 """ try: config = configparser.ConfigParser() config.read(self.CONFIG_PATH, encoding="utf8") return config['tiku'] except KeyError or FileNotFoundError: - logger.info("未找到tiku配置,已忽略题库功能") + logger.info("未找到tiku配置, 已忽略题库功能") self.DISABLE = True return None @@ -102,7 +102,7 @@ def query(self,q_info:dict): if self.DISABLE: return None - # 预处理,去除【单选题】这样与标题无关的字段 + # 预处理, 去除【单选题】这样与标题无关的字段 # 此处需要改进!!! logger.debug(f"原始标题:{q_info['title']}") q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 @@ -125,13 +125,13 @@ def query(self,q_info:dict): return None def _query(self,q_info:dict): """ - 查询接口,交由自定义题库实现 + 查询接口, 交由自定义题库实现 """ pass def get_tiku_from_config(self): """ - 从配置文件加载题库,这个配置可以是用户提供,可以是默认配置文件 + 从配置文件加载题库, 这个配置可以是用户提供, 可以是默认配置文件 """ if not self._conf: # 尝试从默认配置文件加载 @@ -143,7 +143,7 @@ def get_tiku_from_config(self): if not cls_name: raise KeyError except KeyError: - logger.error("未找到题库配置,已忽略题库功能") + logger.error("未找到题库配置, 已忽略题库功能") return self new_cls = globals()[cls_name]() new_cls.config_set(self._conf) @@ -151,7 +151,7 @@ def get_tiku_from_config(self): def jugement_select(self,answer:str) -> bool: """ - 这是一个专用的方法,要求配置维护两个选项列表,一份用于正确选项,一份用于错误选项,以应对题库对判断题答案响应的各种可能的情况 + 这是一个专用的方法, 要求配置维护两个选项列表, 一份用于正确选项, 一份用于错误选项, 以应对题库对判断题答案响应的各种可能的情况 它的作用是将获取到的答案answer与可能的选项列对比并返回对应的布尔值 """ if self.DISABLE: @@ -165,15 +165,15 @@ def jugement_select(self,answer:str) -> bool: elif answer in false_list: return False else: - # 无法判断,随机选择 - logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误,请自行判断并加入配置文件重启脚本,本次将会随机选择选项') + # 无法判断, 随机选择 + logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项') return random.choice([True,False]) def get_submit_params(self): """ - 这是一个专用方法,用于根据当前设置的提交模式,响应对应的答题提交API中的pyFlag值 + 这是一个专用方法, 用于根据当前设置的提交模式, 响应对应的答题提交API中的pyFlag值 """ - # 留空直接提交,1保存但不提交 + # 留空直接提交, 1保存但不提交 if self.SUBMIT: return "" else: @@ -189,7 +189,7 @@ def __init__(self) -> None: self.api = 'https://tk.enncy.cn/query' self._token = None self._token_index = 0 # token队列计数器 - self._times = 100 # 查询次数剩余,初始化为100,查询后校对修正 + self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 def _query(self,q_info:dict): res = requests.get( @@ -203,9 +203,9 @@ def _query(self,q_info:dict): if res.status_code == 200: res_json = res.json() if not res_json['code']: - # 如果是因为TOKEN次数到期,则更换token + # 如果是因为TOKEN次数到期, 则更换token if self._times == 0 or '次数不足' in res_json['data']['answer']: - logger.info(f'TOKEN查询次数不足,将会更换并重新搜题') + logger.info(f'TOKEN查询次数不足, 将会更换并重新搜题') self._token_index += 1 self.load_token() # 重新查询 @@ -222,8 +222,8 @@ def load_token(self): token_list = self._conf['tokens'].split(',') if self._token_index == len(token_list): # TOKEN 用完 - logger.error('TOKEN用完,请自行更换再重启脚本') - raise Exception(f'{self.name} TOKEN 已用完,请更换') + logger.error('TOKEN用完, 请自行更换再重启脚本') + raise Exception(f'{self.name} TOKEN 已用完, 请更换') self._token = token_list[self._token_index] def _init_tiku(self): @@ -262,7 +262,7 @@ def _query(self, q_info: dict): if res.status_code == 200: res_json = res.json() if bool(res_json['plat']): - logger.error("查询失败,返回:" + res.text) + logger.error("查询失败, 返回:" + res.text) return None sep = "\n" return sep.join(res_json['answer']['allAnswer'][0]).strip() diff --git a/api/base.py b/api/base.py index 924550b4..dbb1586c 100644 --- a/api/base.py +++ b/api/base.py @@ -98,7 +98,7 @@ def get_course_list(self): _url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" _data = {"courseType": 1, "courseFolderId": 0, "query": "", "superstarClass": 0} logger.trace("正在读取所有的课程列表...") - # 接口突然抽风,增加headers + # 接口突然抽风, 增加headers _headers = { "Host": "mooc2-ans.chaoxing.com", "sec-ch-ua-platform": '"Windows"', @@ -151,13 +151,13 @@ def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): "0", "1", "2", - ]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 + ]: # 学习界面任务卡片数, 很少有3个的, 但是对于章节解锁任务点少一个都不行, 可以从API /mooc-ans/mycourse/studentstudyAjax获取值, 或者干脆直接加, 但二者都会造成额外的请求 _url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={_clazzid}&courseid={_courseid}&knowledgeid={_knowledgeid}&num={_possible_num}&ut=s&cpi={_cpi}&v=20160407-3&mooc2=1" logger.trace("开始读取章节所有任务点...") _resp = _session.get(_url) _job_list, _job_info = decode_course_card(_resp.text) if _job_info.get("notOpen", False): - # 直接返回,节省一次请求 + # 直接返回, 节省一次请求 logger.info("该章节未开放") return [], _job_info job_list += _job_list @@ -212,14 +212,14 @@ def video_progress_log( resp = _session.get(_url) if resp.status_code == 200: _success = True - break # 如果返回为200正常,则跳出循环 + break # 如果返回为200正常, 则跳出循环 elif resp.status_code == 403: - continue # 如果出现403无权限报错,则继续尝试不同的rt参数 + continue # 如果出现403无权限报错, 则继续尝试不同的rt参数 if _success: return resp.json() else: - # 若出现两个rt参数都返回403的情况,则跳过当前任务 - logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") + # 若出现两个rt参数都返回403的情况, 则跳过当前任务 + logger.warning("出现403报错, 尝试修复无效, 正在跳过当前任务点...") return False def study_video( @@ -297,7 +297,7 @@ def study_document(self, _course, _job): def study_work(self, _course, _job, _job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码,帮助修复#391错误 + _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码, 帮助修复#391错误 def random_answer(options: str) -> str: answer = "" @@ -317,16 +317,16 @@ def random_answer(options: str) -> str: for i in range( random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] - ): # 此处表示随机多选答案几率:2个 10%,3个 50% ,4个 40% + ): # 此处表示随机多选答案几率:2个 10%, 3个 50%, 4个 40% _choice = random.choice(_op_list) _op_list.remove(_choice) - answer += _choice[:1] # 取首字为答案,例如A或B - # 对答案进行排序,否则会提交失败 + answer += _choice[:1] # 取首字为答案, 例如A或B + # 对答案进行排序, 否则会提交失败 answer = "".join(sorted(answer)) elif q["type"] == "single": answer = random.choice(options.split("\n"))[ :1 - ] # 取首字为答案,例如A或B + ] # 取首字为答案, 例如A或B # 判断题处理 elif q["type"] == "judgement": # answer = self.tiku.jugement_select(_answer) @@ -336,16 +336,16 @@ def random_answer(options: str) -> str: def multi_cut(answer: str) -> list[str]: """ - 将多选题答案字符串按特定字符进行切割,并返回切割后的答案列表。 + 将多选题答案字符串按特定字符进行切割, 并返回切割后的答案列表. 参数: - answer (str): 多选题答案字符串。 + answer (str): 多选题答案字符串. 返回: - list[str]: 切割后的答案列表。如果无法切割,则返回默认的选项列表 ['A', 'B', 'C', 'D']。 + list[str]: 切割后的答案列表, 如果无法切割, 则返回默认的选项列表 ['A', 'B', 'C', 'D']. 注意: - 如果无法从网页中提取题目信息,将记录警告日志并返回默认选项列表。 + 如果无法从网页中提取题目信息, 将记录警告日志并返回默认选项列表. """ # cut_char = [',',',','|','\n','\r','\t','#','*','-','_','+','@','~','/','\\','.','&',' '] # 多选答案切割符 # ',' 在常规被正确划分的, 选项中出现, 导致 multi_cut 无法正确划分选项 #391 @@ -385,7 +385,7 @@ def multi_cut(answer: str) -> list[str]: logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") return ["A", "B", "C", "D"] # 默认多选题为4个选项 - # 学习通这里根据参数差异能重定向至两个不同接口,需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle + # 学习通这里根据参数差异能重定向至两个不同接口, 需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle _session = init_session() headers = { "Host": "mooc1.chaoxing.com", @@ -426,7 +426,7 @@ def multi_cut(answer: str) -> list[str]: "courseid": _course["courseId"], }, ) - _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码,帮助修复#391错误 + _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码, 帮助修复#391错误 questions = decode_questions_info(_resp.text) # 加载题目信息 # 搜题 @@ -446,9 +446,9 @@ def multi_cut(answer: str) -> list[str]: for o in options_list: if ( _a.upper() in o - ): # 题库返回的答案可能包含选项,如A,B,C,全部转成大写与学习通一致 + ): # 题库返回的答案可能包含选项, 如A, B, C, 全部转成大写与学习通一致 answer += o[:1] - # 对答案进行排序,否则会提交失败 + # 对答案进行排序, 否则会提交失败 answer = "".join(sorted(answer)) elif q["type"] == "judgement": answer = "true" if self.tiku.jugement_select(res) else "false" @@ -457,7 +457,7 @@ def multi_cut(answer: str) -> list[str]: if res in o: answer = o[:1] break - # 如果未能匹配,依然随机答题 + # 如果未能匹配, 依然随机答题 logger.info(f"找到答案但答案未能匹配 -> {res}\t随机选择答案") answer = answer if answer else random_answer(q["options"]) # 填充答案 @@ -509,7 +509,7 @@ def multi_cut(answer: str) -> list[str]: def strdy_read(self, _course, _job, _job_info) -> None: """ - 阅读任务学习,仅完成任务点,并不增长时长 + 阅读任务学习, 仅完成任务点, 并不增长时长 """ _session = init_session() _resp = _session.get( diff --git a/main.py b/main.py index f5163ec5..baa69898 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ import sys import os -# # 定义全局变量,用于存储配置文件路径 +# # 定义全局变量, 用于存储配置文件路径 # textPath = './resource/BookID.txt' # # 获取文本 -> 用于查看学习过的课程ID @@ -101,7 +101,7 @@ def __init__(self) -> None: def add_times(self, id: str) -> None: if id == self.rollback_id and self.rollback_times == 3: - raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") + raise MaxRollBackError("回滚次数已达3次, 请手动检查学习通任务点完成情况") elif id != self.rollback_id: # 新job self.rollback_id = id @@ -119,8 +119,8 @@ def add_times(self, id: str) -> None: # 规范化播放速度的输入值 speed = min(2.0, max(1.0, speed)) if (not username) or (not password): - username = input("请输入你的手机号,按回车确认\n手机号:") - password = input("请输入你的密码,按回车确认\n密码:") + username = input("请输入你的手机号, 按回车确认\n手机号:") + password = input("请输入你的密码, 按回车确认\n密码:") account = Account(username, password) # 设置题库 tiku = Tiku() @@ -130,7 +130,7 @@ def add_times(self, id: str) -> None: # 实例化超星API chaoxing = Chaoxing(account=account, tiku=tiku) - # 检查当前登录状态,并检查账号密码 + # 检查当前登录状态, 并检查账号密码 _login_state = chaoxing.login() if not _login_state["status"]: raise LoginError(_login_state["msg"]) @@ -156,7 +156,7 @@ def add_times(self, id: str) -> None: if not course_task: course_task = all_course # 开始遍历要学习的课程列表 - logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") + logger.info(f"课程列表过滤完毕, 当前课程任务数量: {len(course_task)}") for course in course_task: logger.info(f"开始学习课程: {course['title']}") # 获取当前课程的所有章节 @@ -164,7 +164,7 @@ def add_times(self, id: str) -> None: course["courseId"], course["clazzId"], course["cpi"] ) - # 为了支持课程任务回滚,采用下标方式遍历任务点 + # 为了支持课程任务回滚, 采用下标方式遍历任务点 __point_index = 0 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] @@ -182,22 +182,22 @@ def add_times(self, id: str) -> None: # bookID = job_info["knowledgeid"] # 获取视频ID - # 发现未开放章节,尝试回滚上一个任务重新完成一次 + # 发现未开放章节, 尝试回滚上一个任务重新完成一次 try: if job_info.get("notOpen", False): __point_index -= 1 # 默认第一个任务总是开放的 # 针对题库启用情况 if not tiku or tiku.DISABLE or not tiku.SUBMIT: - # 未启用题库或未开启题库提交,章节检测未完成会导致无法开始下一章,直接退出 + # 未启用题库或未开启题库提交, 章节检测未完成会导致无法开始下一章, 直接退出 logger.error( - f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交" + f"章节未开启, 可能由于上一章节的章节检测未完成, 请手动完成并提交再重试, 或者开启题库并启用提交" ) break RB.add_times(point["id"]) continue except MaxRollBackError as e: - logger.error("回滚次数已达3次,请手动检查学习通任务点完成情况") - # 跳过该课程,继续下一课程 + logger.error("回滚次数已达3次, 请手动检查学习通任务点完成情况") + # 跳过该课程, 继续下一课程 break # 可能存在章节无任何内容的情况 @@ -208,11 +208,11 @@ def add_times(self, id: str) -> None: for job in jobs: # 视频任务 if job["type"] == "video": - # TODO: 目前这个记录功能还不够完善,中途退出的课程ID也会被记录 + # TODO: 目前这个记录功能还不够完善, 中途退出的课程ID也会被记录 # TextBookID = getText() # 获取学习过的课程ID # if TextBookID.count(bookID) > 0: - # logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中,跳过") # 如果已经学习过该课程,则跳过 - # break # 如果已经学习过该课程,则跳过 + # logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中, 跳过") # 如果已经学习过该课程, 则跳过 + # break # 如果已经学习过该课程, 则跳过 # appendText(bookID) # 记录正在学习的课程ID logger.trace( @@ -225,7 +225,7 @@ def add_times(self, id: str) -> None: course, job, job_info, _speed=speed, _type="Video" ) except JSONDecodeError as e: - logger.warning("当前任务非视频任务,正在尝试音频任务解码") + logger.warning("当前任务非视频任务, 正在尝试音频任务解码") isAudio = True if isAudio: try: From f44ee66a68138abea0c348df58c1c561bcbe3ef4 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Tue, 24 Dec 2024 09:41:09 +0800 Subject: [PATCH 5/9] feat: Introduce chapter_test parameter to control chapter quiz handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added chapter_test parameter for configuring chapter quiz handling options: - 0: Skip - 1: Complete and save - 2: Complete and submit - Implemented functionality to skip quiz tasks when chapter_test is set to 0 - Background: Many users do not have a tiku token configured and do not want to use random selection. This is a simple solution to address that. Excerpt from main.py, lines 255 to 259: elif job['type'] == 'workid': # 检测配置文件是否跳过测验任务 if chapter_test == 0: logger.info(f'跳过章节测验任务, 任务章节: {course['title']}') continue --- config_template.ini | 9 +++++++++ main.py | 16 +++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/config_template.ini b/config_template.ini index b5758e6d..91705a05 100644 --- a/config_template.ini +++ b/config_template.ini @@ -10,6 +10,15 @@ course_list = xxx,xxx,xxx ; 视频播放倍速(默认1,最大2) speed = 1 + +; index_point = 0 + +; 章节测验 +; 0: 不做 +; 1: 做完保存 | TODO +; 2: 做完提交 | TODO +chapter_test = 0 + [tiku] ; 可选项 : ; 1. TikuYanxi(言溪题库 https://tk.enncy.cn/) diff --git a/main.py b/main.py index baa69898..b22a04de 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ import configparser import random +from api import config from api.logger import logger from api.base import Chaoxing, Account from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError @@ -82,6 +83,13 @@ def init_config(): else None ), int(config.get("common", "speed")), + # 章节测验 + # 0: 不做 + # 1: 做完保存 + # 2: 做完提交 + # chapter_test + int(config.get("common", "chapter_test")), + config["tiku"], ) else: @@ -91,6 +99,7 @@ def init_config(): args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1, None, + None, ) @@ -115,7 +124,7 @@ def add_times(self, id: str) -> None: # 避免异常的无限回滚 RB = RollBackManager() # 初始化登录信息 - username, password, course_list, speed, tiku_config = init_config() + username, password, course_list, speed, chapter_test, tiku_config = init_config() # 规范化播放速度的输入值 speed = min(2.0, max(1.0, speed)) if (not username) or (not password): @@ -244,7 +253,12 @@ def add_times(self, id: str) -> None: chaoxing.study_document(course, job) # 测验任务 elif job["type"] == "workid": + # 检测配置文件是否跳过测验任务 + if chapter_test == 0: + logger.info(f"跳过章节测验任务, 任务章节: {course['title']}") + continue logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") + # 有比直接跟在函数括号后面传参数更好看的写法, 传 self chaoxing.study_work(course, job, job_info) # 阅读任务 elif job["type"] == "read": From 8f627415e05947ae557f0ddd00e4c37e552366a9 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Tue, 24 Dec 2024 10:09:13 +0800 Subject: [PATCH 6/9] refactor: Format code and revise wording in some comments --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index b22a04de..5fae50a0 100644 --- a/main.py +++ b/main.py @@ -258,7 +258,7 @@ def add_times(self, id: str) -> None: logger.info(f"跳过章节测验任务, 任务章节: {course['title']}") continue logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - # 有比直接跟在函数括号后面传参数更好看的写法, 传 self + # 使用更优雅的方式传递参数 -> self (chapter_test) chaoxing.study_work(course, job, job_info) # 阅读任务 elif job["type"] == "read": From 42677fb6abfc1252fd7173a3cbf56fe7aa342504 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Tue, 24 Dec 2024 11:03:58 +0800 Subject: [PATCH 7/9] refactor: Format code and revise wording in some comments --- api/answer.py | 142 +++++++++++++++++++++++-------------------- api/base.py | 4 +- api/cxsecret_font.py | 4 +- api/decode.py | 6 +- config_template.ini | 6 +- main.py | 7 +-- 6 files changed, 86 insertions(+), 83 deletions(-) diff --git a/api/answer.py b/api/answer.py index 76acddad..a8ee03a7 100644 --- a/api/answer.py +++ b/api/answer.py @@ -4,15 +4,18 @@ import json from api.logger import logger import random -from urllib3 import disable_warnings,exceptions +from urllib3 import disable_warnings, exceptions + # 关闭警告 disable_warnings(exceptions.InsecureRequestWarning) + class CacheDAO: """ @Author: SocialSisterYi @Reference: https://github.com/SocialSisterYi/xuexiaoyi-to-xuexitong-tampermonkey-proxy """ + def __init__(self, file: str = "cache.json"): self.cacheFile = Path(file) if not self.cacheFile.is_file(): @@ -35,8 +38,8 @@ def addCache(self, question: str, answer: str): class Tiku: CONFIG_PATH = "config.ini" # 默认配置文件路径 - DISABLE = False # 停用标志 - SUBMIT = False # 提交标志 + DISABLE = False # 停用标志 + SUBMIT = False # 提交标志 def __init__(self) -> None: self._name = None @@ -46,7 +49,7 @@ def __init__(self) -> None: @property def name(self): return self._name - + @name.setter def name(self, value): self._name = value @@ -54,7 +57,7 @@ def name(self, value): @property def api(self): return self._api - + @api.setter def api(self, value): self._api = value @@ -64,7 +67,7 @@ def token(self): return self._token @token.setter - def token(self,value): + def token(self, value): self._token = value def init_tiku(self): @@ -74,15 +77,15 @@ def init_tiku(self): self.config_set(self._get_conf()) if not self.DISABLE: # 设置提交模式 - self.SUBMIT = True if self._conf['submit'] == 'true' else False + self.SUBMIT = True if self._conf["submit"] == "true" else False # 调用自定义题库初始化 self._init_tiku() - + def _init_tiku(self): # 仅用于题库初始化, 例如配置token, 交由自定义题库完成 pass - def config_set(self,config): + def config_set(self, config): self._conf = config def _get_conf(self): @@ -92,38 +95,39 @@ def _get_conf(self): try: config = configparser.ConfigParser() config.read(self.CONFIG_PATH, encoding="utf8") - return config['tiku'] + return config["tiku"] except KeyError or FileNotFoundError: logger.info("未找到tiku配置, 已忽略题库功能") self.DISABLE = True return None - def query(self,q_info:dict): + def query(self, q_info: dict): if self.DISABLE: return None # 预处理, 去除【单选题】这样与标题无关的字段 # 此处需要改进!!! - logger.debug(f"原始标题:{q_info['title']}") - q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 - logger.debug(f"处理后标题:{q_info['title']}") + logger.debug(f"原始标题: {q_info['title']}") + q_info["title"] = q_info["title"][6:] # 暂时直接用裁切解决 + logger.debug(f"处理后标题: {q_info['title']}") # 先过缓存 cache_dao = CacheDAO() - answer = cache_dao.getCache(q_info['title']) + answer = cache_dao.getCache(q_info["title"]) if answer: - logger.info(f"从缓存中获取答案:{q_info['title']} -> {answer}") + logger.info(f"从缓存中获取答案: {q_info['title']} -> {answer}") return answer.strip() else: answer = self._query(q_info) if answer: answer = answer.strip() - cache_dao.addCache(q_info['title'], answer) - logger.info(f"从{self.name}获取答案:{q_info['title']} -> {answer}") + cache_dao.addCache(q_info["title"], answer) + logger.info(f"从{self.name}获取答案: {q_info['title']} -> {answer}") return answer - logger.error(f"从{self.name}获取答案失败:{q_info['title']}") + logger.error(f"从{self.name}获取答案失败: {q_info['title']}") return None - def _query(self,q_info:dict): + + def _query(self, q_info: dict): """ 查询接口, 交由自定义题库实现 """ @@ -139,7 +143,7 @@ def get_tiku_from_config(self): if self.DISABLE: return self try: - cls_name = self._conf['provider'] + cls_name = self._conf["provider"] if not cls_name: raise KeyError except KeyError: @@ -148,16 +152,16 @@ def get_tiku_from_config(self): new_cls = globals()[cls_name]() new_cls.config_set(self._conf) return new_cls - - def jugement_select(self,answer:str) -> bool: + + def jugement_select(self, answer: str) -> bool: """ 这是一个专用的方法, 要求配置维护两个选项列表, 一份用于正确选项, 一份用于错误选项, 以应对题库对判断题答案响应的各种可能的情况 它的作用是将获取到的答案answer与可能的选项列对比并返回对应的布尔值 """ if self.DISABLE: return False - true_list = self._conf['true_list'].split(',') - false_list = self._conf['false_list'].split(',') + true_list = self._conf["true_list"].split(",") + false_list = self._conf["false_list"].split(",") # 对响应的答案作处理 answer = answer.strip() if answer in true_list: @@ -166,9 +170,11 @@ def jugement_select(self,answer:str) -> bool: return False else: # 无法判断, 随机选择 - logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项') - return random.choice([True,False]) - + logger.error( + f"无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项" + ) + return random.choice([True, False]) + def get_submit_params(self): """ 这是一个专用方法, 用于根据当前设置的提交模式, 响应对应的答题提交API中的pyFlag值 @@ -179,97 +185,99 @@ def get_submit_params(self): else: return "1" + # 按照以下模板实现更多题库 + class TikuYanxi(Tiku): # 言溪题库实现 def __init__(self) -> None: super().__init__() - self.name = '言溪题库' - self.api = 'https://tk.enncy.cn/query' + self.name = "言溪题库" + self.api = "https://tk.enncy.cn/query" self._token = None - self._token_index = 0 # token队列计数器 - self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 + self._token_index = 0 # token队列计数器 + self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 - def _query(self,q_info:dict): + def _query(self, q_info: dict): res = requests.get( self.api, - params={ - 'question':q_info['title'], - 'token':self._token - }, - verify=False + params={"question": q_info["title"], "token": self._token}, + verify=False, ) if res.status_code == 200: res_json = res.json() - if not res_json['code']: + if not res_json["code"]: # 如果是因为TOKEN次数到期, 则更换token - if self._times == 0 or '次数不足' in res_json['data']['answer']: - logger.info(f'TOKEN查询次数不足, 将会更换并重新搜题') + if self._times == 0 or "次数不足" in res_json["data"]["answer"]: + logger.info(f"TOKEN查询次数不足, 将会更换并重新搜题") self._token_index += 1 self.load_token() # 重新查询 return self._query(q_info) - logger.error(f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}') + logger.error( + f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}' + ) return None - self._times = res_json["data"].get("times",self._times) - return res_json['data']['answer'].strip() + self._times = res_json["data"].get("times", self._times) + return res_json["data"]["answer"].strip() else: - logger.error(f'{self.name}查询失败:\n{res.text}') + logger.error(f"{self.name}查询失败:\n{res.text}") return None - - def load_token(self): - token_list = self._conf['tokens'].split(',') + + def load_token(self): + token_list = self._conf["tokens"].split(",") if self._token_index == len(token_list): # TOKEN 用完 - logger.error('TOKEN用完, 请自行更换再重启脚本') - raise Exception(f'{self.name} TOKEN 已用完, 请更换') + logger.error("TOKEN用完, 请自行更换再重启脚本") + raise Exception(f"{self.name} TOKEN 已用完, 请更换") self._token = token_list[self._token_index] def _init_tiku(self): self.load_token() + class TikuAdapter(Tiku): # TikuAdapter题库实现 https://github.com/DokiDoki1103/tikuAdapter def __init__(self) -> None: super().__init__() - self.name = 'TikuAdapter题库' - self.api = '' + self.name = "TikuAdapter题库" + self.api = "" def _query(self, q_info: dict): # 判断题目类型 - if q_info['type'] == "single": + if q_info["type"] == "single": type = 0 - elif q_info['type'] == 'multiple': + elif q_info["type"] == "multiple": type = 1 - elif q_info['type'] == 'completion': + elif q_info["type"] == "completion": type = 2 - elif q_info['type'] == 'judgement': + elif q_info["type"] == "judgement": type = 3 else: type = 4 - options = q_info['options'] + options = q_info["options"] res = requests.post( self.api, json={ - 'question': q_info['title'], - 'options': options.split('\n'), - 'type': type + "question": q_info["title"], + "options": options.split("\n"), + "type": type, }, - verify=False + verify=False, ) if res.status_code == 200: res_json = res.json() - if bool(res_json['plat']): - logger.error("查询失败, 返回:" + res.text) + if bool(res_json["plat"]): + logger.error("查询失败, 返回: " + res.text) return None sep = "\n" - return sep.join(res_json['answer']['allAnswer'][0]).strip() - # else: - logger.error(f'{self.name}查询失败:\n{res.text}') + return sep.join(res_json["answer"]["allAnswer"][0]).strip() + # else: + logger.error(f"{self.name}查询失败:\n{res.text}") return None def _init_tiku(self): # self.load_token() - self.api = self._conf['url'] + self.api = self._conf["url"] diff --git a/api/base.py b/api/base.py index dbb1586c..5bccef98 100644 --- a/api/base.py +++ b/api/base.py @@ -317,7 +317,7 @@ def random_answer(options: str) -> str: for i in range( random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] - ): # 此处表示随机多选答案几率:2个 10%, 3个 50%, 4个 40% + ): # 此处表示随机多选答案几率: 2个 10%, 3个 50%, 4个 40% _choice = random.choice(_op_list) _op_list.remove(_choice) answer += _choice[:1] # 取首字为答案, 例如A或B @@ -380,7 +380,7 @@ def multi_cut(answer: str) -> list[str]: if len(res) > 1: return res logger.warning( - f"未能从网页中提取题目信息, 以下为相关信息:\n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" + f"未能从网页中提取题目信息, 以下为相关信息: \n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" ) # 尝试输出网页内容和选项信息 logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") return ["A", "B", "C", "D"] # 默认多选题为4个选项 diff --git a/api/cxsecret_font.py b/api/cxsecret_font.py index d66fb856..64a5210c 100644 --- a/api/cxsecret_font.py +++ b/api/cxsecret_font.py @@ -74,12 +74,12 @@ def decrypt(dststr_fontmap: Dict[str, str], dst_str: str) -> str: ori_str = "" for char in dst_str: if dstchar_hash := dststr_fontmap.get(f"uni{ord(char):X}"): - # 存在于“密钥”字体,解密 + # 存在于 "密钥" 字体, 解密 orichar_hash = fonthash_dao.find_char(dstchar_hash) if orichar_hash is not None: ori_str += chr(int(orichar_hash[3:], 16)) else: - # 不存在于“密钥”字体,直接复制 + # 不存在于 "密钥" 字体, 直接复制 ori_str += char # 替换解密后的康熙部首 ori_str = ori_str.translate(KX_RADICALS_TAB) diff --git a/api/decode.py b/api/decode.py index 06993d3c..2fbb0047 100644 --- a/api/decode.py +++ b/api/decode.py @@ -127,9 +127,9 @@ def decode_course_card(_text: str): # 不属于任务点的任务 if "job" not in _card or _card["job"] is False: if _card.get("type") and _card["type"] == "read": - # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 + # 发现有在视频任务下掺杂阅读任务, 不完成可能会导致无法开启下一章节 if _card["property"].get("read", False): - # 已阅读,跳过 + # 已阅读, 跳过 continue _job = {} _job["title"] = _card["property"]["title"] @@ -153,7 +153,7 @@ def decode_course_card(_text: str): try: _job["mid"] = _card["mid"] except KeyError: - logger.warning("出现转码失败视频,已跳过...") + logger.warning("出现转码失败视频, 已跳过...") continue _job["objectid"] = _card["objectId"] _job["aid"] = _card["aid"] diff --git a/config_template.ini b/config_template.ini index 91705a05..91ccc4d7 100644 --- a/config_template.ini +++ b/config_template.ini @@ -11,13 +11,13 @@ course_list = xxx,xxx,xxx ; 视频播放倍速(默认1,最大2) speed = 1 +; TODO ; index_point = 0 ; 章节测验 ; 0: 不做 -; 1: 做完保存 | TODO -; 2: 做完提交 | TODO -chapter_test = 0 +; 1: 做 +chapter_test = 1 [tiku] ; 可选项 : diff --git a/main.py b/main.py index f7eaf2fc..f688253d 100644 --- a/main.py +++ b/main.py @@ -83,11 +83,7 @@ def init_config(): else None ), int(config.get("common", "speed")), - # 章节测验 - # 0: 不做 - # 1: 做完保存 - # 2: 做完提交 - # chapter_test + # chapter_test, 章节测验 0: 不做 1: 做 int(config.get("common", "chapter_test")), config["tiku"], @@ -258,7 +254,6 @@ def add_times(self, id: str) -> None: logger.info(f"跳过章节测验任务, 任务章节: {course['title']}") continue logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - # 使用更优雅的方式传递参数 -> self (chapter_test) chaoxing.study_work(course, job, job_info) # 阅读任务 elif job["type"] == "read": From ca26ba0ecbde5b738d1030618608a477797e2057 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Wed, 25 Dec 2024 10:25:11 +0800 Subject: [PATCH 8/9] fix: improve configuration parsing mechanism - Optimize course list parsing from config files and command line arguments - Add parse_course_list function to handle course IDs containing whitespace - Simplify code with list comprehension Background: Previous version used simple .split(,) which ignored course IDs containing whitespace in configuration. The new parsing mechanism properly handles whitespace in course ID lists. --- api/answer.py | 6 +-- main.py | 130 +++++++++++++++++++++++++++----------------------- 2 files changed, 73 insertions(+), 63 deletions(-) diff --git a/api/answer.py b/api/answer.py index a8ee03a7..119411fa 100644 --- a/api/answer.py +++ b/api/answer.py @@ -210,7 +210,7 @@ def _query(self, q_info: dict): if not res_json["code"]: # 如果是因为TOKEN次数到期, 则更换token if self._times == 0 or "次数不足" in res_json["data"]["answer"]: - logger.info(f"TOKEN查询次数不足, 将会更换并重新搜题") + logger.info("TOKEN查询次数不足, 将会更换并重新搜题") self._token_index += 1 self.load_token() # 重新查询 @@ -274,8 +274,8 @@ def _query(self, q_info: dict): return None sep = "\n" return sep.join(res_json["answer"]["allAnswer"][0]).strip() - # else: - logger.error(f"{self.name}查询失败:\n{res.text}") + # else: # https://github.com/Samueli924/chaoxing/blame/3369cae6e55a44d6d284e17bccefb56d1606f5bb/api/answer.py#L269 + # logger.error(f"{self.name}查询失败:\n{res.text}") # Unreachable code return None def _init_tiku(self): diff --git a/main.py b/main.py index f688253d..88711e16 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import argparse import configparser import random +import traceback from api.logger import logger from api.base import Chaoxing, Account @@ -11,33 +12,31 @@ import time import sys import os +from dataclasses import dataclass +from typing import Optional, List -# # 定义全局变量, 用于存储配置文件路径 -# textPath = './resource/BookID.txt' - -# # 获取文本 -> 用于查看学习过的课程ID -# def getText(): -# try: -# if not os.path.exists(textPath): -# with open(textPath, 'x') as file: pass -# return [] -# with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') -# content = {int(item.strip()) for item in content if item.strip()} -# return list(content) -# except Exception as e: logger.error(f"获取文本失败: {e}"); return [] +# 关闭警告 +disable_warnings(exceptions.InsecureRequestWarning) -# # 追加文本 -> 用于记录学习过的课程ID -# def appendText(text): -# if not os.path.exists(textPath): return -# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') +@dataclass +class Config: + username: str + password: str + course_list: Optional[List[str]] + speed: float + chapter_test: Optional[int] + tiku_config: Optional[dict] -# 关闭警告 -disable_warnings(exceptions.InsecureRequestWarning) +def parse_course_list(course_str: Optional[str]) -> Optional[List[str]]: + """解析课程列表字符串为列表""" + if not course_str: + return None + return [item.strip() for item in course_str.split(",") if item.strip()] -def init_config(): +def init_config() -> Config: parser = argparse.ArgumentParser( description="Samueli924/chaoxing", formatter_class=argparse.ArgumentDefaultsHelpFormatter, @@ -49,7 +48,7 @@ def init_config(): parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") parser.add_argument( - "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔" + "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔 (Optional)" ) parser.add_argument( "-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)" @@ -74,28 +73,22 @@ def init_config(): if args.config: config = configparser.ConfigParser() config.read(args.config, encoding="utf8") - return ( - config.get("common", "username"), - config.get("common", "password"), - ( - str(config.get("common", "course_list")).split(",") - if config.get("common", "course_list") - else None - ), - int(config.get("common", "speed")), - # chapter_test, 章节测验 0: 不做 1: 做 - int(config.get("common", "chapter_test")), - - config["tiku"], + return Config( + username=config.get("common", "username"), + password=config.get("common", "password"), + course_list=parse_course_list(config.get("common", "course_list")), + speed=float(config.get("common", "speed")), + chapter_test=config.getint("common", "chapter_test", fallback=None), + tiku_config=config["tiku"], ) else: - return ( - args.username, - args.password, - args.list.split(",") if args.list else None, - int(args.speed) if args.speed else 1, - None, - None, + return Config( + username=args.username, + password=args.password, + course_list=parse_course_list(args.list), + speed=float(args.speed) if args.speed else 1.0, + chapter_test=None, + tiku_config=None, ) @@ -120,16 +113,21 @@ def add_times(self, id: str) -> None: # 避免异常的无限回滚 RB = RollBackManager() # 初始化登录信息 - username, password, course_list, speed, chapter_test, tiku_config = init_config() + config = init_config() # 规范化播放速度的输入值 - speed = min(2.0, max(1.0, speed)) - if (not username) or (not password): - username = input("请输入你的手机号, 按回车确认\n手机号:") - password = input("请输入你的密码, 按回车确认\n密码:") - account = Account(username, password) + config.speed = min(2.0, max(1.0, config.speed)) + if (not config.username) or (not config.password): + config.username = input("请输入你的手机号, 按回车确认\n手机号:") + config.password = input("请输入你的密码, 按回车确认\n密码:") + logger.debug( + f"username: {config.username}, password: {config.password}, course_list: {config.course_list}, speed: {config.speed}" + ) + account = Account(config.username, config.password) # 设置题库 tiku = Tiku() - tiku.config_set(tiku_config) # 载入配置 + logger.debug(f"tiku_config: {config.tiku_config}") + # if config.tiku_config: + tiku.config_set(config.tiku_config) # 载入配置 tiku = tiku.get_tiku_from_config() # 载入题库 tiku.init_tiku() # 初始化题库 @@ -143,24 +141,26 @@ def add_times(self, id: str) -> None: all_course = chaoxing.get_course_list() course_task = [] # 手动输入要学习的课程ID列表 - if not course_list: + if not config.course_list: print("*" * 10 + "课程列表" + "*" * 10) for course in all_course: print(f"ID: {course['courseId']} 课程名: {course['title']}") print("*" * 28) try: - course_list = input( + config.course_list = input( "请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n" ).split(",") except Exception as e: raise FormatError("输入格式错误") from e # 筛选需要学习的课程 for course in all_course: - if course["courseId"] in course_list: + if course["courseId"] in config.course_list: course_task.append(course) if not course_task: course_task = all_course + # 开始遍历要学习的课程列表 + logger.debug(f"要学习的课程列表: {course_task}") logger.info(f"课程列表过滤完毕, 当前课程任务数量: {len(course_task)}") for course in course_task: logger.info(f"开始学习课程: {course['title']}") @@ -170,14 +170,16 @@ def add_times(self, id: str) -> None: ) # 为了支持课程任务回滚, 采用下标方式遍历任务点 + logger.debug(f"当前课程子任务点共计: {len(point_list['points'])}") + __point_index = 0 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') - logger.debug(f"当前章节 __point_index: {__point_index}") # 触发参数: -v + logger.debug(f"当前 __point_index: {__point_index}") sleep_duration = random.uniform(1, 3) - logger.debug(f"本次随机等待时间: {sleep_duration}") - time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep + logger.debug(f"本次随机等待时间: {sleep_duration:.1f}s") + time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机 sleep # 获取当前章节的所有任务点 jobs = [] job_info = None @@ -227,7 +229,11 @@ def add_times(self, id: str) -> None: isAudio = False try: chaoxing.study_video( - course, job, job_info, _speed=speed, _type="Video" + course, + job, + job_info, + _speed=config.speed, + _type="Video", ) except JSONDecodeError as e: logger.warning("当前任务非视频任务, 正在尝试音频任务解码") @@ -235,7 +241,11 @@ def add_times(self, id: str) -> None: if isAudio: try: chaoxing.study_video( - course, job, job_info, _speed=speed, _type="Audio" + course, + job, + job_info, + _speed=config.speed, + _type="Audio", ) except JSONDecodeError as e: logger.warning( @@ -250,8 +260,10 @@ def add_times(self, id: str) -> None: # 测验任务 elif job["type"] == "workid": # 检测配置文件是否跳过测验任务 - if chapter_test == 0: - logger.info(f"跳过章节测验任务, 任务章节: {course['title']}") + if config.chapter_test == 0: + logger.info( + f"跳过章节测验任务, 任务章节: {course['title']}" + ) continue logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") chaoxing.study_work(course, job, job_info) @@ -268,8 +280,6 @@ def add_times(self, id: str) -> None: else: raise except BaseException as e: - import traceback - logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) raise e From 1df9e714e348a252ac1345338f8457f0783c0566 Mon Sep 17 00:00:00 2001 From: n-WN <30841158+n-WN@users.noreply.github.com> Date: Fri, 27 Dec 2024 00:33:52 +0800 Subject: [PATCH 9/9] fix: Multiple choice answer matching anomaly --- api/base.py | 128 +++++++++++++++++++++++++++------------------------- main.py | 3 +- 2 files changed, 68 insertions(+), 63 deletions(-) diff --git a/api/base.py b/api/base.py index 5bccef98..1d366b0f 100644 --- a/api/base.py +++ b/api/base.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from calendar import LocaleHTMLCalendar import re import time import random @@ -297,42 +298,55 @@ def study_document(self, _course, _job): def study_work(self, _course, _job, _job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码, 帮助修复#391错误 - def random_answer(options: str) -> str: - answer = "" + def match_answer_with_options(answer: str, options_list: list[str], q_type: str) -> str: + # pretreatment + # del ';' ';' ',' ',' ' ' '、' + # Prevents incorrect answers due to poor matches with the question bank. + # It would be better to maintain the return in terms of correctness, starting with this. + def delete_option_separator(options: str) -> str: + return re.sub(r"[;,,、\s]", "", options) + """统一的答案匹配处理函数""" + matched_answer = "" + if q_type == "multiple": + for ans in multi_cut(answer): + ans = delete_option_separator(ans) + ans = ans.strip().upper() + for opt in options_list: + opt = delete_option_separator(opt) + if ans == opt[:1].upper() or ans in opt.upper(): + matched_answer += opt[:1] + matched_answer = "".join(sorted(set(matched_answer))) # 去重并排序 + elif q_type == "judgement": + matched_answer = "true" if self.tiku.jugement_select(answer) else "false" + else: # single + for opt in options_list: + if answer.upper() in opt.upper(): + matched_answer = opt[:1] + break + return matched_answer + + def random_answer(options: str, q_type: str) -> str: + """改进的随机答案生成""" if not options: - return answer - - if q["type"] == "multiple": - logger.debug(f"当前选项列表[cut前] -> {options}") - _op_list = multi_cut(options) - logger.debug(f"当前选项列表[cut后] -> {_op_list}") - - if not _op_list: - logger.error( - "选项为空, 未能正确提取题目选项信息! 请反馈并提供以上信息" - ) - return answer - - for i in range( - random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] - ): # 此处表示随机多选答案几率: 2个 10%, 3个 50%, 4个 40% - _choice = random.choice(_op_list) - _op_list.remove(_choice) - answer += _choice[:1] # 取首字为答案, 例如A或B - # 对答案进行排序, 否则会提交失败 - answer = "".join(sorted(answer)) - elif q["type"] == "single": - answer = random.choice(options.split("\n"))[ - :1 - ] # 取首字为答案, 例如A或B - # 判断题处理 - elif q["type"] == "judgement": - # answer = self.tiku.jugement_select(_answer) - answer = "true" if random.choice([True, False]) else "false" - logger.info(f"随机选择 -> {answer}") - return answer + return "" + + options_list = multi_cut(options) + if not options_list: + logger.error("选项为空,未能正确提取题目选项信息!") + return "" + + if q_type == "multiple": + # 权重调整:2选项(10%), 3选项(50%), 4选项(40%) + num_choices = random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] + num_choices = min(num_choices, len(options_list)) # 确保不超过可用选项数 + selected = random.sample(options_list, num_choices) + return "".join(sorted(opt[:1] for opt in selected)) + elif q_type == "single": + return random.choice(options_list)[:1] + elif q_type == "judgement": + return "true" if random.choice([True, False]) else "false" + return "" def multi_cut(answer: str) -> list[str]: """ @@ -353,6 +367,7 @@ def multi_cut(answer: str) -> list[str]: # 同时为了避免没有考虑到的 case, 应该先按照 '\n' 匹配, 匹配不到再按照其他字符匹配 cut_char = [ "\n", + ";", ",", ",", "|", @@ -374,18 +389,21 @@ def multi_cut(answer: str) -> list[str]: ] # 多选答案切割符 res = [] for char in cut_char: + # logger.debug(f"尝试使用字符 '{char}' 进行切割") # 输出的时候不要渲染 res = [ opt for opt in answer.split(char) if opt.strip() ] # Filter empty strings if len(res) > 1: + logger.debug(f"使用字符 '{char}' 切割成功") + logger.debug(f"切割后的选项列表为: {res}") return res logger.warning( - f"未能从网页中提取题目信息, 以下为相关信息: \n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" + f"未能从网页中提取题目信息, 以下为相关信息:\n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" ) # 尝试输出网页内容和选项信息 logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") return ["A", "B", "C", "D"] # 默认多选题为4个选项 - # 学习通这里根据参数差异能重定向至两个不同接口, 需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle + # 获取题目信息 _session = init_session() headers = { "Host": "mooc1.chaoxing.com", @@ -429,38 +447,23 @@ def multi_cut(answer: str) -> list[str]: _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码, 帮助修复#391错误 questions = decode_questions_info(_resp.text) # 加载题目信息 - # 搜题 + # 处理每个题目 for q in questions["questions"]: logger.debug(f"当前题目信息 -> {q}") + options_list = multi_cut(q["options"]) res = self.tiku.query(q) - answer = "" + if not res: - # 随机答题 - answer = random_answer(q["options"]) + answer = random_answer(q["options"], q["type"]) + logger.info(f"未找到答案,随机选择 -> {answer}") else: - # 根据响应结果选择答案 - options_list = multi_cut(q["options"]) - if q["type"] == "multiple": - # 多选处理 - for _a in multi_cut(res): - for o in options_list: - if ( - _a.upper() in o - ): # 题库返回的答案可能包含选项, 如A, B, C, 全部转成大写与学习通一致 - answer += o[:1] - # 对答案进行排序, 否则会提交失败 - answer = "".join(sorted(answer)) - elif q["type"] == "judgement": - answer = "true" if self.tiku.jugement_select(res) else "false" + answer = match_answer_with_options(res, options_list, q["type"]) + if not answer: + answer = random_answer(q["options"], q["type"]) + logger.info(f"答案匹配失败,随机选择 -> {answer}") else: - for o in options_list: - if res in o: - answer = o[:1] - break - # 如果未能匹配, 依然随机答题 - logger.info(f"找到答案但答案未能匹配 -> {res}\t随机选择答案") - answer = answer if answer else random_answer(q["options"]) - # 填充答案 + logger.info(f"找到匹配答案 -> {answer}") + q["answerField"][f'answer{q["id"]}'] = answer logger.info(f'{q["title"]} 填写答案为 {answer}') @@ -498,6 +501,7 @@ def multi_cut(answer: str) -> list[str]: "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", }, ) + # TODO 提交答案后查询分数和正确率 if res.status_code == 200: res_json = res.json() if res_json["status"]: diff --git a/main.py b/main.py index 88711e16..55c5a6e3 100644 --- a/main.py +++ b/main.py @@ -172,7 +172,8 @@ def add_times(self, id: str) -> None: # 为了支持课程任务回滚, 采用下标方式遍历任务点 logger.debug(f"当前课程子任务点共计: {len(point_list['points'])}") - __point_index = 0 + # __point_index = 0 + __point_index = 26 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}')