Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce templated platforms (CPEs) #9906

Merged
38 changes: 26 additions & 12 deletions docs/manual/developer/06_contributing_with_content.md
Original file line number Diff line number Diff line change
Expand Up @@ -1150,31 +1150,45 @@ At the moment, only the CPE mechanism is supported.
### Applicability by CPE

The CPEs defined by the project are declared in
`shared/applicability/cpes.yml`.
`shared/applicability/*.yml`, one CPE per file.

The id of the CPE is inferred from the file name.

Syntax is as follows (using examples of existing CPEs):

cpes:
- machine: ## The id of the CPE
machine.yml: ## The id of the CPE is 'machine'
name: "cpe:/a:machine" ## The CPE Name as defined by the CPE standard
title: "Bare-metal or Virtual Machine" ## Human readable title for the CPE
check_id: installed_env_is_a_machine ## ID of OVAL implementing the applicability check
- gdm:
name: "cpe:/a:gdm"
title: "Package gdm is installed"
check_id: installed_env_has_gdm_package

The first entry above defines a CPE whose `id` is `machine`, this CPE
package.yml:
name: "cpe:/a:{arg}"
title: "Package {pkgname} is installed"
check_id: cond_package_{arg}
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}} ## The conditional expression for Bash remediations
ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}} ## The conditional expression for Ansible remediations
template: ## Instead of static OVAL checks a CPE can use templates
name: cond_package ## Name of the template with OVAL applicability check
args: ## CPEs can be parametrized: 'package[*]'.
ntp: ## This is the map of substitution values for 'package[ntp]'
pkgname: ntp ## "Package {pkgname} is installed" -> "Package ntp is installed"
title: NTP daemon and utilities

The first file above defines a CPE whose `id` is `machine`, this CPE
is used for rules not applicable to containers.
A rule or profile with `platform: machine` will be evaluated only if the
targeted scan environment is either bare-metal or virtual machine.

The second entry defines a CPE for GDM.
By setting the `platform` to `gdm`, the rule will have its applicability
restricted to only environments which have `gdm` package installed.
The second file defines a parametrized CPE. This allows us to define multiple
similar CPEs that differ in their argument. In our example, we define
the `package` CPE. Within the `args` key we configure a set of its possible
arguments and their values. In our example, there is a single possible value: `ntp`.

By setting the `platform` to `package[ntp]`, the rule will have its applicability
restricted to only environments which have `ntp` package installed.

The OVAL checks for the CPE need to be of `inventory` class, and must be
under `shared/checks/oval/`.
under `shared/checks/oval/` or have a template under `shared/templates/`.

#### Setting a product's default CPE

Expand Down
11 changes: 11 additions & 0 deletions shared/applicability/package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "cpe:/a:{arg}"
title: "Package {pkgname} is installed"
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}}
ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}}
check_id: platform_package_{arg}
template:
name: platform_package
args:
ntp:
pkgname: ntp
title: NTP daemon and utilities
11 changes: 11 additions & 0 deletions shared/templates/platform_package/oval.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<def-group>
<definition class="inventory" id="platform_{{{ _RULE_ID }}}"
version="1">
{{{ oval_metadata("The " + pkg_system|upper + " package " + PKGNAME + " should be installed.", affected_platforms=["multi_platform_all"]) }}}
<criteria>
<criterion comment="Package {{{ PKGNAME }}} is installed"
test_ref="platform_test_{{{ _RULE_ID }}}_installed" />
</criteria>
</definition>
{{{ oval_test_package_installed(package=PKGNAME, test_id="platform_test_" + _RULE_ID + "_installed") }}}
</def-group>
2 changes: 2 additions & 0 deletions shared/templates/platform_package/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
supported_languages:
- oval
20 changes: 8 additions & 12 deletions ssg/boolean_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ def name(self):
return self.requirement.project_name

@staticmethod
def cpe_id_is_parametrized(cpe_id):
return re.search(r'^\w+\[\w+\]$', cpe_id)
def is_parametrized(name):
return bool(pkg_resources.Requirement.parse(name).extras)

@staticmethod
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")
def get_base_of_parametrized_name(name):
"""
If given a parametrized platform name such as package[test],
it returns the package part only.
"""
return pkg_resources.Requirement.parse(name).project_name


class Algebra(boolean.BooleanAlgebra):
Expand All @@ -125,11 +129,3 @@ def __init__(self, symbol_cls, function_cls):
super(Algebra, self).__init__(allowed_in_token=VERSION_SYMBOLS+SPEC_SYMBOLS,
Symbol_class=symbol_cls,
NOT_class=not_cls, AND_class=and_cls, OR_class=or_cls)


def get_base_name_of_parametrized_platform(name):
"""
If given a parametrized platform name such as package[test],
it returns the package part only.
"""
return pkg_resources.Requirement.parse(name).project_name
70 changes: 49 additions & 21 deletions ssg/build_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from .utils import required_key, apply_formatting_on_dict_values
from .xml import ElementTree as ET
from .boolean_expression import Algebra, Symbol, Function
from .boolean_expression import get_base_name_of_parametrized_platform
from .entities.common import XCCDFEntity
from .entities.common import XCCDFEntity, Templatable
from .yaml import convert_string_to_bool


Expand Down Expand Up @@ -82,16 +81,27 @@ def add_cpe_item(self, cpe_item):

def get_cpe(self, cpe_id_or_name):
try:
if Symbol.is_cpe_name(cpe_id_or_name):
if CPEItem.is_cpe_name(cpe_id_or_name):
return self.cpes_by_name[cpe_id_or_name]
else:
if Symbol.cpe_id_is_parametrized(cpe_id_or_name):
cpe_id_or_name = get_base_name_of_parametrized_platform(
if CPEALFactRef.cpe_id_is_parametrized(cpe_id_or_name):
cpe_id_or_name = CPEALFactRef.get_base_name_of_parametrized_cpe_id(
cpe_id_or_name)
return self.cpes_by_id[cpe_id_or_name]
except KeyError:
raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name)

def add_resolved_cpe_items_from_platform(self, platform):
for fact_ref in platform.test.get_symbols():
if fact_ref.arg:
cpe = self.get_cpe(fact_ref.cpe_name)
new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref)
self.add_cpe_item(new_cpe)
fact_ref.cpe_name = new_cpe.name

def get_cpe_for_fact_ref(self, fact_ref):
return self.get_cpe(fact_ref.as_id())

def get_cpe_name(self, cpe_id):
cpe = self.get_cpe(cpe_id)
return cpe.name
Expand Down Expand Up @@ -133,7 +143,7 @@ def to_file(self, file_name, cpe_oval_file):
tree.write(file_name, encoding="utf-8")


class CPEItem(XCCDFEntity):
class CPEItem(XCCDFEntity, Templatable):
"""
Represents the cpe-item element from the CPE standard.
"""
Expand All @@ -144,8 +154,10 @@ class CPEItem(XCCDFEntity):
bash_conditional=lambda: "",
ansible_conditional=lambda: "",
is_product_cpe=lambda: False,
args=lambda: {},
** XCCDFEntity.KEYS
)
KEYS.update(**Templatable.KEYS)

MANDATORY_KEYS = [
"name",
Expand Down Expand Up @@ -177,6 +189,29 @@ def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe)
return cpe_item

def set_template_variables(self, *sources):
if self.is_templated():
self.template["vars"] = {}
for source in sources:
self.template["vars"].update(source)

def create_resolved_cpe_item_for_fact_ref(self, fact_ref):
resolved_parameters = self.args[fact_ref.arg]
resolved_parameters.update(fact_ref.as_dict())
cpe_item_as_dict = self.represent_as_dict()
cpe_item_as_dict["args"] = None
cpe_item_as_dict["id_"] = fact_ref.as_id()
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
cpe_item_as_dict, resolved_parameters)
new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
new_associated_cpe_item_as_dict)
new_associated_cpe_item.set_template_variables(resolved_parameters)
return new_associated_cpe_item

@staticmethod
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")


class CPEALLogicalTest(Function):

Expand All @@ -199,10 +234,6 @@ def enrich_with_cpe_info(self, cpe_products):
for arg in self.args:
arg.enrich_with_cpe_info(cpe_products)

def pass_parameters(self, product_cpes):
for arg in self.args:
arg.pass_parameters(product_cpes)

def to_bash_conditional(self):
child_bash_conds = [
a.to_bash_conditional() for a in self.args
Expand Down Expand Up @@ -262,17 +293,6 @@ def enrich_with_cpe_info(self, cpe_products):
self.ansible_conditional = cpe_products.get_cpe(self.cpe_name).ansible_conditional
self.cpe_name = cpe_products.get_cpe_name(self.cpe_name)

def pass_parameters(self, product_cpes):
if self.arg:
associated_cpe_item_as_dict = product_cpes.get_cpe(self.cpe_name).represent_as_dict()
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
associated_cpe_item_as_dict, self.as_dict())
new_associated_cpe_item_as_dict["id_"] = self.as_id()
new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
new_associated_cpe_item_as_dict)
product_cpes.add_cpe_item(new_associated_cpe_item)
self.cpe_name = new_associated_cpe_item.name

def to_xml_element(self):
cpe_factref = ET.Element("{%s}fact-ref" % CPEALFactRef.ns)
cpe_factref.set('name', self.cpe_name)
Expand All @@ -284,6 +304,14 @@ def to_bash_conditional(self):
def to_ansible_conditional(self):
return self.ansible_conditional

@staticmethod
def cpe_id_is_parametrized(cpe_id):
return Symbol.is_parametrized(cpe_id)

@staticmethod
def get_base_name_of_parametrized_cpe_id(cpe_id):
return Symbol.get_base_of_parametrized_name(cpe_id)
Comment on lines +307 to +313
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing!



def extract_subelement(objects, sub_elem_type):
"""
Expand Down
25 changes: 12 additions & 13 deletions ssg/build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,8 @@ def load_benchmark(self, directory):
self.benchmark.unselect_empty_groups()

def load_compiled_content(self):
self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml)

self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir)

filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
Expand All @@ -1443,8 +1445,6 @@ def load_compiled_content(self):
self.load_entities_by_id(filenames, self.platforms, Platform)
self.product_cpes.platforms = self.platforms

self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml)

for g in self.groups.values():
g.load_entities(self.rules, self.values, self.groups)

Expand Down Expand Up @@ -1512,14 +1512,13 @@ class Platform(XCCDFEntity):
def from_text(cls, expression, product_cpes):
if not product_cpes:
return None
test = product_cpes.algebra.parse(
expression, simplify=True)
id = test.as_id()
platform = cls(id)
test = product_cpes.algebra.parse(expression, simplify=True)
id_ = test.as_id()
platform = cls(id_)
platform.test = test
platform.test.pass_parameters(product_cpes)
product_cpes.add_resolved_cpe_items_from_platform(platform)
platform.test.enrich_with_cpe_info(product_cpes)
platform.name = id
platform.name = id_
platform.original_expression = expression
platform.xml_content = platform.get_xml()
platform.bash_conditional = platform.test.to_bash_conditional()
Expand All @@ -1529,8 +1528,8 @@ def from_text(cls, expression, product_cpes):
def get_xml(self):
cpe_platform = ET.Element("{%s}platform" % Platform.ns)
cpe_platform.set('id', self.name)
# in case the platform contains only single CPE name, fake the logical test
# we have to athere to CPE specification
# In case the platform contains only single CPE name, fake the logical test
# we have to adhere to CPE specification
if isinstance(self.test, CPEALFactRef):
cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
cpe_test.set('operator', 'AND')
Expand All @@ -1557,11 +1556,11 @@ def get_remediation_conditional(self, language):
def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
platform.xml_content = ET.fromstring(platform.xml_content)
# if we did receive a product_cpes, we can restore also the original test object
# If we received a product_cpes, we can restore also the original test object
# it can be later used e.g. for comparison
if product_cpes:
platform.test = product_cpes.algebra.parse(
platform.original_expression, simplify=True)
platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True)
product_cpes.add_resolved_cpe_items_from_platform(platform)
return platform

def __eq__(self, other):
Expand Down
22 changes: 22 additions & 0 deletions ssg/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,20 @@ def build_lang_for_templatable(self, templatable, lang):
filled_template = self.get_lang_contents_for_templatable(templatable, lang)
self.write_lang_contents_for_templatable(filled_template, lang, templatable)

def build_cpe(self, cpe):
for lang in self.get_resolved_langs_to_generate(cpe):
self.build_lang_for_templatable(cpe, lang)

def build_platform(self, platform):
"""
Builds templated content of a given Platform (all CPEs/Symbols) for all available
languages, writing the output to the correct build directories.
"""
for fact_ref in platform.test.get_symbols():
cpe = self.product_cpes.get_cpe_for_fact_ref(fact_ref)
if cpe.is_templated():
self.build_cpe(cpe)

def build_rule(self, rule):
"""
Builds templated content of a given Rule for all available languages,
Expand All @@ -230,6 +244,13 @@ def build_extra_ovals(self):
})
self.build_lang_for_templatable(rule, LANGUAGES["oval"])

def build_all_platforms(self):
for platform_file in sorted(os.listdir(self.platforms_dir)):
platform_path = os.path.join(self.platforms_dir, platform_file)
platform = ssg.build_yaml.Platform.from_yaml(platform_path, self.env_yaml,
self.product_cpes)
self.build_platform(platform)

def build_all_rules(self):
for rule_file in sorted(os.listdir(self.resolved_rules_dir)):
rule_path = os.path.join(self.resolved_rules_dir, rule_file)
Expand All @@ -252,3 +273,4 @@ def build(self):

self.build_extra_ovals()
self.build_all_rules()
self.build_all_platforms()
9 changes: 8 additions & 1 deletion tests/unit/ssg-module/data/applicability/package.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
name: "cpe:/a:{arg}"
title: "Package {arg} is installed"
title: "Package {pkgname} is installed"
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}}
check_id: installed_env_has_{arg}_package
template:
name: package_installed
args:
ntp:
pkgname: ntp
title: NTP
9 changes: 9 additions & 0 deletions tests/unit/ssg-module/data/package_ntp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: package_ntp
original_expression: package[ntp]
xml_content: <ns0:platform xmlns:ns0="http://cpe.mitre.org/language/2.0" id="package_ntp"><ns0:logical-test
operator="AND" negate="false"><ns0:fact-ref name="cpe:/a:ntp" /></ns0:logical-test></ns0:platform>
bash_conditional: ( ( rpm --quiet -q ntp ) )
ansible_conditional: ( ( "ntp" in ansible_facts.packages ) )
definition_location: ''
documentation_complete: true
title: 'NTP Package'
12 changes: 0 additions & 12 deletions tests/unit/ssg-module/data/templates/package_installed/template.py

This file was deleted.

Loading