Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to merge command for controlling creation of system entry and type of relationships #201

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 83 additions & 21 deletions surfactant/cmd/merge.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import uuid as uuid_module
from collections import deque
from typing import Dict, List
from typing import Dict, List, Tuple

import click
from loguru import logger

from surfactant.configmanager import ConfigManager
from surfactant.plugin.manager import find_io_plugin, get_plugin_manager
from surfactant.sbomtypes._relationship import Relationship
from surfactant.sbomtypes._sbom import SBOM
Expand All @@ -14,11 +15,18 @@

@click.argument("sbom_outfile", envvar="SBOM_OUTPUT", type=click.File("w"), required=True)
@click.argument("input_sboms", type=click.File("r"), required=True, nargs=-1)
@click.option("--config_file", type=click.File("r"), required=False)
@click.option(
"--config_file",
type=click.File("r"),
required=False,
help="Config file for controlling some aspects of the merged SBOM, primarily the creation of a new top-level system object (settings here will typically take precedence over command line options)",
)
@click.option(
"--output_format",
is_flag=False,
default="surfactant.output.cytrics_writer",
default=ConfigManager().get(
"core", "output_format", fallback="surfactant.output.cytrics_writer"
),
help="SBOM output format, options=surfactant.output.[cytrics|csv|spdx]_writer",
)
@click.option(
Expand All @@ -27,8 +35,36 @@
default="surfactant.input_readers.cytrics_reader",
help="SBOM input format, assumes that all input SBOMs being merged have the same format, options=surfactant.input_readers.[cytrics|cyclonedx|spdx]_reader",
)
@click.option(
"--system_uuid",
is_flag=False,
help="System UUID to use for relationships to a top-level system object",
)
@click.option(
"--system_relationship",
is_flag=False,
default="Contains",
show_default=True,
help="Relationship type between merged SBOM contents to a top-level system object",
)
@click.option(
"--add_system/--no_add_system",
default=False,
show_default=True,
help="Create a top-level system entry for tying together the merged SBOM components. When disabled, relationships will still be created to a provided system UUID",
)
@click.command("merge")
def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_format):
# pylint: disable-next=too-many-positional-arguments
def merge_command(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to clean this up a bit, you could use **kwargs. But it isn't neccessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about how to handle this pattern in click that seems to result in function definitions with a lot of arguments -- ideally I'd like to have a function that makes the feature usable when using Surfactant as a library, which will probably need at least one function with many args.

**kwargs could be a nice solution to avoid having two functions with duplicate function signatures that have many args and need to be kept in sync. I'll look into this some more and probably convert the generate subcommand at the same time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I think you can achieve this by creating a custom decorator that adds options or args in the way you want. Similar to what you see in this stack overflow post: https://stackoverflow.com/questions/50061342/is-it-possible-to-reuse-python-click-option-decorators-for-multiple-commands

input_sboms,
sbom_outfile,
config_file,
output_format,
input_format,
system_uuid,
system_relationship,
add_system,
):
"""Merge two or more INPUT_SBOMS together into SBOM_OUTFILE.

An optional CONFIG_FILE can be supplied to specify a root system entry
Expand All @@ -43,10 +79,19 @@ def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_f
config = None
if config_file:
config = json.load(config_file)
merge(sboms, sbom_outfile, config, output_writer)


def merge(input_sboms, sbom_outfile, config, output_writer):
merge(sboms, sbom_outfile, config, output_writer, system_uuid, system_relationship, add_system)


# pylint: disable-next=too-many-positional-arguments
def merge(
input_sboms,
sbom_outfile,
config,
output_writer,
system_uuid=None,
system_relationship="Contains", # remember: keep in-sync with click arg default
add_system=False,
):
"""Merge two or more SBOMs."""
merged_sbom = input_sboms[0]
for sbom_m in input_sboms[1:]:
Expand All @@ -56,22 +101,28 @@ def merge(input_sboms, sbom_outfile, config, output_writer):
roots = get_roots_check_cycles(rel_graph)

# Check if provided UUID for a system object already exists to avoid creating a duplicate
add_system = True
if config and "system" in config:
if "UUID" in config["system"]:
for s in merged_sbom.systems:
if config["system"]["UUID"] == s.UUID:
add_system = False
break
# Even if not adding the system, create a dummy/placeholder with the UUID for creating relationships
system = create_system_object(merged_sbom, config)
system, using_random_uuid = create_system_object(merged_sbom, config, system_uuid)
if add_system:
merged_sbom.systems.append(system)

# Add a system relationship to each root software/systems entry identified
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system["UUID"], yUUID=r, relationship="Includes")
if not using_random_uuid or add_system:
if config and "systemRelationship" in config:
system_relationship = config["systemRelationship"]
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system.UUID, yUUID=r, relationship=system_relationship)
nightlark marked this conversation as resolved.
Show resolved Hide resolved
)
else:
logger.warning(
"No top-level system relationships added; enable the add system option to randomly generate a UUID, or specify a system UUID"
)

output_writer.write_sbom(merged_sbom, sbom_outfile)
Expand All @@ -92,10 +143,10 @@ def construct_relationship_graph(sbom: SBOM):
rel_graph[sw.UUID] = []
# iterate through all relationships, adding edges to the adjacency list
for rel in sbom.relationships:
# check case where xUUID doesn't exist (and error if yUUID doesn't exist) in the graph
if rel.xUUID not in rel_graph or rel.yUUID not in rel_graph:
logger.error("====ERROR xUUID or yUUID doesn't exist====")
logger.error(f"{rel = }")
logger.error(
f"Either the xUUID or yUUID for the relationship does not exist in the graph: {rel = }"
)
continue
# consider also including relationship type for the edge
# treat as directed graph, with inverted edges (pointing to parents) so dfs will eventually lead to the root parent node for a (sub)graph
Expand Down Expand Up @@ -151,23 +202,34 @@ def dfs(rel):
return roots


def create_system_object(sbom: SBOM, config=None) -> System:
def create_system_object(sbom: SBOM, config=None, system_uuid=None) -> Tuple[System, bool]:
"""Function to create an accurate system object

Positional arguments:
sbom (SBOM): The SBOM the system object is being created for.
config: The user specified config json (Optional).

Returns:
System: The created system object.
Tuple[System, bool]: The created system object and a boolean indicating if a random UUID was used.
"""

system = {}
using_random_uuid = False
if config and "system" in config:
system = config["system"]
# make sure the required fields are present and at least mostly valid
if ("UUID" not in system) or not sbom.is_valid_uuid4(system["UUID"]):

# system_uuid supplied via command line overrides config file UUID
if system_uuid:
system["UUID"] = system_uuid
elif "UUID" not in system:
# No UUID, generate a random one...
using_random_uuid = True
system["UUID"] = str(uuid_module.uuid4())
# check if the UUID appears valid based on the CyTRICS schema
elif not sbom.is_valid_uuid4(system["UUID"]):
invalid_uuid = system["UUID"]
logger.error(f"Invalid uuid4 given ({invalid_uuid}) for the system")
nightlark marked this conversation as resolved.
Show resolved Hide resolved

if "name" not in system:
system["name"] = ""
captureStart = -1
Expand All @@ -183,4 +245,4 @@ def create_system_object(sbom: SBOM, config=None) -> System:
system["captureStart"] = captureStart
if "captureEnd" not in system or not system["captureEnd"]:
system["captureEnd"] = captureEnd
return System(**system)
return System(**system), using_random_uuid
Loading
Loading