Skip to content

Commit

Permalink
fix: improve VVI query performance by using direct SPARQL
Browse files Browse the repository at this point in the history
The previous implementation was using the local RDF graph to find venue/volume/issue
relationships, which was inefficient for new articles since their relationships
couldn't be discovered by traversing from the article upwards to the venue.

This fix:
- Replaces local graph traversal with a direct SPARQL query
- Uses part_of* path to find all related volumes and issues
- Properly organizes issues under their parent volumes
- Maintains the same output structure and functionality

Performance is improved especially for cases where articles are new but belong
to existing venues.
  • Loading branch information
arcangelo7 committed Jan 22, 2025
1 parent 5dc2de8 commit dd6b728
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 92 deletions.
16 changes: 5 additions & 11 deletions oc_meta/core/curator.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,12 @@ def clean_vvi(self, row: Dict[str, str]) -> None:
if volume in self.vvi[metaval]['volume']:
vol_meta = self.vvi[metaval]['volume'][volume]['id']
else:
# Check if volume exists in triplestore before creating new one
ts_vvi = self.finder.retrieve_venue_from_meta(metaval)
if volume in ts_vvi['volume']:
vol_meta = ts_vvi['volume'][volume]['id']
# Update local structure with triplestore data
self.vvi[metaval]['volume'][volume] = ts_vvi['volume'][volume]
else:
vol_meta = self.new_entity(self.brdict, '')
self.vvi[metaval]['volume'][volume] = dict()
self.vvi[metaval]['volume'][volume]['id'] = vol_meta
self.vvi[metaval]['volume'][volume]['issue'] = dict()
vol_meta = self.new_entity(self.brdict, '')
self.vvi[metaval]['volume'][volume] = dict()
self.vvi[metaval]['volume'][volume]['id'] = vol_meta
self.vvi[metaval]['volume'][volume]['issue'] = dict()
elif volume and br_type == 'journal volume':
# The data must be invalidated, because the resource is a journal volume but an issue has also been specified
if issue:
row['volume'] = ''
row['issue'] = ''
Expand Down
133 changes: 53 additions & 80 deletions oc_meta/lib/finder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from __future__ import annotations

from time import sleep
from typing import Dict, List, Tuple

import yaml
from dateutil import parser
from oc_meta.constants import ROOT_CONTAINER_TYPES
from oc_meta.plugins.editor import MetaEditor
from oc_ocdm.graph import GraphEntity
from oc_ocdm.graph.graph_entity import GraphEntity
Expand Down Expand Up @@ -33,7 +30,6 @@ def __query(self, query, return_format = JSON):
"""Execute a SPARQL query with retries and exponential backoff"""
self.ts.setReturnFormat(return_format)
self.ts.setQuery(query)

max_retries = 5 # Aumentiamo il numero di tentativi
base_wait = 5 # Tempo base di attesa in secondi

Expand Down Expand Up @@ -345,58 +341,65 @@ def retrieve_venue_from_meta(self, meta_id:str) -> Dict[str, Dict[str, str]]:
}
}
The first level 'issue' field includes the issues contained directly in the venue,
while the 'volume' field includes the volumes in the venue and the related issues.
:params meta_id: a MetaID
:type meta_id: str
:returns: Dict[str, Dict[str, str]] -- the string with normalized hyphens
:returns: Dict[str, Dict[str, str]] -- the venue structure with volumes and issues
'''
content = dict()
content['issue'] = dict()
content['volume'] = dict()
content = self.__retrieve_vvi(meta_id, content)

return content
content = {
'issue': {},
'volume': {}
}

def __retrieve_vvi(self, meta:str, content:Dict[str, dict]) -> dict:
venue_iri = URIRef(f'{self.base_iri}/br/{meta}')
ress = []

for triple in self.local_g.triples((None, GraphEntity.iri_part_of, venue_iri)):
res = {'res': None, 'type': None, 'sequence_identifier': None, 'container': None}
res['res'] = triple[0].replace(f'{self.base_iri}/br/', '')
for res_triple in self.local_g.triples((triple[0], None, None)):
if res_triple[1] == RDF.type and res_triple[2] != GraphEntity.iri_expression:
res['type'] = res_triple[2]
elif res_triple[1] == GraphEntity.iri_has_sequence_identifier:
res['sequence_identifier'] = str(res_triple[2])
elif res_triple[1] == GraphEntity.iri_part_of:
res['container'] = res_triple[2]
ress.append(res)
# Query per trovare tutti i volumi e issue collegati alla venue
query = f"""
SELECT DISTINCT ?entity ?type ?seq ?container
WHERE {{
?entity a ?type ;
<{GraphEntity.iri_has_sequence_identifier}> ?seq .
OPTIONAL {{ ?entity <{GraphEntity.iri_part_of}> ?container }}
?entity <{GraphEntity.iri_part_of}>* <{self.base_iri}/br/{meta_id}> .
FILTER(?type IN (<{GraphEntity.iri_journal_volume}>, <{GraphEntity.iri_journal_issue}>))
}}
"""

for res in ress:
if res['res'] is not None:
if res['type'] == GraphEntity.iri_journal_issue and res['container'] == venue_iri:
content['issue'].setdefault(res['sequence_identifier'], dict())
content['issue'][res['sequence_identifier']]['id'] = res['res']
elif res['type'] == GraphEntity.iri_journal_volume:
content['volume'].setdefault(res['sequence_identifier'], dict())
content['volume'][res['sequence_identifier']]['id'] = res['res']
content['volume'][res['sequence_identifier']]['issue'] = self.__retrieve_issues_by_volume(URIRef(f"{self.base_iri}/br/{res['res']}"))
results = self.__query(query)

return content

def __retrieve_issues_by_volume(self, res:URIRef) -> dict:
content = dict()
for triple in self.local_g.triples((None, GraphEntity.iri_part_of, res)):
for res_triple in self.local_g.triples((triple[0], None, None)):
if res_triple[1] == GraphEntity.iri_has_sequence_identifier:
content.setdefault(str(res_triple[2]), dict())
content[str(res_triple[2])]['id'] = res_triple[0].replace(f'{self.base_iri}/br/', '')
# Prima processiamo tutti i volumi
volumes = {} # Dizionario temporaneo per mappare gli ID dei volumi ai loro sequence numbers
for result in results['results']['bindings']:
entity_type = result['type']['value']
if entity_type == str(GraphEntity.iri_journal_volume):
entity_id = result['entity']['value'].replace(f'{self.base_iri}/br/', '')
seq = result['seq']['value']
volumes[entity_id] = seq
content['volume'][seq] = {
'id': entity_id,
'issue': {}
}

# Poi processiamo tutte le issue
for result in results['results']['bindings']:
entity_type = result['type']['value']
if entity_type == str(GraphEntity.iri_journal_issue):
entity_id = result['entity']['value'].replace(f'{self.base_iri}/br/', '')
seq = result['seq']['value']
container = result.get('container', {}).get('value', '')

if container:
container_id = container.replace(f'{self.base_iri}/br/', '')
# Se il container è un volume che conosciamo
if container_id in volumes:
volume_seq = volumes[container_id]
content['volume'][volume_seq]['issue'][seq] = {'id': entity_id}
else:
# Se il container non è un volume conosciuto, mettiamo l'issue direttamente sotto la venue
content['issue'][seq] = {'id': entity_id}
else:
# Se non ha container, va direttamente sotto la venue
content['issue'][seq] = {'id': entity_id}

return content

def retrieve_ra_sequence_from_br_meta(self, metaid: str, col_name: str) -> List[Dict[str, tuple]]:
'''
Given a bibliographic resource's MetaID and a field name, it returns its agent roles and responsible agents in the correct order according to the specified field.
Expand Down Expand Up @@ -841,18 +844,14 @@ def process_batch(subjects, cur_depth):

next_subjects = set()
for batch in batch_process(list(subjects), BATCH_SIZE):
# Query to get direct triples and object types
query_prefix = f'''
SELECT ?s ?p ?o
WHERE {{
VALUES ?s {{ {' '.join([f"<{s}>" for s in batch])} }}
?s ?p ?o .
?s ?p ?o.
}}'''

# Process direct triples and collect objects that could be containers
potential_containers = set()
result = self.__query(query_prefix)

if result:
for row in result['results']['bindings']:
s = URIRef(row['s']['value'])
Expand All @@ -862,36 +861,10 @@ def process_batch(subjects, cur_depth):
o_datatype = URIRef(row['o']['datatype']) if 'datatype' in row['o'] else None
o = URIRef(o) if o_type == 'uri' else Literal(lexical_or_value=o, datatype=o_datatype)
self.local_g.add((s, p, o))
if p == RDF.type and o not in ROOT_CONTAINER_TYPES:
potential_containers.add(str(s))

# Add non-special objects to next_subjects as before
if isinstance(o, URIRef) and p not in {RDF.type, GraphEntity.iri_with_role, GraphEntity.iri_uses_identifier_scheme}:
next_subjects.add(str(o))

# Only run inverse query for potential containers
if potential_containers:
inverse_query = f'''
SELECT ?s ?p ?o
WHERE {{
VALUES ?container {{ {' '.join([f"<{s}>" for s in potential_containers])} }}
?s <{GraphEntity.iri_part_of}> ?container .
?s ?p ?o .
}}'''

result = self.__query(inverse_query)
if result:
for row in result['results']['bindings']:
s = URIRef(row['s']['value'])
p = URIRef(row['p']['value'])
o = row['o']['value']
o_type = row['o']['type']
o_datatype = URIRef(row['o']['datatype']) if 'datatype' in row['o'] else None
o = URIRef(o) if o_type == 'uri' else Literal(lexical_or_value=o, datatype=o_datatype)
self.local_g.add((s, p, o))
next_subjects.add(str(s))

# Process next level
# Dopo aver processato tutti i batch di questo livello, procedi con il prossimo livello di profondità
process_batch(next_subjects, cur_depth + 1)

def get_initial_subjects_from_metavals(metavals):
Expand Down
7 changes: 6 additions & 1 deletion test/meta_process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,12 +666,14 @@ def test_duplicate_omids_with_venue_datatype(self):
<https://w3id.org/oc/meta/br/0601>
<http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0601>, <https://w3id.org/oc/meta/id/0602> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Journal> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> ;
<http://purl.org/dc/terms/title> "BMJ" .
# Second venue
<https://w3id.org/oc/meta/br/0602>
<http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0603> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Journal> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> ;
<http://purl.org/dc/terms/title> "British Medical Journal" .
}
GRAPH <https://w3id.org/oc/meta/id/> {
Expand Down Expand Up @@ -960,7 +962,7 @@ def test_volume_issue_deduplication(self):
# Both articles should reference the same volume and issue
first_volume = bindings[0]['volume']['value']
first_issue = bindings[0]['issue']['value']
print(json.dumps(bindings, indent=4))

for binding in bindings[1:]:
self.assertEqual(binding['volume']['value'], first_volume,
"Articles reference different volumes")
Expand All @@ -982,17 +984,20 @@ def test_volume_issue_deduplication_with_triplestore(self):
<https://w3id.org/oc/meta/br/0601>
<http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0601> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Journal> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> ;
<http://purl.org/dc/terms/title> "Test Journal" .
# Volume 1
<https://w3id.org/oc/meta/br/0602>
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/JournalVolume> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> ;
<http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0601> ;
<http://purl.org/spar/fabio/hasSequenceIdentifier> "1" .
# Issue 1
<https://w3id.org/oc/meta/br/0603>
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/JournalIssue> ;
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> ;
<http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0602> ;
<http://purl.org/spar/fabio/hasSequenceIdentifier> "1" .
}
Expand Down

0 comments on commit dd6b728

Please sign in to comment.