Skip to content

Commit

Permalink
replace db in memory with persistent db
Browse files Browse the repository at this point in the history
  • Loading branch information
hrshdhgd committed Aug 13, 2024
1 parent e07bbef commit faabc55
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 120 deletions.
1 change: 1 addition & 0 deletions kg_microbe_merge/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
RAW_DATA_DIR = DATA_DIR / "raw"
MERGED_DATA_DIR = DATA_DIR / "merged"
MERGED_GRAPH_STATS_FILE = MERGED_DATA_DIR / "merged_graph_stats.yaml"
TMP_DIR = DATA_DIR / "duckdb_temp"
240 changes: 120 additions & 120 deletions kg_microbe_merge/utils/duckdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import duckdb

from kg_microbe_merge.constants import TMP_DIR


def get_table_count(con, table):
"""Get the number of rows of a given duckdb table name."""
Expand Down Expand Up @@ -265,19 +267,6 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz
"""
Merge nodes files using DuckDB with batching for large datasets.
:param nodes_file_list: List of paths to nodes files.
:param output_file: Path to the output file.
:param priority_sources: List of source names to prioritize.
"""
# Create a DuckDB connection
conn = duckdb.connect("nodes.db")

# Load the files into DuckDB
load_into_duckdb(conn, nodes_file_list, "combined_nodes")

priority_sources_str = ", ".join(f"''{source}''" for source in priority_sources)

"""
Construct the query to merge the nodes
This query performs the following operations:
Expand Down Expand Up @@ -309,7 +298,19 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz
- Reducing memory usage by processing subsets of data at a time.
- Maintaining the same aggregation logic as the original query.
- Ensuring consistent output formatting across all batches.
:param nodes_file_list: List of paths to nodes files.
:param output_file: Path to the output file.
:param priority_sources: List of source names to prioritize.
"""
# Create a DuckDB connection
conn = duckdb.connect("nodes.db")

# Load the files into DuckDB
load_into_duckdb(conn, nodes_file_list, "combined_nodes")

priority_sources_str = ", ".join(f"''{source}''" for source in priority_sources)

try:
# Construct the query to get columns and their aggregation expressions
query = f"""
Expand Down Expand Up @@ -381,132 +382,131 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz
os.remove("nodes.db")


def duckdb_edges_merge(edges_file_list, output_file, batch_size=2000000):
def duckdb_edges_merge(edges_file_list, output_file, batch_size=1000000):
"""
Merge edges files using DuckDB.
Merge edges files using DuckDB with a disk-based approach for improved memory efficiency.
:param edges_file_list: List of paths to edges files.
:param output_file: Path to the output file.
"""
# Create a DuckDB connection
conn = duckdb.connect("edges.db")
:param batch_size: Number of edges to process in each batch.
# Load the files into DuckDB, excluding the 'id' column
load_into_duckdb(conn, edges_file_list, "combined_edges", exclude_columns=["id"])
Detailed Explanation:
"""
Detailed Explanation:
1. Column Information Retrieval:
- The function retrieves column names from the combined_edges table using SQL.
- This information is used to dynamically construct the aggregation query.
2. Query Construction:
- Aggregation expressions are built for each column:
- For 'subject', 'predicate', and 'object', they are kept as is with a 'ce.' prefix.
- For other columns, a string_agg expression is created to concatenate distinct values.
- These expressions are joined into a single string for use in the SQL query.
3. Batch Processing:
- The total number of edges is determined.
- Edges are processed in batches to handle large datasets efficiently.
- For each batch:
- A batch-specific query is constructed using a CTE (Common Table Expression).
- The batch query selects distinct edges and joins them with the full dataset.
- Results are grouped and ordered by subject, predicate, and object.
4. Data Writing:
- For the first batch, the query is printed for debugging and results are written to the output file.
- For subsequent batches, results are appended to the output file.
- Progress is printed after each batch.
5. Error Handling:
- Any DuckDB errors are caught and reported, along with the generated query for debugging.
6. Resource Management:
- The database connection is properly closed in the 'finally' block.
- The temporary database file is removed after processing.
This approach allows for efficient processing of large edge datasets by using batch processing
and constructing the query dynamically based on the table structure. It handles potential memory
issues by processing data in manageable chunks.
"""
1. Initial Setup:
- The function connects to a persistent DuckDB database on disk.
- It loads the edge files into a table using a memory-mapped approach.
try:
# Get column names
columns = conn.execute(
"SELECT column_name FROM information_schema.columns WHERE table_name = 'combined_edges'"
).fetchall()
2. Temporary Table Creation:
- A temporary table is created with distinct subject, predicate, and object combinations.
- This table is stored on disk using a memory-mapped file.
# Construct aggregation expressions
# Construct aggregation expressions
agg_expressions = []
for column in columns:
column_name = column[0]
if column_name in ("subject", "predicate", "object"):
agg_expressions.append(f"ce.{column_name}")
else:
agg_expressions.append(
f"string_agg(DISTINCT ce.{column_name}, '|' ORDER BY ce.{column_name}) AS {column_name}"
)
3. Batched Column-wise Processing:
- For each non-key column (not subject, predicate, or object):
- The column is added to the temporary table.
- An UPDATE statement aggregates the distinct values for each edge in batches.
# Join expressions into a single string
agg_expressions_str = ", ".join(agg_expressions)
4. Result Writing:
- The final results are written to the output file directly from the temporary table.
# Construct the final query
query = f"""
SELECT {agg_expressions_str}
FROM combined_edges
GROUP BY subject, predicate, object
ORDER BY subject, predicate, object
"""
5. Error Handling and Cleanup:
- Any DuckDB errors are caught and reported.
- The database connection is closed and temporary files are removed.
# Get total number of unique edges
total_edges = conn.execute("SELECT COUNT(*) FROM combined_edges").fetchone()[0]
This approach processes data in batches and uses disk storage, which significantly reduces
memory usage and allows for processing of very large datasets that exceed available RAM.
"""
os.makedirs(TMP_DIR, exist_ok=True)
db_file = "edges_persistent.db"
conn = duckdb.connect(db_file)

# Process in batches
for offset in range(0, total_edges, batch_size):
batch_query = f"""
WITH batch_edges AS (
SELECT DISTINCT subject, predicate, object
FROM combined_edges
ORDER BY subject, predicate, object
LIMIT {batch_size} OFFSET {offset}
)
SELECT {agg_expressions_str}
FROM combined_edges ce
INNER JOIN batch_edges be
ON ce.subject = be.subject
AND ce.predicate = be.predicate
AND ce.object = be.object
GROUP BY ce.subject, ce.predicate, ce.object
ORDER BY ce.subject, ce.predicate, ce.object
"""
try:
# Enable memory-mapped storage for temporary tables
conn.execute(f"PRAGMA temp_directory='{TMP_DIR}'") # Store temp files in the same directory
conn.execute("PRAGMA memory_limit='4GB'") # Adjust based on available system memory

# Print the generated SQL for debugging
if offset == 0:
print("Generated SQL query (for first batch):")
print(batch_query)
conn.execute(f"COPY ({batch_query}) TO '{output_file}' (HEADER, DELIMITER '\t')")
else:
batch_data = conn.execute(batch_query).fetch_df()
batch_data.to_csv(output_file, mode="a", sep="\t", header=False, index=False)
# Load the files into DuckDB, excluding the 'id' column
load_into_duckdb(conn, edges_file_list, "combined_edges", exclude_columns=["id"])

print(f"Written {min(offset + batch_size, total_edges)} / {total_edges} edges")
# Get column names
columns = conn.execute(
"SELECT column_name FROM information_schema.columns WHERE table_name = 'combined_edges'"
).fetchall()
column_names = [col[0] for col in columns]

# Print the generated SQL for debugging
print("Generated SQL query:")
print(batch_query)
# Create a temporary table for storing intermediate results
conn.execute(
"""
CREATE TABLE temp_edges AS
SELECT DISTINCT subject, predicate, object
FROM combined_edges
"""
)

# Execute the final query and save the result
conn.execute(f"COPY ({batch_query}) TO '{output_file}' (HEADER, DELIMITER '\t')")
# Process non-key columns in batches
for column in column_names:
if column not in ("subject", "predicate", "object"):
conn.execute(f"ALTER TABLE temp_edges ADD COLUMN {column} STRING")

# Process in batches
offset = 0
while True:
batch_update = conn.execute(
f"""
UPDATE temp_edges
SET {column} = (
SELECT string_agg(DISTINCT ce.{column}, '|' ORDER BY ce.{column})
FROM (
SELECT subject, predicate, object, {column}
FROM combined_edges
LIMIT {batch_size} OFFSET {offset}
) ce
WHERE ce.subject = temp_edges.subject
AND ce.predicate = temp_edges.predicate
AND ce.object = temp_edges.object
)
WHERE temp_edges.rowid IN (
SELECT rowid
FROM temp_edges
LIMIT {batch_size} OFFSET {offset}
)
"""
)

if batch_update.fetchone()[0] == 0:
break

offset += batch_size
print(f"Processed {offset} rows for column {column}")

# Write results to file in batches
with open(output_file, "w") as f:
# Write header
header = conn.execute("SELECT * FROM temp_edges LIMIT 0").fetchdf().columns.tolist()
f.write("\t".join(header) + "\n")

# Write data in batches
offset = 0
while True:
batch = conn.execute(
f"""
SELECT *
FROM temp_edges
ORDER BY subject, predicate, object
LIMIT {batch_size} OFFSET {offset}
"""
).fetchdf()

if batch.empty:
break

batch.to_csv(f, sep="\t", header=False, index=False, mode="a")
offset += batch_size
print(f"Written {offset} rows to output file")

print(f"Merged file has been created as '{output_file}'")

except duckdb.Error as e:
print(f"An error occurred: {e}")
print("Generated query was:")
print(query)
finally:
# Close the connection
conn.close()
os.remove("edges.db")
os.remove(db_file)

0 comments on commit faabc55

Please sign in to comment.