Skip to content

Commit

Permalink
Merge pull request #17 from Synerise/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
piobab authored Dec 22, 2020
2 parents f4c92f1 + d8a8381 commit 5b70a6c
Show file tree
Hide file tree
Showing 7 changed files with 555 additions and 415 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "cleora"
version = "1.0.1"
authors = ["Piotr Babel <[email protected]>", "Jacek Dabrowski <[email protected]>"]
authors = ["Piotr Babel <[email protected]>", "Jacek Dabrowski <[email protected]>", "Konrad Goluchowski <[email protected]>"]
edition = "2018"
license-file = "LICENSE"
readme = "README.md"
Expand Down
148 changes: 62 additions & 86 deletions src/embedding.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::configuration::Configuration;
use crate::persistence::embedding::EmbeddingPersistor;
use crate::persistence::entity::EntityMappingPersistor;
use crate::persistence::sparse_matrix::SparseMatrixPersistor;
use crate::sparse_matrix::SparseMatrix;
use crate::sparse_matrix::SparseMatrixReader;
use log::{info, warn};
use memmap::MmapMut;
use rayon::prelude::*;
Expand All @@ -20,18 +19,17 @@ const LOGGED_NUMBER_OF_BROKEN_ENTITIES: usize = 20;
/// Calculate embeddings in memory.
pub fn calculate_embeddings<T1, T2, T3>(
config: Arc<Configuration>,
sparse_matrix: &SparseMatrix<T1>,
sparse_matrix_reader: Arc<T1>,
entity_mapping_persistor: Arc<T2>,
embedding_persistor: &mut T3,
) where
T1: SparseMatrixPersistor + Sync,
T2: EntityMappingPersistor + Sync,
T1: SparseMatrixReader + Sync + Send,
T2: EntityMappingPersistor,
T3: EmbeddingPersistor,
{
let mult = MatrixMultiplicator {
dimension: config.embeddings_dimension,
sparse_matrix_id: sparse_matrix.get_id(),
sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor,
sparse_matrix_reader,
};
let init = mult.initialize();
let res = mult.propagate(config.max_number_of_iteration, init);
Expand All @@ -42,18 +40,17 @@ pub fn calculate_embeddings<T1, T2, T3>(

/// Provides matrix multiplication based on sparse matrix data.
#[derive(Debug)]
pub struct MatrixMultiplicator<'a, T: SparseMatrixPersistor + Sync> {
pub struct MatrixMultiplicator<T: SparseMatrixReader + Sync + Send> {
pub dimension: u16,
pub sparse_matrix_id: String,
pub sparse_matrix_persistor: &'a T,
pub sparse_matrix_reader: Arc<T>,
}

impl<'a, T> MatrixMultiplicator<'a, T>
impl<T> MatrixMultiplicator<T>
where
T: SparseMatrixPersistor + Sync,
T: SparseMatrixReader + Sync + Send,
{
fn initialize(&self) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();

info!(
"Start initialization. Dims: {}, entities: {}.",
Expand All @@ -68,13 +65,10 @@ where
.into_par_iter()
.map(|i| {
let mut col: Vec<f32> = Vec::with_capacity(entities_count as usize);
for j in 0..entities_count {
let hsh = self.sparse_matrix_persistor.get_hash(j);
if hsh != -1 {
let col_value =
((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float;
col.insert(j as usize, col_value);
}
for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32)
/ max_hash_float;
col.insert(j as usize, col_value);
}
col
})
Expand All @@ -90,7 +84,7 @@ where
fn propagate(&self, max_iter: u8, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
info!("Start propagating. Number of iterations: {}.", max_iter);

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
let mut new_res = res;
for i in 0..max_iter {
let next = self.next_power(new_res);
Expand All @@ -100,26 +94,23 @@ where
i,
self.dimension,
entities_count,
self.sparse_matrix_persistor.get_amount_of_data()
self.sparse_matrix_reader.get_number_of_entries()
);
}
info!("Done propagating.");
new_res
}

fn next_power(&self, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let rnew = Self::zero_2d(entities_count, self.dimension as usize);

let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data();

let result: Vec<Vec<f32>> = res
.into_par_iter()
.zip(rnew)
.update(|data| {
let (res_col, rnew_col) = data;
for j in 0..amount_of_data {
let entry = self.sparse_matrix_persistor.get_entry(j);
for entry in self.sparse_matrix_reader.iter_entries() {
let elem = rnew_col.get_mut(entry.row as usize).unwrap();
let value = res_col.get(entry.col as usize).unwrap();
*elem += *value * entry.value
Expand All @@ -141,7 +132,7 @@ where
}

fn normalize(&self, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let mut row_sum = vec![0f32; entities_count];

for i in 0..(self.dimension as usize) {
Expand Down Expand Up @@ -179,7 +170,7 @@ where
{
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
Expand All @@ -192,21 +183,17 @@ where

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let entity_name_opt = entity_mapping_persistor.get_entity(hash.value);
if let Some(entity_name) = entity_name_opt {
let hash_occur = self
.sparse_matrix_persistor
.get_hash_occurrence(hash as u64);
let mut embedding: Vec<f32> = Vec::with_capacity(self.dimension as usize);
for j in 0..(self.dimension as usize) {
let col: &Vec<f32> = res.get(j).unwrap();
let value = col.get(i as usize).unwrap();
embedding.insert(j, *value);
}
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.put_data(&entity_name, hash.occurrence, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
Expand Down Expand Up @@ -246,28 +233,25 @@ fn log_broken_entities(broken_entities: HashSet<String>) {
/// Calculate embeddings with memory-mapped files.
pub fn calculate_embeddings_mmap<T1, T2, T3>(
config: Arc<Configuration>,
sparse_matrix: &SparseMatrix<T1>,
sparse_matrix_reader: Arc<T1>,
entity_mapping_persistor: Arc<T2>,
embedding_persistor: &mut T3,
) where
T1: SparseMatrixPersistor + Sync,
T2: EntityMappingPersistor + Sync,
T1: SparseMatrixReader + Sync + Send,
T2: EntityMappingPersistor,
T3: EmbeddingPersistor,
{
let matrix_id = sparse_matrix_reader.get_id();

let mult = MatrixMultiplicatorMMap {
dimension: config.embeddings_dimension,
sparse_matrix_id: sparse_matrix.get_id(),
sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor,
sparse_matrix_reader,
};
let init = mult.initialize();
let res = mult.propagate(config.max_number_of_iteration, init);
mult.persist(res, entity_mapping_persistor, embedding_persistor);

let work_file = format!(
"{}_matrix_{}",
sparse_matrix.get_id(),
config.max_number_of_iteration
);
let work_file = format!("{}_matrix_{}", matrix_id, config.max_number_of_iteration);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!(
"File {} can't be removed after work. Remove the file in order to save disk space.",
Expand All @@ -280,26 +264,25 @@ pub fn calculate_embeddings_mmap<T1, T2, T3>(

/// Provides matrix multiplication based on sparse matrix data.
#[derive(Debug)]
pub struct MatrixMultiplicatorMMap<'a, T: SparseMatrixPersistor + Sync> {
pub struct MatrixMultiplicatorMMap<T: SparseMatrixReader + Sync + Send> {
pub dimension: u16,
pub sparse_matrix_id: String,
pub sparse_matrix_persistor: &'a T,
pub sparse_matrix_reader: Arc<T>,
}

impl<'a, T> MatrixMultiplicatorMMap<'a, T>
impl<T> MatrixMultiplicatorMMap<T>
where
T: SparseMatrixPersistor + Sync,
T: SparseMatrixReader + Sync + Send,
{
fn initialize(&self) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();

info!(
"Start initialization. Dims: {}, entities: {}.",
self.dimension, entities_count
);

let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4;
let file_name = format!("{}_matrix_0", self.sparse_matrix_id);
let file_name = format!("{}_matrix_0", self.sparse_matrix_reader.get_id());
let file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -330,20 +313,17 @@ where
.for_each(|(i, chunk)| {
// i - number of dimension
// chunk - column/vector of bytes
for j in 0..entities_count as usize {
let hsh = self.sparse_matrix_persistor.get_hash(j as u32);
if hsh != -1 {
let col_value =
((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float;

let start_idx = j * 4;
let end_idx = start_idx + 4;
let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr();
unsafe {
let value = pointer as *mut f32;
*value = col_value;
};
}
for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32)
/ max_hash_float;

let start_idx = j * 4;
let end_idx = start_idx + 4;
let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr();
unsafe {
let value = pointer as *mut f32;
*value = col_value;
};
}
});

Expand All @@ -360,13 +340,13 @@ where
fn propagate(&self, max_iter: u8, res: MmapMut) -> MmapMut {
info!("Start propagating. Number of iterations: {}.", max_iter);

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
let mut new_res = res;
for i in 0..max_iter {
let next = self.next_power(i, new_res);
new_res = self.normalize(next);

let work_file = format!("{}_matrix_{}", self.sparse_matrix_id, i);
let work_file = format!("{}_matrix_{}", self.sparse_matrix_reader.get_id(), i);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!("File {} can't be removed after work. Remove the file in order to save disk space.", work_file)
});
Expand All @@ -376,18 +356,22 @@ where
i,
self.dimension,
entities_count,
self.sparse_matrix_persistor.get_amount_of_data()
self.sparse_matrix_reader.get_number_of_entries()
);
}
info!("Done propagating.");
new_res
}

fn next_power(&self, iteration: u8, res: MmapMut) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;

let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4;
let file_name = format!("{}_matrix_{}", self.sparse_matrix_id, iteration + 1);
let file_name = format!(
"{}_matrix_{}",
self.sparse_matrix_reader.get_id(),
iteration + 1
);
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -409,16 +393,12 @@ where
})
};

let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data();

let input = Arc::new(res);
mmap_output
.par_chunks_mut(entities_count * 4)
.enumerate()
.for_each_with(input, |input, (i, chunk)| {
for j in 0..amount_of_data {
let entry = self.sparse_matrix_persistor.get_entry(j);

for entry in self.sparse_matrix_reader.iter_entries() {
let start_idx_input = ((i * entities_count) + entry.col as usize) * 4;
let end_idx_input = start_idx_input + 4;
let pointer: *const u8 = (&input[start_idx_input..end_idx_input]).as_ptr();
Expand All @@ -445,7 +425,7 @@ where
}

fn normalize(&self, mut res: MmapMut) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let mut row_sum = vec![0f32; entities_count];

for i in 0..(self.dimension as usize) {
Expand Down Expand Up @@ -499,7 +479,7 @@ where
{
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
Expand All @@ -512,13 +492,9 @@ where

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let entity_name_opt = entity_mapping_persistor.get_entity(hash.value);
if let Some(entity_name) = entity_name_opt {
let hash_occur = self
.sparse_matrix_persistor
.get_hash_occurrence(hash as u64);
let mut embedding: Vec<f32> = Vec::with_capacity(self.dimension as usize);
for j in 0..(self.dimension as usize) {
let start_idx = ((j * entities_count as usize) + i as usize) * 4;
Expand All @@ -532,7 +508,7 @@ where
embedding.insert(j, value);
}
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.put_data(&entity_name, hash.occurrence, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
Expand Down
Loading

0 comments on commit 5b70a6c

Please sign in to comment.