Skip to content

Commit

Permalink
Bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Jan 6, 2025
1 parent 9160ccf commit 6301788
Show file tree
Hide file tree
Showing 27 changed files with 294 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
CARGO_TERM_COLOR: always
RUST_LOG: debug
MATURIN_VERSION: '1.7.4'
RUST_TOOLCHAIN: nightly-2024-10-28
RUST_TOOLCHAIN: nightly-2024-11-28

jobs:
# Build the documentation and upload the static HTML files as an artifact.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ permissions:

env:
CARGO_TERM_COLOR: always
RUST_TOOLCHAIN: nightly-2024-10-28
RUST_TOOLCHAIN: nightly-2024-11-28
MATURIN_VERSION: '1.7.4'
MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_API_TOKEN }}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env:
CARGO_TERM_COLOR: always
RUST_LOG: debug
MATURIN_VERSION: '1.7.4'
RUST_TOOLCHAIN: nightly-2024-10-28
RUST_TOOLCHAIN: nightly-2024-11-28

jobs:
build_and_test:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
env:
CARGO_TERM_COLOR: always
RUST_LOG: debug
RUST_TOOLCHAIN: nightly-2024-10-28
RUST_TOOLCHAIN: nightly-2024-11-28

jobs:
build_and_test:
Expand Down
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [
"lib/representation",
"lib/maplib",
"lib/triplestore",
"lib/parquet_io",
"lib/file_io",
"lib/pydf_io",
"lib/query_processing",
"lib/spargebra",
Expand Down
2 changes: 1 addition & 1 deletion lib/parquet_io/Cargo.toml → lib/file_io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "parquet_io"
name = "file_io"
version = "0.5.0"
edition = "2021"
rust-version.workspace = true
Expand Down
33 changes: 19 additions & 14 deletions lib/parquet_io/src/lib.rs → lib/file_io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,37 @@ use polars::prelude::{
DataFrame, LazyFrame, ParallelStrategy, ParquetCompression, ParquetWriter, PolarsError,
ScanArgsParquet,
};
use std::fs::File;
use std::fs::{create_dir, File};
use std::path::Path;

use std::fmt::{Display, Formatter};
use std::io;
use thiserror::Error;

pub fn create_folder_if_not_exists(path: &Path) -> Result<(), FileIOError> {
if !path.exists() {
create_dir(path).map_err(|x| FileIOError::FileCreateIOError(x))?;
}
Ok(())
}

#[derive(Error, Debug)]
pub enum ParquetIOError {
pub enum FileIOError {
FileCreateIOError(io::Error),
WriteParquetError(PolarsError),
ReadParquetError(PolarsError),
}

impl Display for ParquetIOError {
impl Display for FileIOError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ParquetIOError::FileCreateIOError(e) => {
FileIOError::FileCreateIOError(e) => {
write!(f, "Creating file for writing resulted in an error: {}", e)
}
ParquetIOError::WriteParquetError(e) => {
FileIOError::WriteParquetError(e) => {
write!(f, "Writing to parquet file produced an error {:?}", e)
}
ParquetIOError::ReadParquetError(p) => {
FileIOError::ReadParquetError(p) => {
write!(f, "Reading parquet file resulted in an error: {:?}", p)
}
}
Expand All @@ -43,20 +50,18 @@ pub fn write_parquet(
df: &mut DataFrame,
file_path: &Path,
compression: ParquetCompression,
) -> Result<(), ParquetIOError> {
let file = File::create(file_path).map_err(ParquetIOError::FileCreateIOError)?;
) -> Result<(), FileIOError> {
let file = File::create(file_path).map_err(FileIOError::FileCreateIOError)?;
let mut writer = ParquetWriter::new(file);
writer = writer.with_row_group_size(Some(1_000));
writer = writer.with_compression(compression);
writer
.finish(df)
.map_err(ParquetIOError::WriteParquetError)?;
writer.finish(df).map_err(FileIOError::WriteParquetError)?;
Ok(())
}

pub fn scan_parquet(file_path: &String) -> Result<LazyFrame, ParquetIOError> {
pub fn scan_parquet(file_path: &Path) -> Result<LazyFrame, FileIOError> {
LazyFrame::scan_parquet(
Path::new(file_path),
file_path,
ScanArgsParquet {
n_rows: None,
cache: false,
Expand All @@ -66,5 +71,5 @@ pub fn scan_parquet(file_path: &String) -> Result<LazyFrame, ParquetIOError> {
..Default::default()
},
)
.map_err(ParquetIOError::ReadParquetError)
.map_err(FileIOError::ReadParquetError)
}
4 changes: 3 additions & 1 deletion lib/maplib/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use shacl::errors::ShaclError;
use shacl::{validate, ValidationReport};
use std::collections::HashMap;
use std::io::Write;
use std::path::Path;
use std::path::{Path, PathBuf};
use templates::ast::{ConstantTermOrList, PType, Template};
use templates::dataset::TemplateDataset;
use templates::document::document_from_str;
Expand Down Expand Up @@ -300,6 +300,7 @@ impl Mapping {
include_details: bool,
include_conforms: bool,
streaming: bool,
folder_path: Option<&PathBuf>,
) -> Result<ValidationReport, ShaclError> {
let (shape_graph, mut shape_triplestore) =
self.triplestores_map.remove_entry(shape_graph).unwrap();
Expand All @@ -309,6 +310,7 @@ impl Mapping {
include_details,
include_conforms,
streaming,
folder_path,
) {
Ok(vr) => {
self.triplestores_map.insert(shape_graph, shape_triplestore);
Expand Down
53 changes: 45 additions & 8 deletions lib/query_processing/src/graph_patterns.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::errors::QueryProcessingError;
use crate::type_constraints::{conjunction_variable_type, equal_variable_type, PossibleTypes};
use log::warn;
use oxrdf::vocab::rdfs;
use oxrdf::{Term, Variable};
use oxrdf::vocab::{rdf, rdfs};
use oxrdf::{NamedNode, Term, Variable};
use polars::datatypes::{CategoricalOrdering, DataType, PlSmallStr};
use polars::frame::{DataFrame, UniqueKeepStrategy};
use polars::prelude::{
Expand All @@ -11,7 +11,8 @@ use polars::prelude::{
};
use representation::multitype::{
base_col_name, convert_lf_col_to_multitype, create_join_compatible_solution_mappings,
lf_column_to_categorical, nest_multicolumns, unnest_multicols,
known_convert_lf_multicol_to_single, lf_column_to_categorical, nest_multicolumns,
unnest_multicols,
};
use representation::multitype::{join_workaround, unique_workaround};
use representation::polars_to_rdf::particular_opt_term_vec_to_series;
Expand Down Expand Up @@ -128,17 +129,41 @@ pub fn join(
join_type: JoinType,
) -> Result<SolutionMappings, QueryProcessingError> {
let SolutionMappings {
mappings: right_mappings,
rdf_node_types: right_datatypes,
mappings: mut right_mappings,
rdf_node_types: mut right_datatypes,
height_estimate: right_height,
} = right_solution_mappings;

let SolutionMappings {
mappings: left_mappings,
rdf_node_types: left_datatypes,
mappings: mut left_mappings,
rdf_node_types: mut left_datatypes,
height_estimate: left_height,
} = left_solution_mappings;

let mut lang_to_multi = vec![];
for (lk, lt) in &left_datatypes {
if lt.is_lang_string() {
if let Some(rt) = right_datatypes.get(lk) {
if rt.is_lang_string() {
lang_to_multi.push(lk.clone());
}
}
}
}
if !lang_to_multi.is_empty() {
let lang_base_type = BaseRDFNodeType::Literal(NamedNode::from(rdf::LANG_STRING));
let lang_rdf_type = lang_base_type.as_rdf_node_type();
let lang_multi = RDFNodeType::MultiType(vec![lang_base_type]);
for k in lang_to_multi.iter() {
left_mappings =
left_mappings.with_column(convert_lf_col_to_multitype(k, &lang_rdf_type));
left_datatypes.insert(k.clone(), lang_multi.clone());
right_mappings =
right_mappings.with_column(convert_lf_col_to_multitype(k, &lang_rdf_type));
right_datatypes.insert(k.clone(), lang_multi.clone());
}
}

let (left_mappings, left_datatypes, right_mappings, right_datatypes) =
create_join_compatible_solution_mappings(
left_mappings,
Expand All @@ -148,7 +173,7 @@ pub fn join(
join_type.clone(),
);

let solution_mappings = join_workaround(
let mut solution_mappings = join_workaround(
left_mappings,
left_datatypes,
left_height,
Expand All @@ -158,6 +183,18 @@ pub fn join(
join_type,
);

if !lang_to_multi.is_empty() {
let lang_base_type = BaseRDFNodeType::Literal(NamedNode::from(rdf::LANG_STRING));
let lang_rdf_type = lang_base_type.as_rdf_node_type();
for k in lang_to_multi.iter() {
solution_mappings.mappings =
known_convert_lf_multicol_to_single(solution_mappings.mappings, k, &lang_base_type);
solution_mappings
.rdf_node_types
.insert(k.clone(), lang_rdf_type.clone());
}
}

Ok(solution_mappings)
}

Expand Down
2 changes: 1 addition & 1 deletion lib/triplestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ rust-version.workspace = true
[dependencies]
query_processing = { path = "../query_processing" }
representation = { path = "../representation" }
parquet_io = { path = "../parquet_io" }
file_io = { path = "../file_io" }
spargebra = { path = "../spargebra" }

rayon.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions lib/triplestore/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_io::ParquetIOError;
use file_io::FileIOError;
use std::fmt::{Display, Formatter};
use std::io;
use thiserror::Error;
Expand All @@ -20,7 +20,7 @@ pub enum TriplestoreError {
NTriplesParsingError(String),
IndexingError(String),
IPCIOError(String),
ParquetIOError(ParquetIOError),
FileIOError(FileIOError),
}

impl Display for TriplestoreError {
Expand All @@ -38,7 +38,7 @@ impl Display for TriplestoreError {
TriplestoreError::IPCIOError(e) => {
write!(f, "IPC IO error: {}", e)
}
TriplestoreError::ParquetIOError(e) => {
TriplestoreError::FileIOError(e) => {
write!(f, "Parquet IO error: {}", e)
}
TriplestoreError::FolderCreateIOError(e) => {
Expand Down
10 changes: 0 additions & 10 deletions lib/triplestore/src/io_funcs.rs

This file was deleted.

9 changes: 5 additions & 4 deletions lib/triplestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ extern crate core;

mod dblf;
pub mod errors;
mod io_funcs;
pub mod native_parquet_write;
pub mod query_solutions;
pub mod rdfs_inferencing;
Expand All @@ -12,8 +11,8 @@ pub mod triples_read;
pub mod triples_write;

use crate::errors::TriplestoreError;
use crate::io_funcs::create_folder_if_not_exists;
use crate::storage::Triples;
use file_io::create_folder_if_not_exists;
use log::debug;
use oxrdf::vocab::{rdf, rdfs};
use oxrdf::NamedNode;
Expand Down Expand Up @@ -95,10 +94,12 @@ impl Triplestore {
) -> Result<Triplestore, TriplestoreError> {
let pathbuf = if let Some(storage_folder) = &storage_folder {
let mut pathbuf = Path::new(storage_folder).to_path_buf();
create_folder_if_not_exists(pathbuf.as_path())?;
create_folder_if_not_exists(pathbuf.as_path())
.map_err(TriplestoreError::FileIOError)?;
let ext = format!("ts_{}", Uuid::new_v4().to_string());
pathbuf.push(&ext);
create_folder_if_not_exists(pathbuf.as_path())?;
create_folder_if_not_exists(pathbuf.as_path())
.map_err(TriplestoreError::FileIOError)?;
Some(pathbuf)
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions lib/triplestore/src/native_parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Triplestore;
use crate::errors::TriplestoreError;
use file_io::{property_to_filename, write_parquet};
use log::debug;
use parquet_io::{property_to_filename, write_parquet};
use polars::prelude::ParquetCompression;
use representation::BaseRDFNodeType;
use std::path::Path;
Expand Down Expand Up @@ -49,7 +49,7 @@ impl Triplestore {
file_path.as_path(),
ParquetCompression::default(),
)
.map_err(TriplestoreError::ParquetIOError)?
.map_err(TriplestoreError::FileIOError)?
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion py_maplib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py_maplib"
version = "0.14.5"
version = "0.14.4"
edition = "2021"

[dependencies]
Expand Down
Loading

0 comments on commit 6301788

Please sign in to comment.