Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly process templated Ansible conditionals and introduce os_linux platform #9959

Merged
merged 9 commits into from
Dec 16, 2022
15 changes: 15 additions & 0 deletions shared/applicability/os_linux.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: "cpe:/o:{arg}"
title: "Operating System is {arg}"
check_id: platform_os_linux_{arg}
versioned: false
template:
name: platform_os_linux
args:
rhel:
os_name: "Red Hat Enterprise Linux"
os_id: 'rhel'
os_id_ansible: "RedHat"
fedora:
os_name: "Fedora"
os_id: 'fedora'
os_id_ansible: "Fedora"
2 changes: 2 additions & 0 deletions shared/templates/platform_os_linux/ansible.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{%- set ansible_os_release_name_cond = 'ansible_distribution == "' + OS_ID_ANSIBLE + '"' -%}}
{{{ ansible_os_release_name_cond.strip() }}}
25 changes: 25 additions & 0 deletions shared/templates/platform_os_linux/oval.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<def-group>
<definition class="inventory" id="platform_{{{ _RULE_ID }}}" version="1">
{{{ oval_metadata("The installed operating system is " + OS_NAME, affected_platforms=["multi_platform_all"]) }}}
<criteria operator="AND">
<criterion comment="The operating system installed on the system is {{{ OS_NAME }}}"
test_ref="test_os_id_is_{{{ ID }}}" />
</criteria>
</definition>

<ind:textfilecontent54_test check="all" comment="ID in os-release is {{{ OS_ID }}}" id="test_os_id_is_{{{ ID }}}" version="1">
<ind:object object_ref="obj_os_id_is_{{{ ID }}}" />
<ind:state state_ref="state_os_id_is_{{{ ID }}}" />
</ind:textfilecontent54_test>

<ind:textfilecontent54_object id="obj_os_id_is_{{{ ID }}}" version="1">
<ind:filepath>/etc/os-release</ind:filepath>
<ind:pattern operation="pattern match">^ID=[&quot;&apos;]?(\w+)[&quot;&apos;]?$</ind:pattern>
<ind:instance operation="greater than or equal" datatype="int">1</ind:instance>
</ind:textfilecontent54_object>

<ind:textfilecontent54_state id="state_os_id_is_{{{ ID }}}" version="1">
<ind:subexpression operation="pattern match">{{{ OS_ID }}}</ind:subexpression>
</ind:textfilecontent54_state>

</def-group>
3 changes: 3 additions & 0 deletions shared/templates/platform_os_linux/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
supported_languages:
- ansible
- oval
28 changes: 23 additions & 5 deletions ssg/build_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,20 @@ def get_cpe(self, cpe_id_or_name):

def add_resolved_cpe_items_from_platform(self, platform):
for fact_ref in platform.get_fact_refs():
if fact_ref.arg:
cpe = self.get_cpe(fact_ref.cpe_name)
new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref)
self.add_cpe_item(new_cpe)
fact_ref.cpe_name = new_cpe.name
if fact_ref.arg: # the CPE item is parametrized
try:
# if there already exists a CPE item with factref's ID
# we can just use it right away, no new CPE items need to be created
cpe = self.get_cpe_for_fact_ref(fact_ref)
fact_ref.cpe_name = cpe.name
except CPEDoesNotExist:
# if the CPE item with factref's ID does not exist
# it means that we need to create a new CPE item
# which will have parameters in place
cpe = self.get_cpe(fact_ref.cpe_name)
new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref)
self.add_cpe_item(new_cpe)
fact_ref.cpe_name = new_cpe.name

def get_cpe_for_fact_ref(self, fact_ref):
return self.get_cpe(fact_ref.as_id())
Expand Down Expand Up @@ -217,6 +226,15 @@ def create_resolved_cpe_item_for_fact_ref(self, fact_ref):
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")

def set_conditional(self, language, content):
if language == "ansible":
self.ansible_conditional = content
elif language == "bash":
self.bash_conditional = content
else:
raise RuntimeError(
"The language {0} is not supported as conditional for CPE".format(language))


class CPEALLogicalTest(Function):

Expand Down
17 changes: 13 additions & 4 deletions ssg/build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,8 @@ def from_text(cls, expression, product_cpes):
platform.name = id_
platform.original_expression = expression
platform.xml_content = platform.get_xml()
platform.bash_conditional = platform.test.to_bash_conditional()
platform.ansible_conditional = platform.test.to_ansible_conditional()
platform.update_conditional_from_cpe_items("bash", product_cpes)
platform.update_conditional_from_cpe_items("ansible", product_cpes)
return platform

def get_xml(self):
Expand All @@ -1542,7 +1542,7 @@ def get_xml(self):
return xmlstr

def to_xml_element(self):
return self.xml_content
return ET.fromstring(self.xml_content)

def get_remediation_conditional(self, language):
if language == "bash":
Expand All @@ -1555,7 +1555,6 @@ def get_remediation_conditional(self, language):
@classmethod
def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
platform.xml_content = ET.fromstring(platform.xml_content)
# If we received a product_cpes, we can restore also the original test object
# it can be later used e.g. for comparison
if product_cpes:
Expand All @@ -1566,6 +1565,16 @@ def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
def get_fact_refs(self):
return self.test.get_symbols()

def update_conditional_from_cpe_items(self, language, product_cpes):
self.test.enrich_with_cpe_info(product_cpes)
if language == "bash":
self.bash_conditional = self.test.to_bash_conditional()
elif language == "ansible":
self.ansible_conditional = self.test.to_ansible_conditional()
else:
raise RuntimeError(
"Platform remediations do not support the {0} language".format(language))

def __eq__(self, other):
if not isinstance(other, Platform):
return False
Expand Down
33 changes: 25 additions & 8 deletions ssg/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,40 @@ def write_lang_contents_for_templatable(self, filled_template, lang, templatable

def build_lang_for_templatable(self, templatable, lang):
"""
Builds templated content of a given Templatable for a selected language,
writing the output to the correct build directories.
Builds templated content of a given Templatable for a selected language
returning the filled template.
"""
filled_template = self.get_lang_contents_for_templatable(templatable, lang)
self.write_lang_contents_for_templatable(filled_template, lang, templatable)
return self.get_lang_contents_for_templatable(templatable, lang)

def build_cpe(self, cpe):
for lang in self.get_resolved_langs_to_generate(cpe):
self.build_lang_for_templatable(cpe, lang)
filled_template = self.build_lang_for_templatable(cpe, lang)
if lang.template_type == TemplateType.REMEDIATION:
cpe.set_conditional(lang.name, filled_template)
if lang.template_type == TemplateType.CHECK:
self.write_lang_contents_for_templatable(filled_template, lang, cpe)
self.product_cpes.add_cpe_item(cpe)
cpe_path = os.path.join(self.cpe_items_dir, cpe.id_+".yml")
cpe.dump_yaml(cpe_path)

def build_platform(self, platform):
"""
Builds templated content of a given Platform (all CPEs/Symbols) for all available
languages, writing the output to the correct build directories.
languages, writing the output to the correct build directories
and updating the platform it self.
"""
langs_affecting_this_platform = set()
for fact_ref in platform.test.get_symbols():
cpe = self.product_cpes.get_cpe_for_fact_ref(fact_ref)
if cpe.is_templated():
self.build_cpe(cpe)
langs_affecting_this_platform.update(
self.get_resolved_langs_to_generate(cpe))
for lang in langs_affecting_this_platform:
if lang.template_type == TemplateType.REMEDIATION:
platform.update_conditional_from_cpe_items(lang.name, self.product_cpes)
platform_path = os.path.join(self.platforms_dir, platform.id_+".yml")
platform.dump_yaml(platform_path)

def build_rule(self, rule):
"""
Expand All @@ -228,7 +243,8 @@ def build_rule(self, rule):
"""
for lang in self.get_resolved_langs_to_generate(rule):
if lang.name != "sce-bash":
self.build_lang_for_templatable(rule, lang)
filled_template = self.build_lang_for_templatable(rule, lang)
self.write_lang_contents_for_templatable(filled_template, lang, rule)

def build_extra_ovals(self):
declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
Expand All @@ -242,7 +258,8 @@ def build_extra_ovals(self):
"title": oval_def_id,
"template": template,
})
self.build_lang_for_templatable(rule, LANGUAGES["oval"])
filled_template = self.build_lang_for_templatable(rule, LANGUAGES["oval"])
self.write_lang_contents_for_templatable(filled_template, LANGUAGES["oval"], rule)

def build_all_platforms(self):
for platform_file in sorted(os.listdir(self.platforms_dir)):
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/ssg-module/test_build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def test_platform_from_text_simple(product_cpes):
"ansible_virtualization_type not in [\"docker\", \"lxc\", \"openvz\", \"podman\", \"container\"]"
assert platform.get_remediation_conditional("bash") == \
"[ ! -f /.dockerenv ] && [ ! -f /run/.containerenv ]"
platform_el = ET.fromstring(platform.to_xml_element())
platform_el = platform.to_xml_element()
assert platform_el.tag == "{%s}platform" % cpe_language_namespace
assert platform_el.get("id") == "machine"
logical_tests = platform_el.findall(
Expand All @@ -265,7 +265,7 @@ def test_platform_from_text_simple_product_cpe(product_cpes):
platform = ssg.build_yaml.Platform.from_text("rhel7-workstation", product_cpes)
assert platform.get_remediation_conditional("bash") == ""
assert platform.get_remediation_conditional("ansible") == ""
platform_el = ET.fromstring(platform.to_xml_element())
platform_el = platform.to_xml_element()
assert platform_el.tag == "{%s}platform" % cpe_language_namespace
assert platform_el.get("id") == "rhel7-workstation"
logical_tests = platform_el.findall(
Expand All @@ -285,7 +285,7 @@ def test_platform_from_text_or(product_cpes):
assert platform.get_remediation_conditional("bash") == "( rpm --quiet -q chrony || rpm --quiet -q ntp )"
assert platform.get_remediation_conditional("ansible") == \
"( \"chrony\" in ansible_facts.packages or \"ntp\" in ansible_facts.packages )"
platform_el = ET.fromstring(platform.to_xml_element())
platform_el = platform.to_xml_element()
assert platform_el.tag == "{%s}platform" % cpe_language_namespace
assert platform_el.get("id") == "chrony_or_ntp"
logical_tests = platform_el.findall(
Expand All @@ -312,7 +312,7 @@ def test_platform_from_text_complex_expression(product_cpes):
"systemd and !yum and (ntp or chrony)", product_cpes)
assert platform.get_remediation_conditional("bash") == "( rpm --quiet -q systemd && ( rpm --quiet -q chrony || rpm --quiet -q ntp ) && ! ( rpm --quiet -q yum ) )"
assert platform.get_remediation_conditional("ansible") == "( \"systemd\" in ansible_facts.packages and ( \"chrony\" in ansible_facts.packages or \"ntp\" in ansible_facts.packages ) and not ( \"yum\" in ansible_facts.packages ) )"
platform_el = ET.fromstring(platform.to_xml_element())
platform_el = platform.to_xml_element()
assert platform_el.tag == "{%s}platform" % cpe_language_namespace
assert platform_el.get("id") == "systemd_and_chrony_or_ntp_and_not_yum"
logical_tests = platform_el.findall(
Expand Down