Skip to content

Commit

Permalink
fix: do not change risk acceptance date inadvertently (#2302)
Browse files Browse the repository at this point in the history
* fix: do not change risk acceptance date inadvertently

* chore: pylint

* feat: correct wrongly set dates

* chore: codereview
  • Loading branch information
StefanFl authored Dec 4, 2024
1 parent 3e70330 commit eb2dbd1
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 82 deletions.
50 changes: 33 additions & 17 deletions backend/application/core/api/serializers_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def update(self, instance: Observation, validated_data: dict):
actual_severity = instance.current_severity
actual_status = instance.current_status
actual_vex_justification = instance.current_vex_justification
actual_risk_acceptance_expiry_date = instance.risk_acceptance_expiry_date

instance.origin_component_name = ""
instance.origin_component_version = ""
Expand All @@ -282,30 +283,45 @@ def update(self, instance: Observation, validated_data: dict):

observation: Observation = super().update(instance, validated_data)

if actual_severity != observation.current_severity:
actual_severity = observation.current_severity
else:
actual_severity = ""
log_severity = (
observation.current_severity
if actual_severity != observation.current_severity
else ""
)

if actual_status != observation.current_status:
actual_status = observation.current_status
else:
actual_status = ""
log_status = (
observation.current_status
if actual_status != observation.current_status
else ""
)

if actual_vex_justification != observation.current_vex_justification:
actual_vex_justification = observation.current_vex_justification
else:
actual_vex_justification = ""
log_vex_justification = (
observation.current_vex_justification
if actual_vex_justification != observation.current_vex_justification
else ""
)

log_risk_acceptance_expiry_date = (
observation.risk_acceptance_expiry_date
if actual_risk_acceptance_expiry_date
!= observation.risk_acceptance_expiry_date
else None
)

if actual_severity or actual_status:
if (
log_severity
or log_status
or log_vex_justification
or log_risk_acceptance_expiry_date
):
create_observation_log(
observation=observation,
severity=actual_severity,
status=actual_status,
severity=log_severity,
status=log_status,
comment="Observation changed manually",
vex_justification=actual_vex_justification,
vex_justification=log_vex_justification,
assessment_status=Assessment_Status.ASSESSMENT_STATUS_AUTO_APPROVED,
risk_acceptance_expiry_date=observation.risk_acceptance_expiry_date,
risk_acceptance_expiry_date=log_risk_acceptance_expiry_date,
)

check_security_gate(observation.product)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
from datetime import date, timedelta
from typing import Optional

from django.core.paginator import Paginator
from django.db import migrations

from application.core.services.risk_acceptance_expiry import (
calculate_risk_acceptance_expiry_date,
)
from application.core.types import Status

logger = logging.getLogger("secobserve.migration")


def correct_risk_acceptance_expiry_date(apps, schema_editor):
Observation = apps.get_model("core", "Observation")

Observation_Log = apps.get_model("core", "Observation_Log")
observations = Observation.objects.filter(
current_status=Status.STATUS_RISK_ACCEPTED
).order_by("id")

paginator = Paginator(observations, 1000)
for page_number in paginator.page_range:
page = paginator.page(page_number)
updates = []

for observation in page.object_list:
risk_acceptance_expiry_date_found = False
most_recent_risk_acceptance: Optional[date] = None

observation_logs = Observation_Log.objects.filter(
observation=observation
).order_by("-created")
for observation_log in observation_logs:
if (
observation_log.status == Status.STATUS_RISK_ACCEPTED
and not most_recent_risk_acceptance
):
most_recent_risk_acceptance = observation_log.created.date()

if observation_log.risk_acceptance_expiry_date:
observation.risk_acceptance_expiry_date = (
observation_log.risk_acceptance_expiry_date
)
risk_acceptance_expiry_date_found = True
break

if (
not risk_acceptance_expiry_date_found
and observation.risk_acceptance_expiry_date
):
new_risk_acceptance_expiry_date = calculate_risk_acceptance_expiry_date(
observation.product
)
if most_recent_risk_acceptance:
days_between = (date.today() - most_recent_risk_acceptance).days
observation.risk_acceptance_expiry_date = (
new_risk_acceptance_expiry_date - timedelta(days=days_between)
)
else:
observation.risk_acceptance_expiry_date = (
new_risk_acceptance_expiry_date
)

updates.append(observation)

Observation.objects.bulk_update(updates, ["risk_acceptance_expiry_date"])


class Migration(migrations.Migration):
dependencies = [
(
"core",
"0055_product_authorization_group_members",
),
]

operations = [
migrations.RunPython(
correct_risk_acceptance_expiry_date,
reverse_code=migrations.RunPython.noop,
),
]
23 changes: 15 additions & 8 deletions backend/application/core/services/assessment.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ def _update_observation(
)

previous_risk_acceptance_expiry_date = observation.risk_acceptance_expiry_date
observation.risk_acceptance_expiry_date = new_risk_acceptance_expiry_date
observation.risk_acceptance_expiry_date = (
new_risk_acceptance_expiry_date
if observation.current_status == Status.STATUS_RISK_ACCEPTED
else None
)

if (
previous_current_severity # pylint: disable=too-many-boolean-expressions
Expand Down Expand Up @@ -164,16 +168,19 @@ def remove_assessment(observation: Observation, comment: str) -> bool:
observation.assessment_status = ""
observation.assessment_vex_justification = ""
observation.current_severity = get_current_severity(observation)
previous_status = observation.current_status
observation.current_status = get_current_status(observation)
observation.current_vex_justification = get_current_vex_justification(
observation
)
risk_acceptance_expiry_date = (
calculate_risk_acceptance_expiry_date(observation.product)
if observation.current_status == Status.STATUS_RISK_ACCEPTED
else None
)
observation.risk_acceptance_expiry_date = risk_acceptance_expiry_date

if observation.current_status == Status.STATUS_RISK_ACCEPTED:
if previous_status != Status.STATUS_RISK_ACCEPTED:
observation.risk_acceptance_expiry_date = (
calculate_risk_acceptance_expiry_date(observation.product)
)
else:
observation.risk_acceptance_expiry_date = None

create_observation_log(
observation=observation,
Expand All @@ -182,7 +189,7 @@ def remove_assessment(observation: Observation, comment: str) -> bool:
comment=comment,
vex_justification="",
assessment_status=Assessment_Status.ASSESSMENT_STATUS_REMOVED,
risk_acceptance_expiry_date=risk_acceptance_expiry_date,
risk_acceptance_expiry_date=observation.risk_acceptance_expiry_date,
)

check_security_gate(observation.product)
Expand Down
2 changes: 2 additions & 0 deletions backend/application/core/services/observation_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ def create_observation_log(
risk_acceptance_expiry_date=risk_acceptance_expiry_date,
)
observation_log.save()

observation.last_observation_log = observation_log.created
observation.save()

observation.product.last_observation_change = observation_log.created
observation.product.save()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def expire_risk_acceptances() -> None:
)
if not assessment_removed:
observation.parser_status = Status.STATUS_OPEN
observation.risk_acceptance_expiry_date = None
observation.save()
save_assessment(
observation=observation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,14 @@ def _process_current_observation(
observation_before.product
)
observation_before.current_status = get_current_status(observation_before)
observation_before.risk_acceptance_expiry_date = (
calculate_risk_acceptance_expiry_date(observation_before.product)
if observation_before.current_status == Status.STATUS_RISK_ACCEPTED
else None
)

if observation_before.current_status == Status.STATUS_RISK_ACCEPTED:
if previous_status != Status.STATUS_RISK_ACCEPTED:
observation_before.risk_acceptance_expiry_date = (
calculate_risk_acceptance_expiry_date(observation_before.product)
)
else:
observation_before.risk_acceptance_expiry_date = None

epss_apply_observation(observation_before)
observation_before.import_last_seen = timezone.now()
Expand Down Expand Up @@ -450,14 +453,17 @@ def _process_current_observation(
previous_status != observation_before.current_status
or previous_severity != observation_before.current_severity
):
if previous_status != observation_before.current_status:
status = observation_before.current_status
else:
status = ""
if previous_severity != observation_before.current_severity:
severity = imported_observation.current_severity
else:
severity = ""
status = (
observation_before.current_status
if previous_status != observation_before.current_status
else ""
)

severity = (
imported_observation.current_severity
if previous_severity != observation_before.current_severity
else ""
)

create_observation_log(
observation=observation_before,
Expand All @@ -479,6 +485,7 @@ def _process_new_observation(imported_observation: Observation) -> None:
)

imported_observation.current_status = get_current_status(imported_observation)

imported_observation.risk_acceptance_expiry_date = (
calculate_risk_acceptance_expiry_date(imported_observation.product)
if imported_observation.current_status == Status.STATUS_RISK_ACCEPTED
Expand Down
Loading

0 comments on commit eb2dbd1

Please sign in to comment.