Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log-viewer-webui): Add support for loading IR/JSON streams from S3. #673

Merged
merged 51 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
28b597f
Initial backup
haiqi96 Jan 9, 2025
75e32ea
Adding
haiqi96 Jan 9, 2025
a08c558
Adding binary side support
haiqi96 Jan 9, 2025
dda24c5
Fix for binary support
haiqi96 Jan 10, 2025
722430c
Support s3 uploading for extraction task
haiqi96 Jan 10, 2025
3624586
C++ linter
haiqi96 Jan 10, 2025
85a832b
python linter
haiqi96 Jan 10, 2025
b970950
Use some subclass trick
haiqi96 Jan 13, 2025
0d34484
Merge branch 'main' into jsonl_upload
haiqi96 Jan 13, 2025
fdf8de3
Linter and other rearrangements
haiqi96 Jan 13, 2025
f933099
Merge branch 'main' into jsonl_upload
haiqi96 Jan 13, 2025
e506044
Remove unnecessary print
haiqi96 Jan 13, 2025
5aac350
Missing changes
haiqi96 Jan 13, 2025
5735449
Initial working version
haiqi96 Jan 14, 2025
bb5f9cc
simple renaming
haiqi96 Jan 15, 2025
2e47b72
Rename arguments for now
haiqi96 Jan 16, 2025
7481d53
Add default value
haiqi96 Jan 16, 2025
285384c
Address code review comments.
haiqi96 Jan 16, 2025
856664f
Apply suggestions from code review
haiqi96 Jan 16, 2025
473a613
Merge branch 'main' into jsonl_upload
haiqi96 Jan 16, 2025
7340c71
Update to work with latest codebase
haiqi96 Jan 16, 2025
9b09987
Merge branch 'jsonl_upload' into webui_s3_chunks
haiqi96 Jan 16, 2025
c6a1ad2
Update config (it's so small so I don't bother to make it a separate PR)
haiqi96 Jan 16, 2025
8a295df
Merge branch 'jsonl_upload' into webui_s3_chunks
haiqi96 Jan 16, 2025
ff14a5b
Linter
haiqi96 Jan 17, 2025
429b976
Merge branch 'jsonl_upload' into webui_s3_chunks
haiqi96 Jan 17, 2025
ef7fd82
Use environmental variable to handle S3 credentials for log viewer webui
haiqi96 Jan 17, 2025
518eec9
Attempt to fix linter
haiqi96 Jan 17, 2025
b2c842b
Merge branch 'main' into jsonl_upload
haiqi96 Jan 17, 2025
05bf3f6
Apply suggestions from code review
haiqi96 Jan 17, 2025
b6c8e69
Resolve simpler comments
haiqi96 Jan 17, 2025
db47554
Combine argument checking
haiqi96 Jan 17, 2025
4098ee5
fix stupid mistake
haiqi96 Jan 17, 2025
04b94ba
Add CLP prefix to env
haiqi96 Jan 17, 2025
b1827d1
Merge branch 'jsonl_upload' into webui_s3_chunks
haiqi96 Jan 17, 2025
abaabd8
Address review comment
haiqi96 Jan 17, 2025
2da0c15
Apply suggestions from code review
haiqi96 Jan 17, 2025
3ba8b89
Merge branch 'webui_s3_chunks' of https://github.com/haiqi96/clp_fork…
haiqi96 Jan 17, 2025
cf6a6c1
Update AWS sdk version
haiqi96 Jan 17, 2025
812de08
Use constant as requested in code review
haiqi96 Jan 17, 2025
59751c0
Linter
haiqi96 Jan 17, 2025
f37f40e
Merge branch 'main' into webui_s3_chunks
haiqi96 Jan 17, 2025
8e47f61
Add error check for S3 URL generation
haiqi96 Jan 17, 2025
a537b73
Use Null for optional values
haiqi96 Jan 18, 2025
78625a3
Linter
haiqi96 Jan 18, 2025
e8560d4
Apply suggestions from code review
haiqi96 Jan 19, 2025
33a7f8b
Address comments
haiqi96 Jan 19, 2025
dca12aa
update api versions
haiqi96 Jan 19, 2025
49fa164
fix docstrings
haiqi96 Jan 19, 2025
3a31483
Update components/log-viewer-webui/server/settings.json
kirkrodrigues Jan 20, 2025
1476f7b
Apply suggestions from code review
kirkrodrigues Jan 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
Expand Down Expand Up @@ -254,17 +253,17 @@ def generate_container_config(
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
container_clp_config.stream_output.set_directory(pathlib.Path("/") / "mnt" / "stream-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
clp_config.stream_output.get_directory(),
container_clp_config.stream_output.get_directory(),
):
docker_mounts.stream_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
clp_config.stream_output.get_directory(),
container_clp_config.stream_output.get_directory(),
)

return container_clp_config, docker_mounts
Expand All @@ -276,7 +275,7 @@ def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_output = clp_config.stream_output
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,10 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.stream_output.get_directory().mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
container_worker_log_path = container_logs_dir / "worker.log"
# fmt: off
container_start_cmd = [
"docker", "run",
Expand All @@ -729,6 +730,7 @@ def generic_start_worker(
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"WORKER_LOG_PATH={container_worker_log_path}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on
Expand Down Expand Up @@ -760,7 +762,7 @@ def generic_start_worker(
"--loglevel",
"WARNING",
"-f",
str(container_logs_dir / "worker.log"),
str(container_worker_log_path),
"-Q",
celery_route,
"-n",
Expand Down Expand Up @@ -922,10 +924,29 @@ def start_log_viewer_webui(
"MongoDbName": clp_config.results_cache.db_name,
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"StreamFilesDir": str(container_clp_config.stream_output.get_directory()),
"StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size,
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}

stream_storage = clp_config.stream_output.storage
stream_storage_env_vars = None
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
if StorageType.S3 == stream_storage.type:
s3_config = stream_storage.s3_config

settings_json_updates["StreamFilesS3Region"] = s3_config.region_code
settings_json_updates["StreamFilesS3PathPrefix"] = (
f"{s3_config.bucket}/{s3_config.key_prefix}"
)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
access_key_id, secret_access_key = s3_config.get_credentials()
if access_key_id is not None and secret_access_key is not None:
stream_storage_env_vars = [
"-e",
f"AWS_ACCESS_KEY_ID={access_key_id}",
"-e",
f"AWS_SECRET_ACCESS_KEY={secret_access_key}",
]

settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
with open(settings_json_path, "w") as settings_json_file:
settings_json_file.write(json.dumps(settings_json))
Expand All @@ -947,6 +968,10 @@ def start_log_viewer_webui(
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on

if stream_storage_env_vars is not None:
container_cmd.extend(stream_storage_env_vars)

necessary_mounts = [
mounts.clp_home,
mounts.stream_output_dir,
Expand Down
117 changes: 79 additions & 38 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
from enum import auto
from typing import Literal, Optional, Union
from typing import Literal, Optional, Tuple, Union

from dotenv import dotenv_values
from pydantic import BaseModel, PrivateAttr, validator
Expand Down Expand Up @@ -40,6 +40,7 @@
OS_RELEASE_FILE_PATH = pathlib.Path("etc") / "os-release"

CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path("etc") / "credentials.yml"
CLP_DEFAULT_DATA_DIRECTORY_PATH = pathlib.Path("var") / "data"
CLP_METADATA_TABLE_PREFIX = "clp_"


Expand Down Expand Up @@ -309,13 +310,29 @@ class Queue(BaseModel):
password: Optional[str]


class S3Credentials(BaseModel):
access_key_id: str
secret_access_key: str

@validator("access_key_id")
def validate_access_key_id(cls, field):
if field == "":
raise ValueError("access_key_id cannot be empty")
return field

@validator("secret_access_key")
def validate_secret_access_key(cls, field):
if field == "":
raise ValueError("secret_access_key cannot be empty")
return field


class S3Config(BaseModel):
region_code: str
bucket: str
key_prefix: str

access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None
credentials: Optional[S3Credentials]

@validator("region_code")
def validate_region_code(cls, field):
Expand All @@ -337,10 +354,15 @@ def validate_key_prefix(cls, field):
raise ValueError('key_prefix must end with "/"')
return field

def get_credentials(self) -> Tuple[Optional[str], Optional[str]]:
if self.credentials is None:
return None, None
return self.credentials.access_key_id, self.credentials.secret_access_key


class FsStorage(BaseModel):
type: Literal[StorageType.FS.value] = StorageType.FS.value
directory: pathlib.Path = pathlib.Path("var") / "data" / "archives"
directory: pathlib.Path

@validator("directory")
def validate_directory(cls, field):
Expand All @@ -359,7 +381,7 @@ def dump_to_primitive_dict(self):

class S3Storage(BaseModel):
type: Literal[StorageType.S3.value] = StorageType.S3.value
staging_directory: pathlib.Path = pathlib.Path("var") / "data" / "staged_archives"
staging_directory: pathlib.Path
s3_config: S3Config

@validator("staging_directory")
Expand All @@ -377,8 +399,46 @@ def dump_to_primitive_dict(self):
return d


class ArchiveFsStorage(FsStorage):
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives"


class StreamFsStorage(FsStorage):
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams"


class ArchiveS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_archives"


class StreamS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_streams"


def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
storage_type = storage_config.type
if StorageType.FS == storage_config.type:
return storage_config.directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")


def _set_directory_for_storage_config(
storage_config: Union[FsStorage, S3Storage], directory
) -> None:
storage_type = storage_config.type
if StorageType.FS == storage_type:
storage_config.directory = directory
elif StorageType.S3 == storage_type:
storage_config.staging_directory = directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")


class ArchiveOutput(BaseModel):
storage: Union[FsStorage, S3Storage] = FsStorage()
storage: Union[ArchiveFsStorage, ArchiveS3Storage] = ArchiveFsStorage()
target_archive_size: int = 256 * 1024 * 1024 # 256 MB
target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB
target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB
Expand Down Expand Up @@ -409,55 +469,36 @@ def validate_target_segment_size(cls, field):
return field

def set_directory(self, directory: pathlib.Path):
storage_config = self.storage
storage_type = storage_config.type
if StorageType.FS == storage_type:
storage_config.directory = directory
elif StorageType.S3 == storage_type:
storage_config.staging_directory = directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")
_set_directory_for_storage_config(self.storage, directory)

def get_directory(self) -> pathlib.Path:
storage_config = self.storage
storage_type = storage_config.type
if StorageType.FS == storage_config.type:
return storage_config.directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")
return _get_directory_from_storage_config(self.storage)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["storage"] = self.storage.dump_to_primitive_dict()
return d


class StreamOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "streams"
storage: Union[StreamFsStorage, StreamS3Storage] = StreamFsStorage()
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory cannot be empty")
return field

@validator("target_uncompressed_size")
def validate_target_uncompressed_size(cls, field):
if field <= 0:
raise ValueError("target_uncompressed_size must be greater than 0")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)
def set_directory(self, directory: pathlib.Path):
_set_directory_for_storage_config(self.storage, directory)

def get_directory(self) -> pathlib.Path:
return _get_directory_from_storage_config(self.storage)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
d["storage"] = self.storage.dump_to_primitive_dict()
return d


Expand Down Expand Up @@ -527,7 +568,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.storage.make_config_paths_absolute(clp_home)
self.stream_output.make_config_paths_absolute(clp_home)
self.stream_output.storage.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
Expand Down Expand Up @@ -557,7 +598,7 @@ def validate_archive_output_config(self):

def validate_stream_output_dir(self):
try:
validate_path_could_be_dir(self.stream_output.directory)
validate_path_could_be_dir(self.stream_output.get_directory())
except ValueError as ex:
raise ValueError(f"stream_output.directory is invalid: {ex}")

Expand Down Expand Up @@ -643,7 +684,7 @@ class WorkerConfig(BaseModel):
data_directory: pathlib.Path = CLPConfig().data_directory

# Only needed by query workers.
stream_output_dir: pathlib.Path = StreamOutput().directory
stream_output: StreamOutput = StreamOutput()
stream_collection_name: str = ResultsCache().stream_collection_name

def dump_to_primitive_dict(self):
Expand All @@ -652,6 +693,6 @@ def dump_to_primitive_dict(self):

# Turn paths into primitive strings
d["data_directory"] = str(self.data_directory)
d["stream_output_dir"] = str(self.stream_output_dir)
d["stream_output"] = self.stream_output.dump_to_primitive_dict()

return d
5 changes: 3 additions & 2 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,13 @@ def s3_put(
)

config = Config(retries=dict(total_max_attempts=total_max_attempts, mode="adaptive"))
aws_access_key_id, aws_secret_access_key = s3_config.get_credentials()

my_s3_client = boto3.client(
"s3",
region_name=s3_config.region_code,
aws_access_key_id=s3_config.access_key_id,
aws_secret_access_key=s3_config.secret_access_key,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
config=config,
)

Expand Down
17 changes: 11 additions & 6 deletions components/core/src/clp/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ auto CommandLineArguments::parse_ir_extraction_arguments(
// Define IR extraction options
po::options_description options_ir_extraction("IR Extraction Options");
// clang-format off
options_ir_extraction
.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)->value_name("SIZE"),
"Target size (B) for each IR chunk before a new chunk is created"
);
options_ir_extraction.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)
->value_name("SIZE")
->default_value(m_ir_target_size),
"Target size (B) for each IR chunk before a new chunk is created"
)(
"print-ir-stats",
po::bool_switch(&m_print_ir_stats),
"Print statistics (ndjson) about each IR file as it's extracted"
);
// clang-format on

// Define visible options
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clo/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {
[[nodiscard]] auto get_archive_path() const -> std::string_view { return m_archive_path; }

// IR extraction arguments
[[nodiscard]] auto print_ir_stats() const -> bool { return m_print_ir_stats; }

[[nodiscard]] auto get_file_split_id() const -> std::string const& { return m_file_split_id; }

[[nodiscard]] size_t get_ir_target_size() const { return m_ir_target_size; }
Expand Down Expand Up @@ -180,6 +182,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_archive_path;

// Variables for IR extraction
bool m_print_ir_stats{false};
std::string m_file_split_id;
size_t m_ir_target_size{128ULL * 1024 * 1024};
std::string m_ir_output_dir;
Expand Down
Loading
Loading