diff --git a/.gitignore b/.gitignore index c17ad886..f9457e9d 100644 --- a/.gitignore +++ b/.gitignore @@ -93,6 +93,7 @@ target/ profile_default/ ipython_config.py cytrics_venv +new_env # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b22db6fb..763d72fd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ exclude: ^(.gitignore|generate_sbom.py|extract_file_info.py|pe_info.py) repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.4.7 + rev: v0.6.1 hooks: # Run the linter - id: ruff @@ -9,7 +9,7 @@ repos: # Run the formatter - id: ruff-format - repo: https://github.com/pycqa/pylint - rev: v3.2.2 + rev: v3.2.6 hooks: - id: pylint - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f578377e..77ec834c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -43,6 +43,8 @@ To install optional dependencies required for running pytest and pre-commit: pip install -e ".[test,dev]" ``` +`pip install` with the `-e` or `--editable` option can also be used to install Surfactant plugins for development. + ## Code of Conduct All participants in the Surfactant community are expected to follow our [Code of Conduct](https://www.contributor-covenant.org/version/2/1/code_of_conduct.html). diff --git a/README.md b/README.md index 193dad55..98aba642 100644 --- a/README.md +++ b/README.md @@ -28,19 +28,41 @@ or decompilation. ### For Users: -1. Create a virtual environment with python >= 3.8 [Optional, but recommended] +For ease of use, we recommend using [pipx](https://github.com/pypa/pipx) since it transparently handles creating and using Python virtual environments, which helps avoid dependency conflicts with other installed Python apps. Install `pipx` by following [their installation instructions](https://github.com/pypa/pipx#install-pipx). + +1. Install Surfactant using `pipx install` (with python >= 3.8) + +```bash +pipx install surfactant +``` + +2. Install plugins using `pipx inject surfactant`. As an example, this is how the fuzzy hashing plugin could be installed from a git repository (PyPI package names, local source directories, or wheel files can also be used). + +```bash +pipx inject surfactant git+https://github.com/LLNL/Surfactant#subdirectory=plugins/fuzzyhashes +``` + +If for some reason manually managing virtual environments is desired, the following steps can be used instead: + +1. Create a virtual environment with python >= 3.8 and activate it [Optional, but highly recommended over a global install] ```bash python -m venv cytrics_venv source cytrics_venv/bin/activate ``` -2. Install Surfactant with pip +2. Install Surfactant with `pip install` ```bash pip install surfactant ``` +3. Install plugins using `pip install`. As an example, this is how the fuzzy hashing plugin could be installed from a git repository (PyPI package names, local source directories, or wheel files can also be used). + +```bash +pip install git+https://github.com/LLNL/Surfactant#subdirectory=plugins/fuzzyhashes +``` + ### For Developers: 1. Create a virtual environment with python >= 3.8 [Optional, but recommended] @@ -68,15 +90,55 @@ To install optional dependencies required for running pytest and pre-commit: pip install -e ".[test,dev]" ``` +`pip install` with the `-e` or `--editable` option can also be used to install Surfactant plugins for development. + +```bash +pip install -e plugins/fuzzyhashes +``` + +## Settings + +Surfactant settings can be changed using the `surfactant config` subcommand, or by hand editing the settings configuration file (this is not the same as the JSON file used to configure settings for a particular sample that is described later). + +### Command Line + +Using `surfactant config` is very similar to the basic use of `git config`. The key whose value is being accessed will be in the form `section.option` where `section` is typically a plugin name or `core`, and `option` is the option to set. As an example, the `core.recorded_institution` option can be used to configure the recorded institution used to identify who the creator of a generated SBOM was. + +Setting this option to `LLNL` could be done with the following command: + +```bash +surfactant config core.recorded_institution LLNL +``` + +Getting the currently set value for the option would then be done with: + +```bash +surfactant config core.recorded_institution +``` + +### Manual Editing + +If desired, the settings config file can also be manually edited. The location of the file will depend on your platform. +On Unix-like platforms (including macOS), the XDG directory specification is followed and settings will be stored in +`${XDG_CONFIG_HOME}/surfactant/config.toml`. If the `XDG_CONFIG_HOME` environment variable is not set, the location defaults +to `~/.config`. On Windows, the file is stored in the Roaming AppData folder at `%APPDATA%\\surfactant\\config.toml`. + +The file itself is a TOML file, and for the previously mentioned example plugin may look something like this: + +```toml +[core] +recorded_institution = "LLNL" +``` + ## Usage ### Identify sample file In order to test out surfactant, you will need a sample file/folder. If you don't have one on hand, you can download and use the portable .zip file from or the Linux .tar.gz file from . Alternatively, you can pick a sample from https://lc.llnl.gov/gitlab/cir-software-assurance/unpacker-to-sbom-test-files -### Build configuration file +### Build configuration file for sample -A configuration file contains the information about the sample to gather information from. Example JSON configuration files can be found in the examples folder of this repository. +A configuration file for a sample contains the information about the sample to gather information from. Example JSON sample configuration files can be found in the examples folder of this repository. **extractPaths**: (required) the absolute path or relative path from location of current working directory that `surfactant` is being run from to the sample folders, cannot be a file (Note that even on Windows, Unix style `/` directory separators should be used in paths)\ **archive**: (optional) the full path, including file name, of the zip, exe installer, or other archive file that the folders in **extractPaths** were extracted from. This is used to collect metadata about the overall sample and will be added as a "Contains" relationship to all software entries found in the various **extractPaths**\ @@ -349,7 +411,7 @@ Details on the merge command can be found in the docs page [here](./docs/basic_u ## Plugins Surfactant supports using plugins to add additional features. For users, installing and enabling a plugin usually just involves -doing a `pip install` of the plugin. +doing a `pipx inject surfactant` when using pipx or `pip install` of the plugin if manually managing virtual environments. Detailed information on configuration options for the plugin system and how to develop new plugins can be found [here](./docs/plugins.md). diff --git a/docs/api/config_manager.md b/docs/api/config_manager.md new file mode 100644 index 00000000..d53e0602 --- /dev/null +++ b/docs/api/config_manager.md @@ -0,0 +1,95 @@ +# ConfigManager + +The `ConfigManager` class is used to handle settings stored in a configuration file. It supports reading and writing configuration values while preserving formatting and comments, and it caches the configuration to avoid reloading it multiple times during the application's runtime. The underlying config file location is dependent on the operating system, though typically follows the XDG directory specification and respects the `XDG_CONFIG_HOME` environment variable on Unix-like platforms. On Windows, the configuration file is stored in the AppData Roaming folder (`%APPDATA%`). + +## Usage + +## Configuration File Location + +The location of the configuration file varies depending on the platform: + +- **Windows**: `%AppData%\surfactant\config.toml` +- **macOS**: `${XDG_CONFIG_HOME}/surfactant/config.toml` +- **Linux**: `${XDG_CONFIG_HOME}/surfactant/config.toml` + +For systems that use `XDG_CONFIG_HOME`, if the environment variable is not set then the default location is `~/.config`. + +## Example Configuration File + +Here is an example of what the configuration file might look like: + +```toml +[core] +recorded_institution = "LLNL" +``` + +### Initialization + +To initialize the `ConfigManager`, simply import and create an instance: + +```python +from surfactant.configmanager import ConfigManager + +config_manager = ConfigManager() +``` + +This automatically handles loading a copy of the config file the first time an instance of the ConfigManager is created, effectively making it a snapshot in time of the configuration settings. + +### Getting a Value + +To retrieve a stored value, use the `get` method: + +```python +value = config_manager.get('section', 'option', fallback='default_value') +``` + +- `section`: The section within the configuration file. For plugins this should be the plugin name. +- `option`: The option within the section. +- `fallback`: The fallback value if the option is not found. + +Alternatively, dictionary-like access for reading is also supported: + +```python +value = config_manager['section']['option'] +``` + +However, this makes no guarantees that keys will exist and extra error handling **will be required**. If the `section` is not found then `None` is returned -- trying to access nested keys from this will fail. Furthermore, if the `section` does exist, you will need checks to see if a nested key exists before trying to access its value. A more realistic example would be: + +```python +section_config = config_manager['section'] # May return `None` +value = section_config['option'] if section_config and 'option' in section_config else None +``` + +### Setting a Value + +To set a value, use the `set` method: + +```python +config_manager.set('section', 'option', 'new_value') +``` + +- `section`: The section within the configuration file. For plugins this should be the plugin name. +- `option`: The option within the section. +- `value`: The value to set. + +NOTE: Most use cases should not need this. + +### Saving the Configuration File + +The configuration file is automatically saved when you set a value. The file can be manual saved using: + +```python +config_manager._save_config() +``` + +NOTE: Most use cases should not need this. + +### Loading the Configuration File + +The configuration file can be reloaded using: + +```python +config_manager._load_config() +``` + +NOTE: Most use cases should not need this. diff --git a/docs/cli_usage.md b/docs/cli_usage.md new file mode 100644 index 00000000..fd20868f --- /dev/null +++ b/docs/cli_usage.md @@ -0,0 +1,82 @@ +# CLI Usage +The Surfactant CLI interface allows users to easily and quickly find, add, and edit entries within a given SBOM. +Some functionality we support includes: +- Specify a file to find, add, or edit its entry in a given SBOM +- Fix up path prefixes, i.e. installPath or containerPath +- Add relationships + +## surfactant cli find +The **cli find** command allows users to find specific entries within a SBOM. This will allow users to: +- Verify entries exist within the SBOM +- Manually inspect one or more related entries within a SBOM for errors or bad formatting +- Provide a subset of entries to supply to the `cli edit` or `cli add` commands. + +### Example 1: Find Exact Matches +```bash +surfactant cli find sbom.json --UUID 123 +{ +"UUID": 123, +"filename": foo.exe, +"sha256": , +"installPath": ["C:/Users/Test/Downloads/"] +} +surfactant cli find --file ../test.exe # File matches are found by hash matching, not filename matches. +{ +"UUID": 456, +"filename": test.exe, +"sha256": , +"installPath": ["C:/Users/Test/Documents/"] +} +``` +### Example 2: Find Partial Matches +```bash +surfactant cli find --installpath C:/Users/Test/Downloads/ +{ +"UUID": 123, +"filename": foo.exe, +"sha256": +"installPath": ["C:/Users/Test/Downloads/"] +} +``` + +## surfactant cli add +The **cli add** command will allow users to easily add manual entries to an SBOM. This command should allow users to: +- Add key value pairs to existing SBOM entries +- Add whole new entries to the SBOM +- Add new installPaths based on existing containerPaths +### Adding a relationship +```bash +surfactant cli add --relationship "{xUUID:"123",yUUID:456, "relationship: "Uses"}" sbom.json +``` +### Example 1: Adding a manual entry +```bash +surfactant cli add --entry "{UUID:"123",filename:"test.exe", "sha256": "3423csdlkf13048kj"}" sbom.json +``` +### Example 2: Adding an entry by file +```bash +surfactant cli add --file test.exe sbom.json +``` +### Example 3: Creating new installPaths from containerPaths +```bash +surfactant cli add --installPath 123/ /bin/ sbom.json +``` +Our SBOM before the `cli add` command: +```bash +{ +"UUID": 456, +"filename": test.exe, +"sha256": +"installPath": [], +"containerPath": ["123/helpers/test/exe"] +} +``` +Our SBOM after the `cli add` command: +```bash +{ +"UUID": 456, +"filename": test.exe, +"sha256": +"installPath": ["/bin/helpers/test.exe"], +"containerPath": ["123/helpers/test.exe"] +} +``` diff --git a/docs/configuration_files.md b/docs/configuration_files.md index ebe88909..54c0f724 100644 --- a/docs/configuration_files.md +++ b/docs/configuration_files.md @@ -1,12 +1,45 @@ # Configuration Files There are several files for configuring different aspects of Surfactant functionality based on the subcommand used. -This page currently describes the format for the file used to generate an SBOM, but will eventually cover other -configuration files as well. +This page currently describes sample configuration files, and the Surfactant settings configuration file. The sample configuration file is used to generate an SBOM for a particular software/firmware sample, and will be the most frequently written by users. The Surfactant settings configuration file is used to turn on and off various Surfactant features, including settings for controlling functionality in Surfactant plugins. -## Build configuration file +## Settings Configuration File -A configuration file contains the information about the sample to gather information from. Example JSON configuration files can be found in the examples folder of this repository. +Surfactant settings can be changed using the `surfactant config` subcommand, or by hand editing the settings configuration file (this is not the same as the JSON file used to configure settings for a particular sample that is described later). + +### Command Line + +Using `surfactant config` is very similar to the basic use of `git config`. The key whose value is being accessed will be in the form `section.option` where `section` is typically a plugin name or `core`, and `option` is the option to set. As an example, the `core.recorded_institution` option can be used to configure the recorded institution used to identify who the creator of a generated SBOM was. + +Setting this option to `LLNL` could be done with the following command: + +```bash +surfactant config core.recorded_institution LLNL +``` + +Getting the currently set value for the option would then be done with: + +```bash +surfactant config core.recorded_institution +``` + +### Manual Editing + +If desired, the settings config file can also be manually edited. The location of the file will depend on your platform. +On Unix-like platforms (including macOS), the XDG directory specification is followed and settings will be stored in +`${XDG_CONFIG_HOME}/surfactant/config.toml`. If the `XDG_CONFIG_HOME` environment variable is not set, the location defaults +to `~/.config`. On Windows, the file is stored in the Roaming AppData folder at `%APPDATA%\\surfactant\\config.toml`. + +The file itself is a TOML file, and for the previously mentioned example plugin may look something like this: + +```toml +[core] +recorded_institution = "LLNL" +``` + +## Build sample configuration file + +A sample configuration file contains the information about the sample to gather information from. Example JSON sample configuration files can be found in the examples folder of this repository. **extractPaths**: (required) the absolute path or relative path from location of current working directory that `surfactant` is being run from to the sample folders, cannot be a file (Note that even on Windows, Unix style `/` directory separators should be used in paths)\ **archive**: (optional) the full path, including file name, of the zip, exe installer, or other archive file that the folders in **extractPaths** were extracted from. This is used to collect metadata about the overall sample and will be added as a "Contains" relationship to all software entries found in the various **extractPaths**\ diff --git a/docs/getting_started.md b/docs/getting_started.md index 91cb0edb..f24b9525 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -9,19 +9,41 @@ and Windows, though it should also work on other operating systems such as FreeB ### For Users: -1. Create a virtual environment with python >= 3.8 [Optional, but recommended] +For ease of use, we recommend using [pipx](https://github.com/pypa/pipx) since it transparently handles creating and using Python virtual environments, which helps avoid dependency conflicts with other installed Python apps. Install `pipx` by following [their installation instructions](https://github.com/pypa/pipx#install-pipx). + +1. Install Surfactant using `pipx install` (with python >= 3.8) + +```bash +pipx install surfactant +``` + +2. Install plugins using `pipx inject surfactant`. As an example, this is how the fuzzy hashing plugin could be installed from a git repository (PyPI package names, local source directories, or wheel files can also be used). + +```bash +pipx inject surfactant git+https://github.com/LLNL/Surfactant#subdirectory=plugins/fuzzyhashes +``` + +If for some reason manually managing virtual environments is desired, the following steps can be used instead: + +1. Create a virtual environment with python >= 3.8 and activate it [Optional, but highly recommended over a global install] ```bash python -m venv cytrics_venv source cytrics_venv/bin/activate ``` -2. Install Surfactant with pip +2. Install Surfactant with `pip install` ```bash pip install surfactant ``` +3. Install plugins using `pip install`. As an example, this is how the fuzzy hashing plugin could be installed from a git repository (PyPI package names, local source directories, or wheel files can also be used). + +```bash +pip install git+https://github.com/LLNL/Surfactant#subdirectory=plugins/fuzzyhashes +``` + ### For Developers: 1. Create a virtual environment with python >= 3.8 [Optional, but recommended] @@ -49,6 +71,8 @@ To install optional dependencies required for running pytest and pre-commit: pip install -e ".[test,dev]" ``` +`pip install` with the `-e` or `--editable` option can also be used to install Surfactant plugins for development. + ## Understanding the SBOM Output The following is a brief overview of the default SBOM file output format (which follows the CyTRICS schema). It is diff --git a/plugins/angrimportfinder/surfactantplugin_angrimportfinder.py b/plugins/angrimportfinder/surfactantplugin_angrimportfinder.py index a2ef433a..4112ea87 100644 --- a/plugins/angrimportfinder/surfactantplugin_angrimportfinder.py +++ b/plugins/angrimportfinder/surfactantplugin_angrimportfinder.py @@ -34,7 +34,7 @@ def angrimport_finder(sbom: SBOM, software: Software, filename: str, filetype: s # Performing check to see if file has been analyzed already existing_json = None output_name = None - for f in Path.cwd().glob("*.json"): + for f in Path.cwd().glob("*_additional_metadata.json"): flist.append((f.stem).split("_")[0]) if filehash == (f.stem).split("_")[0]: existing_json = f diff --git a/plugins/binary2strings/surfactantplugin_binary2strings.py b/plugins/binary2strings/surfactantplugin_binary2strings.py index 6f4f01b6..c71e0043 100644 --- a/plugins/binary2strings/surfactantplugin_binary2strings.py +++ b/plugins/binary2strings/surfactantplugin_binary2strings.py @@ -33,7 +33,7 @@ def extract_strings(sbom: SBOM, software: Software, filename: str, filetype: str # Performing check to see if file has been analyzed already existing_json = None output_name = None - for f in Path.cwd().glob("*.json"): + for f in Path.cwd().glob("*_additional_metadata.json"): flist.append((f.stem).split("_")[0]) if shaHash == (f.stem).split("_")[0]: existing_json = f diff --git a/plugins/cvebin2vex/README.md b/plugins/cvebin2vex/README.md new file mode 100644 index 00000000..000f4e7a --- /dev/null +++ b/plugins/cvebin2vex/README.md @@ -0,0 +1,44 @@ +# CVE-to-OpenVEX Plugin for SBOM Surfactant + +A plugin for Surfactant that leverages [cve-bin-tool](https://github.com/intel/cve-bin-tool) and a custom conversion process to generate OpenVEX vulnerability statements from binary files. This tool supports analyzing binary strings to identify known vulnerabilities and outputs them in the standardized CycloneDX and OpenVEX format. + +The cve-bin-tool is licensed under GPL-3 and is available to review [here](https://github.com/intel/cve-bin-tool?tab=GPL-3.0-1-ov-file#readme). + +## Quickstart + +To install this plugin within the same virtual environment as Surfactant, use the command `pip install .`. + +For developers modifying the plugin, the editable installation can be achieved with `pip install -e .`. + +Since the plugin is designed to run in `--offline` mode, before your initial run of the script please run the command `cve-bin-tool --update now .`. This will provide you a freshly updated local database that the script will check against in the offline mode. + +After the plugin installation, run Surfactant as you normally would to create an SBOM. For binary files analyzed by this plugin, additional JSON files will be generated containing vulnerability data extracted from the binaries. If there are duplicate hashed files, the extractor will skip the entry. + +Example: +Output Filename: `$(sha256hash)_additional_metadata.json` + +```json +{ + "sha256hash": " ", + "filename": [], + "openvex": [], + "cyclonedx-vex": [], + "cve-bin-tool": [] +} +``` + +The plugin's functionality can be toggled via Surfactant's plugin management features, using the plugin name `surfactantplugin_cvebintool2vex.py` as defined in the `pyproject.toml` under the `project.entry-points."surfactant"` section. + +## Features + +- **Offline Vulnerability Analysis**: Utilizes CVE-bin-tool in offline mode to scan binaries for known vulnerabilities. +- **OpenVEX Format Conversion**: Transforms CVE-bin-tool JSON output into the OpenVEX format, a standardized way to report vulnerabilities. + +## Uninstalling + +Remove the plugin from your environment with `pip uninstall surfactantplugin_cvebintool2vex`. + +## Important Licensing Information +Main Project License (Surfactant): MIT License. + +Plugin License: MIT License, but it includes and uses cve-bin-tool, which is GPL-3.0 licensed. diff --git a/plugins/cvebin2vex/pyproject.toml b/plugins/cvebin2vex/pyproject.toml new file mode 100644 index 00000000..a1ef5d53 --- /dev/null +++ b/plugins/cvebin2vex/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "surfactantplugin-cvebintool2vex" +authors = [ + {name = "Tyler Williams", email = "tyler.williams@pnnl.gov"}, +] +description = "Surfactant binary scanner with vex creation" +readme = "README.md" +requires-python = ">=3.8" +keywords = ["surfactant"] +license = {text = "MIT License"} +classifiers = [ + "Programming Language :: Python :: 3", + "Environment :: Console", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + "License :: OSI Approved :: MIT License", +] +dependencies = [ + "cve-bin-tool", + "surfactant", +] +dynamic = ["version"] + +[project.entry-points."surfactant"] +"surfactantplugin_cvebintool2vex" = "surfactantplugin_cvebintool2vex" + +[tool.setuptools] +py-modules=["surfactantplugin_cvebintool2vex"] diff --git a/plugins/cvebin2vex/surfactantplugin_cvebintool2vex.py b/plugins/cvebin2vex/surfactantplugin_cvebintool2vex.py new file mode 100644 index 00000000..72aa77d4 --- /dev/null +++ b/plugins/cvebin2vex/surfactantplugin_cvebintool2vex.py @@ -0,0 +1,212 @@ +import json +import subprocess +import sys +import uuid +from datetime import datetime +from pathlib import Path + +from loguru import logger + +import surfactant.plugin +from surfactant.sbomtypes import SBOM, Software + + +def run_cve_bin_tool(input_file_path, shaHash, output_dir): + cvebin_file_name = f"{shaHash}_{input_file_path.stem}.json" + output_file_path = output_dir / cvebin_file_name + + cdxvex_file_name = f"{shaHash}_{input_file_path.stem}.cdxvex" + vex_output_path = output_dir / cdxvex_file_name + + try: + command = [ + "cve-bin-tool", + "--offline", + "--input-file", + str(input_file_path), + "--output", + str(output_file_path), + "--format", + "json", + "--vex", + str(vex_output_path), + ] + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Check the exit status + if result.returncode in (0, 1): + return output_file_path # Return path to the generated JSON file + except subprocess.CalledProcessError as e: + logger.error( + f"Error running CVE-bin-tool: {e}\nOutput: {e.output}\nError: {e.stderr}", + file=sys.stderr, + ) + return None + + +def convert_cve_to_openvex(json_output_path, shaHash, output_dir): + openvex_file_name = f"{json_output_path.stem}.vex" + openvex_output = output_dir / openvex_file_name + + # Open and read the .json file + try: + with open(json_output_path, "r") as file: + cve_data = json.load(file) + except json.JSONDecodeError as e: + logger.error(f"Error reading JSON file: {e}") + return + except IOError as e: + logger.error(f"IO error when reading {json_output_path}: {e}") + + openvex_template = { + "@context": "https://openvex.dev/ns/v0.2.0", + "@id": f"urn:uuid:{uuid.uuid4()}", + "author": "Surfactant plugin cvebintool2vex", + "timestamp": datetime.now().isoformat(), + "version": 1, + "tooling": "Surfactant (https://github.com/LLNL/Surfactant)", + "statements": [], + } + + for entry in cve_data: + # Convert CVE data to OpenVEX format + statement = { + "vulnerability": {"name": entry["cve_number"]}, + "products": [ + {"@id": f"cpe:2.3:a:{entry['vendor']}:{entry['product']}:{entry['version']}:::::"} + ], + "status": "under_investigation", + "source": entry["source"], + "cvss_version": entry["cvss_version"], + "cvss_vector": entry["cvss_vector"], + "severity": entry["severity"], + } + openvex_template["statements"].append(statement) + + # Save the OpenVEX output to a new file + try: + with open(openvex_output, "w") as outfile: + json.dump(openvex_template, outfile, indent=4) + except IOError as e: + logger.error(f"IO error when writing {openvex_output}: {e}") + + +def process_input(input_path, shaHash, output_dir=None): + input_path = Path(input_path) + if output_dir is None: + output_dir = Path.cwd() + + if input_path.is_dir(): + for input_file in input_path.glob("*.*"): + if input_file.suffix.lower() not in [".bin", ".exe", ".jar"]: + continue + process_file(input_file, shaHash, output_dir) + elif input_path.is_file(): + process_file(input_path, shaHash, output_dir) + else: + logger.info(f"Error: {input_path} is neither a file nor a directory.") + + +def process_file(input_file, shaHash, output_directory): + try: + json_output_path = run_cve_bin_tool(input_file, shaHash, output_directory) + if json_output_path: + convert_cve_to_openvex(json_output_path, shaHash, output_directory) + except subprocess.CalledProcessError as e: + logger.error(f"Error running CVE-bin-tool: {e}") + except json.JSONDecodeError as e: + logger.error(f"JSON decoding error in {input_file}: {e}") + except IOError as e: + logger.error(f"I/O error processing {input_file}: {e}") + + # Check if the expected JSON file was created and proceed if it exists + cvebin_file_name = f"{shaHash}_{input_file.stem}.json" + jsonfile = output_directory / cvebin_file_name + if not jsonfile.exists(): + logger.warning(f"Expected JSON file does not exist: {jsonfile}") + + +def delete_extra_files(*file_paths): + for file_path in file_paths: + try: + if file_path.exists(): + file_path.unlink() + except PermissionError as e: + logger.error(f"Permission error deleting {file_path}: {e}") + + except OSError as e: + logger.error(f"OS error deleting {file_path}: {e}") + + +@surfactant.plugin.hookimpl(specname="extract_file_info") +def cvebintool2vex(sbom: SBOM, software: Software, filename: str, filetype: str): + """ + :param sbom(SBOM): The SBOM that the software entry/file is being added to. Can be used to add observations or analysis data. + :param software(Software): The software entry associated with the file to extract information from. + :param filename (str): The full path to the file to extract information from. + :param filetype (str): File type information based on magic bytes. + """ + # Only parsing executable files + if filetype not in ["ELF", "PE"]: + pass + + shaHash = str(software.sha256) + filename = Path(filename) + output_dir = Path.cwd() + + existing_json_path = output_dir / f"{shaHash}_additional_metadata.json" + if existing_json_path.exists(): + with open(existing_json_path, "r") as file: + data = json.load(file) + else: + data = { + "sha256hash": shaHash, + "filename": [filename.name], + "openvex": [], + "cyclonedx-vex": [], + "cve-bin-tool": [], + } + + # Assuming JSON, CDXVEX, and VEX files are processed here + process_input(filename, shaHash, output_dir) + # and you have the output files: .json, .cdxvex, .vex + + # Integrate .cdxvex and .vex file contents + cdxvex_file_path = output_dir / f"{shaHash}_{filename.stem}.cdxvex" + vex_file_path = output_dir / f"{shaHash}_{filename.stem}.vex" + json_file_path = output_dir / f"{shaHash}_{filename.stem}.json" + + if cdxvex_file_path.exists() and vex_file_path.exists() and json_file_path.exists(): + # For .cdxvex and .vex files, if they contain JSON, parse them as such; otherwise, read as text + try: + with open(cdxvex_file_path, "r") as file: + cdxvex_data = json.load(file) # Assuming .cdxvex file is in JSON format + data["cyclonedx-vex"].append(cdxvex_data) + except json.JSONDecodeError: + with open(cdxvex_file_path, "r") as file: + cdxvex_data = file.read() # Fallback if not JSON + data["cyclonedx-vex"].append(cdxvex_data) + + try: + with open(vex_file_path, "r") as file: + vex_data = json.load(file) # Assuming .vex file is in JSON format + data["openvex"].append(vex_data) + except json.JSONDecodeError: + with open(vex_file_path, "r") as file: + vex_data = file.read() # Fallback if not JSON + data["openvex"].append(vex_data) + + with open(json_file_path, "r") as file: + json_data = json.load(file) + data["cve-bin-tool"].append(json_data) + + # Attempt to save the updated data + try: + with open(existing_json_path, "w") as file: + json.dump(data, file, indent=4) + logger.info(f"Updated data saved to {existing_json_path}") + except IOError as e: + logger.error(f"IO error when writing {existing_json_path}: {e}") + + # Clean up extra files + delete_extra_files(cdxvex_file_path, vex_file_path, json_file_path) diff --git a/plugins/grype/README.md b/plugins/grype/README.md new file mode 100644 index 00000000..2488e872 --- /dev/null +++ b/plugins/grype/README.md @@ -0,0 +1,16 @@ +# Syft Plugin for SBOM Surfactant + +A plugin for Surfactant that uses [grype](https://github.com/anchore/grype) + + +## Quickstart +To start, install grype following the instructions on [the github page](https://github.com/anchore/grype#installation) + +In the same virtual environment that Surfactant was installed in, install this plugin with `pip install git+https://github.com/LLNL/Surfactant#subdirectory=plugins/grype`. If pipx was used to install Surfactant, install this plugin with `pipx inject surfactant git+https://github.com/LLNL/Surfactant#subdirectory=plugins/grype`. + +For developers making changes to this plugin, install it with `pip install -e .` using a clone of the git repository. + +After installation, this plugin will run whenever Surfactant discovers an applicable file (saved Docker image tarball) to be examined. + +## Uninstalling +The plugin can be uninstalled with `pip uninstall surfactantplugin-grype`. If pipx was used, it can be uninstalled with `pipx uninject surfactant surfactantplugin-grype`. diff --git a/plugins/grype/pyproject.toml b/plugins/grype/pyproject.toml new file mode 100644 index 00000000..a9501195 --- /dev/null +++ b/plugins/grype/pyproject.toml @@ -0,0 +1,32 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "surfactantplugin_grype" +authors = [ + {name = "Kendall Harter", email = "harter8@llnl.gov"}, +] +description = "Surfactant plugin for running grype on files" +readme = "README.md" +requires-python = ">=3.8" +keywords = ["surfactant"] +license = {text = "MIT License"} +classifiers = [ + "Programming Language :: Python :: 3", + "Environment :: Console", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + "License :: OSI Approved :: MIT License", +] +dependencies = [ + "loguru", +] +dynamic = ["version"] + +[project.entry-points."surfactant"] +"surfactantplugin_grype" = "surfactantplugin_grype" + +[tool.setuptools] +py-modules=["surfactantplugin_grype"] diff --git a/plugins/grype/setup.py b/plugins/grype/setup.py new file mode 100644 index 00000000..fd9577f2 --- /dev/null +++ b/plugins/grype/setup.py @@ -0,0 +1,9 @@ +# Copyright 2024 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +# For compatibility with old versions of tools +from setuptools import setup + +setup() diff --git a/plugins/grype/surfactantplugin_grype.py b/plugins/grype/surfactantplugin_grype.py new file mode 100644 index 00000000..8879bf9d --- /dev/null +++ b/plugins/grype/surfactantplugin_grype.py @@ -0,0 +1,80 @@ +# Copyright 2024 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT +import gzip +import subprocess +import tempfile +from typing import List, Optional + +from loguru import logger + +import surfactant.plugin +from surfactant.sbomtypes import SBOM, Software + + +def check_if_grype_installed() -> bool: + try: + result = subprocess.run(["grype", "--help"], capture_output=True, check=False).returncode + except FileNotFoundError: + result = 1 + if result != 0: + logger.warning("Install grype for the grype plugin to run") + return result == 0 + + +disable_plugin = not check_if_grype_installed() + + +def run_grype(filename: str) -> object: + result = subprocess.run(["grype", filename], capture_output=True, check=False) + if result.returncode != 0: + logger.warning(f"Running grype on {filename} failed") + return None + output = result.stdout.decode() + to_ret = [] + # skip the header on the first line + for line in output.split("\n")[1:]: + columns = [s.strip() for s in line.split(" ") if s.strip()] + # Skip empty lines + if len(columns) == 0: + continue + # Assume that the "Fixed In" field is missing if there's only 5 entries + name = columns[0] + installed = columns[1] + if len(columns) == 5: + fixed_in = "" + type_ = columns[2] + vuln = columns[3] + severity = columns[4] + else: + fixed_in = columns[2] + type_ = columns[3] + vuln = columns[4] + severity = columns[5] + to_ret.append( + { + "name": name, + "installed": installed, + "fixed_in": fixed_in, + "type": type_, + "vulnerability": vuln, + "severity": severity, + } + ) + return {"grype_output": to_ret} + + +@surfactant.plugin.hookimpl +def extract_file_info( + sbom: SBOM, software: Software, filename: str, filetype: str, children: list +) -> Optional[List[Software]]: + if disable_plugin or filetype not in ("DOCKER_TAR", "DOCKER_GZIP"): + return None + if filetype == "DOCKER_GZIP": + with open(filename, "rb") as gzip_in: + gzip_data = gzip_in.read() + with tempfile.NamedTemporaryFile() as gzip_out: + gzip_out.write(gzip.decompress(gzip_data)) + return run_grype(gzip_out.name) + return run_grype(filename) diff --git a/pyproject.toml b/pyproject.toml index 0fc77d1d..10286b2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,19 +39,21 @@ classifiers = [ "License :: OSI Approved :: MIT License", ] dependencies = [ - "dataclasses_json", - "pyelftools", - "pefile", - "dnfile==0.14.1", - "olefile", - "defusedxml", + "dataclasses_json==0.6.*", + "pyelftools==0.31.*", + # This isn't semver so fix it to a specific release + "pefile==2023.2.7", + "dnfile==0.15.*", + "olefile==0.47.*", + "defusedxml==0.7.*", "spdx-tools==0.8.*", - "cyclonedx-python-lib==7.4.0", - "pluggy", - "click", - "javatools>=1.6.0", - "loguru", - "flask" + "cyclonedx-python-lib==7.6.0", + "pluggy==1.*", + "click==8.*", + "javatools>=1.6,==1.*", + "loguru==0.7.*", + "flask==3.*", + "tomlkit==0.13.*", ] dynamic = ["version"] diff --git a/surfactant/__main__.py b/surfactant/__main__.py index c337d568..b5d70552 100755 --- a/surfactant/__main__.py +++ b/surfactant/__main__.py @@ -12,6 +12,7 @@ from loguru import logger from surfactant.cmd.cli import add, edit, find +from surfactant.cmd.config import config from surfactant.cmd.createconfig import create_config from surfactant.cmd.generate import sbom as generate from surfactant.cmd.merge import merge_command @@ -55,6 +56,7 @@ def cli(): # Main Commands main.add_command(generate) main.add_command(version) +main.add_command(config) main.add_command(stat) main.add_command(merge_command) main.add_command(create_config) diff --git a/surfactant/cmd/cli.py b/surfactant/cmd/cli.py index 83e97452..903318ca 100644 --- a/surfactant/cmd/cli.py +++ b/surfactant/cmd/cli.py @@ -1,11 +1,14 @@ import hashlib import sys +from pathlib import Path import click from loguru import logger from surfactant.plugin.manager import find_io_plugin, get_plugin_manager +from surfactant.sbomtypes._relationship import Relationship from surfactant.sbomtypes._sbom import SBOM +from surfactant.sbomtypes._software import Software @click.argument("sbom", type=click.File("r"), required=True) @@ -52,16 +55,132 @@ def find(sbom, output_format, input_format, **kwargs): output_writer.write_sbom(out_sbom, sys.stdout) +@click.argument("sbom", required=True) +@click.option( + "--output", + default=None, + is_flag=False, + help="Specifies the file to output new sbom. Default replaces the input file.", +) +@click.option("--file", is_flag=False, help="Adds entry for file to sbom") +@click.option("--relationship", is_flag=False, type=str, help="Adds relationship to sbom") +@click.option("--entry", is_flag=False, type=str, help="Adds software entry to sbom") +@click.option( + "--installPath", + is_flag=False, + type=str, + nargs=2, + help="Adds new installPath by finding and replacing a containerPath prefix (1st arg) with a new prefix (2nd arg)", +) +@click.option( + "--output_format", + is_flag=False, + default="surfactant.output.cytrics_writer", + help="SBOM output format, options=[cytrics|csv|spdx|cyclonedx]", +) +@click.option( + "--input_format", + is_flag=False, + default="surfactant.input_readers.cytrics_reader", + help="SBOM input format, options=[cytrics|cyclonedx|spdx]", +) +@click.command("add") +def add(sbom, output, output_format, input_format, **kwargs): + "CLI command to add specific entry(s) to a supplied SBOM" + pm = get_plugin_manager() + output_writer = find_io_plugin(pm, output_format, "write_sbom") + input_reader = find_io_plugin(pm, input_format, "read_sbom") + with open(Path(sbom), "r") as f: + in_sbom = input_reader.read_sbom(f) + # Remove None values + filtered_kwargs = dict({(k, v) for k, v in kwargs.items() if v is not None}) + out_sbom = cli_add().execute(in_sbom, **filtered_kwargs) + # Write to the input file if no output specified + if output is None: + with open(Path(sbom), "w") as f: + output_writer.write_sbom(out_sbom, f) + else: + try: + with open(Path(output), "w") as f: + output_writer.write_sbom(out_sbom, f) + except OSError as e: + logger.error(f"Could not open file {output} in write mode - {e}") + + @click.argument("sbom", type=click.File("r"), required=True) @click.command("edit") -def edit(sbom): +def edit(sbom, output_format, input_format, **kwargs): "CLI command to edit specific entry(s) in a supplied SBOM" -@click.argument("sbom", type=click.File("r"), required=True) -@click.command("add") -def add(sbom): - "CLI command to add specific entry(s) to a supplied SBOM" +class cli_add: + """ + A class that implements the surfactant cli add functionality + + Attributes: + match_functions A dictionary of functions that provide matching functionality for given SBOM fields (i.e. uuid, sha256, installpath, etc) + camel_case_conversions A dictionary of string conversions from all lowercase to camelcase. Used to convert python click options to match the SBOM attribute's case + sbom An internal record of sbom entries the class adds to as it finds more matches. + """ + + camel_case_conversions: dict + match_functions: dict + sbom: SBOM + + def __init__(self): + """Initializes the cli_add class""" + self.match_functions = { + "relationship": self.add_relationship, + "file": self.add_file, + "installPath": self.add_installpath, + "entry": self.add_entry, + } + self.camel_case_conversions = { + "uuid": "UUID", + "filename": "fileName", + "installpath": "installPath", + "capturetime": "captureTime", + "relationshipassertion": "relationshipAssertion", + } + + def handle_kwargs(self, kwargs: dict) -> dict: + converted_kwargs = {} + for k, v in kwargs.items(): # Convert key values to camelcase where appropriate + key = self.camel_case_conversions[k] if k in self.camel_case_conversions else k + converted_kwargs[key] = v + return converted_kwargs + + def execute(self, input_sbom: SBOM, **kwargs): + """Executes the main functionality of the cli_find class + param: input_sbom The sbom to add entries to + param: kwargs: Dictionary of key/value pairs indicating what features to match on + """ + converted_kwargs = self.handle_kwargs(kwargs) + self.sbom = input_sbom + + for key, value in converted_kwargs.items(): + if key in self.match_functions: + self.match_functions[key](value) + else: + logger.warning(f"Paramter {key} is not supported") + return self.sbom + + def add_relationship(self, value: dict) -> bool: + self.sbom.add_relationship(Relationship(**value)) + + def add_file(self, path): + self.sbom.software.append(Software.create_software_from_file(path)) + + def add_entry(self, entry): + self.sbom.software.append(Software.from_dict(entry)) + + def add_installpath(self, prefixes: tuple): + cleaned_prefixes = (p.rstrip("/") for p in prefixes) + containerPathPrefix, installPathPrefix = cleaned_prefixes + for sw in self.sbom.software: + for path in sw.containerPath: + if containerPathPrefix in path: + sw.installPath.append(path.replace(containerPathPrefix, installPathPrefix)) class cli_find: @@ -69,8 +188,9 @@ class cli_find: A class that implements the surfactant cli find functionality Attributes: - match_functions A dictionary of functions that provide matching functionality for given SBOM fields (i.e. uuid, sha256, installpath, etc) - sbom An internal record of sbom entries the class adds to as it finds more matches. + match_functions A dictionary of functions that provide matching functionality for given SBOM fields (i.e. uuid, sha256, installpath, etc) + camel_case_conversions A dictionary of string conversions from all lowercase to camelcase. Used to convert python click options to match the SBOM attribute's case + sbom An internal record of sbom entries the class adds to as it finds more matches. """ match_functions: dict diff --git a/surfactant/cmd/config.py b/surfactant/cmd/config.py new file mode 100644 index 00000000..3e622a9e --- /dev/null +++ b/surfactant/cmd/config.py @@ -0,0 +1,51 @@ +from typing import List, Optional + +import click + +from surfactant.configmanager import ConfigManager + + +@click.command("config") +@click.argument("key", required=True) +@click.argument("values", nargs=-1) +def config(key: str, values: Optional[List[str]]): + """Get or set a configuration value. + + If only KEY is provided, the current value is displayed. + If both KEY and one or more VALUES are provided, the configuration value is set. + KEY should be in the format 'section.option'. + """ + config_manager = ConfigManager() + + if not values: + # Get the configuration value + try: + section, option = key.split(".", 1) + except ValueError as err: + raise SystemExit("Invalid KEY given. Is it in the format 'section.option'?") from err + result = config_manager.get(section, option) + if result is None: + click.echo(f"Configuration '{key}' not found.") + else: + click.echo(f"{key} = {result}") + else: + # Set the configuration value + # Convert 'true' and 'false' strings to boolean + converted_values = [] + for value in values: + if value.lower() == "true": + converted_values.append(True) + elif value.lower() == "false": + converted_values.append(False) + else: + converted_values.append(value) + + # If there's only one value, store it as a single value, otherwise store as a list + final_value = converted_values[0] if len(converted_values) == 1 else converted_values + + try: + section, option = key.split(".", 1) + except ValueError as err: + raise SystemExit("Invalid KEY given. Is it in the format 'section.option'?") from err + config_manager.set(section, option, final_value) + click.echo(f"Configuration '{key}' set to '{final_value}'.") diff --git a/surfactant/cmd/generate.py b/surfactant/cmd/generate.py index 927005b1..c76e208d 100644 --- a/surfactant/cmd/generate.py +++ b/surfactant/cmd/generate.py @@ -7,12 +7,14 @@ import pathlib import queue import re -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import click from loguru import logger from surfactant import ContextEntry +from surfactant.configmanager import ConfigManager +from surfactant.fileinfo import sha256sum from surfactant.plugin.manager import find_io_plugin, get_plugin_manager from surfactant.relationships import parse_relationships from surfactant.sbomtypes import SBOM, Software @@ -143,6 +145,20 @@ def warn_if_hash_collision(soft1: Optional[Software], soft2: Optional[Software]) ) +def get_default_from_config(option: str, fallback: Optional[Any] = None) -> Any: + """Retrive a core config option for use as default argument value. + + Args: + option (str): The core config option to get. + fallback (Optional[Any]): The fallback value if the option is not found. + + Returns: + Any: The configuration value or 'NoneType' if the key doesn't exist. + """ + config_manager = ConfigManager() + return config_manager.get("core", option, fallback=fallback) + + @click.command("generate") @click.argument( "config_file", @@ -176,17 +192,17 @@ def warn_if_hash_collision(soft1: Optional[Software], soft2: Optional[Software]) @click.option( "--recorded_institution", is_flag=False, - default="LLNL", + default=get_default_from_config("recorded_institution"), help="Name of user's institution", ) @click.option( "--output_format", is_flag=False, - default="surfactant.output.cytrics_writer", + default=get_default_from_config("output_format", fallback="surfactant.output.cytrics_writer"), help="SBOM output format, see --list-output-formats for list of options; default is CyTRICS", ) @click.option( - "--list-output-formats", + "--list_output_formats", is_flag=True, callback=print_output_formats, expose_value=False, @@ -200,7 +216,7 @@ def warn_if_hash_collision(soft1: Optional[Software], soft2: Optional[Software]) help="Input SBOM format, see --list-input-formats for list of options; default is CyTRICS", ) @click.option( - "--list-input-formats", + "--list_input_formats", is_flag=True, callback=print_input_formats, expose_value=False, @@ -220,7 +236,7 @@ def sbom( ): """Generate a sbom configured in CONFIG_FILE and output to SBOM_OUTPUT. - An optional INPUT_SBOM can be supplied to use as a base for subsequent operations + An optional INPUT_SBOM can be supplied to use as a base for subsequent operations. """ pm = get_plugin_manager() @@ -260,8 +276,10 @@ def sbom( if not skip_gather: # List of directory symlinks; 2-sized tuples with (source, dest) dir_symlinks: List[Tuple[str, str]] = [] - # List of file symlinks; keys are SHA256 hashes, values are source paths + # List of file install path symlinks; keys are SHA256 hashes, values are source paths file_symlinks: Dict[str, List[str]] = {} + # List of filename symlinks; keys are SHA256 hashes, values are file names + filename_symlinks: Dict[str, List[str]] = {} while not context.empty(): entry: ContextEntry = context.get() if entry.archive: @@ -330,14 +348,32 @@ def sbom( # os.path.join will insert an OS specific separator between cdir and f # need to make sure that separator is a / and not a \ on windows filepath = pathlib.Path(cdir, f).as_posix() - file_is_symlink = False # TODO: add CI tests for generating SBOMs in scenarios with symlinks... (and just generally more CI tests overall...) + # Record symlink details but don't run info extractors on them if os.path.islink(filepath): + # NOTE: resolve_link function could print warning if symlink goes outside of extract path dir true_filepath = resolve_link(filepath, cdir, epath, entry.installPrefix) # Dead/infinite links will error so skip them if true_filepath is None: continue - # Otherwise add them and skip adding the entry + # Compute sha256 hash of the file; skip if the file pointed by the symlink can't be opened + try: + true_file_sha256 = sha256sum(true_filepath) + except FileNotFoundError: + logger.warning( + f"Unable to open symlink {filepath} pointing to {true_filepath}" + ) + continue + # Record the symlink name to be added as a file name + # Dead links would appear as a file, so need to check the true path to see + # if the thing pointed to is a file or a directory + if os.path.isfile(true_filepath): + if true_file_sha256 and true_file_sha256 not in filename_symlinks: + filename_symlinks[true_file_sha256] = [] + symlink_base_name = pathlib.PurePath(filepath).name + if symlink_base_name not in filename_symlinks[true_file_sha256]: + filename_symlinks[true_file_sha256].append(symlink_base_name) + # Record symlink install path if an install prefix is given if entry.installPrefix: install_filepath = real_path_to_install_path( epath, entry.installPrefix, filepath @@ -348,13 +384,18 @@ def sbom( # A dead link shows as a file so need to test if it's a # file or a directory once rebased if os.path.isfile(true_filepath): - # file_symlinks.append((install_filepath, install_dest)) - file_is_symlink = True + if true_file_sha256 and true_file_sha256 not in file_symlinks: + file_symlinks[true_file_sha256] = [] + file_symlinks[true_file_sha256].append(install_filepath) else: dir_symlinks.append((install_filepath, install_dest)) - continue - # We need get_software_entry to look at the true filepath - filepath = true_filepath + # NOTE Two cases that don't get recorded (but maybe should?) are: + # 1. If the file pointed to is outside the extract paths, it won't + # appear in the SBOM at all -- is that desirable? If it were included, + # should the true path also be included as an install path? + # 2. Does a symlink "exist" inside an archive/installer, or only after + # unpacking/installation? + continue if entry.installPrefix or entry.installPrefix == "": install_path = entry.installPrefix @@ -380,16 +421,9 @@ def sbom( except Exception as e: raise RuntimeError(f"Unable to process: {filepath}") from e - if file_is_symlink and entry.installPrefix: - # Track the symlink, but don't add to list of entries - # as it'll be processed later anyways - if sw_parent.sha256 not in file_symlinks: - file_symlinks[sw_parent.sha256] = [] - file_symlinks[sw_parent.sha256].append(install_filepath) - else: - entries.append(sw_parent) - for sw in sw_children: - entries.append(sw) + entries.append(sw_parent) + for sw in sw_children: + entries.append(sw) if entries: # if a software entry already exists with a matching file hash, augment the info in the existing entry @@ -426,17 +460,23 @@ def sbom( ) # TODO a pass later on to check for and remove duplicate relationships should be added just in case - # Add file symlinks to install paths + # Add symlinks to install paths and file names for software in new_sbom.software: + if software.sha256 in filename_symlinks: + filename_symlinks_added = [] + for filename in filename_symlinks[software.sha256]: + if filename not in software.fileName: + software.fileName.append(filename) + filename_symlinks_added.append(filename) + if filename_symlinks_added: + # Store information on which file names are symlinks + software.metadata.append({"fileNameSymlinks": filename_symlinks_added}) if software.sha256 in file_symlinks: symlinks_added = [] for full_path in file_symlinks[software.sha256]: if full_path not in software.installPath: software.installPath.append(full_path) symlinks_added.append(full_path) - base_name = pathlib.PurePath(full_path).name - if base_name not in software.fileName: - software.fileName.append(base_name) if symlinks_added: # Store information on which install paths are symlinks software.metadata.append({"installPathSymlinks": symlinks_added}) diff --git a/surfactant/configmanager.py b/surfactant/configmanager.py new file mode 100644 index 00000000..3ad618ce --- /dev/null +++ b/surfactant/configmanager.py @@ -0,0 +1,142 @@ +import os +import platform +from pathlib import Path +from threading import Lock +from typing import Any, Optional, Union + +import tomlkit + + +class ConfigManager: + """A configuration manager for handling settings stored in a configuration file. The + configuration manager internally caches a copy of the loaded configuration file, so + external changes won't affect the setting value while a program is running. + + Attributes: + app_name (str): The name of the application. (Default: 'surfactant') + config_dir (Optional[Path]): The directory where the configuration file is stored. + config (tomlkit.document): The configuration document loaded by tomlkit. Preserves formatting and comments. + config_file_path (Path): The path to the configuration file. + """ + + _instances = {} + _lock = Lock() + + def __new__( + cls, app_name: str = "surfactant", config_dir: Optional[Union[str, Path]] = None + ) -> "ConfigManager": + """Manage singleton configuration manager for each unique application name. + + Args: + app_name (str): The name of the application. (Default: 'surfactant') + config_dir (Optional[Union[str, Path]]): The directory where the application configuration is stored. + + Returns: + ConfigManager: The singleton instance of the configuration manager for the given application name. + """ + with cls._lock: + if app_name not in cls._instances: + instance = super(ConfigManager, cls).__new__(cls) + instance._initialized = False + cls._instances[app_name] = instance + return cls._instances[app_name] + + def __init__( + self, app_name: str = "surfactant", config_dir: Optional[Union[str, Path]] = None + ) -> None: + """Initializes the configuration manager. + + Args: + app_name (str): The name of the application. (Default: 'surfactant') + config_dir (Optional[Union[str, Path]]): The directory where the application configuration is stored. + """ + if self._initialized: + return + self._initialized = True + + self.app_name = app_name + self.config_dir = Path(config_dir) / app_name if config_dir else None + self.config = tomlkit.document() + self.config_file_path = self._get_config_file_path() + self._load_config() + + def _get_config_file_path(self) -> Path: + """Determines the path to the configuration file. + + Returns: + Path: The path to the configuration file. + """ + if self.config_dir: + config_dir = Path(self.config_dir) + else: + if platform.system() == "Windows": + config_dir = Path(os.getenv("APPDATA", os.path.expanduser("~\\AppData\\Roaming"))) + else: + config_dir = Path(os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))) + config_dir = config_dir / self.app_name + return config_dir / "config.toml" + + def _load_config(self) -> None: + """Loads the configuration from the configuration file.""" + if self.config_file_path.exists(): + with open(self.config_file_path, "r") as configfile: + self.config = tomlkit.parse(configfile.read()) + + def get(self, section: str, option: str, fallback: Optional[Any] = None) -> Any: + """Gets a configuration value. + + Args: + section (str): The section within the configuration file. + option (str): The option within the section. + fallback (Optional[Any]): The fallback value if the option is not found. + + Returns: + Any: The configuration value or the fallback value. + """ + return self.config.get(section, {}).get(option, fallback) + + def set(self, section: str, option: str, value: Any) -> None: + """Sets a configuration value. + + Args: + section (str): The section within the configuration file. + option (str): The option within the section. + value (Any): The value to set. + """ + if section not in self.config: + self.config[section] = tomlkit.table() + self.config[section][option] = value + self._save_config() + + def _save_config(self) -> None: + """Saves the configuration to the configuration file.""" + if not self.config_file_path.exists(): + self.config_file_path.parent.mkdir(parents=True, exist_ok=True) + with open(self.config_file_path, "w") as configfile: + configfile.write(tomlkit.dumps(self.config)) + + def __getitem__(self, key: str) -> Any: + """Enables dictionary-like syntax for accessing configuration settings. + NOTE: Remember to check that the value returned is not 'None' before + trying to access nested keys. + + Args: + key (str): The key for accessing a TOML value or table. + + Returns: + Any: The configuration value or 'NoneType' if the key doesn't exist. + """ + if key not in self.config: + return None + return self.config[key] + + @classmethod + def delete_instance(cls, app_name: str) -> None: + """Deletes the singleton instance for the given application name. + + Args: + app_name (str): The name of the application. + """ + with cls._lock: + if app_name in cls._instances: + del cls._instances[app_name] diff --git a/surfactant/fileinfo.py b/surfactant/fileinfo.py index 17c8469b..fbb8782f 100755 --- a/surfactant/fileinfo.py +++ b/surfactant/fileinfo.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: MIT import os import stat +import sys from hashlib import md5, sha1, sha256 @@ -54,7 +55,12 @@ def calc_file_hashes(filename): """ sha256_hash = sha256() sha1_hash = sha1() - md5_hash = md5() + # hashlib.md5 usedforsecurity flag was added in Python 3.9 + if sys.version_info >= (3, 9): + # avoid error with FIPS-compliant OpenSSL library builds complaining about md5 + md5_hash = md5(usedforsecurity=False) + else: + md5_hash = md5() b = bytearray(4096) mv = memoryview(b) try: @@ -70,3 +76,25 @@ def calc_file_hashes(filename): "sha1": sha1_hash.hexdigest(), "md5": md5_hash.hexdigest(), } + + +def sha256sum(filename): + """Calculate sha256 hash for the file specified. May throw a FileNotFound exception. + + Args: + filename (str): Name of file. + + Returns: + Optional[str]: The sha256 hash of the file. + + Raises: + FileNotFoundError: If the given filename could not be found. + """ + h = sha256() + with open(filename, "rb") as f: + # Reading is buffered by default (https://docs.python.org/3/library/functions.html#open) + chunk = f.read(h.block_size) + while chunk: + h.update(chunk) + chunk = f.read(h.block_size) + return h.hexdigest() diff --git a/surfactant/filetypeid/id_magic.py b/surfactant/filetypeid/id_magic.py index 7d4b73e9..7567ede8 100755 --- a/surfactant/filetypeid/id_magic.py +++ b/surfactant/filetypeid/id_magic.py @@ -2,7 +2,9 @@ # See the top-level LICENSE file for details. # # SPDX-License-Identifier: MIT +import json import pathlib +import tarfile from enum import Enum, auto from typing import Optional @@ -19,6 +21,30 @@ class ExeType(Enum): MACHO64 = auto() +def is_docker_archive(filepath: str) -> bool: + # pylint: disable=too-many-return-statements + with tarfile.open(filepath) as tar: + try: + manifest_info = tar.getmember("manifest.json") + if not manifest_info.isfile(): + return False + with tar.extractfile(manifest_info) as manifest_file: + manifest = json.load(manifest_file) + # There's one entry in the list for each image + if not isinstance(manifest, list): + return False + for data in manifest: + # Just check if this data member exists + _ = tar.getmember(data["Config"]) + # Now check that each of the layers exist + for layer in data["Layers"]: + _ = tar.getmember(layer) + # Everything seems to exist and be in order; this is most likely a Docker archive + return True + except KeyError: + return False + + @surfactant.plugin.hookimpl(tryfirst=True) def identify_file_type(filepath: str) -> Optional[str]: # pylint: disable=too-many-return-statements @@ -76,8 +102,12 @@ def identify_file_type(filepath: str) -> Optional[str]: ".tar.gz", ".cab.gz", ]: + if is_docker_archive(filepath): + return "DOCKER_GZIP" return "GZIP" if magic_bytes[257:265] == b"ustar\x0000" or magic_bytes[257:265] == b"ustar \x00": + if is_docker_archive(filepath): + return "DOCKER_TAR" return "TAR" if magic_bytes[:4] in [b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08"]: suffix = pathlib.Path(filepath).suffix.lower() diff --git a/surfactant/infoextractors/docker_image.py b/surfactant/infoextractors/docker_image.py new file mode 100644 index 00000000..d62e8682 --- /dev/null +++ b/surfactant/infoextractors/docker_image.py @@ -0,0 +1,64 @@ +# Copyright 2023 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT +import gzip +import json +import subprocess +import tempfile + +from loguru import logger + +import surfactant.plugin +from surfactant.sbomtypes import SBOM, Software + + +def is_docker_scout_installed(): + # Check that Docker Scout can be run + try: + result = subprocess.run(["docker", "scout"], capture_output=True, check=False) + if result.returncode != 0: + logger.warning("Install Docker Scout to scan containers for additional information") + return False + return True + except FileNotFoundError: + return False + + +# Check if Docker Scout is installed when this Python module gets loaded +disable_docker_scout = not is_docker_scout_installed() + + +def supports_file(filetype: str) -> bool: + return filetype in ("DOCKER_TAR", "DOCKER_GZIP") + + +@surfactant.plugin.hookimpl +def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object: + if disable_docker_scout or not supports_file(filetype): + return None + return extract_docker_info(filetype, filename) + + +def extract_docker_info(filetype: str, filename: str) -> object: + if filetype == "DOCKER_GZIP": + with open(filename, "rb") as gzip_in: + gzip_data = gzip_in.read() + with tempfile.NamedTemporaryFile() as gzip_out: + gzip_out.write(gzip.decompress(gzip_data)) + return run_docker_scout(gzip_out.name) + return run_docker_scout(filename) + + +# Function that extract_docker_info delegates to to actually run Docker scout +def run_docker_scout(filename: str) -> object: + result = subprocess.run( + ["docker", "scout", "sbom", "--format", "spdx", f"fs://{filename}"], + capture_output=True, + check=False, + ) + if result.returncode != 0: + logger.warning(f"Running Docker Scout on {filename} failed") + return {} + spdx_out = json.loads(result.stdout) + return {"dockerSPDX": spdx_out} diff --git a/surfactant/infoextractors/docker_tarball_file.py b/surfactant/infoextractors/docker_tarball_file.py new file mode 100644 index 00000000..a63805f2 --- /dev/null +++ b/surfactant/infoextractors/docker_tarball_file.py @@ -0,0 +1,87 @@ +# Copyright 2024 Lawrence Livermore National Security, LLC +# see: ${repository}/LICENSE +# +# SPDX-License-Identifier: MIT + +import tarfile +from pathlib import PurePosixPath +import json +from typing import IO, Any, Union + +import surfactant.plugin +from surfactant.sbomtypes import SBOM, Software + + +def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None: + return tarball.extractfile( + {tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"] + ) + + +def get_config_file_from_tarball( + tarball: tarfile.TarFile, path: str +) -> Union[IO[bytes], None]: + return tarball.extractfile( + {tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path] + ) + + +def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "Config" + return [entry[path] for entry in manifest] + + +def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "RepoTags" + return [entry[path] for entry in manifest] + + +def portable_path_list(*paths: str): + """Convert paths to a portable format acknowledged by""" + return tuple(str(PurePosixPath(path_str)) for path_str in paths) + + +def supports_file(filename: str, filetype: str) -> bool: + EXPECTED_FILETYPE = "TAR" + + expected_members = portable_path_list( + "index.json", + "manifest.json", + "oci-layout", + "repositories", + "blobs/sha256", + ) + + if filetype != EXPECTED_FILETYPE: + return False + + with tarfile.open(filename) as this_tarfile: + found_members = portable_path_list( + *[member.name for member in this_tarfile.getmembers()] + ) + + return all(expected_member in found_members for expected_member in expected_members) + + +@surfactant.plugin.hookimpl +def extract_file_info( + sbom: SBOM, software: Software, filename: str, filetype: str +) -> object: + if not supports_file(filename, filetype): + return None + return extract_image_info(filename) + + +def extract_image_info(filename: str): + """Return image configuration objects mapped by their paths.""" + root_key = "dockerImageConfigs" + image_info: dict[str, list[dict[str, Any]]] = {root_key: []} + with tarfile.open(filename) as tarball: + # we know the manifest file is present or we wouldn't be this far + assert (manifest_file := get_manifest_file_from_tarball(tarball)) + manifest = json.load(manifest_file) + for config_path in manifest.get_config_path_from_manifest(manifest): + assert (config_file := get_config_file_from_tarball(tarball, config_path)) + config = json.load(config_file) + image_info[root_key].append(config) + return image_info diff --git a/surfactant/infoextractors/pe_file.py b/surfactant/infoextractors/pe_file.py index 1800801f..47453c1b 100755 --- a/surfactant/infoextractors/pe_file.py +++ b/surfactant/infoextractors/pe_file.py @@ -14,6 +14,7 @@ import defusedxml.ElementTree import dnfile +from loguru import logger import surfactant.plugin from surfactant.sbomtypes import SBOM, Software @@ -91,7 +92,7 @@ def extract_pe_info(filename): file_details["peMachine"] = pe_machine_types[pe.FILE_HEADER.Machine] else: file_details["peMachine"] = pe.FILE_HEADER.Machine - print("[WARNING] Unknown machine type encountered in PE file header") + logger.warning("Unknown machine type encountered in PE file header") if pe.OPTIONAL_HEADER is not None: file_details["peOperatingSystemVersion"] = ( f"{pe.OPTIONAL_HEADER.MajorOperatingSystemVersion}.{pe.OPTIONAL_HEADER.MinorOperatingSystemVersion}" @@ -103,7 +104,7 @@ def extract_pe_info(filename): file_details["peSubsystem"] = pe_subsystem_types[pe.OPTIONAL_HEADER.Subsystem] else: file_details["peSubsystem"] = pe.OPTIONAL_HEADER.Subsystem - print("[WARNING] Unknown Windows Subsystem type encountered in PE file header") + logger.warning("Unknown Windows Subsystem type encountered in PE file header") file_details["peLinkerVersion"] = ( f"{pe.OPTIONAL_HEADER.MajorLinkerVersion}.{pe.OPTIONAL_HEADER.MinorLinkerVersion}" ) @@ -190,19 +191,20 @@ def extract_pe_info(filename): def add_core_assembly_info(asm_dict, asm_info): - asm_dict["Name"] = asm_info.Name.value if hasattr(asm_info.Name, "value") else asm_info.Name + # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L36-L39 + # HeapItemString value will be decoded string, or None if there was a UnicodeDecodeError + asm_dict["Name"] = asm_info.Name.value if asm_info.Name.value else asm_info.raw_data.hex() asm_dict["Culture"] = ( - asm_info.Culture.value if hasattr(asm_info.Culture, "value") else asm_info.Culture + asm_info.Culture.value if asm_info.Culture.value else asm_info.Culture.raw_data.hex() ) asm_dict["Version"] = ( f"{asm_info.MajorVersion}.{asm_info.MinorVersion}.{asm_info.BuildNumber}.{asm_info.RevisionNumber}" ) + # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L62-L66 + # HeapItemBinary value is the bytes following the compressed int (indicating the length) asm_dict["PublicKey"] = ( - asm_info.PublicKey.hex() - if hasattr(asm_info.PublicKey, "hex") - else ( - asm_info.PublicKey.value if hasattr(asm_info.PublicKey, "value") else asm_info.PublicKey - ) + # raw_data attribute of PublicKey includes leading byte with length of data, value attr removes it + asm_info.PublicKey.value.hex() ) @@ -233,7 +235,9 @@ def add_assembly_flags_info(asm_dict, asm_info): def get_assembly_info(asm_info): asm: Dict[str, Any] = {} add_core_assembly_info(asm, asm_info) - asm["HashAlgId"] = asm_info.HashAlgId + # REFERENCE: https://github.com/malwarefrank/dnfile/blob/fcccdaf/src/dnfile/enums.py#L851-L863 + # HashAlgID is a dnfile enum, based on possible .NET hash algs + asm["HashAlgId"] = asm_info.HashAlgId.name add_assembly_flags_info(asm, asm_info) return asm @@ -241,18 +245,25 @@ def get_assembly_info(asm_info): def get_assemblyref_info(asmref_info): asmref: Dict[str, Any] = {} add_core_assembly_info(asmref, asmref_info) - asmref["HashValue"] = ( - asmref_info.HashValue.hex() - if hasattr(asmref_info.HashValue, "hex") - else asmref_info.HashValue - ) + # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L62-L66 + # HeapItemBinary value is the bytes following the compressed int (indicating the length) + # raw_data attribute has the compressed int indicating length included + asmref["HashValue"] = asmref_info.HashValue.value.hex() add_assembly_flags_info(asmref, asmref_info) return asmref def insert_implmap_info(im_info, imp_modules): - dllName = im_info.ImportScope.row.Name - methodName = im_info.ImportName + # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L36-L39 + # HeapItemString value will be decoded string, or None if there was a UnicodeDecodeError + dllName = ( + im_info.ImportScope.row.Name.value + if im_info.ImportScope.row.Name.value + else im_info.ImportScope.row.Name.raw_data.hex() + ) + methodName = ( + im_info.ImportName.value if im_info.ImportName.value else im_info.ImportName.raw_data.hex() + ) if dllName: for imp_module in imp_modules: if imp_module["Name"] == dllName: @@ -276,7 +287,7 @@ def get_windows_manifest_info(filename): binary_filepath = pathlib.Path(filename) manifest_filepath = binary_filepath.with_suffix(binary_filepath.suffix + ".manifest") if manifest_filepath.exists(): - print("Found application manifest file for " + filename) + logger.info("Found application manifest file for " + filename) et = defusedxml.ElementTree.parse(manifest_filepath) manifest_info = {} @@ -288,8 +299,8 @@ def get_windows_manifest_info(filename): asm_xmlns, asm_tag = get_xmlns_and_tag(asm_e) if asm_tag == "assemblyIdentity": if "assemblyIdentity" in manifest_info: - print( - "[WARNING] duplicate assemblyIdentity element found in the manifest file: " + logger.warning( + "duplicate assemblyIdentity element found in the manifest file: " + str(manifest_filepath) ) manifest_info["assemblyIdentity"] = asm_e.attrib @@ -299,8 +310,8 @@ def get_windows_manifest_info(filename): manifest_info["file"].append(asm_e.attrib) if asm_tag == "dependency": if "dependency" in manifest_info: - print( - "[WARNING] duplicate dependency element found in the manifest file: " + logger.warning( + "duplicate dependency element found in the manifest file: " + str(manifest_filepath) ) dependency_info: Dict[str, Any] = {} @@ -321,28 +332,28 @@ def get_windows_manifest_info(filename): def get_dependentAssembly_info(da_et, config_filepath=""): daet_xmlns, daet_tag = get_xmlns_and_tag(da_et) if daet_tag != "dependentAssembly": - print("[WARNING] element tree given was not for a dependentAssembly element tag") + logger.warning("element tree given was not for a dependentAssembly element tag") da_info = {} for da_e in da_et: da_xmlns, da_tag = get_xmlns_and_tag(da_e) if da_tag == "assemblyIdentity": if "assemblyIdentity" in da_info: - print( - "[WARNING] duplicate assemblyIdentity element found in the app config file: " + logger.warning( + "duplicate assemblyIdentity element found in the app config file: " + str(config_filepath) ) da_info["assemblyIdentity"] = da_e.attrib if da_tag == "codeBase": if "codeBase" in da_info: - print( - "[WARNING] duplicate codeBase element found in the app config file: " + logger.warning( + "duplicate codeBase element found in the app config file: " + str(config_filepath) ) da_info["codeBase"] = da_e.attrib if da_tag == "bindingRedirect": if "bindingRedirect" in da_info: - print( - "[WARNING] duplicate bindingRedirect element found in the app config file: " + logger.warning( + "duplicate bindingRedirect element found in the app config file: " + str(config_filepath) ) da_info["bindingRedirect"] = da_e.attrib @@ -356,7 +367,7 @@ def get_dependentAssembly_info(da_et, config_filepath=""): def get_assemblyBinding_info(ab_et, config_filepath=""): xmlns, tag = get_xmlns_and_tag(ab_et) if tag != "assemblyBinding": - print("[WARNING] element tree given was not for an assemblyBinding tag") + logger.warning("element tree given was not for an assemblyBinding tag") ab_info = {} @@ -373,8 +384,8 @@ def get_assemblyBinding_info(ab_et, config_filepath=""): # privatePath: "bin;bin2\subbin;bin3" if ab_tag == "probing": if "probing" in ab_info: - print( - "[WARNING] duplicate probing element found in the app config file: " + logger.warning( + "duplicate probing element found in the app config file: " + str(config_filepath) ) ab_info["probing"] = ab_e.attrib @@ -406,8 +417,8 @@ def get_assemblyBinding_info(ab_et, config_filepath=""): # - fullName: "math,version=...,publicKeyToken=...,culture=neutral" if ab_tag == "qualifyAssembly": if "qualifyAssembly" in ab_info: - print( - "[WARNING] duplicate qualifyAssembly element found in the app config file: " + logger.warning( + "duplicate qualifyAssembly element found in the app config file: " + str(config_filepath) ) ab_info["qualifyAssembly"] = ab_e.attrib @@ -429,7 +440,7 @@ def get_windows_application_config_info(filename): binary_filepath = pathlib.Path(filename) config_filepath = binary_filepath.with_suffix(binary_filepath.suffix + ".config") if config_filepath.exists(): - print("Found application configuration file for " + filename) + logger.info("Found application configuration file for " + filename) et = defusedxml.ElementTree.parse(config_filepath) app_config_info = {} @@ -484,15 +495,15 @@ def get_windows_application_config_info(filename): xmlns, tag = get_xmlns_and_tag(win_child) if tag == "probing": if "probing" in windows_info: - print( - "[WARNING] duplicate windows/probing element was found in the app config file: " + logger.warning( + "duplicate windows/probing element was found in the app config file: " + str(config_filepath) ) if "privatePath" in win_child.attrib: windows_info["probing"] = {"privatePath": win_child.attrib["privatePath"]} else: - print( - "[WARNING] windows/probing element missing privatePath attribute in app config file: " + logger.warning( + "windows/probing element missing privatePath attribute in app config file: " + str(config_filepath) ) if tag == "assemblyBinding": @@ -521,8 +532,8 @@ def get_windows_application_config_info(filename): # attribute is either 'true' or 'false' (string) # Causes runtime to search directory given in DEVPATH env var for assemblies first (skips signature checks) if "developmentMode" in runtime_info: - print( - "[WARNING] duplicate developmentMode element was found in the app config file: " + logger.warning( + "duplicate developmentMode element was found in the app config file: " + str(config_filepath) ) if "developerInstallation" in rt_child.attrib: @@ -530,8 +541,8 @@ def get_windows_application_config_info(filename): "developerInstallation": rt_child.attrib["developerInstallation"] } else: - print( - "[WARNING] developmentMode element missing developerInstallation attribute in app config file: " + logger.warning( + "developmentMode element missing developerInstallation attribute in app config file: " + str(config_filepath) ) if tag == "assemblyBinding": diff --git a/surfactant/plugin/manager.py b/surfactant/plugin/manager.py index 63b0ac2e..de492b63 100644 --- a/surfactant/plugin/manager.py +++ b/surfactant/plugin/manager.py @@ -17,6 +17,8 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: from surfactant.infoextractors import ( a_out_file, coff_file, + docker_image, + docker_tarball_file, elf_file, java_file, js_file, @@ -43,6 +45,8 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: id_extension, a_out_file, coff_file, + docker_image, + docker_tarball_file, elf_file, java_file, js_file, diff --git a/tests/cmd/test_cli.py b/tests/cmd/test_cli.py index 70839549..0d3c50c5 100644 --- a/tests/cmd/test_cli.py +++ b/tests/cmd/test_cli.py @@ -5,11 +5,18 @@ import pathlib -from surfactant.cmd.cli import cli_find -from surfactant.sbomtypes import SBOM +import pytest + +from surfactant.cmd.cli import cli_add, cli_find +from surfactant.sbomtypes import SBOM, Relationship + + +@pytest.fixture(name="test_sbom") +def fixture_test_sbom(): + with open(pathlib.Path(__file__).parent / "../data/sample_sboms/helics_sbom.json", "r") as f: + sbom = SBOM.from_json(f.read()) + return sbom -with open(pathlib.Path(__file__).parent / "../data/sample_sboms/helics_sbom.json", "r") as f: - in_sbom = SBOM.from_json(f.read()) bad_sbom = SBOM( { @@ -41,9 +48,9 @@ ) -def test_find_by_sha256(): +def test_find_by_sha256(test_sbom): out_bom = cli_find().execute( - in_sbom, sha256="f41ca6f7c447225df3a7eef754d303d22cf877586735fb2d56d1eb15bf1daed9" + test_sbom, sha256="f41ca6f7c447225df3a7eef754d303d22cf877586735fb2d56d1eb15bf1daed9" ) assert len(out_bom.software) == 1 assert ( @@ -52,9 +59,9 @@ def test_find_by_sha256(): ) -def test_find_by_multiple_hashes(): +def test_find_by_multiple_hashes(test_sbom): out_bom = cli_find().execute( - in_sbom, + test_sbom, sha256="f41ca6f7c447225df3a7eef754d303d22cf877586735fb2d56d1eb15bf1daed9", md5="5fbf80df5004db2f0ce1f78b524024fe", ) @@ -65,17 +72,17 @@ def test_find_by_multiple_hashes(): ) -def test_find_by_mismatched_hashes(): +def test_find_by_mismatched_hashes(test_sbom): out_bom = cli_find().execute( - in_sbom, + test_sbom, sha256="f41ca6f7c447225df3a7eef754d303d22cf877586735fb2d56d1eb15bf1daed9", md5="2ff380e740d2eb09e5d67f6f2cd17636", ) assert len(out_bom.software) == 0 -def test_find_by_containerPath(): - out_bom = cli_find().execute(in_sbom, containerpath="477da45b-bb38-450e-93f7-e525aaaa6862/") +def test_find_by_containerPath(test_sbom): + out_bom = cli_find().execute(test_sbom, containerpath="477da45b-bb38-450e-93f7-e525aaaa6862/") assert len(out_bom.software) == 7 @@ -91,3 +98,59 @@ def test_find_with_bad_filter(): assert len(out_bom.software) == 0 out_bom = cli_find().execute(bad_sbom, bad_filter=1.234) # Unsupported Type assert len(out_bom.software) == 0 + + +def test_add_by_file(test_sbom): + previous_software_len = len(test_sbom.software) + out_bom = cli_add().execute( + test_sbom, file=pathlib.Path(__file__).parent / "../data/a_out_files/big_m68020.aout" + ) + assert len(out_bom.software) == previous_software_len + 1 + assert ( + out_bom.software[8].sha256 + == "9e125f97e5f180717096c57fa2fdf06e71cea3e48bc33392318643306b113da4" + ) + + +def test_add_entry(test_sbom): + entry = { + "UUID": "6b50c545-3e07-4aec-bbb0-bae07704143a", + "name": "Test Aout File", + "size": 4, + "fileName": ["big_m68020.aout"], + "installPath": [], + "containerPath": [], + "captureTime": 1715726918, + "sha1": "fbf8688fbe1976b6f324b0028c4b97137ae9139d", + "sha256": "9e125f97e5f180717096c57fa2fdf06e71cea3e48bc33392318643306b113da4", + "md5": "e8d3808a4e311a4262563f3cb3a31c3e", + "comments": "This is a test entry.", + } + previous_software_len = len(test_sbom.software) + out_bom = cli_add().execute(test_sbom, entry=entry) + assert len(out_bom.software) == previous_software_len + 1 + assert ( + out_bom.software[8].sha256 + == "9e125f97e5f180717096c57fa2fdf06e71cea3e48bc33392318643306b113da4" + ) + + +def test_add_relationship(test_sbom): + relationship = { + "xUUID": "455341bb-2739-4918-9805-e1a93e27e2a4", + "yUUID": "e286a415-6c6b-427d-9fe6-d7dbb0486f7d", + "relationship": "Uses", + } + previous_rel_len = len(test_sbom.relationships) + out_bom = cli_add().execute(test_sbom, relationship=relationship) + assert len(out_bom.relationships) == previous_rel_len + 1 + test_sbom.relationships.discard(Relationship(**relationship)) + + +def test_add_installpath(test_sbom): + containerPathPrefix = "477da45b-bb38-450e-93f7-e525aaaa6862/" + installPathPrefix = "/bin/" + out_bom = cli_add().execute(test_sbom, installpath=(containerPathPrefix, installPathPrefix)) + for sw in out_bom.software: + if containerPathPrefix in sw.containerPath: + assert installPathPrefix in sw.installPath diff --git a/tests/config/test_configmanager.py b/tests/config/test_configmanager.py new file mode 100644 index 00000000..d42d4780 --- /dev/null +++ b/tests/config/test_configmanager.py @@ -0,0 +1,109 @@ +import os +import platform +from pathlib import Path + +import pytest + +from surfactant.configmanager import ConfigManager + + +@pytest.fixture(name="config_manager") +def fixture_config_manager(tmp_path): + # Use the tmp_path fixture for the temporary directory + config_manager = ConfigManager(app_name="testapp", config_dir=tmp_path) + yield config_manager + # Cleanup after test + ConfigManager.delete_instance("testapp") + + +def test_singleton(config_manager): + config_manager2 = ConfigManager(app_name="testapp") + assert config_manager is config_manager2 + + +def test_set_and_get(config_manager): + config_manager.set("Settings", "theme", "dark") + theme = config_manager.get("Settings", "theme") + assert theme == "dark" + + +def test_set_and_getitem(config_manager): + config_manager.set("Settings", "theme", "dark") + theme = config_manager["Settings"]["theme"] + assert theme == "dark" + + +def test_createinstance_and_getitem(config_manager): + config_manager.set("Settings", "theme", "dark") + # ConfigManager instance accessed will be the same as the one created in the test fixture + # so that the set value above will be present for testing + settings_config = ConfigManager(app_name="testapp")["Settings"] + assert settings_config + assert "theme" in settings_config # pylint: disable=unsupported-membership-test + assert settings_config["theme"] == "dark" # pylint: disable=unsubscriptable-object + + +def test_get_with_fallback(config_manager): + fallback_value = "light" + theme = config_manager.get("Settings", "theme", fallback=fallback_value) + assert theme == fallback_value + + +def test_config_file_creation(config_manager): + config_manager.set("Settings", "theme", "dark") + assert config_manager.config_file_path.exists() + + +@pytest.mark.skipif(platform.system() != "Windows", reason="Test specific to Windows platform") +def test_windows_config_path(): + config_manager = ConfigManager(app_name="testapp") + config_path = config_manager._get_config_file_path() # pylint: disable=protected-access + expected_config_dir = Path(os.getenv("APPDATA", str(Path("~\\AppData\\Roaming").expanduser()))) + assert expected_config_dir in config_path.parents + # delete instance so other tests don't accidentally use it + config_manager.delete_instance("testapp") + + +@pytest.mark.skipif(platform.system() == "Windows", reason="Test specific to Unix-like platforms") +def test_unix_config_path(): + config_manager = ConfigManager(app_name="testapp") + config_path = config_manager._get_config_file_path() # pylint: disable=protected-access + expected_config_dir = Path(os.getenv("XDG_CONFIG_HOME", str(Path("~/.config").expanduser()))) + assert expected_config_dir in config_path.parents + # delete instance so other tests don't accidentally use it + config_manager.delete_instance("testapp") + + +def test_preserve_comments(config_manager): + # Create a config file with some value and add in a comment line + config_manager.set("Settings", "theme", "dark") + with open(config_manager.config_file_path, "a") as configfile: + configfile.write("\n# This is a comment\n") + # Force reload of cached config in the ConfigManager + config_manager._load_config() # pylint: disable=protected-access + assert config_manager.get("Settings", "theme") == "dark" + + # Set a new value to make ConfigManager to save an updated config file + config_manager.set("Settings", "language", "en") + with open(config_manager.config_file_path, "r") as configfile: + content = configfile.read() + assert "# This is a comment" in content + + +def test_multiple_instances(tmp_path): + # Make sure two separate config managers can more or less co-exist peacefully + config_manager1 = ConfigManager(app_name="testapp1", config_dir=tmp_path) + config_manager2 = ConfigManager(app_name="testapp2", config_dir=tmp_path) + config_manager1.set("Settings", "theme", "dark") + config_manager2.set("Settings", "theme", "light") + assert config_manager1.get("Settings", "theme") == "dark" + assert config_manager2.get("Settings", "theme") == "light" + assert config_manager1.config_file_path.exists() + assert config_manager2.config_file_path.exists() + assert config_manager1.config_file_path != config_manager2.config_file_path + ConfigManager.delete_instance("testapp1") + ConfigManager.delete_instance("testapp2") + + +if __name__ == "__main__": + pytest.main()