Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather Docker-specific image configs #247

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 90 additions & 19 deletions surfactant/infoextractors/docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
import gzip
import json
import subprocess
import tarfile
import tempfile
from typing import IO, Any, Union

from loguru import logger

### ===============================
### Utility Predicates
### ===============================
import surfactant.plugin
from surfactant.sbomtypes import SBOM, Software

Expand All @@ -25,33 +30,38 @@ def is_docker_scout_installed():
return False


# Check if Docker Scout is installed when this Python module gets loaded
disable_docker_scout = not is_docker_scout_installed()
def is_oci_archive(filename: str) -> bool:
"""Return True if given file is a tarball
roughly matching the OCI specification"""

with tarfile.open(filename) as this_tarfile: # oci-layout only path ensured
return "oci-layout" in this_tarfile.getmembers()


def supports_file(filetype: str) -> bool:
return filetype in ("DOCKER_TAR", "DOCKER_GZIP")


@surfactant.plugin.hookimpl
def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object:
if disable_docker_scout or not supports_file(filetype):
return None
return extract_docker_info(filetype, filename)

### ===============================
### Archive Utilities
### ===============================
def gunzip_tarball(filename: str) -> object:
"""Unzip a gzipped tarball to a temporary file
and return the name of the corresponding file."""

def extract_docker_info(filetype: str, filename: str) -> object:
if filetype == "DOCKER_GZIP":
with open(filename, "rb") as gzip_in:
gzip_data = gzip_in.read()
with tempfile.NamedTemporaryFile() as gzip_out:
gzip_out.write(gzip.decompress(gzip_data))
return run_docker_scout(gzip_out.name)
return run_docker_scout(filename)
with open(filename, "rb") as gzip_in:
gzip_data = gzip_in.read()
with tempfile.NamedTemporaryFile() as gzip_out:
gzip_out.write(gzip.decompress(gzip_data))
return gzip_out.name


# Function that extract_docker_info delegates to to actually run Docker scout
def run_docker_scout(filename: str) -> object:
### ===============================
### Extraction Procedures
### ===============================
def extract_info_via_docker_scout(filename: str) -> object:
"""Dispatch to `docker-scout` subprocess,
returning captured SPDX output"""
result = subprocess.run(
["docker", "scout", "sbom", "--format", "spdx", f"fs://{filename}"],
capture_output=True,
Expand All @@ -61,4 +71,65 @@ def run_docker_scout(filename: str) -> object:
logger.warning(f"Running Docker Scout on {filename} failed")
return {}
spdx_out = json.loads(result.stdout)
return {"dockerSPDX": spdx_out}
return spdx_out


def extract_configs(filename: str):
"""Return image configuration objects mapped by their paths."""

def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"]
)

def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Union[IO[bytes], None]:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path]
)

def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "Config"
return [entry[path] for entry in manifest]

# currently unused
def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "RepoTags"
return [entry[path] for entry in manifest]

image_configs = []
with tarfile.open(filename) as tarball:
# we know the manifest file is present or we wouldn't be this far
assert (manifest_file := get_manifest_file_from_tarball(tarball))
manifest = json.load(manifest_file)
for config_path in get_config_path_from_manifest(manifest):
assert (config_file := get_config_file_from_tarball(tarball, config_path))
config = json.load(config_file)
image_configs.append(config)
return image_configs


### =================================
### Hook Implementation
### =================================


@surfactant.plugin.hookimpl
def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object:
if not supports_file(filetype):
return None

## Conditionally extract tarball if gzipped
filename = gunzip_tarball(filename) if filetype == "DOCKER_GZIP" else filename

## Establish metadata object
metadata = {}

## Extract config files
metadata["dockerImageConfigs"] = extract_configs(filename)

## Use docker-scout if available
if is_docker_scout_installed():
metadata["dockerSPDX"] = extract_info_via_docker_scout(filename)

## Return final metadata object
return metadata
79 changes: 79 additions & 0 deletions surfactant/infoextractors/docker_tarball_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2024 Lawrence Livermore National Security, LLC
# see: ${repository}/LICENSE
#
# SPDX-License-Identifier: MIT

import json
import tarfile
from pathlib import PurePosixPath
from typing import IO, Any, Union

import surfactant.plugin
from surfactant.sbomtypes import SBOM, Software


def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"]
)


def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Union[IO[bytes], None]:
return tarball.extractfile({tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path])


def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "Config"
return [entry[path] for entry in manifest]


def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "RepoTags"
return [entry[path] for entry in manifest]


def portable_path_list(*paths: str):
"""Convert paths to a portable format acknowledged by"""
return tuple(str(PurePosixPath(path_str)) for path_str in paths)


def supports_file(filename: str, filetype: str) -> bool:
EXPECTED_FILETYPE = "DOCKER_TAR"

expected_members = portable_path_list(
"index.json",
"manifest.json",
"oci-layout",
"repositories",
"blobs/sha256",
)

if filetype != EXPECTED_FILETYPE:
return False

with tarfile.open(filename) as this_tarfile:
found_members = portable_path_list(*[member.name for member in this_tarfile.getmembers()])

return all(expected_member in found_members for expected_member in expected_members)


@surfactant.plugin.hookimpl
def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object:
if not supports_file(filename, filetype):
return None
return extract_image_info(filename)


def extract_image_info(filename: str):
"""Return image configuration objects mapped by their paths."""
root_key = "dockerImageConfigs"
image_info: dict[str, list[dict[str, Any]]] = {root_key: []}
with tarfile.open(filename) as tarball:
# we know the manifest file is present or we wouldn't be this far
assert (manifest_file := get_manifest_file_from_tarball(tarball))
manifest = json.load(manifest_file)
for config_path in manifest.get_config_path_from_manifest(manifest):
assert (config_file := get_config_file_from_tarball(tarball, config_path))
config = json.load(config_file)
image_info[root_key].append(config)
return image_info
Loading