Skip to content

Commit

Permalink
Merge branch 'jsonl_upload' into webui_s3_chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Jan 17, 2025
2 parents 518eec9 + 04b94ba commit b1827d1
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 68 deletions.
40 changes: 37 additions & 3 deletions components/clp-package-utils/clp_package_utils/scripts/compress.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
import configparser
import logging
import pathlib
import subprocess
import sys
import uuid
from typing import List
from typing import List, Tuple

from clp_py_utils.clp_config import CLPConfig, StorageEngine
from clp_py_utils.s3_utils import parse_aws_credentials_file
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
Expand All @@ -26,6 +26,40 @@
logger = logging.getLogger(__file__)


def _parse_aws_credentials_file(credentials_file_path: pathlib.Path, user: str) -> Tuple[str, str]:
"""
Parses the `aws_access_key_id` and `aws_secret_access_key` of `user` from the given
credentials_file_path.
:param credentials_file_path:
:param user:
:return: A tuple of (aws_access_key_id, aws_secret_access_key)
:raises: ValueError if the file doesn't exist, or doesn't contain valid aws credentials.
"""

if not credentials_file_path.exists():
raise ValueError(f"'{credentials_file_path}' doesn't exist.")

config_reader = configparser.ConfigParser()
config_reader.read(credentials_file_path)

if not config_reader.has_section(user):
raise ValueError(f"User '{user}' doesn't exist.")

user_credentials = config_reader[user]
if "aws_session_token" in user_credentials:
raise ValueError(f"Session tokens (short-term credentials) are not supported.")

aws_access_key_id = user_credentials.get("aws_access_key_id")
aws_secret_access_key = user_credentials.get("aws_secret_access_key")

if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"The credentials file must contain both aws_access_key_id and aws_secret_access_key."
)

return aws_access_key_id, aws_secret_access_key


def _generate_logs_list(
container_logs_list_path: pathlib.Path,
parsed_args: argparse.Namespace,
Expand Down Expand Up @@ -92,7 +126,7 @@ def _generate_compress_cmd(
aws_secret_access_key = parsed_args.aws_secret_access_key
if parsed_args.aws_credentials_file:
default_credentials_user = "default"
aws_access_key_id, aws_secret_access_key = parse_aws_credentials_file(
aws_access_key_id, aws_secret_access_key = _parse_aws_credentials_file(
pathlib.Path(parsed_args.aws_credentials_file), default_credentials_user
)
if aws_access_key_id and aws_secret_access_key:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ def generic_start_worker(
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"WORKER_LOG_PATH={container_worker_log_path}",
"-e", f"CLP_WORKER_LOG_PATH={container_worker_log_path}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on
Expand Down
8 changes: 4 additions & 4 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,16 @@ class StreamFsStorage(FsStorage):


class ArchiveS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_archives"
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives"


class StreamS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_streams"
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams"


def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
storage_type = storage_config.type
if StorageType.FS == storage_config.type:
if StorageType.FS == storage_type:
return storage_config.directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
Expand Down Expand Up @@ -600,7 +600,7 @@ def validate_stream_output_dir(self):
try:
validate_path_could_be_dir(self.stream_output.get_directory())
except ValueError as ex:
raise ValueError(f"stream_output.directory is invalid: {ex}")
raise ValueError(f"stream_output.storage's directory is invalid: {ex}")

def validate_data_dir(self):
try:
Expand Down
37 changes: 0 additions & 37 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import configparser
import re
from pathlib import Path
from typing import List, Tuple
Expand All @@ -14,42 +13,6 @@
AWS_ENDPOINT = "amazonaws.com"


def parse_aws_credentials_file(
credentials_file_path: Path, user: str = "default"
) -> Tuple[str, str]:
"""
Parses the `aws_access_key_id` and `aws_secret_access_key` of `user` from the given
credentials_file_path.
:param credentials_file_path:
:param user:
:return: A tuple of (aws_access_key_id, aws_secret_access_key)
:raises: ValueError if the file doesn't exist, or doesn't contain valid aws credentials.
"""

if not credentials_file_path.exists():
raise ValueError(f"'{credentials_file_path}' doesn't exist.")

config_reader = configparser.ConfigParser()
config_reader.read(credentials_file_path)

if not config_reader.has_section(user):
raise ValueError(f"User '{user}' doesn't exist.")

user_credentials = config_reader[user]
if "aws_session_token" in user_credentials:
raise ValueError(f"Session tokens (short-term credentials) are not supported.")

aws_access_key_id = user_credentials.get("aws_access_key_id")
aws_secret_access_key = user_credentials.get("aws_secret_access_key")

if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"The credentials file must contain both aws_access_key_id and aws_secret_access_key."
)

return aws_access_key_id, aws_secret_access_key


def parse_s3_url(s3_url: str) -> Tuple[str, str, str]:
"""
Parses the region_code, bucket, and key_prefix from the given S3 URL.
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ auto CommandLineArguments::parse_ir_extraction_arguments(
)(
"print-ir-stats",
po::bool_switch(&m_print_ir_stats),
"Print statistics (ndjson) about each IR file as it's extracted"
"Print statistics (ndjson) about each IR file after it's extracted"
);
// clang-format on

Expand Down
8 changes: 7 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {

m_archive_path = archive_path.string();
if (false == std::filesystem::create_directory(m_archive_path, ec)) {
throw OperationFailed(ErrorCodeErrno, __FILENAME__, __LINE__);
SPDLOG_ERROR(
"Failed to create archive directory \"{}\" - ({}) {}",
m_archive_path,
ec.message(),
ec.value()
);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}

std::string var_dict_path = m_archive_path + constants::cArchiveVarDictFile;
Expand Down
30 changes: 16 additions & 14 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,23 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
throw std::invalid_argument("No output directory specified");
}

if (0 != m_target_ordered_chunk_size && false == m_ordered_decompression) {
throw std::invalid_argument(
"target-ordered-chunk-size must be used with ordered argument"
);
}
if (false == m_ordered_decompression) {
if (0 != m_target_ordered_chunk_size) {
throw std::invalid_argument(
"target-ordered-chunk-size must be used with ordered argument"
);
}

if (m_print_ordered_chunk_stats && false == m_ordered_decompression) {
throw std::invalid_argument(
"print-ordered-chunk-stats must be used with ordered argument"
);
if (m_print_ordered_chunk_stats) {
throw std::invalid_argument(
"print-ordered-chunk-stats must be used with ordered argument"
);
}

if (false == m_mongodb_uri.empty()) {
throw std::invalid_argument("Recording decompression metadata only supported"
" for ordered decompression");
}
}

// We use xor to check that these arguments are either both specified or both
Expand All @@ -510,11 +517,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
);
}

if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) {
throw std::invalid_argument(
"Recording decompression metadata only supported for ordered decompression"
);
}
} else if ((char)Command::Search == command_input) {
std::string archives_dir;
std::string query;
Expand Down
3 changes: 1 addition & 2 deletions components/core/src/clp_s/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ bool is_multi_file_archive(std::string_view const path) {
|| constants::cArchiveVarDictFile == formatted_name
|| constants::cArchiveLogDictFile == formatted_name
|| constants::cArchiveArrayDictFile == formatted_name
|| constants::cArchiveTableMetadataFile == formatted_name
|| constants::cArchiveTablesFile == formatted_name)
|| constants::cArchiveTableMetadataFile == formatted_name)
{
continue;
} else {
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/archive_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ constexpr char cArchiveSchemaTreeFile[] = "/schema_tree";

// Encoded record table files
constexpr char cArchiveTableMetadataFile[] = "/table_metadata";
constexpr char cArchiveTablesFile[] = "/tables";
constexpr char cArchiveTablesFile[] = "/0";

// Dictionary files
constexpr char cArchiveArrayDictFile[] = "/array.dict";
Expand Down
7 changes: 6 additions & 1 deletion components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,12 @@ int main(int argc, char const* argv[]) {
}

if (CommandLineArguments::Command::Compress == command_line_arguments.get_command()) {
if (false == compress(command_line_arguments)) {
try {
if (false == compress(command_line_arguments)) {
return 1;
}
} catch (std::exception const& e) {
SPDLOG_ERROR("Encountered error during compression - {}", e.what());
return 1;
}
} else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ FROM ubuntu:focal AS BASE
# TODO: Investigate why libssl-dev is a hidden dependency of clp-s
RUN apt-get update \
&& apt-get install -y \
libcurl4 \
libmariadb-dev \
libssl-dev

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def extract_stream(
start_time=start_time,
)

task_results, task_stdout_as_str = run_query_task(
task_results, task_stdout_str = run_query_task(
sql_adapter=sql_adapter,
logger=logger,
clp_logs_dir=clp_logs_dir,
Expand All @@ -167,7 +167,7 @@ def extract_stream(
logger.info(f"Uploading streams to S3...")

upload_error = False
for line in task_stdout_as_str.splitlines():
for line in task_stdout_str.splitlines():
try:
stream_stats = json.loads(line)
except json.decoder.JSONDecodeError:
Expand All @@ -183,6 +183,9 @@ def extract_stream(

stream_path = Path(stream_path_str)

# If we've had a single upload error, we don't want to try uploading any other streams
# since that may unnecessarily slow down the task and generate a lot of extraneous
# output.
if not upload_error:
stream_name = stream_path.name
logger.info(f"Uploading stream {stream_name} to S3...")
Expand All @@ -198,7 +201,7 @@ def extract_stream(

if upload_error:
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = str(os.getenv("WORKER_LOG_PATH"))
task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
else:
logger.info(f"Finished uploading streams.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \
ca-certificates \
checkinstall \
curl \
libcurl4 \
libmariadb-dev \
libssl-dev \
python3 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \
ca-certificates \
checkinstall \
curl \
libcurl4 \
libmariadb-dev \
libssl-dev \
python3 \
Expand Down

0 comments on commit b1827d1

Please sign in to comment.