Skip to content

Commit

Permalink
Refactor configure-time compliance logic
Browse files Browse the repository at this point in the history
Split STIG/CIS compliance into separate functions and separate
validation from applying required changes. Fix argument parsing to
use argparse's "choices" keyword. Add comments mapping configuration
to specific points from the STIG/CIS documents.
  • Loading branch information
RichardPooleEDB authored and JonathanRenon-EDB committed Nov 25, 2024
1 parent 37a44e2 commit a95ff67
Showing 1 changed file with 102 additions and 82 deletions.
184 changes: 102 additions & 82 deletions lib/tpaexec/architecture.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def add_options(self, p):
"--compliance",
action="store",
dest="compliance",
help="a supported compliance standard",
choices=['stig','cis'],
help="configure to assist with a compliance standard",
)

g = p.add_argument_group("OS selection")
Expand Down Expand Up @@ -524,12 +525,22 @@ def validate_arguments(self, args):

def _validate_compliance(self, args):
"""
Verify that if a compliance target has been requested, it is a
supported one.
Verify that the requested compliance target, if any, is compatible
with other arguments.
"""
target = args.get("compliance")
if target is not None and target not in ("stig", "cis"):
raise ArchitectureError(f"Available compliance targets: stig, cis")
compliance_target = args.get("compliance")
if compliance_target == "stig":
if args.get("platform") != "bare":
raise ArchitectureError('STIG compliance requires the "bare" platform')

if args.get("postgres_flavour") != "epas":
raise ArchitectureError('STIG compliance requires the "epas" flavour')

if args.get("distribution") != "RedHat" or args.get("os_version") not in (
"8",
"9",
):
raise ArchitectureError("STIG compliance requires RHEL version 8 or 9")

def _validate_flavour_version(self, args):
"""Verify postgres flavour, version and related arguments.
Expand Down Expand Up @@ -745,85 +756,94 @@ def apply_compliance(self, args):
return

if compliance_target == "stig":
top = args.get("top_level_settings") or {}
top.update({"compliance": "stig"})
args["top_level_settings"] = top

if args.get("platform") != "bare":
raise ArchitectureError('STIG compliance requires the "bare" platform')

cluster_vars = args.get("cluster_vars")
if cluster_vars.get("postgres_flavour") != "epas":
raise ArchitectureError('STIG compliance requires the "epas" flavour')

if args.get("distribution") != "RedHat" or args.get("os_version") not in (
"8",
"9",
):
raise ArchitectureError("STIG compliance requires RHEL version 8 or 9")

pcs = cluster_vars.get("postgres_conf_settings") or {}
pcs.update(
{
"edb_audit": "xml",
"edb_audit_statement": "all",
"edb_audit_connect": "all",
"edb_audit_disconnect": "all",
"statement_timeout": 1000,
"client_min_messages": "ERROR",
}
)
cluster_vars["postgres_conf_settings"] = pcs

cluster_vars.update(
{
"tcp_keepalives_idle": 10,
"tcp_keepalives_interval": 10,
"tcp_keepalives_count": 10,
"log_destination": "stderr",
"postgres_log_file_mode": "0600",
"hba_force_hostssl": True,
"hba_force_certificate_auth": True,
"hba_cert_authentication_map": "sslmap",
"extra_postgres_extensions": cluster_vars.get("extra_postgres_extensions", [])
+ ["sql_protect"]
}
)
self._apply_stig(args)

if compliance_target == "cis":
top = args.get("top_level_settings") or {}
top.update({"compliance": "cis"})
args["top_level_settings"] = top

cluster_vars = args.get("cluster_vars")
pcs = cluster_vars.get("postgres_conf_settings") or {}
pcs.update(
{
"log_error_verbosity": "verbose",
"log_line_prefix": "'%m [%p]: [%l-1] db=%d,user=%u,app=%a,client=%h '",
"log_replication_commands": "on",
"temp_file_limit": "1GB",
}
)
cluster_vars["postgres_conf_settings"] = pcs

cluster_vars.update(
{
"log_connections": "on",
"log_disconnections": "on",
"extra_bash_rc_lines": cluster_vars.get("extra_bashrc_lines", [])
+ ["umask 0077"],
"extra_postgres_extensions": cluster_vars.get("extra_postgres_extensions", [])
+ ["passwordcheck", "pgaudit"]
}
)
self._apply_cis(args)

def _apply_stig(self, args):
"""
Applies changes to config.yml required by STIG compliance
"""
top = args.get("top_level_settings") or {}
top.update({"compliance": "stig"})
args["top_level_settings"] = top

cluster_vars = args.get("cluster_vars")
pcs = cluster_vars.get("postgres_conf_settings") or {}
pcs.update(
{
# EPAS-00-001000
"edb_audit": "xml",
# EPAS-00-012600 and others
"edb_audit_statement": "all",
"edb_audit_connect": "all",
"edb_audit_disconnect": "all",
# EPAS-00-005200
"statement_timeout": 1000,
# EPAS-00-006600
"client_min_messages": "ERROR",
}
)
cluster_vars["postgres_conf_settings"] = pcs

cluster_vars.update(
{
# EPAS-00-005200
"tcp_keepalives_idle": 10,
"tcp_keepalives_interval": 10,
"tcp_keepalives_count": 10,
# EPAS-00-006600
"log_destination": "stderr",
"postgres_log_file_mode": "0600",
# EPAS-00-009500
"hba_force_hostssl": True,
"hba_force_certificate_auth": True,
"hba_cert_authentication_map": "sslmap",
# EPAS-00-006200 and others
"extra_postgres_extensions": cluster_vars.get("extra_postgres_extensions", [])
+ ["sql_protect"]
}
)

# set various GUCs in cluster_vars
# set compliance: stig so later checks can see it
# set something to install sql/protect
# set something to make certs work
# check that we have got certs provided also
# check that we are bare, RHEL8/9, EPAS
def _apply_cis(self, args):
"""
Applies changes to config.yml required by CIS compliance
"""
top = args.get("top_level_settings") or {}
top.update({"compliance": "cis"})
args["top_level_settings"] = top

cluster_vars = args.get("cluster_vars")
pcs = cluster_vars.get("postgres_conf_settings") or {}
pcs.update(
{
# 3.1.22
"log_error_verbosity": "verbose",
# 3.1.24
"log_line_prefix": "'%m [%p]: [%l-1] db=%d,user=%u,app=%a,client=%h '",
# 7.2
"log_replication_commands": "on",
# 8.1
"temp_file_limit": "1GB",
}
)
cluster_vars["postgres_conf_settings"] = pcs

cluster_vars.update(
{
# 3.1.20
"log_connections": "on",
# 3.1.21
"log_disconnections": "on",
# 2.1
"extra_bash_rc_lines": cluster_vars.get("extra_bashrc_lines", [])
+ ["umask 0077"],
# 3.2, 5.3
"extra_postgres_extensions": cluster_vars.get("extra_postgres_extensions", [])
+ ["passwordcheck", "pgaudit"]
}
)

def cluster_name(self):
"""
Expand Down

0 comments on commit a95ff67

Please sign in to comment.