Skip to content

Commit

Permalink
feat(rdfox): support for datastore params
Browse files Browse the repository at this point in the history
  • Loading branch information
jgeluk committed Aug 19, 2022
1 parent 738f7bd commit e5ff713
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 45 deletions.
30 changes: 23 additions & 7 deletions src/cursor/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
pub struct Cursor<'a> {
#[allow(dead_code)]
pub inner: *mut CCursor,
pub(crate) connection: &'a DataStoreConnection,
pub(crate) connection: &'a DataStoreConnection<'a>,
statement: Statement<'a>,
}

Expand Down Expand Up @@ -70,24 +70,40 @@ impl<'a> Cursor<'a> {
Ok(cursor)
}

pub fn count(&mut self) -> Result<u64, Error> { self.execute_and_rollback(1000000000, |_row| Ok(())) }
pub fn count(&mut self) -> Result<u64, Error> {
self.execute_and_rollback(1000000000, |_row| Ok(()))
}

pub fn count_in_transaction(&mut self, tx: &mut Transaction) -> Result<u64, Error> {
self.consume(tx, 1000000000, |_row| Ok(()))
}

pub fn consume<T>(&mut self, tx: &mut Transaction, maxrow: u64, mut f: T) -> Result<u64, Error>
where T: FnMut(CursorRow) -> Result<(), Error> {
pub fn consume<T>(
&mut self,
tx: &mut Transaction,
maxrow: u64,
mut f: T,
) -> Result<u64, Error>
where
T: FnMut(CursorRow) -> Result<(), Error>,
{
let (mut opened_cursor, mut multiplicity) = OpenedCursor::new(self, &tx)?;
let mut rowid = 0_u64;
let mut count = 0_u64;
while multiplicity > 0 {
if multiplicity >= maxrow {
return Err(Error::MultiplicityExceededMaximumNumberOfRows { maxrow, multiplicity, query: self.statement.text.clone() })
return Err(Error::MultiplicityExceededMaximumNumberOfRows {
maxrow,
multiplicity,
query: self.statement.text.clone(),
})
}
rowid += 1;
if rowid >= maxrow {
return Err(Error::ExceededMaximumNumberOfRows { maxrow, query: self.statement.text.clone() })
return Err(Error::ExceededMaximumNumberOfRows {
maxrow,
query: self.statement.text.clone(),
})
}
count += multiplicity;
let row = CursorRow {
Expand All @@ -105,7 +121,7 @@ impl<'a> Cursor<'a> {
pub fn update_and_commit<T, U>(&mut self, maxrow: u64, f: T) -> Result<u64, Error>
where T: FnMut(CursorRow) -> Result<(), Error> {
let mut tx = Transaction::begin_read_write(self.connection)?;
self.update_and_commit_in_transaction(&mut tx, maxrow,f)
self.update_and_commit_in_transaction(&mut tx, maxrow, f)
}

pub fn execute_and_rollback<T>(&mut self, maxrow: u64, f: T) -> Result<u64, Error>
Expand Down
23 changes: 12 additions & 11 deletions src/data_store.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
use std::fmt::{Display, Formatter};

use crate::error::Error;
use crate::ServerConnection;
use crate::{error::Error, Parameters, ServerConnection};

#[derive(Debug, PartialEq, Clone)]
pub struct DataStore {
pub(crate) name: String,
pub struct DataStore<'a> {
pub name: String,
pub parameters: &'a Parameters,
}

impl Display for DataStore {
impl<'a> Display for DataStore<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "data store [{}]", self.name)
}
}

impl DataStore {
pub fn declare(name: &str) -> Self {
Self {
impl<'a> DataStore<'a> {
pub fn declare_with_parameters(name: &str, parameters: &'a Parameters) -> Result<Self, Error> {
Ok(Self {
name: name.to_string(),
}
parameters,
})
}

pub fn create(self, server_connection: &ServerConnection) -> Result<Self, Error> {
server_connection.create_data_store(self)
pub fn create(self, server_connection: &'a ServerConnection) -> Result<(), Error> {
server_connection.create_data_store(&self).map(|_| ())
}
}
18 changes: 9 additions & 9 deletions src/data_store_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ use crate::{
};

#[derive(Debug, PartialEq)]
pub struct DataStoreConnection {
pub data_store: DataStore,
pub struct DataStoreConnection<'a> {
pub data_store: &'a DataStore<'a>,
pub(crate) inner: *mut CDataStoreConnection,
started_at: Instant,
}

impl Display for DataStoreConnection {
impl<'a> Display for DataStoreConnection<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("connection").unwrap();
// match self.get_id() {
Expand All @@ -68,15 +68,15 @@ impl Display for DataStoreConnection {
}
}

impl Drop for DataStoreConnection {
impl<'a> Drop for DataStoreConnection<'a> {
fn drop(&mut self) {
let duration = self.started_at.elapsed();
log::info!("dropped {self} after {:?}", duration)
}
}

impl DataStoreConnection {
pub(crate) fn new(data_store: DataStore, inner: *mut CDataStoreConnection) -> Self {
impl<'a> DataStoreConnection<'a> {
pub(crate) fn new(data_store: &'a DataStore<'a>, inner: *mut CDataStoreConnection) -> Self {
Self {
data_store,
inner,
Expand Down Expand Up @@ -224,9 +224,9 @@ impl DataStoreConnection {
Ok(count)
}

pub fn evaluate_update<'a>(
pub fn evaluate_update<'b>(
&self,
statement: &'a Statement<'a>,
statement: &'b Statement<'b>,
parameters: &Parameters,
) -> Result<(), Error> {
let base_iri = ptr::null_mut();
Expand All @@ -249,7 +249,7 @@ impl DataStoreConnection {
Ok(())
}

pub fn evaluate_to_stream<'a, W>(
pub fn evaluate_to_stream<W>(
&'a self,
writer: W,
statement: &'a Statement<'a>,
Expand Down
19 changes: 16 additions & 3 deletions src/graph_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@ use std::{

use indoc::formatdoc;

use crate::{error::Error, DataStoreConnection, FactDomain, Graph, Parameters, Prefixes, Statement, Transaction};
use crate::{
error::Error,
DataStoreConnection,
FactDomain,
Graph,
Parameters,
Prefixes,
Statement,
Transaction,
};

pub struct GraphConnection<'a> {
pub data_store_connection: &'a DataStoreConnection,
pub data_store_connection: &'a DataStoreConnection<'a>,
started_at: Instant,
pub graph: Graph,
pub ontology_graph: Option<Graph>,
Expand Down Expand Up @@ -73,7 +82,11 @@ impl<'a> GraphConnection<'a> {
.import_rdf_from_directory(root, &self.graph)
}

pub fn get_triples_count(&self, tx: &mut Transaction, fact_domain: FactDomain) -> Result<u64, Error> {
pub fn get_triples_count(
&self,
tx: &mut Transaction,
fact_domain: FactDomain,
) -> Result<u64, Error> {
Statement::new(
&Prefixes::default()?,
formatdoc!(
Expand Down
11 changes: 10 additions & 1 deletion src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum FactDomain {
pub enum PersistenceMode {
File,
FileSequence,
Off
Off,
}

impl Display for PersistenceMode {
Expand All @@ -43,6 +43,7 @@ impl Display for PersistenceMode {
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct Parameters {
pub(crate) inner: *mut CParameters,
}
Expand Down Expand Up @@ -128,6 +129,14 @@ impl Parameters {
}
}

pub fn import_rename_user_blank_nodes(self, setting: bool) -> Result<Self, Error> {
self.set_string(
"import.rename-user-blank-nodes",
format!("{setting:?}").as_str(),
)?;
Ok(self)
}

/// If true, all API calls are recorded in a script that
/// the shell can replay later. later.
/// The default value is false.
Expand Down
19 changes: 7 additions & 12 deletions src/server_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
database_call,
error::Error,
root::{
CParameters_getEmptyParameters,
CServerConnection,
CServerConnection_createDataStore,
CServerConnection_deleteDataStore,
Expand Down Expand Up @@ -75,31 +74,27 @@ impl ServerConnection {
Ok(())
}

pub fn create_data_store(&self, data_store: DataStore) -> Result<DataStore, Error> {
pub fn create_data_store<'a>(&self, data_store: &'a DataStore<'a>) -> Result<(), Error> {
log::debug!("Creating {data_store}");
let c_name = CString::new(data_store.name.as_str()).unwrap();
database_call!(
"creating a datastore",
CServerConnection_createDataStore(
self.inner,
c_name.as_ptr(),
CParameters_getEmptyParameters(),
data_store.parameters.inner,
)
)?;
log::info!("Created {data_store}");
Ok(data_store)
}

pub fn create_data_store_named(&self, name: &str) -> Result<DataStore, Error> {
self.create_data_store(DataStore::declare(name))
Ok(())
}

pub fn connect_to_data_store(
pub fn connect_to_data_store<'a>(
&self,
data_store: DataStore,
) -> Result<DataStoreConnection, Error> {
data_store: &'a DataStore<'a>,
) -> Result<DataStoreConnection<'a>, Error> {
log::debug!("Connecting to {}", data_store);
let mut ds_connection = DataStoreConnection::new(data_store.clone(), ptr::null_mut());
let mut ds_connection = DataStoreConnection::new(data_store, ptr::null_mut());
let c_name = CString::new(data_store.name.as_str()).unwrap();
database_call!(
"creating a datastore connection",
Expand Down
2 changes: 1 addition & 1 deletion src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<'a, W: 'a + Write + Debug> Drop for RefToSelf<'a, W> {
/// to handle the various callbacks from the underlying C-API to RDFox.
#[derive(PartialEq, Debug)]
pub struct Streamer<'a, W: 'a + Write + Debug> {
pub connection: &'a DataStoreConnection,
pub connection: &'a DataStoreConnection<'a>,
pub writer: W,
pub statement: &'a Statement<'a>,
pub mime_type: &'static Mime,
Expand Down
2 changes: 1 addition & 1 deletion src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

#[derive(Debug)]
pub struct Transaction<'a> {
pub connection: &'a DataStoreConnection,
pub connection: &'a DataStoreConnection<'a>,
committed: bool,
}

Expand Down

0 comments on commit e5ff713

Please sign in to comment.