From f81479224a79b80297eb7699e110727bcbe3c08b Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Thu, 28 Nov 2024 15:25:33 +0100 Subject: [PATCH] MetaEditor: relationship_cache --- oc_meta/plugins/editor.py | 88 ++++++++------ .../duplicated_entities_simultaneously.py | 39 ++++--- test/entity_merger_test.py | 108 +++++++++++++++++- 3 files changed, 189 insertions(+), 46 deletions(-) diff --git a/oc_meta/plugins/editor.py b/oc_meta/plugins/editor.py index af80c69..d8c1655 100644 --- a/oc_meta/plugins/editor.py +++ b/oc_meta/plugins/editor.py @@ -82,6 +82,7 @@ def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False self.counter_handler = RedisCounterHandler(host=self.redis_host, port=self.redis_port, db=self.redis_db) self.entity_cache = EntityCache() + self.relationship_cache = {} def make_sparql_query_with_retry(self, sparql: SPARQLWrapper, query, max_retries=5, backoff_factor=0.3): sparql.setQuery(query) @@ -164,7 +165,7 @@ def delete(self, res: str, property: str = None, object: str = None) -> None: entity_to_purge = g_set.get_entity(URIRef(res)) entity_to_purge.mark_as_to_be_deleted() self.save(g_set, supplier_prefix) - + def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None: """ Merge two entities and their related entities using batch import with caching. @@ -175,42 +176,65 @@ def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None: other: The entity to be merged into the main one """ # First get all related entities with a single SPARQL query - sparql = SPARQLWrapper(endpoint=self.endpoint) - query = f''' - PREFIX rdf: - PREFIX datacite: - PREFIX pro: - SELECT DISTINCT ?entity WHERE {{ - {{?entity ?p <{other}>}} UNION - {{<{res}> ?p ?entity}} UNION - {{<{other}> ?p ?entity}} - FILTER (?p != rdf:type) - FILTER (?p != datacite:usesIdentifierScheme) - FILTER (?p != pro:withRole) - }}''' - - data = self.make_sparql_query_with_retry(sparql, query) - - # Collect entities that need to be imported (not in cache) - entities_to_import = set() + related_entities = set() - # Check main entities against cache - if not self.entity_cache.is_cached(res): - entities_to_import.add(res) - if not self.entity_cache.is_cached(other): - entities_to_import.add(other) - - # Check related entities against cache - for result in data["results"]["bindings"]: - if result['entity']['type'] == 'uri': - related_entity = URIRef(result["entity"]["value"]) - if not self.entity_cache.is_cached(related_entity): - entities_to_import.add(related_entity) + if other in self.relationship_cache: + related_entities.update(self.relationship_cache[other]) + else: + sparql = SPARQLWrapper(endpoint=self.endpoint) + query = f''' + PREFIX rdf: + PREFIX datacite: + PREFIX pro: + SELECT DISTINCT ?entity WHERE {{ + {{?entity ?p <{other}>}} UNION + {{<{other}> ?p ?entity}} + FILTER (?p != rdf:type) + FILTER (?p != datacite:usesIdentifierScheme) + FILTER (?p != pro:withRole) + }}''' + + data = self.make_sparql_query_with_retry(sparql, query) + other_related = {URIRef(result["entity"]["value"]) + for result in data["results"]["bindings"] + if result['entity']['type'] == 'uri'} + + self.relationship_cache[other] = other_related + related_entities.update(other_related) + if res in self.relationship_cache: + related_entities.update(self.relationship_cache[res]) + else: + # Query only for objects of the surviving entity if not in cache + sparql = SPARQLWrapper(endpoint=self.endpoint) + query = f''' + PREFIX rdf: + PREFIX datacite: + PREFIX pro: + SELECT DISTINCT ?entity WHERE {{ + <{res}> ?p ?entity + FILTER (?p != rdf:type) + FILTER (?p != datacite:usesIdentifierScheme) + FILTER (?p != pro:withRole) + }}''' + + data = self.make_sparql_query_with_retry(sparql, query) + res_related = {URIRef(result["entity"]["value"]) + for result in data["results"]["bindings"] + if result['entity']['type'] == 'uri'} + + self.relationship_cache[res] = res_related + related_entities.update(res_related) + + entities_to_import = set([res, other]) + entities_to_import.update(related_entities) + entities_to_import = {e for e in entities_to_import + if not self.entity_cache.is_cached(e)} + # Import only non-cached entities if there are any if entities_to_import: try: - imported_entities = self.reader.import_entities_from_triplestore( + self.reader.import_entities_from_triplestore( g_set=g_set, ts_url=self.endpoint, entities=list(entities_to_import), diff --git a/oc_meta/run/merge/duplicated_entities_simultaneously.py b/oc_meta/run/merge/duplicated_entities_simultaneously.py index d0d4e78..fda3b11 100644 --- a/oc_meta/run/merge/duplicated_entities_simultaneously.py +++ b/oc_meta/run/merge/duplicated_entities_simultaneously.py @@ -57,9 +57,9 @@ def count_csv_rows(csv_file: str) -> int: return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: List[str], - surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]: + surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]: """ - Fetch all related entities in batches to avoid overwhelming the SPARQL endpoint. + Fetch all related entities in batches and populate the relationship cache. Args: meta_editor: MetaEditor instance @@ -75,8 +75,6 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: # Process merged entities in batches for i in range(0, len(merged_entities), batch_size): batch_merged = merged_entities[i:i + batch_size] - - # Create UNION clauses for current batch merged_clauses = [] for entity in batch_merged: merged_clauses.extend([ @@ -106,15 +104,22 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: results = meta_editor.make_sparql_query_with_retry(sparql, query) for result in results["results"]["bindings"]: if result['entity']['type'] == 'uri': - all_related_entities.add(URIRef(result['entity']['value'])) + related_uri = URIRef(result['entity']['value']) + all_related_entities.add(related_uri) + + # Update relationship cache for each merged entity in the batch + for entity in batch_merged: + entity_uri = URIRef(entity) + if entity_uri not in meta_editor.relationship_cache: + meta_editor.relationship_cache[entity_uri] = set() + meta_editor.relationship_cache[entity_uri].add(related_uri) + except Exception as e: print(f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}") # Process surviving entities in batches for i in range(0, len(surviving_entities), batch_size): batch_surviving = surviving_entities[i:i + batch_size] - - # Create UNION clauses for current batch surviving_clauses = [] for entity in batch_surviving: surviving_clauses.append( @@ -123,7 +128,7 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: if not surviving_clauses: continue - + query = f""" PREFIX rdf: PREFIX datacite: @@ -143,12 +148,20 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: results = meta_editor.make_sparql_query_with_retry(sparql, query) for result in results["results"]["bindings"]: if result['entity']['type'] == 'uri': - all_related_entities.add(URIRef(result['entity']['value'])) + related_uri = URIRef(result['entity']['value']) + all_related_entities.add(related_uri) + + # Update relationship cache for each surviving entity in the batch + for entity in batch_surviving: + entity_uri = URIRef(entity) + if entity_uri not in meta_editor.relationship_cache: + meta_editor.relationship_cache[entity_uri] = set() + meta_editor.relationship_cache[entity_uri].add(related_uri) + except Exception as e: print(f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}") return all_related_entities - def process_file(self, csv_file: str) -> str: """Process a single CSV file with cross-row batch processing""" data = self.read_csv(csv_file) @@ -179,7 +192,7 @@ def process_file(self, csv_file: str) -> str: if not rows_to_process: return csv_file - # Fetch all related entities in batches + # Fetch all related entities and populate relationship cache all_related_entities = self.fetch_related_entities_batch( meta_editor, batch_merged_entities, @@ -205,7 +218,7 @@ def process_file(self, csv_file: str) -> str: entities=list(entities_to_import), resp_agent=meta_editor.resp_agent, enable_validation=False, - batch_size=self.batch_size # Usa lo stesso batch_size + batch_size=self.batch_size ) # Update cache with newly imported entities @@ -216,7 +229,7 @@ def process_file(self, csv_file: str) -> str: print(f"Error importing entities: {e}") modified = True - # Perform all merges now that entities are imported + # Perform all merges using cached relationship data for surviving_entity, merged_entities in rows_to_process: surviving_uri = URIRef(surviving_entity) for merged_entity in merged_entities: diff --git a/test/entity_merger_test.py b/test/entity_merger_test.py index 952f2a0..27369d2 100644 --- a/test/entity_merger_test.py +++ b/test/entity_merger_test.py @@ -7,11 +7,12 @@ import redis from oc_meta.run.merge.duplicated_entities_simultaneously import EntityMerger +from oc_meta.run.meta_editor import MetaEditor from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler from oc_ocdm.graph import GraphSet from oc_ocdm.prov.prov_set import ProvSet from oc_ocdm.storer import Storer -from rdflib import XSD, Graph, Literal, URIRef +from rdflib import URIRef from SPARQLWrapper import POST, SPARQLWrapper BASE = os.path.join('test', 'merger') @@ -1045,6 +1046,111 @@ def test_merge_bibliographic_resources(self): self.assertTrue(re_found, "Resource embodiment should still exist after merge") + def test_fetch_related_entities_batch(self): + """Test batch fetching of related entities""" + meta_editor = MetaEditor(META_CONFIG, + "https://orcid.org/0000-0002-8420-0696", + save_queries=False) + + g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060", + custom_counter_handler=self.counter_handler) + + # Utilizziamo un insieme più piccolo di numeri validi per il test + valid_numbers = [11, 12, 13, 14, 15] + entities = {} + + # Creiamo gli autori e li memorizziamo in un dizionario per facile accesso + for i in valid_numbers: + ra = g_set.add_ra( + resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef(f"https://w3id.org/oc/meta/ra/060{i}") + ) + ra.has_name(f"Author {i}") + entities[i] = ra + + # Creiamo le entità correlate per ogni autore + for i in valid_numbers: + # Creiamo l'identificatore + identifier = g_set.add_id( + resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef(f"https://w3id.org/oc/meta/id/060{i}") + ) + identifier.create_orcid(f"0000-0001-{i:04d}-1111") + entities[i].has_identifier(identifier) + + # Creiamo il ruolo + role = g_set.add_ar( + resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef(f"https://w3id.org/oc/meta/ar/060{i}") + ) + role.create_author() + role.is_held_by(entities[i]) + + # Creiamo la pubblicazione + pub = g_set.add_br( + resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef(f"https://w3id.org/oc/meta/br/060{i}") + ) + pub.has_title(f"Publication {i}") + pub.has_contributor(role) + + prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False, + custom_counter_handler=self.counter_handler) + prov.generate_provenance() + + rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep + + res_storer = Storer(abstract_set=g_set, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + prov_storer = Storer(abstract_set=prov, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + + res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output, + batch_size=10, save_queries=False) + + batch_sizes = [1, 5, 11, 25] + for batch_size in batch_sizes: + with self.subTest(batch_size=batch_size): + # Test con una singola entità + merged_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[0]}"] + surviving_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[1]}"] + + related = self.merger.fetch_related_entities_batch( + meta_editor=meta_editor, + merged_entities=merged_entities, + surviving_entities=surviving_entities, + batch_size=batch_size + ) + + expected_related = { + URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[0]}"), # ID della merged + URIRef(f"https://w3id.org/oc/meta/ar/060{valid_numbers[0]}"), # AR della merged + URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[1]}") # AR della surviving + } + + self.assertEqual(related, expected_related) + + # Test con multiple entità + merged_entities = [f"https://w3id.org/oc/meta/ra/060{i}" + for i in valid_numbers[:3]] + surviving_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[3]}"] + + related = self.merger.fetch_related_entities_batch( + meta_editor=meta_editor, + merged_entities=merged_entities, + surviving_entities=surviving_entities, + batch_size=batch_size + ) + + expected_related = set() + for i in valid_numbers[:3]: # Entità merged + expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{i}")) + expected_related.add(URIRef(f"https://w3id.org/oc/meta/ar/060{i}")) + expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[3]}")) + + self.assertEqual(related, expected_related) if __name__ == '__main__': unittest.main() \ No newline at end of file