Skip to content

Commit

Permalink
Refactor and promote function to a class
Browse files Browse the repository at this point in the history
The code has many issues that are triggering CodeClimate problems.
We tried to address them by simple refactoring, but then we realized
that more refactoring would make the code more readable and
understandable.

There are 2 branches of the code logic:
1. processing OVAL checks from benchmark directories (eg. /linux_os/guide)
2. processing OVAL checks from shared directories (eg. /shared/checks)

These 2 logic branches have similar code and it's possible to unify
them. However, I found the unification easier when I change the code
to a class.
  • Loading branch information
jan-cerny committed Jan 20, 2023
1 parent 2828a77 commit 682892a
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 114 deletions.
5 changes: 2 additions & 3 deletions build-scripts/combine_ovals.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ def main():
ssg.utils.required_key(env_yaml, "target_oval_version_str"),
ssg.utils.required_key(env_yaml, "ssg_version"))

body = ssg.build_ovals.checks(
oval_builder = ssg.build_ovals.OVALBuilder(
env_yaml,
args.product_yaml,
ssg.utils.required_key(env_yaml, "target_oval_version_str"),
args.ovaldirs,
args.include_benchmark,
args.build_ovals_dir)
body = oval_builder.build_shorthand(args.include_benchmark)

# parse new file(string) as an ssg.xml.ElementTree, so we can reorder elements
# appropriately
Expand Down
268 changes: 157 additions & 111 deletions ssg/build_ovals.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,122 +260,168 @@ def _check_rule_id(oval_file_tree, rule_id):
return False


def get_checks_from_benchmark(
env_yaml, product_yaml_path, already_loaded, oval_version,
build_ovals_dir):
oval_checks = []
product = utils.required_key(env_yaml, "product")
product_dir = os.path.dirname(product_yaml_path)
relative_guide_dir = utils.required_key(env_yaml, "benchmark_root")
guide_dir = os.path.abspath(os.path.join(product_dir, relative_guide_dir))
additional_content_directories = env_yaml.get("additional_content_directories", [])
add_content_dirs = [
os.path.abspath(os.path.join(product_dir, rd)) for rd in additional_content_directories]
local_env_yaml = dict()
local_env_yaml.update(env_yaml)

if build_ovals_dir:
# Create output directory if it doesn't yet exist.
if not os.path.exists(build_ovals_dir):
os.makedirs(build_ovals_dir)

for _dir_path in find_rule_dirs_in_paths([guide_dir] + add_content_dirs):
rule_id = get_rule_dir_id(_dir_path)

rule_path = os.path.join(_dir_path, "rule.yml")
try:
rule = Rule.from_yaml(rule_path, env_yaml)
except DocumentationNotComplete:
# Happens on non-debug build when a rule is "documentation-incomplete"
continue
prodtypes = parse_prodtype(rule.prodtype)

local_env_yaml['rule_id'] = rule.id_
local_env_yaml['rule_title'] = rule.title
local_env_yaml['products'] = prodtypes # default is all

for _path in get_rule_dir_ovals(_dir_path, product):
# To be compatible with the later checks, use the rule_id
# (i.e., the value of _dir) to recreate the expected filename if
# this OVAL was in a rule directory.
filename = "%s.xml" % rule_id

xml_content = process_file_with_macros(_path, local_env_yaml)

if not _check_is_applicable_for_product(xml_content, product):
continue

if build_ovals_dir:
# store intermediate files
output_file_name = rule_id + ".xml"
output_filepath = os.path.join(build_ovals_dir, output_file_name)
with open(output_filepath, "w") as f:
f.write(xml_content)

if _check_is_loaded(already_loaded, filename, oval_version):
continue
oval_file_tree = _create_oval_tree_from_string(xml_content)
if not _check_rule_id(oval_file_tree, rule_id,):
msg = "OVAL definition in '%s' doesn't match rule ID '%s'." % (
_path, rule_id)
print(msg, file=sys.stderr)
if not _check_oval_version_from_oval(oval_file_tree, oval_version):
def _create_output_directory(directory_path=None):
if directory_path and not os.path.exists(directory_path):
os.makedirs(directory_path)


def _list_full_paths(directory):
full_paths = [os.path.join(directory, x) for x in os.listdir(directory)]
return sorted(full_paths)


class OVALBuilder:
def __init__(
self, env_yaml, product_yaml_path, shared_directories,
build_ovals_dir):
self.env_yaml = env_yaml
self.product_yaml_path = product_yaml_path
self.shared_directories = shared_directories
self.build_ovals_dir = build_ovals_dir
self.already_loaded = dict()
self.oval_version = utils.required_key(
env_yaml, "target_oval_version_str")
self.product = utils.required_key(env_yaml, "product")

def build_shorthand(self, include_benchmark):
_create_output_directory(self.build_ovals_dir)
all_checks = []
if include_benchmark:
all_checks += self._get_checks_from_benchmark()
all_checks += self._get_checks_from_shared_directories()
document_body = "".join(all_checks)
return document_body

def _get_checks_from_benchmark(self):
product_dir = os.path.dirname(self.product_yaml_path)
relative_guide_dir = utils.required_key(self.env_yaml, "benchmark_root")
guide_dir = os.path.abspath(
os.path.join(product_dir, relative_guide_dir))
additional_content_directories = self.env_yaml.get(
"additional_content_directories", [])
dirs_to_scan = [guide_dir]
for rd in additional_content_directories:
abspath = os.path.abspath(os.path.join(product_dir, rd))
dirs_to_scan.append(abspath)
rule_dirs = list(find_rule_dirs_in_paths(dirs_to_scan))
oval_checks = self._process_directories(rule_dirs, True)
return oval_checks

def _get_checks_from_shared_directories(self):
# earlier directory has higher priority
reversed_dirs = self.shared_directories[::-1]
oval_checks = self._process_directories(reversed_dirs, False)
return oval_checks

def _process_directories(self, directories, from_benchmark):
oval_checks = []
for directory in directories:
if not os.path.exists(directory):
continue
oval_checks += self._process_directory(directory, from_benchmark)
return oval_checks

oval_checks.append(xml_content)
already_loaded[filename] = oval_version
return oval_checks


def get_checks_from_directories(oval_dirs, env_yaml, already_loaded, oval_version):
product = utils.required_key(env_yaml, "product")
reversed_dirs = oval_dirs[::-1] # earlier directory has higher priority
oval_checks = []
for oval_dir in reversed_dirs:
if not os.path.isdir(oval_dir):
continue
# sort the files to make output deterministic
for filename in sorted(os.listdir(oval_dir)):
if not filename.endswith(".xml"):
continue
oval_file_path = os.path.join(oval_dir, filename)
if "checks_from_templates" in oval_dir:
with open(oval_file_path, "r") as f:
xml_content = f.read()
else:
xml_content = process_file_with_macros(oval_file_path, env_yaml)
def _get_list_of_oval_files(self, directory, from_benchmark):
if from_benchmark:
oval_files = get_rule_dir_ovals(directory, self.product)
else:
oval_files = _list_full_paths(directory)
return oval_files

if not _check_is_applicable_for_product(xml_content, product):
continue
if _check_is_loaded(already_loaded, filename, oval_version):
continue
oval_file_tree = _create_oval_tree_from_string(xml_content)
if not _check_oval_version_from_oval(oval_file_tree, oval_version):
def _process_directory(self, directory, from_benchmark):
try:
context = self._get_context(directory, from_benchmark)
except DocumentationNotComplete:
return []
oval_files = self._get_list_of_oval_files(directory, from_benchmark)
oval_checks = self._get_directory_oval_checks(
context, oval_files, from_benchmark)
return oval_checks

def _get_directory_oval_checks(self, context, oval_files, from_benchmark):
oval_checks = []
for file_path in oval_files:
xml_content = self._process_oval_file(
file_path, from_benchmark, context)
if xml_content is None:
continue
oval_checks.append(xml_content)
already_loaded[filename] = oval_version
return oval_checks
return oval_checks

def _read_oval_file(self, file_path, context, from_benchmark):
if from_benchmark or "checks_from_templates" not in file_path:
xml_content = process_file_with_macros(file_path, context)
else:
with open(file_path, "r") as f:
xml_content = f.read()
return xml_content

def _create_key(self, file_path, from_benchmark):
if from_benchmark:
rule_id = os.path.basename(
(os.path.dirname(os.path.dirname(file_path))))
oval_key = "%s.xml" % rule_id
else:
oval_key = os.path.basename(file_path)
return oval_key

def _process_oval_file(self, file_path, from_benchmark, context):
if not file_path.endswith(".xml"):
return None
oval_key = self._create_key(file_path, from_benchmark)
if _check_is_loaded(self.already_loaded, oval_key, self.oval_version):
return None
xml_content = self._read_oval_file(file_path, context, from_benchmark)
if not self._manage_oval_file_xml_content(
file_path, xml_content, from_benchmark):
return None
self.already_loaded[oval_key] = self.oval_version
return xml_content

def _manage_oval_file_xml_content(
self, file_path, xml_content, from_benchmark):
if not _check_is_applicable_for_product(xml_content, self.product):
return False
oval_file_tree = _create_oval_tree_from_string(xml_content)
if not _check_oval_version_from_oval(oval_file_tree, self.oval_version):
return False
if from_benchmark:
self._benchmark_specific_actions(
file_path, xml_content, oval_file_tree)
return True

def checks(
env_yaml, yaml_path, oval_version, oval_dirs, include_benchmark,
build_ovals_dir=None):
"""
Concatenate all XML files in the oval directory, to create the document
body. Then concatenates this with all XML files in the guide directories,
preferring {{{ product }}}.xml to shared.xml.
oval_dirs: list of directory with oval files (later has higher priority)
Return: The document body
"""
def _benchmark_specific_actions(
self, file_path, xml_content, oval_file_tree):
rule_id = os.path.basename(
(os.path.dirname(os.path.dirname(file_path))))
self._store_intermediate_file(rule_id, xml_content)
if not _check_rule_id(oval_file_tree, rule_id):
msg = "OVAL definition in '%s' doesn't match rule ID '%s'." % (
file_path, rule_id)
print(msg, file=sys.stderr)

def _get_context(self, directory, from_benchmark):
if from_benchmark:
rule_path = os.path.join(directory, "rule.yml")
rule = Rule.from_yaml(rule_path, self.env_yaml)
context = self._create_local_env_yaml_for_rule(rule)
else:
context = self.env_yaml
return context

already_loaded = dict() # filename -> oval_version
all_checks = []
if include_benchmark:
all_checks += get_checks_from_benchmark(
env_yaml, yaml_path, already_loaded, oval_version, build_ovals_dir)
all_checks += get_checks_from_directories(
oval_dirs, env_yaml, already_loaded, oval_version)
document_body = "".join(all_checks)
return document_body
def _create_local_env_yaml_for_rule(self, rule):
local_env_yaml = dict()
local_env_yaml.update(self.env_yaml)
local_env_yaml['rule_id'] = rule.id_
local_env_yaml['rule_title'] = rule.title
prodtypes = parse_prodtype(rule.prodtype)
local_env_yaml['products'] = prodtypes # default is all
return local_env_yaml

def _store_intermediate_file(self, rule_id, xml_content):
if not self.build_ovals_dir:
return
output_file_name = rule_id + ".xml"
output_filepath = os.path.join(self.build_ovals_dir, output_file_name)
with open(output_filepath, "w") as f:
f.write(xml_content)

0 comments on commit 682892a

Please sign in to comment.