Skip to content

Commit

Permalink
Add support for repo-pkg release check
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Bijlani <[email protected]>
  • Loading branch information
PackjGuard committed Dec 15, 2023
1 parent 07ced08 commit 2773ddd
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 16 deletions.
85 changes: 79 additions & 6 deletions packj/audit/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from typing import Optional
import email.utils as eutils

from datetime import timedelta
from colorama import Fore, Style

from packj.util.net import __parse_url, download_file, check_site_exist, check_domain_popular
from packj.util.dates import datetime_delta
from packj.util.dates import datetime_delta, date_str_to_datetime
from packj.util.email_validity import check_email_address
from packj.util.files import write_json_to_file, read_from_csv, read_file_lines
from packj.util.enum_util import PackageManagerEnum, LanguageEnum
Expand Down Expand Up @@ -459,17 +460,88 @@ def analyze_repo_activity(risks, report):
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
elif repo_data:
commits, contributors, tags = tuple(repo_data[k] for k in ('commits', 'contributors', 'tags'))
commits, contributors, tags = tuple(len(repo_data[k]) if repo_data[k] else None for k in ('commits', 'contributors', 'tags'))
msg_ok(f'commits: {commits}, contributors: {contributors}, tags: {tags}')
report['repo'].update(repo_data)
except Exception as e:
msg_fail(str(e))
finally:
return risks, report, repo_data

def get_pkg_ver_release_dates_before_after(release_history, pkg_ver):
pkg_release_date = release_history[pkg_ver]['release_date']
pkg_release_history = {v['release_date']:k for k,v in release_history.items()}
release_dates = list(pkg_release_history.keys())

try:
index = release_dates.index(pkg_release_date)

pkg_release_date_before = release_dates[index - 1] if index - 1 >= 0 else None
pkg_ver_before = None
if pkg_release_date_before:
pkg_ver_before = pkg_release_history[pkg_release_date_before]

pkg_release_date_after = release_dates[index + 1] if index + 1 < len(release_dates) else None
pkg_ver_after = None
if pkg_release_date_after:
pkg_ver_after = pkg_release_history[pkg_release_date_after]

return {
'before': (pkg_release_date_before, pkg_ver_before),
'target': (pkg_release_date, pkg_ver),
'after': (pkg_release_date_after, pkg_ver_after)
}
except ValueError:
raise Exception(f'Failed to find version {pkg_ver} in pkg release history')

def get_repo_ver_release_dates(tag_list, cutoff_datetime):
try:
dates = {}
for tag, date_str in tag_list:
if date_str_to_datetime(date_str) > cutoff_datetime:
continue
if date_str not in dates:
dates[date_str] = [tag]
else:
dates[date_str].append(tag)
return sorted(dates.items(), key = lambda x:date_str_to_datetime(x[0]), reverse=True)[0]
except Exception as e:
raise Exception(f'Failed to find repo release dates: {str(e)}')

def analyze_repo_releases(repo_data, risks, report, release_history):
try:
msg_info('Analyzing repo-pkg release match...', end='', flush=True, indent=1)
release_tags = repo_data['tags']
if not release_tags or not len(release_tags):
alert_type = 'inconsistent with repo source'
reason = 'no repo releases'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
pkg_ver = report['pkg_ver']
pkg_release_history = get_pkg_ver_release_dates_before_after(release_history, pkg_ver)
cutoff_datestr, _ = pkg_release_history['target']
if not cutoff_datestr:
msg_warn('FAIL', 'Insufficent metadata')
else:
pkg_ver_release_date = date_str_to_datetime(cutoff_datestr)
repo_ver_release_date, repo_ver_tag = get_repo_ver_release_dates(repo_data['tags'], pkg_ver_release_date)
delta = datetime_delta(repo_ver_release_date, pkg_ver_release_date, days=True)
#print(pkg_ver_release_date, repo_ver_release_date, delta)
if delta > 1:
alert_type = 'inconsistent with repo source'
reason = 'more repo releases'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
msg_ok(f'matching tag(s) {",".join(repo_ver_tag)} on {repo_ver_release_date}')
except Exception as e:
msg_fail(str(e))
finally:
return risks, report

def analyze_repo_code(risks, report):
def analyze_repo_code(repo_data, risks, report):
try:
repo_url = report['repo']['url']
msg_info('Analyzing repo-pkg src code match...', end='', flush=True, indent=1)
# TODO
msg_warn(' N/A','Coming soon!')
Expand Down Expand Up @@ -890,8 +962,9 @@ def audit(pm_args, pkg_name, ver_str, report_dir, extra_args, config):
risks, report = analyze_repo_data(config, risks, report)
if 'description' in report['repo']:
risks, report = analyze_repo_descr(risks, report)
risks, report = analyze_repo_code(risks, report)
risks, report = analyze_repo_activity(risks, report)
risks, report, repo_data = analyze_repo_activity(risks, report)
risks, report = analyze_repo_releases(repo_data, risks, report, release_history)
risks, report = analyze_repo_code(repo_data, risks, report)
risks, report = analyze_cves(pm_name, pkg_name, ver_str, risks, report)
risks, report = analyze_deps(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report)

Expand Down
5 changes: 4 additions & 1 deletion packj/util/dates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
def nearest(tuple_list, pivot):
return min(tuple_list, key=lambda x: abs(x[1] - pivot))

def ts_to_date_str(tstamp:float, fmt:str='%m-%d-%Y'):
try:
import datetime
Expand Down Expand Up @@ -45,7 +48,7 @@ def date_str_to_datetime(date_str, fmt=None):
except Exception as e:
raise Exception("Failed to get datetime from date string %s: %s" % (date_str, str(e)))

def datetime_delta(date1, date2=None, days=None):
def datetime_delta(date1, date2=None, days=False):
try:
import datetime
import pytz
Expand Down
21 changes: 12 additions & 9 deletions packj/util/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,42 @@ def git_clone(repo_url):
logging.debug("Failed to get parse repo at %s: %s" % (clone_dir, str(e)))
return "invalid repo", None

reason = None

# tags
tags = None
try:
tags = [(t.name, datetime_to_date_str(t.commit.committed_datetime)) for t in repo.tags]
except Exception as e:
logging.debug("Failed to get tags %s: %s" % (clone_dir, str(e)))
tags = None

# branches
branches = None
try:
branches = [b.name for b in repo.remote().refs]
except Exception as e:
logging.debug("Failed to get branches %s: %s" % (clone_dir, str(e)))
branches = None

commits = []
try:
commits = []
for commit in repo.iter_commits():
commits.append(commit)
except Exception as e:
logging.debug("Failed to get commits %s: %s" % (clone_dir, str(e)))
commits = None
reason = 'No commits'

authors = None
try:
if commits:
authors = set([commit.author.email for commit in commits])
except Exception as e:
logging.debug("Failed to get authors %s: %s" % (clone_dir, str(e)))
authors = None

shutil.rmtree(os.path.dirname(clone_dir))
return None, {
'commits' : len(commits) if commits else None,
'branches' : len(branches),
'tags' : len(tags),
'contributors' : len(authors),
return reason, {
'commits' : commits,
'branches' : branches,
'tags' : tags,
'contributors' : authors,
}

0 comments on commit 2773ddd

Please sign in to comment.