-
Notifications
You must be signed in to change notification settings - Fork 265
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- 加入题库模块和引入SocialSisterYi大佬相关代码并实现字体解密 - 现在支持需要解锁章节的课程 - 修复若干问题
- Loading branch information
Showing
11 changed files
with
31,510 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,4 +143,5 @@ chaoxing.log | |
./config.ini | ||
./chaoxing.log | ||
./cookies.txt | ||
.idea/ | ||
.idea/ | ||
cache.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
import configparser | ||
import requests | ||
from pathlib import Path | ||
from typing import Optional | ||
import json | ||
from api.logger import logger | ||
import random | ||
from urllib3 import disable_warnings,exceptions | ||
# 关闭警告 | ||
disable_warnings(exceptions.InsecureRequestWarning) | ||
|
||
class CacheDAO: | ||
""" | ||
@Author: SocialSisterYi | ||
@Reference: https://github.com/SocialSisterYi/xuexiaoyi-to-xuexitong-tampermonkey-proxy | ||
""" | ||
def __init__(self, file: str = "cache.json"): | ||
self.cacheFile = Path(file) | ||
if not self.cacheFile.is_file(): | ||
self.cacheFile.open("w").write("{}") | ||
self.fp = self.cacheFile.open("r+", encoding="utf8") | ||
|
||
def getCache(self, question: str) -> Optional[str]: | ||
self.fp.seek(0) | ||
data = json.load(self.fp) | ||
if isinstance(data, dict): | ||
return data.get(question) | ||
|
||
def addCache(self, question: str, answer: str): | ||
self.fp.seek(0) | ||
data: dict = json.load(self.fp) | ||
data[question] = answer | ||
self.fp.seek(0) | ||
json.dump(data, self.fp, ensure_ascii=False, indent=4) | ||
|
||
|
||
class Tiku: | ||
CONFIG_PATH = "config.ini" # 默认配置文件路径 | ||
DISABLE = False # 停用标志 | ||
|
||
def __init__(self) -> None: | ||
self._name = None | ||
self._api = None | ||
self._conf = None | ||
|
||
@property | ||
def name(self): | ||
return self._name | ||
|
||
@name.setter | ||
def name(self, value): | ||
self._name = value | ||
|
||
@property | ||
def api(self): | ||
return self._api | ||
|
||
@api.setter | ||
def api(self, value): | ||
self._api = value | ||
|
||
@property | ||
def token(self): | ||
return self._token | ||
|
||
@token.setter | ||
def token(self,value): | ||
self._token = value | ||
|
||
def init_tiku(self): | ||
# 仅用于题库初始化,例如配置token | ||
pass | ||
|
||
def config_set(self,config:configparser.ConfigParser|None): | ||
self._conf = config | ||
|
||
def _get_conf(self): | ||
""" | ||
从默认配置文件查询配置,如果未能查到,停用题库 | ||
""" | ||
try: | ||
config = configparser.ConfigParser() | ||
config.read(self.CONFIG_PATH, encoding="utf8") | ||
return config['tiku'] | ||
except KeyError or FileNotFoundError: | ||
logger.info("未找到tiku配置,已忽略题库功能") | ||
self.DISABLE = True | ||
return None | ||
|
||
def query(self,q_info:dict) -> str|None: | ||
if self.DISABLE: | ||
return None | ||
|
||
# 预处理,去除【单选题】这样与标题无关的字段 | ||
# 此处需要改进!!! | ||
q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 | ||
|
||
# 先过缓存 | ||
cache_dao = CacheDAO() | ||
answer = cache_dao.getCache(q_info['title']) | ||
if answer: | ||
logger.info(f"从缓存中获取答案:{q_info['title']} -> {answer}") | ||
return answer | ||
else: | ||
answer = self._query(q_info) | ||
if answer: | ||
cache_dao.addCache(q_info['title'], answer) | ||
logger.info(f"从{self.name}获取答案:{q_info['title']} -> {answer}") | ||
return answer | ||
logger.error(f"从{self.name}获取答案失败:{q_info['title']}") | ||
return None | ||
def _query(self,q_info:dict): | ||
pass | ||
|
||
def get_tiku_from_config(self): | ||
""" | ||
从配置文件加载题库,这个配置可以是用户提供,可以是默认配置文件 | ||
""" | ||
if not self._conf: | ||
# 尝试从默认配置文件加载 | ||
self.config_set(self._get_conf()) | ||
if self.DISABLE: | ||
return self | ||
cls_name = self._conf['provider'] | ||
new_cls = globals()[cls_name]() | ||
new_cls.config_set(self._conf) | ||
return new_cls | ||
|
||
def jugement_select(self,answer:str) -> bool: | ||
""" | ||
这是一个专用的方法,要求配置维护两个选项列表,一份用于正确选项,一份用于错误选项,以应对题库对判断题答案响应的各种可能的情况 | ||
它的作用是将获取到的答案answer与可能的选项列对比并返回对应的布尔值 | ||
""" | ||
if self.DISABLE: | ||
return False | ||
true_list = self._conf['true_list'].split(',') | ||
false_list = self._conf['false_list'].split(',') | ||
if answer in true_list: | ||
return True | ||
elif answer in false_list: | ||
return False | ||
else: | ||
# 无法判断,随机选择 | ||
logger.error(f'无法判断答案{answer}对应的是正确还是错误,请自行判断并加入配置文件重启脚本,本次将会随机选择选项') | ||
return random.choice([True,False]) | ||
|
||
|
||
|
||
# 按照以下模板实现更多题库 | ||
|
||
class TikuYanxi(Tiku): | ||
# 言溪题库实现 | ||
def __init__(self) -> None: | ||
super().__init__() | ||
self.name = '言溪题库' | ||
self.api = 'https://tk.enncy.cn/query' | ||
self._token = None | ||
self._token_index = 0 # token队列计数器 | ||
self._times = 100 # 查询次数剩余,初始化为100,查询后校对修正 | ||
|
||
def _query(self,q_info:dict): | ||
res = requests.get( | ||
self.api, | ||
params={ | ||
'question':q_info['title'], | ||
'token':self._token | ||
}, | ||
verify=False | ||
) | ||
if res.status_code == 200: | ||
res_json = res.json() | ||
if not res_json['code']: | ||
# 如果是因为TOKEN次数到期,则更换token | ||
if self._times == 0 or '次数不足' in res_json['data']['answer']: | ||
logger.info(f'TOKEN查询次数不足,将会更换并重新搜题') | ||
self._token_index += 1 | ||
self.load_token() | ||
# 重新查询 | ||
return self._query(q_info) | ||
logger.error(f'{self.name}查询失败:\n剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n消息:{res_json["message"]}') | ||
return None | ||
self._times = res_json["data"].get("times",self._times) | ||
return res_json['data']['answer'] | ||
else: | ||
logger.error(f'{self.name}查询失败:\n{res.text}') | ||
return None | ||
|
||
def load_token(self): | ||
token_list = self._conf['tokens'].split(',') | ||
if self._token_index == len(token_list): | ||
# TOKEN 用完 | ||
logger.error('TOKEN用完,请自行更换再重启脚本') | ||
raise Exception(f'{self.name} TOKEN 已用完,请更换') | ||
self._token = token_list[self._token_index] | ||
|
||
def init_tiku(self): | ||
if not self._conf: | ||
self.config_set(self._get_conf()) | ||
if not self.DISABLE: | ||
return self.load_token() | ||
|
Oops, something went wrong.