Skip to content

Commit

Permalink
fix: handle temporary identifiers deduplication
Browse files Browse the repository at this point in the history
Fixed an issue where records with the same temporary identifier (temp:*) were not being properly deduplicated. The fix ensures that multiple records sharing the same temporary identifier are correctly recognized as the same entity and merged during processing.
  • Loading branch information
arcangelo7 committed Feb 1, 2025
1 parent 500271d commit 258a88f
Show file tree
Hide file tree
Showing 12 changed files with 2,972 additions and 1,505 deletions.
5 changes: 1 addition & 4 deletions oc_meta/core/creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ def __init__(
"wikidata",
"wikipedia",
}
self.temp_schema = {"temp"} # New schema for temporary identifiers
self.schemas = self.ra_id_schemas.union(self.br_id_schemas).union(
self.temp_schema
)
self.schemas = self.ra_id_schemas.union(self.br_id_schemas)

self.ra_index = self.indexer_id(ra_index)
self.br_index = self.indexer_id(br_index)
Expand Down
8 changes: 1 addition & 7 deletions oc_meta/core/curator.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ def clean_id_list(
metaid = ""
id_list = list(filter(None, id_list))
clean_list = list()
temp_ids = list() # List to store temporary identifiers

for elem in id_list:
if elem in clean_list:
Expand All @@ -729,11 +728,7 @@ def clean_id_list(
schema = identifier[0].lower()
value = identifier[1]

if schema == "temp":
# Store temporary identifiers separately for deduplication
temp_ids.append(value)
continue
elif schema == "omid":
if schema == "omid":
metaid = value.replace(pattern, "")
else:
normalized_id = Cleaner(elem).normalize_id(
Expand All @@ -746,7 +741,6 @@ def clean_id_list(
if len(how_many_meta) > 1:
clean_list = [i for i in clean_list if not i.lower().startswith("omid")]

# Use temporary IDs for deduplication but don't include them in clean_list
return clean_list, metaid

def conflict(
Expand Down
462 changes: 323 additions & 139 deletions oc_meta/run/meta_process.py

Large diffs are not rendered by default.

41 changes: 24 additions & 17 deletions oc_meta/run/upload/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,38 @@
import redis
from redis.exceptions import ConnectionError as RedisConnectionError


class CacheManager:
REDIS_DB = 4 # Database di default per Redis
REDIS_KEY = "processed_files" # Chiave per il set Redis

def __init__(self, json_cache_file: str, redis_host: str = 'localhost', redis_port: int = 6379):

def __init__(
self,
json_cache_file: str,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 4,
):
self.json_cache_file = json_cache_file
self._redis = None
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.processed_files: Set[str] = set()

# Inizializza il cache
self._init_cache()

# Registra handlers per graceful shutdown
self._register_shutdown_handlers()

def _init_redis(self) -> None:
"""Inizializza la connessione Redis"""
try:
self._redis = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.REDIS_DB,
decode_responses=True # Assicura che le stringhe siano decodificate
db=self.redis_db,
decode_responses=True, # Assicura che le stringhe siano decodificate
)
self._redis.ping() # Verifica la connessione
except RedisConnectionError:
Expand All @@ -40,12 +47,12 @@ def _init_redis(self) -> None:
def _init_cache(self) -> None:
"""Inizializza il cache da file JSON e Redis"""
self._init_redis()

# Carica dal file JSON
if os.path.exists(self.json_cache_file):
with open(self.json_cache_file, 'r', encoding='utf8') as f:
with open(self.json_cache_file, "r", encoding="utf8") as f:
self.processed_files.update(json.load(f))

# Se Redis è disponibile, sincronizza
if self._redis:
# Carica i dati esistenti da Redis
Expand All @@ -58,7 +65,7 @@ def _init_cache(self) -> None:

def _save_to_json(self) -> None:
"""Salva il cache su file JSON"""
with open(self.json_cache_file, 'w', encoding='utf8') as f:
with open(self.json_cache_file, "w", encoding="utf8") as f:
json.dump(list(self.processed_files), f)

def _register_shutdown_handlers(self) -> None:
Expand All @@ -85,7 +92,7 @@ def _cleanup(self) -> None:
def add(self, filename: str) -> None:
"""
Aggiunge un file al cache
Args:
filename (str): Nome del file da aggiungere
"""
Expand All @@ -96,10 +103,10 @@ def add(self, filename: str) -> None:
def __contains__(self, filename: str) -> bool:
"""
Verifica se un file è nel cache
Args:
filename (str): Nome del file da verificare
Returns:
bool: True se il file è nel cache, False altrimenti
"""
Expand All @@ -108,10 +115,10 @@ def __contains__(self, filename: str) -> bool:
def get_all(self) -> Set[str]:
"""
Restituisce tutti i file nel cache
Returns:
Set[str]: Set di nomi dei file processati
"""
if self._redis:
self.processed_files.update(self._redis.smembers(self.REDIS_KEY))
return self.processed_files
return self.processed_files
132 changes: 84 additions & 48 deletions oc_meta/run/upload/on_triplestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@
from oc_meta.run.upload.triplestore_connection import TriplestoreConnection
from tqdm import tqdm

CACHE_FILE = 'ts_upload_cache.json'
FAILED_QUERIES_FILE = 'failed_queries.txt'
DEFAULT_STOP_FILE = '.stop_upload'

def save_failed_query_file(filename, failed_file):
with open(failed_file, 'a', encoding='utf8') as failed_file:
with open(failed_file, "a", encoding="utf8") as failed_file:
failed_file.write(f"{filename}\n")


def execute_sparql_update(endpoint, query):
attempt = 0
max_attempts = 3
wait_time = 5 # Initial wait time in seconds

connection = TriplestoreConnection(endpoint)

while attempt < max_attempts:
Expand All @@ -31,134 +29,172 @@ def execute_sparql_update(endpoint, query):
except Exception as e:
attempt += 1
if attempt < max_attempts:
print(f"[3] Attempt {attempt} failed. Could not execute SPARQL update due to communication problems: {e}")
print(
f"[3] Attempt {attempt} failed. Could not execute SPARQL update due to communication problems: {e}"
)
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
wait_time *= 2 # Double the wait time for the next attempt
else:
print(f"[3] All {max_attempts} attempts failed. Could not execute SPARQL update due to communication problems: {e}")
print(
f"[3] All {max_attempts} attempts failed. Could not execute SPARQL update due to communication problems: {e}"
)
return False


def generate_sparql_queries(quads_to_add, quads_to_remove, batch_size):
queries = []

if quads_to_add:
for i in range(0, len(quads_to_add), batch_size):
insert_query = 'INSERT DATA {\n'
batch = quads_to_add[i:i+batch_size]
insert_query = "INSERT DATA {\n"
batch = quads_to_add[i : i + batch_size]
for graph in set(q[-1] for q in batch):
insert_query += f' GRAPH {graph} {{\n'
insert_query += f" GRAPH {graph} {{\n"
for quad in batch:
if quad[-1] == graph:
insert_query += ' ' + ' '.join(quad[:-1]) + ' .\n'
insert_query += ' }\n'
insert_query += '}\n'
insert_query += " " + " ".join(quad[:-1]) + " .\n"
insert_query += " }\n"
insert_query += "}\n"
queries.append(insert_query)

if quads_to_remove:
for i in range(0, len(quads_to_remove), batch_size):
delete_query = 'DELETE DATA {\n'
batch = quads_to_remove[i:i+batch_size]
delete_query = "DELETE DATA {\n"
batch = quads_to_remove[i : i + batch_size]
for graph in set(q[-1] for q in batch):
delete_query += f' GRAPH {graph} {{\n'
delete_query += f" GRAPH {graph} {{\n"
for quad in batch:
if quad[-1] == graph:
delete_query += ' ' + ' '.join(quad[:-1]) + ' .\n'
delete_query += ' }\n'
delete_query += '}\n'
delete_query += " " + " ".join(quad[:-1]) + " .\n"
delete_query += " }\n"
delete_query += "}\n"
queries.append(delete_query)

return queries


def split_queries(file_path, batch_size):
quads_to_add, quads_to_remove = process_sparql_file(file_path)
return generate_sparql_queries(quads_to_add, quads_to_remove, batch_size)

def remove_stop_file(stop_file=DEFAULT_STOP_FILE):

def remove_stop_file(stop_file):
if os.path.exists(stop_file):
os.remove(stop_file)
print(f"Existing stop file {stop_file} has been removed.")


def upload_sparql_updates(
endpoint,
folder,
batch_size,
cache_file='ts_upload_cache.json',
failed_file='failed_queries.txt',
stop_file='.stop_upload'
endpoint,
folder,
batch_size,
cache_file="ts_upload_cache.json",
failed_file="failed_queries.txt",
stop_file=".stop_upload",
cache_manager=None,
):
"""
Carica gli aggiornamenti SPARQL sul triplestore.
Args:
endpoint (str): URL dell'endpoint SPARQL
folder (str): Cartella contenente i file SPARQL da processare
batch_size (int): Numero di triple da includere in ogni batch
cache_file (str, optional): File per il caching dei file processati. Default 'ts_upload_cache.json'
failed_file (str, optional): File per registrare le query fallite. Default 'failed_queries.txt'
stop_file (str, optional): File per interrompere il processo. Default '.stop_upload'
cache_manager (CacheManager, optional): Instance of CacheManager to use. If None, a new one will be created.
"""
if not os.path.exists(folder):
return

cache_manager = CacheManager(cache_file)

if cache_manager is None:
cache_manager = CacheManager(cache_file)
failed_files = []

# Misura tempo scansione directory e filtraggio file
all_files = [f for f in os.listdir(folder) if f.endswith('.sparql')]
all_files = [f for f in os.listdir(folder) if f.endswith(".sparql")]
files_to_process = [f for f in all_files if f not in cache_manager]
print(f"Found {len(files_to_process)} files to process out of {len(all_files)} total files")

print(
f"Found {len(files_to_process)} files to process out of {len(all_files)} total files"
)

for file in tqdm(files_to_process, desc="Processing files"):
if os.path.exists(stop_file):
print(f"\nStop file {stop_file} detected. Interrupting the process...")
break

file_path = os.path.join(folder, file)
queries = split_queries(file_path, batch_size)

if not queries:
save_failed_query_file(file, failed_file)
continue

all_queries_successful = True

for query in queries:
success = execute_sparql_update(endpoint, query)
if not success:
save_failed_query_file(file, failed_file)
all_queries_successful = False
break

if all_queries_successful:
cache_manager.add(file)

if failed_files:
print("Files with failed queries:")
for file in failed_files:
print(file)


def main():
parser = argparse.ArgumentParser(description='Execute SPARQL update queries on a triple store.')
parser.add_argument('endpoint', type=str, help='Endpoint URL of the triple store')
parser.add_argument('folder', type=str, help='Path to the folder containing SPARQL update query files')
parser.add_argument('--batch_size', type=int, default=10, help='Number of quadruples to include in a batch (default: 10)')
parser.add_argument('--cache_file', type=str, default='ts_upload_cache.json', help='Path to cache file')
parser.add_argument('--failed_file', type=str, default='failed_queries.txt', help='Path to failed queries file')
parser.add_argument('--stop_file', type=str, default='.stop_upload', help='Path to stop file')
parser = argparse.ArgumentParser(
description="Execute SPARQL update queries on a triple store."
)
parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store")
parser.add_argument(
"folder",
type=str,
help="Path to the folder containing SPARQL update query files",
)
parser.add_argument(
"--batch_size",
type=int,
default=10,
help="Number of quadruples to include in a batch (default: 10)",
)
parser.add_argument(
"--cache_file",
type=str,
default="ts_upload_cache.json",
help="Path to cache file",
)
parser.add_argument(
"--failed_file",
type=str,
default="failed_queries.txt",
help="Path to failed queries file",
)
parser.add_argument(
"--stop_file", type=str, default=".stop_upload", help="Path to stop file"
)

args = parser.parse_args()

remove_stop_file(args.stop_file)

upload_sparql_updates(
args.endpoint,
args.folder,
args.endpoint,
args.folder,
args.batch_size,
args.cache_file,
args.failed_file,
args.stop_file
args.stop_file,
)


if __name__ == "__main__":
main()
main()
Loading

0 comments on commit 258a88f

Please sign in to comment.