Skip to content

Commit

Permalink
minor updates, doc update, preparing for upload to pypi
Browse files Browse the repository at this point in the history
  • Loading branch information
hesspnnl committed Jan 28, 2025
1 parent f267a84 commit 3802d3b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 152 deletions.
11 changes: 3 additions & 8 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ NMDC Notebook Tools Usage Notes

Welcome to NMDC Notebook Tools usage notes. Here you will find helpful information on how to use the tools provided by this package.

This module provides two different forms of filtering:

1. `study` module: Filters using a comma-separated format.
2. `collection` module: Filters in a MongoDB-like fashion.

Refer to the respective module documentation for more details:
- `study` module: :mod:`study`
- `collection` module: :mod:`collection`
Filtering should be formatted in MongoDB query syntax. The function build_filter in the DataProcessing class can be used to help build filters. Additonally, there are available functions in the CollectionSearch class that can be used to filter data without having to pass in a specific filter.
The CollectionSearch class is a foundational component that defines common behaviors and properties between collections. Each subclass is designed to be more user-friendly and specific for certain collections, making them the recommended entry points for using the package. Each function of CollectionSearch can be accessed via each subclass.
The subclasses will prefill necessary information for the user, such as the collection name. This will allow the user to focus on the data they are interested in, rather than the specifics of the collection.
68 changes: 50 additions & 18 deletions nmdc_notebook_tools/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,66 @@ def rename_columns(self, df: pd.DataFrame, new_col_names: list) -> pd.DataFrame:
df.columns = new_col_names
return df

def merge_dataframes(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
return pd.merge(df1, df2, on="common_column", how="inner")
def merge_dataframes(
self, column: str, df1: pd.DataFrame, df2: pd.DataFrame
) -> pd.DataFrame:
"""
Merge two dataframes.
params:
column: str
The column to merge on.
df1: pd.DataFrame
The first dataframe to merge.
df2: pd.DataFrame
The second dataframe to merge.
returns:
pd.DataFrame
"""
return pd.merge(df1, df2, on=column, how="inner")

## Define a merging function to join results
# This function merges new results with the previous results that were used for the new API request. It uses two keys from each result to match on. `df1`
# is the data frame whose matching `key1` value is a STRING. `df2` is the other data frame whose matching `key2` has either a string OR list as a value.
# df1_explode_list and df2_explode_list are optional lists of columns in either dataframe that need to be exploded because they are lists (this is because
# drop_duplicates cant take list input in any column). Note that each if statement includes dropping duplicates after merging as the dataframes are being
# exploded which creates many duplicate rows after merging takes place.
def merge_df(
self,
df1,
df2,
key1: str,
key2: str,
df1_explode_list=None,
df2_explode_list=None,
):
if df1_explode_list is not None:
# Explode the lists in the df (necessary for drop duplicates)
for list in df1_explode_list:
df1 = df1.explode(list)
if df2_explode_list is not None:
# Explode the lists in the df (necessary for drop duplicates)
for list in df2_explode_list:
df2 = df2.explode(list)
"""
Define a merging function to join results
This function merges new results with the previous results that were used for the new API request. It uses two keys from each result to match on.
params:
df1 and df2 are the two dataframes that need to be merged.
key1 is the column name in df1 that will be used to match with `key2` in `df2`.
This function automatically identifies columns that need to be exploded because they contain list-like elements, as drop_duplicates can't handle list elements.
"""

def identify_and_explode(df):
for col in df.columns:
if any(isinstance(item, list) for item in df[col]):
df = df.explode(col)
return df

df1 = identify_and_explode(df1)
df2 = identify_and_explode(df2)

# Merge dataframes
merged_df = pd.merge(df1, df2, left_on=key1, right_on=key2)
# Drop any duplicated rows
merged_df.drop_duplicates(keep="first", inplace=True)
return merged_df

def build_filter(self, attributes):
"""
Create a MongoDB filter using $regex for each attribute in the input dictionary. For nested attributes, use dot notation.
Parameters:
attributes (dict): Dictionary of attribute names and their corresponding values to match using regex.
Example: {"name": "example", "description": "example", "geo_loc_name": "example"}
Returns:
dict: A MongoDB filter dictionary.
"""
filter_dict = {}
for attribute_name, attribute_value in attributes.items():
filter_dict[attribute_name] = {"$regex": attribute_value}
return self.string_mongo_list(filter_dict)
122 changes: 0 additions & 122 deletions nmdc_notebook_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,125 +10,3 @@
class Utils:
def __init__(self):
pass

def string_mongo_list(self, data: list) -> str:
"""
Convert elements in a list to use double quotes instead of single quotes.
This is required for mongo queries.
"""
return str(data).replace("'", '"')

def split_list(self, data: list, chunk_size: int) -> list:
return [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]

def get_id_list(self, data: list, id_name: str) -> list:
"""
Get a list of ids from an api call response json.
"""
return [item[id_name] for item in data]

def get_id_results(
self,
newest_results: list,
id_field: str,
query_collection: str,
match_id_field: str,
query_fields: str,
) -> list:
"""
Get the results from a query collection based on the ids from the newest results.
params:
newest_results: list
The results from the most recent query.
id_field: str
The field in the newest results that contains the ids.
query_collection: str
The collection to query.
match_id_field: str
The field in the query collection that matches the id_field.
query_fields: str
The fields to return in the query.
"""
# Extract IDs and split them into chunks
result_ids = self.get_id_list(newest_results, id_field)
chunked_list = self.split_list(result_ids)

# Function to construct the appropriate filter string
def construct_filter_string(chunk):
filter_string = self.string_mongo_list(chunk)
if "data_object_type" in match_id_field:
return f'{{{match_id_field}: {{"$in": {filter_string}}}}}'
else:
return f'{{"{match_id_field}": {{"$in": {filter_string}}}}}'

# Retrieve and collect results
next_results = []
for chunk in chunked_list:
filter_str = construct_filter_string(chunk)
data = CollectionSearch.get_collection(
query_collection, filter_str, 100, query_fields
)
next_results.extend(data["resources"])

return next_results

def get_all_pages(
self,
response: requests.models.Response,
collection_name: str,
filter: str = "",
max_page_size: int = 100,
fields: str = "",
):
"""
Get all pages of results from an API request.
params:
response: requests.models.Response
The response object from an API request.
collection_name: str
The name of the collection to get results from.
filter: str
The filter to apply to the request. Default is an empty string.
max_page_size: int
The maximum number of results to return per page. Default is 100.
fields: str
The fields to return in the response. Default is an empty string.
"""
results = response.json()
api_client = NMDCSearch()
while True:
if response.json().get("next_page_token"):
next_page_token = response.json()["next_page_token"]
else:
break
url = f"{api_client.base_url}/nmdcschema/{collection_name}?filter={filter}&page_size={max_page_size}&projection={fields}&page_token={next_page_token}"
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error("API request failed", exc_info=True)
raise RuntimeError("Failed to get collection from NMDC API") from e
else:
logging.debug(
f"API request response: {response.json()}\n API Status Code: {response.status_code}"
)
results = {"resources": results["resources"] + response.json()["resources"]}
return results

def build_filter(self, attributes):
"""
Create a MongoDB filter using $regex for each attribute in the input dictionary. For nested attributes, use dot notation.
Parameters:
attributes (dict): Dictionary of attribute names and their corresponding values to match using regex.
Example: {"name": "example", "description": "example", "geo_loc_name": "example"}
Returns:
dict: A MongoDB filter dictionary.
"""
filter_dict = {}
for attribute_name, attribute_value in attributes.items():
filter_dict[attribute_name] = {"$regex": attribute_value}

print(self.string_mongo_list(filter_dict))
return self.string_mongo_list(filter_dict)
7 changes: 7 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pre-commit
pandas
requests
matplotlib
sphinx
sphinx_rtd_theme
pytest
4 changes: 0 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
pre-commit
pandas
requests
matplotlib
sphinx
sphinx_rtd_theme
pytest

0 comments on commit 3802d3b

Please sign in to comment.