diff --git a/packj/audit/main.py b/packj/audit/main.py index bf3fd35..688f0fc 100644 --- a/packj/audit/main.py +++ b/packj/audit/main.py @@ -10,10 +10,11 @@ from typing import Optional import email.utils as eutils +from datetime import timedelta from colorama import Fore, Style from packj.util.net import __parse_url, download_file, check_site_exist, check_domain_popular -from packj.util.dates import datetime_delta +from packj.util.dates import datetime_delta, date_str_to_datetime from packj.util.email_validity import check_email_address from packj.util.files import write_json_to_file, read_from_csv, read_file_lines from packj.util.enum_util import PackageManagerEnum, LanguageEnum @@ -459,17 +460,88 @@ def analyze_repo_activity(risks, report): risks = alert_user(alert_type, THREAT_MODEL, reason, risks) msg_alert(reason) elif repo_data: - commits, contributors, tags = tuple(repo_data[k] for k in ('commits', 'contributors', 'tags')) + commits, contributors, tags = tuple(len(repo_data[k]) if repo_data[k] else None for k in ('commits', 'contributors', 'tags')) msg_ok(f'commits: {commits}, contributors: {contributors}, tags: {tags}') report['repo'].update(repo_data) except Exception as e: msg_fail(str(e)) + finally: + return risks, report, repo_data + +def get_pkg_ver_release_dates_before_after(release_history, pkg_ver): + pkg_release_date = release_history[pkg_ver]['release_date'] + pkg_release_history = {v['release_date']:k for k,v in release_history.items()} + release_dates = list(pkg_release_history.keys()) + + try: + index = release_dates.index(pkg_release_date) + + pkg_release_date_before = release_dates[index - 1] if index - 1 >= 0 else None + pkg_ver_before = None + if pkg_release_date_before: + pkg_ver_before = pkg_release_history[pkg_release_date_before] + + pkg_release_date_after = release_dates[index + 1] if index + 1 < len(release_dates) else None + pkg_ver_after = None + if pkg_release_date_after: + pkg_ver_after = pkg_release_history[pkg_release_date_after] + + return { + 'before': (pkg_release_date_before, pkg_ver_before), + 'target': (pkg_release_date, pkg_ver), + 'after': (pkg_release_date_after, pkg_ver_after) + } + except ValueError: + raise Exception(f'Failed to find version {pkg_ver} in pkg release history') + +def get_repo_ver_release_dates(tag_list, cutoff_datetime): + try: + dates = {} + for tag, date_str in tag_list: + if date_str_to_datetime(date_str) > cutoff_datetime: + continue + if date_str not in dates: + dates[date_str] = [tag] + else: + dates[date_str].append(tag) + return sorted(dates.items(), key = lambda x:date_str_to_datetime(x[0]), reverse=True)[0] + except Exception as e: + raise Exception(f'Failed to find repo release dates: {str(e)}') + +def analyze_repo_releases(repo_data, risks, report, release_history): + try: + msg_info('Analyzing repo-pkg release match...', end='', flush=True, indent=1) + release_tags = repo_data['tags'] + if not release_tags or not len(release_tags): + alert_type = 'inconsistent with repo source' + reason = 'no repo releases' + risks = alert_user(alert_type, THREAT_MODEL, reason, risks) + msg_alert(reason) + else: + pkg_ver = report['pkg_ver'] + pkg_release_history = get_pkg_ver_release_dates_before_after(release_history, pkg_ver) + cutoff_datestr, _ = pkg_release_history['target'] + if not cutoff_datestr: + msg_warn('FAIL', 'Insufficent metadata') + else: + pkg_ver_release_date = date_str_to_datetime(cutoff_datestr) + repo_ver_release_date, repo_ver_tag = get_repo_ver_release_dates(repo_data['tags'], pkg_ver_release_date) + delta = datetime_delta(repo_ver_release_date, pkg_ver_release_date, days=True) + #print(pkg_ver_release_date, repo_ver_release_date, delta) + if delta > 1: + alert_type = 'inconsistent with repo source' + reason = 'more repo releases' + risks = alert_user(alert_type, THREAT_MODEL, reason, risks) + msg_alert(reason) + else: + msg_ok(f'matching tag(s) {",".join(repo_ver_tag)} on {repo_ver_release_date}') + except Exception as e: + msg_fail(str(e)) finally: return risks, report -def analyze_repo_code(risks, report): +def analyze_repo_code(repo_data, risks, report): try: - repo_url = report['repo']['url'] msg_info('Analyzing repo-pkg src code match...', end='', flush=True, indent=1) # TODO msg_warn(' N/A','Coming soon!') @@ -890,8 +962,9 @@ def audit(pm_args, pkg_name, ver_str, report_dir, extra_args, config): risks, report = analyze_repo_data(config, risks, report) if 'description' in report['repo']: risks, report = analyze_repo_descr(risks, report) - risks, report = analyze_repo_code(risks, report) - risks, report = analyze_repo_activity(risks, report) + risks, report, repo_data = analyze_repo_activity(risks, report) + risks, report = analyze_repo_releases(repo_data, risks, report, release_history) + risks, report = analyze_repo_code(repo_data, risks, report) risks, report = analyze_cves(pm_name, pkg_name, ver_str, risks, report) risks, report = analyze_deps(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report) diff --git a/packj/util/dates.py b/packj/util/dates.py index 3da862b..b301a6b 100644 --- a/packj/util/dates.py +++ b/packj/util/dates.py @@ -1,3 +1,6 @@ +def nearest(tuple_list, pivot): + return min(tuple_list, key=lambda x: abs(x[1] - pivot)) + def ts_to_date_str(tstamp:float, fmt:str='%m-%d-%Y'): try: import datetime @@ -45,7 +48,7 @@ def date_str_to_datetime(date_str, fmt=None): except Exception as e: raise Exception("Failed to get datetime from date string %s: %s" % (date_str, str(e))) -def datetime_delta(date1, date2=None, days=None): +def datetime_delta(date1, date2=None, days=False): try: import datetime import pytz diff --git a/packj/util/repo.py b/packj/util/repo.py index 1a76347..02490ca 100644 --- a/packj/util/repo.py +++ b/packj/util/repo.py @@ -27,39 +27,42 @@ def git_clone(repo_url): logging.debug("Failed to get parse repo at %s: %s" % (clone_dir, str(e))) return "invalid repo", None + reason = None + # tags + tags = None try: tags = [(t.name, datetime_to_date_str(t.commit.committed_datetime)) for t in repo.tags] except Exception as e: logging.debug("Failed to get tags %s: %s" % (clone_dir, str(e))) - tags = None # branches + branches = None try: branches = [b.name for b in repo.remote().refs] except Exception as e: logging.debug("Failed to get branches %s: %s" % (clone_dir, str(e))) - branches = None + commits = [] try: - commits = [] for commit in repo.iter_commits(): commits.append(commit) except Exception as e: logging.debug("Failed to get commits %s: %s" % (clone_dir, str(e))) commits = None + reason = 'No commits' + authors = None try: if commits: authors = set([commit.author.email for commit in commits]) except Exception as e: logging.debug("Failed to get authors %s: %s" % (clone_dir, str(e))) - authors = None shutil.rmtree(os.path.dirname(clone_dir)) - return None, { - 'commits' : len(commits) if commits else None, - 'branches' : len(branches), - 'tags' : len(tags), - 'contributors' : len(authors), + return reason, { + 'commits' : commits, + 'branches' : branches, + 'tags' : tags, + 'contributors' : authors, }