diff --git a/.gitignore b/.gitignore index 38456adc..1e511665 100644 --- a/.gitignore +++ b/.gitignore @@ -111,4 +111,6 @@ settings.py # webdriver bin/* ghostdriver.log -debug.log \ No newline at end of file +debug.log + +# \ No newline at end of file diff --git a/core/__init__.py b/core/__init__.py index 6423571e..e1b5c921 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -41,14 +41,14 @@ def main(): try: # arg parse t1 = time.time() - parser = argparse.ArgumentParser(prog=__title__, description=__introduction__, epilog=__epilog__, formatter_class=argparse.RawDescriptionHelpFormatter, usage=argparse.SUPPRESS) + parser = argparse.ArgumentParser(prog=__title__, description=__introduction__.format(detail="Main Program"), epilog=__epilog__, formatter_class=argparse.RawDescriptionHelpFormatter, usage=argparse.SUPPRESS) subparsers = parser.add_subparsers() - parser_group_core = subparsers.add_parser('config', help='config for rule&tamper', description='config for rule&tamper', usage=argparse.SUPPRESS, add_help=True) - parser_group_core.add_argument('load', action='store_true', default=False, help='load rule&tamper') + parser_group_core = subparsers.add_parser('config', help='config for rule&tamper', description=__introduction__.format(detail='config for rule&tamper'), formatter_class=argparse.RawDescriptionHelpFormatter, usage=argparse.SUPPRESS, add_help=True) + parser_group_core.add_argument('load', choices=['load', 'recover'], default=False, help='operate for rule&tamper') - parser_group_scan = subparsers.add_parser('scan', help='scan target path', description='scan target path', epilog=__scan_epilog__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=True) + parser_group_scan = subparsers.add_parser('scan', help='scan target path', description=__introduction__.format(detail='scan target path'), epilog=__scan_epilog__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=True) parser_group_scan.add_argument('-t', '--target', dest='target', action='store', default='', metavar='', help='file, folder, compress, or repository address') parser_group_scan.add_argument('-f', '--format', dest='format', action='store', default='csv', metavar='', choices=['html', 'json', 'csv', 'xml'], help='vulnerability output format (formats: %(choices)s)') parser_group_scan.add_argument('-o', '--output', dest='output', action='store', default='', metavar='', help='vulnerability output STREAM, FILE') @@ -61,10 +61,10 @@ def main(): parser_group_scan.add_argument('-d', '--debug', dest='debug', action='store_true', default=False, help='open debug mode') - parser_group_scan.add_argument('-uc', '--unconfirm', dest='unconfirm', action='store_true', default=False, help='show unconfirmed vuls') - parser_group_scan.add_argument('-upc', '--unprecom', dest='unprecom', action='store_true', default=False, help='without Precompiled') + parser_group_scan.add_argument('-uc', '--unconfirm', dest='unconfirm', action='store_false', default=False, help='show unconfirmed vuls') + parser_group_scan.add_argument('-upc', '--unprecom', dest='unprecom', action='store_false', default=False, help='without Precompiled') - parser_group_show = subparsers.add_parser('show', help='show rule&tamper', description='show rule&tamper', usage=argparse.SUPPRESS, add_help=True) + parser_group_show = subparsers.add_parser('show', help='show rule&tamper', description=__introduction__.format(detail='show rule&tamper'), formatter_class=argparse.RawDescriptionHelpFormatter, usage=argparse.SUPPRESS, add_help=True) parser_group_show.add_argument('-list', '--list', dest='list', action='store', default=None, help='show all rules') parser_group_show.add_argument('-listt', '--listtamper', dest='listtamper', action='store', default=None, @@ -73,36 +73,47 @@ def main(): args = parser.parse_args() # log - if args.log: + if hasattr(args, "log") and args.log: log(logging.INFO, args.log) else: log(logging.INFO, str(time.time())) - if args.debug: + if hasattr(args, "debug") and args.debug: logger.setLevel(logging.DEBUG) logger.debug('[INIT] set logging level: debug') - RuleCheck().run() - if args.load: - logger.info("[INIT] RuleCheck finished.") - exit() + if hasattr(args, "load"): + if args.load == "load": + logger.info("[INIT] RuleCheck start.") + RuleCheck().load() - if args.list or args.listtamper: - if args.list: - logger.info("Show List:\n{}".format(show_info('rule', args.list.strip("")))) + logger.info("[INIT] RuleCheck finished.") + exit() - if args.listtamper: - logger.info("Show Tamper List:\n{}".format(show_info('tamper', args.listtamper.strip("")))) + elif args.load == "recover": + logger.info("[INIT] RuleRecover start.") + RuleCheck().recover() - exit() + logger.info("[INIT] RuleRecover finished.") + exit() + + if hasattr(args, "list"): + if args.list or args.listtamper: + if args.list: + logger.info("Show List:\n{}".format(show_info('rule', args.list.strip("")))) + + if args.listtamper: + logger.info("Show Tamper List:\n{}".format(show_info('tamper', args.listtamper.strip("")))) + + exit() - if args.target == '' and args.output == '': + if (not hasattr(args, "target") or args.target == '') or (not hasattr(args, "output") or args.output == ''): parser.print_help() exit() logger.debug('[INIT] start scanning...') - if args.sid: + if hasattr(args, "sid") and args.sid: a_sid = args.sid else: a_sid = get_sid(args.target, True) diff --git a/core/__version__.py b/core/__version__.py index be0c246a..b757f16b 100644 --- a/core/__version__.py +++ b/core/__version__.py @@ -22,7 +22,11 @@ GitHub: https://github.com/LoRexxar/Kunlun-M -KunLun-M is a static code analysis system that automates the detecting vulnerabilities and security issue.""".format(version=__version__) +KunLun-M is a static code analysis system that automates the detecting vulnerabilities and security issue. + +{{detail}} + +""".format(version=__version__) __epilog__ = """Usage: python {m} scan -t {td} python {m} scan -t {td} -r 1000, 1001 diff --git a/core/cli.py b/core/cli.py index 42e1696f..f22b43cf 100644 --- a/core/cli.py +++ b/core/cli.py @@ -23,7 +23,7 @@ from utils.file import Directory from utils.utils import ParseArgs from utils.utils import md5, random_generator -from Kunlun_M.settings import rules_path +from Kunlun_M.settings import RULES_PATH def get_sid(target, is_a_sid=False): @@ -151,32 +151,32 @@ def list_parse(rules_path, istamp=False): if type == "rule": - rule_lan_list = list_parse(rules_path) + rule_lan_list = list_parse(RULES_PATH) rule_dict = {} if key == "all": # show all for lan in rule_lan_list: info_dict[lan] = [] - rule_lan_path = os.path.join(rules_path, lan) + rule_lan_path = os.path.join(RULES_PATH, lan) info_dict[lan] = list_parse(rule_lan_path) elif key in rule_lan_list: info_dict[key] = [] - rule_lan_path = os.path.join(rules_path, key) + rule_lan_path = os.path.join(RULES_PATH, key) info_dict[key] = list_parse(rule_lan_path) elif str(int(key)) == key: for lan in rule_lan_list: info_dict[lan] = [] - rule_lan_path = os.path.join(rules_path, lan) + rule_lan_path = os.path.join(RULES_PATH, lan) info_dict[lan] = list_parse(rule_lan_path) for lan in info_dict: if "CVI_{}.py".format(key) in info_dict[lan]: - f = codecs.open(os.path.join(rules_path, lan, "CVI_{}.py".format(key)), encoding='utf-8', errors="ignore") + f = codecs.open(os.path.join(RULES_PATH, lan, "CVI_{}.py".format(key)), encoding='utf-8', errors="ignore") return f.read() logger.error('[Show] no CVI id {}.'.format(key)) @@ -214,7 +214,7 @@ def list_parse(rules_path, istamp=False): table.align = 'l' i = 0 - tamp_path = os.path.join(rules_path, 'tamper/') + tamp_path = os.path.join(RULES_PATH, 'tamper/') tamp_list = list_parse(tamp_path, True) if key == "all": diff --git a/core/detection.py b/core/detection.py index c5dd00b2..e2202b3d 100644 --- a/core/detection.py +++ b/core/detection.py @@ -16,7 +16,7 @@ import xml.etree.ElementTree as eT from .dependencies import Dependencies from utils.log import logger -from Kunlun_M.settings import rules_path +from Kunlun_M.settings import RULES_PATH try: # for pip >= 10 from pip._internal.req import parse_requirements @@ -41,7 +41,7 @@ def __init__(self, target_directory, files): self.frame_data = {} self.language_data = {} self.project_data = [] - self.rules_path = rules_path + self.rules_path = RULES_PATH @property def language(self): @@ -200,7 +200,7 @@ def _read_xml(self, filename): @staticmethod def rule(): - framework_path = os.path.join(rules_path, 'frameworks.xml') + framework_path = os.path.join(RULES_PATH, 'frameworks.xml') tree = eT.ElementTree(file=framework_path) return tree diff --git a/core/engine.py b/core/engine.py index ccd805f4..2ca3ea1e 100644 --- a/core/engine.py +++ b/core/engine.py @@ -29,7 +29,7 @@ from rules.autorule import autorule from Kunlun_M import const -from Kunlun_M.settings import running_path +from Kunlun_M.settings import RUNNING_PATH from Kunlun_M.const import ext_dict from Kunlun_M.const import VulnerabilityResult @@ -48,7 +48,7 @@ def init_list(self, data=None): :param data: list or a string :return: """ - file_path = os.path.join(running_path, '{sid}_list'.format(sid=self.sid)) + file_path = os.path.join(RUNNING_PATH, '{sid}_list'.format(sid=self.sid)) if not os.path.exists(file_path): if isinstance(data, list): with open(file_path, 'w') as f: @@ -66,7 +66,7 @@ def init_list(self, data=None): })) def list(self, data=None): - file_path = os.path.join(running_path, '{sid}_list'.format(sid=self.sid)) + file_path = os.path.join(RUNNING_PATH, '{sid}_list'.format(sid=self.sid)) if data is None: with open(file_path, 'r') as f: portalocker.lock(f, portalocker.LOCK_EX) @@ -86,7 +86,7 @@ def list(self, data=None): f.write(json.dumps(result)) def status(self, data=None): - file_path = os.path.join(running_path, '{sid}_status'.format(sid=self.sid)) + file_path = os.path.join(RUNNING_PATH, '{sid}_status'.format(sid=self.sid)) if data is None: with open(file_path) as f: portalocker.lock(f, portalocker.LOCK_EX) @@ -100,7 +100,7 @@ def status(self, data=None): def data(self, data=None): - file_path = os.path.abspath(running_path + '/{sid}_data'.format(sid=self.sid)) + file_path = os.path.abspath(RUNNING_PATH + '/{sid}_data'.format(sid=self.sid)) if data is None: with open(file_path) as f: @@ -118,7 +118,7 @@ def is_file(self, is_data=False): ext = 'data' else: ext = 'status' - file_path = os.path.join(running_path, '{sid}_{ext}'.format(sid=self.sid, ext=ext)) + file_path = os.path.join(RUNNING_PATH, '{sid}_{ext}'.format(sid=self.sid, ext=ext)) return os.path.isfile(file_path) diff --git a/core/rule.py b/core/rule.py index b58d9e4a..5ae82b17 100644 --- a/core/rule.py +++ b/core/rule.py @@ -12,9 +12,13 @@ :copyright: Copyright (c) 2017 LoRexxar. All rights reserved """ import os -from Kunlun_M.settings import rules_path +import inspect +import codecs +from Kunlun_M.settings import RULES_PATH from utils.log import logger +from web.index.models import Rules + def block(index): default_index_reverse = 'in-function' @@ -53,7 +57,7 @@ def __init__(self, lans=["php"]): # 逐个处理每一种lan for lan in lans: - self.rules_path = rules_path + "/" + lan + self.rules_path = RULES_PATH + "/" + lan if not os.path.exists(self.rules_path): logger.error("[INIT][RULE] language {} can't found rules".format(self.rules_path)) os.mkdir(self.rules_path) @@ -113,7 +117,14 @@ class RuleCheck: def __init__(self): self.rule_dict = {} - self.rule_base_path = rules_path + self.rule_base_path = RULES_PATH + + self.CONFIG_LIST = ["vulnerability", "language", "author", "description", "status", "match_mode", + "match", "vul_function", "main_function"] + + self.SOLIDITY_CONFIG_LIST = ['match_name', 'black_list', 'unmatch'] + self.REGEX_CONFIG_LIST = ['unmatch'] + self.CHROME_CONFIG_LIST = ['keyword', 'unmatch'] def list_parse(self, rules_path, istamp=False): @@ -138,7 +149,178 @@ def list_parse(self, rules_path, istamp=False): return result - def run(self): - print(self.list_parse(self.rule_base_path)) + def get_all_rules(self): + rule_lan_list = self.list_parse(self.rule_base_path) + + for lan in rule_lan_list: + self.rule_dict[lan] = [] + rule_lan_path = os.path.join(self.rule_base_path, lan) + + self.rule_dict[lan] = self.list_parse(rule_lan_path) + + def load_rules(self, ruleclass): + + main_function_content = inspect.getsourcelines(ruleclass.main) + match_name = "" + black_list = "" + unmatch = "" + keyword = "" + + if ruleclass.match_mode == "regex-return-regex": + match_name = ruleclass.match_name + black_list = ruleclass.black_list + unmatch = ruleclass.unmatch + elif ruleclass.match_mode == "only-regex": + unmatch = ruleclass.unmatch + elif ruleclass.match_mode == "special-crx-keyword-match": + unmatch = ruleclass.unmatch + keyword = ruleclass.keyword + + r = Rules(rule_name=ruleclass.vulnerability, svid=ruleclass.svid, + language=ruleclass.language.lower(), author=ruleclass.author, + description=ruleclass.description, status=ruleclass.status, + match_mode=ruleclass.match_mode, match=ruleclass.match, + match_name=match_name, black_list=black_list, unmatch=unmatch, keyword=keyword, + vul_function=ruleclass.vul_function, main_function=main_function_content) + + r.save() + + return True + + def check_and_update_rule_database(self, ruleconfig_content, nowrule, config): + + svid = nowrule.svid + ruleconfig_content = str(ruleconfig_content).lower() + + if ruleconfig_content != str(getattr(nowrule, config)).lower(): + logger.warning("[INIT][Rule Check] CVI_{} config {} has changed:".format(svid, config)) + logger.warning("[INIT][Rule Check] {} in Rule File is {}".format(config, ruleconfig_content)) + logger.warning("[INIT][Rule Check] {} in Database is {}".format(config, getattr(nowrule, config))) + + logger.warning("[INIT][Rule Check] whether load new {} from Rule File(Y/N):".format(config)) + if input().lower() != 'n': + setattr(nowrule, config, ruleconfig_content) + + nowrule.save() + return True + + def check_rules(self, ruleclass, nowrule): + + for config in self.CONFIG_LIST: + if config != "main_function": + if config == "vulnerability": + config1 = "rule_name" + else: + config1 = config + + self.check_and_update_rule_database(getattr(ruleclass, config), nowrule, config1) + + else: + main_function_content = inspect.getsource(ruleclass.main) + config1 = "main_function" + + self.check_and_update_rule_database(main_function_content, nowrule, config1) + + # for special match_mode + if ruleclass.match_mode == "regex-return-regex": + for config in self.SOLIDITY_CONFIG_LIST: + self.check_and_update_rule_database(getattr(ruleclass, config), nowrule, config) + elif ruleclass.match_mode == "only-regex": + for config in self.REGEX_CONFIG_LIST: + self.check_and_update_rule_database(getattr(ruleclass, config), nowrule, config) + elif ruleclass.match_mode == "special-crx-keyword-match": + for config in self.CHROME_CONFIG_LIST: + self.check_and_update_rule_database(getattr(ruleclass, config), nowrule, config) + + nowrule.save() + return True + + def load(self): + """ + load rule from file to database + :return: + """ + + self.get_all_rules() + i = 0 + + for lan in self.rule_dict: + for rule in self.rule_dict[lan]: + i += 1 + rulename = rule.split('.')[0] + rulefile = "rules." + lan + "." + rulename + + rule_obj = __import__(rulefile, fromlist=rulename) + p = getattr(rule_obj, rulename) + + ruleclass = p() + + r = Rules.objects.filter(svid=ruleclass.svid).first() + + if not r: + + logger.info("[INIT][Load Rules] New Rule CVI_{} {}".format(ruleclass.svid, ruleclass.vulnerability)) + self.load_rules(ruleclass) + + else: + logger.info("[INIT][Load Rules] Check Rule CVI_{} {}".format(ruleclass.svid, ruleclass.vulnerability)) + + self.check_rules(ruleclass, r) return True + + def recover(self): + """ + recover rule from database to file + :return: + """ + rules = Rules.objects.all() + + for rule in rules: + lan = rule.language + + if not os.path.isdir(os.path.join(RULES_PATH, lan)): + os.mkdir(os.path.join(RULES_PATH, lan)) + + rule_lan_path = os.path.join(RULES_PATH, lan) + svid = rule.svid + + rule_path = os.path.join(rule_lan_path, "CVI_{}.py".format(svid)) + + if os.path.exists(rule_path): + logger.warning("[INIT][Recover] Rule file CVI_{}.py exist. whether overwrite file? (Y/N)".format(svid)) + + if input().lower() == 'n': + continue + + logger.info("[INIT][Recover] Recover new Rule file CVI_{}.py".format(svid)) + + template_file = codecs.open(os.path.join(RULES_PATH, 'rule.template'), 'rb+', encoding='utf-8', errors='ignore') + template_file_content = template_file.read() + template_file.close() + + rule_file = codecs.open(rule_path, "wb+", encoding='utf-8', errors='ignore') + + rule_name = rule.rule_name + svid = rule.svid + language = rule.language + author = rule.author + description = rule.description + status = "True" if rule.status else "False" + match_mode = rule.match_mode + match = '"{}"'.format(rule.match) if rule.match and "[" != rule.match[0] else rule.match + match_name = '"{}"'.format(rule.match_name) if rule.match_name and "[" != rule.match_name[0] else rule.match_name + black_list = '"{}"'.format(rule.black_list) if rule.black_list and "[" != rule.black_list[0] else rule.black_list + keyword = '"{}"'.format(rule.keyword) if rule.keyword and "[" != rule.keyword[0] else rule.keyword + unmatch = '"{}"'.format(rule.unmatch) if rule.unmatch and "[" != rule.unmatch[0] else rule.unmatch + vul_function = rule.vul_function if rule.vul_function else "None" + main_function = rule.main_function + + rule_file.write(template_file_content.format(rule_name=rule_name, svid=svid, language=language, + author=author, description=description, status=status, + match_mode=match_mode, match=match, match_name=match_name, + black_list=black_list, keyword=keyword, unmatch=unmatch, + vul_function=vul_function, main_function=main_function)) + + rule_file.close() + diff --git a/db/kunlun.db b/db/kunlun.db index 6a25166e..ece88314 100644 Binary files a/db/kunlun.db and b/db/kunlun.db differ diff --git a/kunlun.py b/kunlun.py index 1c024993..f4fb4984 100644 --- a/kunlun.py +++ b/kunlun.py @@ -1,13 +1,22 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import os import re import sys +# for django +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Kunlun_M.settings') + +import django + +django.setup() + from core import main if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]) sys.exit(main()) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 6396736e..1717207d 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -15,11 +15,11 @@ import os import xml.etree.ElementTree as eT from core.dependencies import Dependencies -from Kunlun_M.settings import project_directory +from Kunlun_M.settings import PROJECT_DIRECTORY -requirements = project_directory+'/tests/vulnerabilities/requirements.txt' -pom = project_directory+'/tests/vulnerabilities/pom.xml' +requirements = PROJECT_DIRECTORY+'/tests/vulnerabilities/requirements.txt' +pom = PROJECT_DIRECTORY+'/tests/vulnerabilities/pom.xml' def test_find_file(): diff --git a/tests/test_detection.py b/tests/test_detection.py index 1c9ff500..cb61ec36 100644 --- a/tests/test_detection.py +++ b/tests/test_detection.py @@ -14,11 +14,11 @@ """ import xml.etree.ElementTree as eT from core.detection import Detection -from Kunlun_M.settings import project_directory +from Kunlun_M.settings import PROJECT_DIRECTORY -vul_path = project_directory+'/tests/vulnerabilities/' -examples_path = project_directory+'/tests/examples' +vul_path = PROJECT_DIRECTORY+'/tests/vulnerabilities/' +EXAMPLES_PATH = PROJECT_DIRECTORY+'/tests/examples' def test_framework(): @@ -28,7 +28,7 @@ def test_framework(): def test_param_xml(): - detection = Detection(examples_path, '.') + detection = Detection(EXAMPLES_PATH, '.') frame_data = {} language_data = {} tree = detection.rule() @@ -39,14 +39,14 @@ def test_param_xml(): def test_rule(): - detection = Detection(examples_path, '.') - root = eT.ElementTree(file=examples_path+'/param_xml.xml') + detection = Detection(EXAMPLES_PATH, '.') + root = eT.ElementTree(file=EXAMPLES_PATH+'/param_xml.xml') tree = detection.rule() assert type(root) is type(tree) def test_get_dict(): - detection = Detection(examples_path, '.') + detection = Detection(EXAMPLES_PATH, '.') extension = ['php', 'js', 'java'] type_num = {} type_num = detection.get_dict(extension, type_num) @@ -56,30 +56,30 @@ def test_get_dict(): def test_project_information(): extension = ['php', 'js', 'java'] - allfiles = Detection.project_information(examples_path, extension) - assert examples_path+'/cloc.html' in allfiles + allfiles = Detection.project_information(EXAMPLES_PATH, extension) + assert EXAMPLES_PATH+'/cloc.html' in allfiles def test_count_py_line(): - count = Detection.count_py_line(examples_path+'/cloc.py') + count = Detection.count_py_line(EXAMPLES_PATH+'/cloc.py') type_count = ['count_blank', 'count_code', 'count_pound'] assert count['count_code'] == 5 def test_count_php_line(): - count = Detection.count_php_line(examples_path+'/cloc.php') + count = Detection.count_php_line(EXAMPLES_PATH+'/cloc.php') type_count = ['count_blank', 'count_code', 'count_pound'] assert count['count_code'] == 2 def test_count_java_line(): - count = Detection.count_java_line(examples_path+'/cloc.java') + count = Detection.count_java_line(EXAMPLES_PATH+'/cloc.java') type_count = ['count_blank', 'count_code', 'count_pound'] assert count['count_code'] == 1 def test_count_data_line(): - count = Detection.count_data_line(examples_path+'/param_xml.xml') + count = Detection.count_data_line(EXAMPLES_PATH+'/param_xml.xml') type_count = ['count_blank', 'count_code', 'count_pound'] assert count['count_code'] == 81 @@ -112,4 +112,4 @@ def test_count_total_num(): def test_cloc(): - assert Detection(examples_path, '.').cloc() + assert Detection(EXAMPLES_PATH, '.').cloc() diff --git a/tests/test_directory.py b/tests/test_directory.py index 017a1995..c5559d10 100644 --- a/tests/test_directory.py +++ b/tests/test_directory.py @@ -12,12 +12,12 @@ :copyright: Copyright (c) 2017 Feei. All rights reserved """ import os -from Kunlun_M.settings import project_directory +from Kunlun_M.settings import PROJECT_DIRECTORY from utils.file import Directory def test_file(): - absolute_path = os.path.join(project_directory, 'kunlun.py') + absolute_path = os.path.join(PROJECT_DIRECTORY, 'kunlun.py') files, file_sum, time_consume = Directory(absolute_path).collect_files() ext, ext_info = files[0] assert '.py' == ext @@ -28,6 +28,6 @@ def test_file(): def test_directory(): - absolute_path = project_directory + absolute_path = PROJECT_DIRECTORY files, file_sum, time_consume = Directory(absolute_path).collect_files() assert len(files) > 1 diff --git a/tests/test_export.py b/tests/test_export.py index 93c96929..b5ced8ce 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -14,10 +14,10 @@ import json import os -from Kunlun_M.settings import running_path, export_path +from Kunlun_M.settings import RUNNING_PATH, EXPORT_PATH from utils.export import write_to_file, dict_to_pretty_table -scan_data_file = os.path.join(running_path, 'abcdefg_data') +scan_data_file = os.path.join(RUNNING_PATH, 'abcdefg_data') if not os.path.exists(scan_data_file): with open(scan_data_file, 'w') as f: scan_data = r"""{"code": 1001, "msg": "scan finished", "result": {"extension": 18, "file": 132, "framework": "Unknown Framework", "language": "python", "push_rules": 43, "target_directory": "/tmp/core/git/shadowsocks/shadowsocks/", "trigger_rules": 1, "vulnerabilities": [{"code_content": " assert '127.0.1.1' not in ip_network", "commit_author": "Sunny", "commit_time": "2015-01-31 19:50:10", "file_path": "shadowsocks/common.py", "id": "130005", "language": "*", "level": "4", "line_number": "294", "match_result": null, "rule_name": "\u786c\u7f16\u7801IP", "solution": "## \u5b89\u5168\u98ce\u9669\n \u786c\u7f16\u7801IP\n\n ## \u4fee\u590d\u65b9\u6848\n \u79fb\u5230\u914d\u7f6e\u6587\u4ef6\u4e2d"}, {"code_content": " assert '192.168.1.2' not in ip_network", "commit_author": "Sunny", "commit_time": "2015-01-31 19:50:10", "file_path": "shadowsocks/common.py", "id": "130005", "language": "*", "level": "4", "line_number": "300", "match_result": null, "rule_name": "\u786c\u7f16\u7801IP", "solution": "## \u5b89\u5168\u98ce\u9669\n \u786c\u7f16\u7801IP\n\n ## \u4fee\u590d\u65b9\u6848\n \u79fb\u5230\u914d\u7f6e\u6587\u4ef6\u4e2d"}, {"code_content": " assert '192.0.2.1' in ip_network", "commit_author": "Sunny", "commit_time": "2015-02-01 00:17:03", "file_path": "shadowsocks/common.py", "id": "130005", "language": "*", "level": "4", "line_number": "301", "match_result": null, "rule_name": "\u786c\u7f16\u7801IP", "solution": "## \u5b89\u5168\u98ce\u9669\n \u786c\u7f16\u7801IP\n\n ## \u4fee\u590d\u65b9\u6848\n \u79fb\u5230\u914d\u7f6e\u6587\u4ef6\u4e2d"}, {"code_content": " assert '192.0.3.1' in ip_network # 192.0.2.0 is treated as 192.0.2.0/23", "commit_author": "Sunny", "commit_time": "2015-02-01 00:17:03", "file_path": "shadowsocks/common.py", "id": "130005", "language": "*", "level": "4", "line_number": "302", "match_result": null, "rule_name": "\u786c\u7f16\u7801IP", "solution": "## \u5b89\u5168\u98ce\u9669\n \u786c\u7f16\u7801IP\n\n ## \u4fee\u590d\u65b9\u6848\n \u79fb\u5230\u914d\u7f6e\u6587\u4ef6\u4e2d"}, {"code_content": " IPNetwork(config.get('forbidden_ip', '127.0.0.0/8,::1/128'))", "commit_author": "loggerhead", "commit_time": "2016-11-20 14:59:32", "file_path": "shadowsocks/shell.py", "id": "130005", "language": "*", "level": "4", "line_number": "146", "match_result": null, "rule_name": "\u786c\u7f16\u7801IP", "solution": "## \u5b89\u5168\u98ce\u9669\n \u786c\u7f16\u7801IP\n\n ## \u4fee\u590d\u65b9\u6848\n \u79fb\u5230\u914d\u7f6e\u6587\u4ef6\u4e2d"}]}}""" @@ -28,9 +28,9 @@ def test_export_to_json(): write_to_file(target=target, sid='abcdefg', output_format='json', filename='test.json') - assert os.path.exists(os.path.join(export_path, 'test.json')) + assert os.path.exists(os.path.join(EXPORT_PATH, 'test.json')) - with open(os.path.join(export_path, 'test.json')) as f: + with open(os.path.join(EXPORT_PATH, 'test.json')) as f: json_string = f.read() # JSON format assert isinstance(json.loads(json_string), dict) @@ -47,14 +47,14 @@ def test_export_to_json(): # rule_name assert "硬编码IP" in json_string - os.remove(os.path.join(export_path, 'test.json')) + os.remove(os.path.join(EXPORT_PATH, 'test.json')) def test_export_to_xml(): write_to_file(target=target, sid='abcdefg', output_format='xml', filename='test.xml') - assert os.path.exists(os.path.join(export_path, 'test.xml')) + assert os.path.exists(os.path.join(EXPORT_PATH, 'test.xml')) - with open(os.path.join(export_path, 'test.xml')) as f: + with open(os.path.join(EXPORT_PATH, 'test.xml')) as f: xml_string = f.read() # XML tag assert "