Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup #17

Merged
merged 2 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "cleora"
version = "1.0.1"
authors = ["Piotr Babel <[email protected]>", "Jacek Dabrowski <[email protected]>"]
authors = ["Piotr Babel <[email protected]>", "Jacek Dabrowski <[email protected]>", "Konrad Goluchowski <[email protected]>"]
edition = "2018"
license-file = "LICENSE"
readme = "README.md"
Expand Down
148 changes: 62 additions & 86 deletions src/embedding.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::configuration::Configuration;
use crate::persistence::embedding::EmbeddingPersistor;
use crate::persistence::entity::EntityMappingPersistor;
use crate::persistence::sparse_matrix::SparseMatrixPersistor;
use crate::sparse_matrix::SparseMatrix;
use crate::sparse_matrix::SparseMatrixReader;
use log::{info, warn};
use memmap::MmapMut;
use rayon::prelude::*;
Expand All @@ -20,18 +19,17 @@ const LOGGED_NUMBER_OF_BROKEN_ENTITIES: usize = 20;
/// Calculate embeddings in memory.
pub fn calculate_embeddings<T1, T2, T3>(
config: Arc<Configuration>,
sparse_matrix: &SparseMatrix<T1>,
sparse_matrix_reader: Arc<T1>,
entity_mapping_persistor: Arc<T2>,
embedding_persistor: &mut T3,
) where
T1: SparseMatrixPersistor + Sync,
T2: EntityMappingPersistor + Sync,
T1: SparseMatrixReader + Sync + Send,
T2: EntityMappingPersistor,
T3: EmbeddingPersistor,
{
let mult = MatrixMultiplicator {
dimension: config.embeddings_dimension,
sparse_matrix_id: sparse_matrix.get_id(),
sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor,
sparse_matrix_reader,
};
let init = mult.initialize();
let res = mult.propagate(config.max_number_of_iteration, init);
Expand All @@ -42,18 +40,17 @@ pub fn calculate_embeddings<T1, T2, T3>(

/// Provides matrix multiplication based on sparse matrix data.
#[derive(Debug)]
pub struct MatrixMultiplicator<'a, T: SparseMatrixPersistor + Sync> {
pub struct MatrixMultiplicator<T: SparseMatrixReader + Sync + Send> {
pub dimension: u16,
pub sparse_matrix_id: String,
pub sparse_matrix_persistor: &'a T,
pub sparse_matrix_reader: Arc<T>,
}

impl<'a, T> MatrixMultiplicator<'a, T>
impl<T> MatrixMultiplicator<T>
where
T: SparseMatrixPersistor + Sync,
T: SparseMatrixReader + Sync + Send,
{
fn initialize(&self) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();

info!(
"Start initialization. Dims: {}, entities: {}.",
Expand All @@ -68,13 +65,10 @@ where
.into_par_iter()
.map(|i| {
let mut col: Vec<f32> = Vec::with_capacity(entities_count as usize);
for j in 0..entities_count {
let hsh = self.sparse_matrix_persistor.get_hash(j);
if hsh != -1 {
let col_value =
((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float;
col.insert(j as usize, col_value);
}
for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32)
/ max_hash_float;
col.insert(j as usize, col_value);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use col.push(col_value) here?

}
col
})
Expand All @@ -90,7 +84,7 @@ where
fn propagate(&self, max_iter: u8, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
info!("Start propagating. Number of iterations: {}.", max_iter);

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
let mut new_res = res;
for i in 0..max_iter {
let next = self.next_power(new_res);
Expand All @@ -100,26 +94,23 @@ where
i,
self.dimension,
entities_count,
self.sparse_matrix_persistor.get_amount_of_data()
self.sparse_matrix_reader.get_number_of_entries()
);
}
info!("Done propagating.");
new_res
}

fn next_power(&self, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let rnew = Self::zero_2d(entities_count, self.dimension as usize);

let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data();

let result: Vec<Vec<f32>> = res
.into_par_iter()
.zip(rnew)
.update(|data| {
let (res_col, rnew_col) = data;
for j in 0..amount_of_data {
let entry = self.sparse_matrix_persistor.get_entry(j);
for entry in self.sparse_matrix_reader.iter_entries() {
let elem = rnew_col.get_mut(entry.row as usize).unwrap();
let value = res_col.get(entry.col as usize).unwrap();
*elem += *value * entry.value
Expand All @@ -141,7 +132,7 @@ where
}

fn normalize(&self, res: Vec<Vec<f32>>) -> Vec<Vec<f32>> {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let mut row_sum = vec![0f32; entities_count];

for i in 0..(self.dimension as usize) {
Expand Down Expand Up @@ -179,7 +170,7 @@ where
{
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
Expand All @@ -192,21 +183,17 @@ where

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let entity_name_opt = entity_mapping_persistor.get_entity(hash.value);
if let Some(entity_name) = entity_name_opt {
let hash_occur = self
.sparse_matrix_persistor
.get_hash_occurrence(hash as u64);
let mut embedding: Vec<f32> = Vec::with_capacity(self.dimension as usize);
for j in 0..(self.dimension as usize) {
let col: &Vec<f32> = res.get(j).unwrap();
let value = col.get(i as usize).unwrap();
embedding.insert(j, *value);
}
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.put_data(&entity_name, hash.occurrence, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
Expand Down Expand Up @@ -246,28 +233,25 @@ fn log_broken_entities(broken_entities: HashSet<String>) {
/// Calculate embeddings with memory-mapped files.
pub fn calculate_embeddings_mmap<T1, T2, T3>(
config: Arc<Configuration>,
sparse_matrix: &SparseMatrix<T1>,
sparse_matrix_reader: Arc<T1>,
entity_mapping_persistor: Arc<T2>,
embedding_persistor: &mut T3,
) where
T1: SparseMatrixPersistor + Sync,
T2: EntityMappingPersistor + Sync,
T1: SparseMatrixReader + Sync + Send,
T2: EntityMappingPersistor,
T3: EmbeddingPersistor,
{
let matrix_id = sparse_matrix_reader.get_id();

let mult = MatrixMultiplicatorMMap {
dimension: config.embeddings_dimension,
sparse_matrix_id: sparse_matrix.get_id(),
sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor,
sparse_matrix_reader,
};
let init = mult.initialize();
let res = mult.propagate(config.max_number_of_iteration, init);
mult.persist(res, entity_mapping_persistor, embedding_persistor);

let work_file = format!(
"{}_matrix_{}",
sparse_matrix.get_id(),
config.max_number_of_iteration
);
let work_file = format!("{}_matrix_{}", matrix_id, config.max_number_of_iteration);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!(
"File {} can't be removed after work. Remove the file in order to save disk space.",
Expand All @@ -280,26 +264,25 @@ pub fn calculate_embeddings_mmap<T1, T2, T3>(

/// Provides matrix multiplication based on sparse matrix data.
#[derive(Debug)]
pub struct MatrixMultiplicatorMMap<'a, T: SparseMatrixPersistor + Sync> {
pub struct MatrixMultiplicatorMMap<T: SparseMatrixReader + Sync + Send> {
pub dimension: u16,
pub sparse_matrix_id: String,
pub sparse_matrix_persistor: &'a T,
pub sparse_matrix_reader: Arc<T>,
}

impl<'a, T> MatrixMultiplicatorMMap<'a, T>
impl<T> MatrixMultiplicatorMMap<T>
where
T: SparseMatrixPersistor + Sync,
T: SparseMatrixReader + Sync + Send,
{
fn initialize(&self) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();

info!(
"Start initialization. Dims: {}, entities: {}.",
self.dimension, entities_count
);

let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4;
let file_name = format!("{}_matrix_0", self.sparse_matrix_id);
let file_name = format!("{}_matrix_0", self.sparse_matrix_reader.get_id());
let file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -330,20 +313,17 @@ where
.for_each(|(i, chunk)| {
// i - number of dimension
// chunk - column/vector of bytes
for j in 0..entities_count as usize {
let hsh = self.sparse_matrix_persistor.get_hash(j as u32);
if hsh != -1 {
let col_value =
((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float;

let start_idx = j * 4;
let end_idx = start_idx + 4;
let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr();
unsafe {
let value = pointer as *mut f32;
*value = col_value;
};
}
for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32)
/ max_hash_float;

let start_idx = j * 4;
let end_idx = start_idx + 4;
let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr();
unsafe {
let value = pointer as *mut f32;
*value = col_value;
};
}
});

Expand All @@ -360,13 +340,13 @@ where
fn propagate(&self, max_iter: u8, res: MmapMut) -> MmapMut {
info!("Start propagating. Number of iterations: {}.", max_iter);

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
let mut new_res = res;
for i in 0..max_iter {
let next = self.next_power(i, new_res);
new_res = self.normalize(next);

let work_file = format!("{}_matrix_{}", self.sparse_matrix_id, i);
let work_file = format!("{}_matrix_{}", self.sparse_matrix_reader.get_id(), i);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!("File {} can't be removed after work. Remove the file in order to save disk space.", work_file)
});
Expand All @@ -376,18 +356,22 @@ where
i,
self.dimension,
entities_count,
self.sparse_matrix_persistor.get_amount_of_data()
self.sparse_matrix_reader.get_number_of_entries()
);
}
info!("Done propagating.");
new_res
}

fn next_power(&self, iteration: u8, res: MmapMut) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;

let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4;
let file_name = format!("{}_matrix_{}", self.sparse_matrix_id, iteration + 1);
let file_name = format!(
"{}_matrix_{}",
self.sparse_matrix_reader.get_id(),
iteration + 1
);
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -409,16 +393,12 @@ where
})
};

let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data();

let input = Arc::new(res);
mmap_output
.par_chunks_mut(entities_count * 4)
.enumerate()
.for_each_with(input, |input, (i, chunk)| {
for j in 0..amount_of_data {
let entry = self.sparse_matrix_persistor.get_entry(j);

for entry in self.sparse_matrix_reader.iter_entries() {
let start_idx_input = ((i * entities_count) + entry.col as usize) * 4;
let end_idx_input = start_idx_input + 4;
let pointer: *const u8 = (&input[start_idx_input..end_idx_input]).as_ptr();
Expand All @@ -445,7 +425,7 @@ where
}

fn normalize(&self, mut res: MmapMut) -> MmapMut {
let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize;
let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize;
let mut row_sum = vec![0f32; entities_count];

for i in 0..(self.dimension as usize) {
Expand Down Expand Up @@ -499,7 +479,7 @@ where
{
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
let entities_count = self.sparse_matrix_reader.get_number_of_entities();
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
Expand All @@ -512,13 +492,9 @@ where

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() {
let entity_name_opt = entity_mapping_persistor.get_entity(hash.value);
if let Some(entity_name) = entity_name_opt {
let hash_occur = self
.sparse_matrix_persistor
.get_hash_occurrence(hash as u64);
let mut embedding: Vec<f32> = Vec::with_capacity(self.dimension as usize);
for j in 0..(self.dimension as usize) {
let start_idx = ((j * entities_count as usize) + i as usize) * 4;
Expand All @@ -532,7 +508,7 @@ where
embedding.insert(j, value);
}
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.put_data(&entity_name, hash.occurrence, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
Expand Down
Loading