From 57332f174c5700c51e21b6c430477e0e9a553612 Mon Sep 17 00:00:00 2001 From: Piotr Babel Date: Sat, 19 Dec 2020 17:09:15 +0100 Subject: [PATCH 1/2] Unify and simplify SparseMatrix construction. --- src/embedding.rs | 148 ++++++-------- src/persistence.rs | 172 ---------------- src/pipeline.rs | 10 +- src/sparse_matrix.rs | 469 +++++++++++++++++++++++++++++++++++-------- tests/snapshot.rs | 7 +- 5 files changed, 461 insertions(+), 345 deletions(-) diff --git a/src/embedding.rs b/src/embedding.rs index 0cf65b2..deb0ace 100644 --- a/src/embedding.rs +++ b/src/embedding.rs @@ -1,8 +1,7 @@ use crate::configuration::Configuration; use crate::persistence::embedding::EmbeddingPersistor; use crate::persistence::entity::EntityMappingPersistor; -use crate::persistence::sparse_matrix::SparseMatrixPersistor; -use crate::sparse_matrix::SparseMatrix; +use crate::sparse_matrix::SparseMatrixReader; use log::{info, warn}; use memmap::MmapMut; use rayon::prelude::*; @@ -20,18 +19,17 @@ const LOGGED_NUMBER_OF_BROKEN_ENTITIES: usize = 20; /// Calculate embeddings in memory. pub fn calculate_embeddings( config: Arc, - sparse_matrix: &SparseMatrix, + sparse_matrix_reader: Arc, entity_mapping_persistor: Arc, embedding_persistor: &mut T3, ) where - T1: SparseMatrixPersistor + Sync, - T2: EntityMappingPersistor + Sync, + T1: SparseMatrixReader + Sync + Send, + T2: EntityMappingPersistor, T3: EmbeddingPersistor, { let mult = MatrixMultiplicator { dimension: config.embeddings_dimension, - sparse_matrix_id: sparse_matrix.get_id(), - sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor, + sparse_matrix_reader, }; let init = mult.initialize(); let res = mult.propagate(config.max_number_of_iteration, init); @@ -42,18 +40,17 @@ pub fn calculate_embeddings( /// Provides matrix multiplication based on sparse matrix data. #[derive(Debug)] -pub struct MatrixMultiplicator<'a, T: SparseMatrixPersistor + Sync> { +pub struct MatrixMultiplicator { pub dimension: u16, - pub sparse_matrix_id: String, - pub sparse_matrix_persistor: &'a T, + pub sparse_matrix_reader: Arc, } -impl<'a, T> MatrixMultiplicator<'a, T> +impl MatrixMultiplicator where - T: SparseMatrixPersistor + Sync, + T: SparseMatrixReader + Sync + Send, { fn initialize(&self) -> Vec> { - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); info!( "Start initialization. Dims: {}, entities: {}.", @@ -68,13 +65,10 @@ where .into_par_iter() .map(|i| { let mut col: Vec = Vec::with_capacity(entities_count as usize); - for j in 0..entities_count { - let hsh = self.sparse_matrix_persistor.get_hash(j); - if hsh != -1 { - let col_value = - ((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float; - col.insert(j as usize, col_value); - } + for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() { + let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32) + / max_hash_float; + col.insert(j as usize, col_value); } col }) @@ -90,7 +84,7 @@ where fn propagate(&self, max_iter: u8, res: Vec>) -> Vec> { info!("Start propagating. Number of iterations: {}.", max_iter); - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); let mut new_res = res; for i in 0..max_iter { let next = self.next_power(new_res); @@ -100,7 +94,7 @@ where i, self.dimension, entities_count, - self.sparse_matrix_persistor.get_amount_of_data() + self.sparse_matrix_reader.get_number_of_entries() ); } info!("Done propagating."); @@ -108,18 +102,15 @@ where } fn next_power(&self, res: Vec>) -> Vec> { - let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize; + let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize; let rnew = Self::zero_2d(entities_count, self.dimension as usize); - let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data(); - let result: Vec> = res .into_par_iter() .zip(rnew) .update(|data| { let (res_col, rnew_col) = data; - for j in 0..amount_of_data { - let entry = self.sparse_matrix_persistor.get_entry(j); + for entry in self.sparse_matrix_reader.iter_entries() { let elem = rnew_col.get_mut(entry.row as usize).unwrap(); let value = res_col.get(entry.col as usize).unwrap(); *elem += *value * entry.value @@ -141,7 +132,7 @@ where } fn normalize(&self, res: Vec>) -> Vec> { - let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize; + let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize; let mut row_sum = vec![0f32; entities_count]; for i in 0..(self.dimension as usize) { @@ -179,7 +170,7 @@ where { info!("Start saving embeddings."); - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); embedding_persistor .put_metadata(entities_count, self.dimension) .unwrap_or_else(|_| { @@ -192,13 +183,9 @@ where // entities which can't be written to the file (error occurs) let mut broken_entities = HashSet::new(); - for i in 0..entities_count { - let hash = self.sparse_matrix_persistor.get_hash(i); - let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64); + for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() { + let entity_name_opt = entity_mapping_persistor.get_entity(hash.value); if let Some(entity_name) = entity_name_opt { - let hash_occur = self - .sparse_matrix_persistor - .get_hash_occurrence(hash as u64); let mut embedding: Vec = Vec::with_capacity(self.dimension as usize); for j in 0..(self.dimension as usize) { let col: &Vec = res.get(j).unwrap(); @@ -206,7 +193,7 @@ where embedding.insert(j, *value); } embedding_persistor - .put_data(&entity_name, hash_occur, embedding) + .put_data(&entity_name, hash.occurrence, embedding) .unwrap_or_else(|_| { broken_entities.insert(entity_name); }); @@ -246,28 +233,25 @@ fn log_broken_entities(broken_entities: HashSet) { /// Calculate embeddings with memory-mapped files. pub fn calculate_embeddings_mmap( config: Arc, - sparse_matrix: &SparseMatrix, + sparse_matrix_reader: Arc, entity_mapping_persistor: Arc, embedding_persistor: &mut T3, ) where - T1: SparseMatrixPersistor + Sync, - T2: EntityMappingPersistor + Sync, + T1: SparseMatrixReader + Sync + Send, + T2: EntityMappingPersistor, T3: EmbeddingPersistor, { + let matrix_id = sparse_matrix_reader.get_id(); + let mult = MatrixMultiplicatorMMap { dimension: config.embeddings_dimension, - sparse_matrix_id: sparse_matrix.get_id(), - sparse_matrix_persistor: &sparse_matrix.sparse_matrix_persistor, + sparse_matrix_reader, }; let init = mult.initialize(); let res = mult.propagate(config.max_number_of_iteration, init); mult.persist(res, entity_mapping_persistor, embedding_persistor); - let work_file = format!( - "{}_matrix_{}", - sparse_matrix.get_id(), - config.max_number_of_iteration - ); + let work_file = format!("{}_matrix_{}", matrix_id, config.max_number_of_iteration); fs::remove_file(&work_file).unwrap_or_else(|_| { warn!( "File {} can't be removed after work. Remove the file in order to save disk space.", @@ -280,18 +264,17 @@ pub fn calculate_embeddings_mmap( /// Provides matrix multiplication based on sparse matrix data. #[derive(Debug)] -pub struct MatrixMultiplicatorMMap<'a, T: SparseMatrixPersistor + Sync> { +pub struct MatrixMultiplicatorMMap { pub dimension: u16, - pub sparse_matrix_id: String, - pub sparse_matrix_persistor: &'a T, + pub sparse_matrix_reader: Arc, } -impl<'a, T> MatrixMultiplicatorMMap<'a, T> +impl MatrixMultiplicatorMMap where - T: SparseMatrixPersistor + Sync, + T: SparseMatrixReader + Sync + Send, { fn initialize(&self) -> MmapMut { - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); info!( "Start initialization. Dims: {}, entities: {}.", @@ -299,7 +282,7 @@ where ); let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4; - let file_name = format!("{}_matrix_0", self.sparse_matrix_id); + let file_name = format!("{}_matrix_0", self.sparse_matrix_reader.get_id()); let file = OpenOptions::new() .read(true) .write(true) @@ -330,20 +313,17 @@ where .for_each(|(i, chunk)| { // i - number of dimension // chunk - column/vector of bytes - for j in 0..entities_count as usize { - let hsh = self.sparse_matrix_persistor.get_hash(j as u32); - if hsh != -1 { - let col_value = - ((hash(hsh + (i as i64)) % max_hash) as f32) / max_hash_float; - - let start_idx = j * 4; - let end_idx = start_idx + 4; - let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr(); - unsafe { - let value = pointer as *mut f32; - *value = col_value; - }; - } + for (j, hsh) in self.sparse_matrix_reader.iter_hashes().enumerate() { + let col_value = ((hash((hsh.value as i64) + (i as i64)) % max_hash) as f32) + / max_hash_float; + + let start_idx = j * 4; + let end_idx = start_idx + 4; + let pointer: *mut u8 = (&mut chunk[start_idx..end_idx]).as_mut_ptr(); + unsafe { + let value = pointer as *mut f32; + *value = col_value; + }; } }); @@ -360,13 +340,13 @@ where fn propagate(&self, max_iter: u8, res: MmapMut) -> MmapMut { info!("Start propagating. Number of iterations: {}.", max_iter); - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); let mut new_res = res; for i in 0..max_iter { let next = self.next_power(i, new_res); new_res = self.normalize(next); - let work_file = format!("{}_matrix_{}", self.sparse_matrix_id, i); + let work_file = format!("{}_matrix_{}", self.sparse_matrix_reader.get_id(), i); fs::remove_file(&work_file).unwrap_or_else(|_| { warn!("File {} can't be removed after work. Remove the file in order to save disk space.", work_file) }); @@ -376,7 +356,7 @@ where i, self.dimension, entities_count, - self.sparse_matrix_persistor.get_amount_of_data() + self.sparse_matrix_reader.get_number_of_entries() ); } info!("Done propagating."); @@ -384,10 +364,14 @@ where } fn next_power(&self, iteration: u8, res: MmapMut) -> MmapMut { - let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize; + let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize; let number_of_bytes = entities_count as u64 * self.dimension as u64 * 4; - let file_name = format!("{}_matrix_{}", self.sparse_matrix_id, iteration + 1); + let file_name = format!( + "{}_matrix_{}", + self.sparse_matrix_reader.get_id(), + iteration + 1 + ); let file = OpenOptions::new() .read(true) .write(true) @@ -409,16 +393,12 @@ where }) }; - let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data(); - let input = Arc::new(res); mmap_output .par_chunks_mut(entities_count * 4) .enumerate() .for_each_with(input, |input, (i, chunk)| { - for j in 0..amount_of_data { - let entry = self.sparse_matrix_persistor.get_entry(j); - + for entry in self.sparse_matrix_reader.iter_entries() { let start_idx_input = ((i * entities_count) + entry.col as usize) * 4; let end_idx_input = start_idx_input + 4; let pointer: *const u8 = (&input[start_idx_input..end_idx_input]).as_ptr(); @@ -445,7 +425,7 @@ where } fn normalize(&self, mut res: MmapMut) -> MmapMut { - let entities_count = self.sparse_matrix_persistor.get_entity_counter() as usize; + let entities_count = self.sparse_matrix_reader.get_number_of_entities() as usize; let mut row_sum = vec![0f32; entities_count]; for i in 0..(self.dimension as usize) { @@ -499,7 +479,7 @@ where { info!("Start saving embeddings."); - let entities_count = self.sparse_matrix_persistor.get_entity_counter(); + let entities_count = self.sparse_matrix_reader.get_number_of_entities(); embedding_persistor .put_metadata(entities_count, self.dimension) .unwrap_or_else(|_| { @@ -512,13 +492,9 @@ where // entities which can't be written to the file (error occurs) let mut broken_entities = HashSet::new(); - for i in 0..entities_count { - let hash = self.sparse_matrix_persistor.get_hash(i); - let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64); + for (i, hash) in self.sparse_matrix_reader.iter_hashes().enumerate() { + let entity_name_opt = entity_mapping_persistor.get_entity(hash.value); if let Some(entity_name) = entity_name_opt { - let hash_occur = self - .sparse_matrix_persistor - .get_hash_occurrence(hash as u64); let mut embedding: Vec = Vec::with_capacity(self.dimension as usize); for j in 0..(self.dimension as usize) { let start_idx = ((j * entities_count as usize) + i as usize) * 4; @@ -532,7 +508,7 @@ where embedding.insert(j, value); } embedding_persistor - .put_data(&entity_name, hash_occur, embedding) + .put_data(&entity_name, hash.occurrence, embedding) .unwrap_or_else(|_| { broken_entities.insert(entity_name); }); diff --git a/src/persistence.rs b/src/persistence.rs index f0781e1..c7142df 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -31,178 +31,6 @@ pub mod entity { } } -pub mod sparse_matrix { - use log::info; - use rustc_hash::FxHashMap; - use std::collections::hash_map; - use std::mem; - - #[derive(Debug, Clone, Copy)] - pub struct Entry { - pub row: u32, - pub col: u32, - pub value: f32, - } - - pub struct EntryIter<'a>(std::slice::Iter<'a, Entry>); - - impl Iterator for EntryIter<'_> { - type Item = Entry; - - #[inline(always)] - fn next(&mut self) -> Option { - self.0.next().copied() - } - } - - pub trait SparseMatrixPersistor { - fn increment_hash_occurrence(&mut self, hash: u64) -> u32; - fn get_hash_occurrence(&self, hash: u64) -> u32; - fn get_id(&self, hash: u64) -> i32; - fn get_hash(&self, id: u32) -> i64; - fn get_entity_counter(&self) -> u32; - fn get_or_add_id_by_hash(&mut self, hash: u64) -> u32; - fn increment_edge_counter(&mut self) -> u32; - fn get_amount_of_data(&self) -> u32; - fn get_or_add_pair_index(&mut self, magic: u64, pos: u32) -> u32; - fn update_row_sum(&mut self, id: u32, val: f32); - fn get_row_sum(&self, id: u32) -> f32; - fn add_new_entry(&mut self, pos: u32, entry: Entry); - fn update_entry(&mut self, pos: u32, entry: Entry); - fn get_entry(&self, pos: u32) -> Entry; - fn replace_entry(&mut self, pos: u32, entry: Entry); - fn iter_entries(&self) -> EntryIter<'_>; - fn finish(&self); - } - - #[derive(Debug, Default)] - pub struct InMemorySparseMatrixPersistor { - edge_count: u32, - hash_2_id: FxHashMap, - id_2_hash: Vec, - hash_2_count: FxHashMap, - row_sum: Vec, - pair_index: FxHashMap, - entries: Vec, - } - - impl SparseMatrixPersistor for InMemorySparseMatrixPersistor { - fn increment_hash_occurrence(&mut self, hash: u64) -> u32 { - let value = self.hash_2_count.entry(hash).or_insert(0); - *value += 1; - *value - } - - fn get_hash_occurrence(&self, hash: u64) -> u32 { - *self.hash_2_count.get(&hash).unwrap() - } - - fn get_id(&self, hash: u64) -> i32 { - match self.hash_2_id.get(&hash) { - Some(value) => *value as i32, - None => -1i32, - } - } - - fn get_hash(&self, id: u32) -> i64 { - match self.id_2_hash.get(id as usize) { - Some(value) => *value as i64, - None => -1i64, - } - } - - fn get_entity_counter(&self) -> u32 { - self.id_2_hash.len() as u32 - } - - fn get_or_add_id_by_hash(&mut self, hash: u64) -> u32 { - match self.hash_2_id.entry(hash) { - hash_map::Entry::Vacant(entry) => { - let id = self.id_2_hash.len() as u32; - entry.insert(id); - self.id_2_hash.push(hash); - id - } - hash_map::Entry::Occupied(entry) => *entry.get(), - } - } - - fn increment_edge_counter(&mut self) -> u32 { - self.edge_count += 1; - self.edge_count - } - - fn get_amount_of_data(&self) -> u32 { - self.entries.len() as u32 - } - - fn get_or_add_pair_index(&mut self, magic: u64, pos: u32) -> u32 { - *self.pair_index.entry(magic).or_insert(pos) - } - - fn update_row_sum(&mut self, id: u32, val: f32) { - let id = id as usize; - if self.row_sum.len() == id { - self.row_sum.push(val); - } else { - self.row_sum[id] += val; - }; - } - - fn get_row_sum(&self, id: u32) -> f32 { - self.row_sum[id as usize] - } - - fn add_new_entry(&mut self, _pos: u32, entry: Entry) { - self.entries.push(entry); - } - - fn update_entry(&mut self, pos: u32, entry: Entry) { - self.entries[pos as usize].value += entry.value; - } - - fn get_entry(&self, pos: u32) -> Entry { - self.entries[pos as usize] - } - - #[inline(always)] - fn iter_entries(&self) -> EntryIter<'_> { - EntryIter(self.entries.iter()) - } - - fn replace_entry(&mut self, pos: u32, entry: Entry) { - self.entries[pos as usize] = entry - } - - fn finish(&self) { - info!("Number of entities: {}", self.get_entity_counter()); - info!("Number of edges: {}", self.edge_count); - info!("Number of entries: {}", self.entries.len()); - - let hash_2_id_mem_size = self.hash_2_id.capacity() * 12; - let id_2_hash_mem_size = self.id_2_hash.capacity() * 8; - let hash_2_count_mem_size = self.hash_2_count.capacity() * 12; - let row_sum_mem_size = self.row_sum.capacity() * 4; - let pair_index_mem_size = self.pair_index.capacity() * 12; - - let entry_mem_size = mem::size_of::(); - let entries_mem_size = self.entries.capacity() * entry_mem_size; - - let total_mem_size = hash_2_id_mem_size - + id_2_hash_mem_size - + hash_2_count_mem_size - + row_sum_mem_size - + pair_index_mem_size - + entries_mem_size; - - info!( - "Total memory usage by the struct ~ {} MB", - (total_mem_size / 1048576) - ); - } - } -} - pub mod embedding { use std::fs::File; use std::io; diff --git a/src/pipeline.rs b/src/pipeline.rs index 2e3b0a1..ff185da 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -6,7 +6,6 @@ use crate::embedding::{calculate_embeddings, calculate_embeddings_mmap}; use crate::entity::{EntityProcessor, SMALL_VECTOR_SIZE}; use crate::persistence::embedding::TextFileVectorPersistor; use crate::persistence::entity::InMemoryEntityMappingPersistor; -use crate::persistence::sparse_matrix::InMemorySparseMatrixPersistor; use crate::sparse_matrix::{create_sparse_matrices, SparseMatrix}; use bus::Bus; use log::{error, info}; @@ -21,7 +20,7 @@ use std::thread; pub fn build_graphs( config: &Configuration, in_memory_entity_mapping_persistor: Arc, -) -> Vec> { +) -> Vec { let sparse_matrices = create_sparse_matrices(&config.columns); dbg!(&sparse_matrices); @@ -157,11 +156,12 @@ fn parse_tsv_line(line: &str) -> Vec> { pub fn train( config: Configuration, in_memory_entity_mapping_persistor: Arc, - sparse_matrices: Vec>, + sparse_matrices: Vec, ) { let config = Arc::new(config); let mut embedding_threads = Vec::new(); for sparse_matrix in sparse_matrices { + let sparse_matrix = Arc::new(sparse_matrix); let config = config.clone(); let in_memory_entity_mapping_persistor = in_memory_entity_mapping_persistor.clone(); let handle = thread::spawn(move || { @@ -181,14 +181,14 @@ pub fn train( if config.in_memory_embedding_calculation { calculate_embeddings( config.clone(), - &sparse_matrix, + sparse_matrix.clone(), in_memory_entity_mapping_persistor, &mut text_file_embedding_persistor, ); } else { calculate_embeddings_mmap( config.clone(), - &sparse_matrix, + sparse_matrix.clone(), in_memory_entity_mapping_persistor, &mut text_file_embedding_persistor, ); diff --git a/src/sparse_matrix.rs b/src/sparse_matrix.rs index 0dcb3b6..5f3b263 100644 --- a/src/sparse_matrix.rs +++ b/src/sparse_matrix.rs @@ -1,13 +1,14 @@ use crate::configuration::Column; -use crate::persistence::sparse_matrix::{ - Entry, InMemorySparseMatrixPersistor, SparseMatrixPersistor, -}; +use log::info; +use rustc_hash::FxHashMap; +use std::collections::hash_map; +use std::mem; /// Creates combinations of column pairs as sparse matrices. /// Let's say that we have such columns configuration: complex::a reflexive::complex::b c. This is provided -/// as Array[Column] after parsing the config. +/// as `&[Column]` after parsing the config. /// The allowed column modifiers are: -/// - transient - the field is virtual - it is considered during embedding process, no output file is written for the field, +/// - transient - the field is virtual - it is considered during embedding process, no entity is written for the column, /// - complex - the field is composite, containing multiple entity identifiers separated by space, /// - reflexive - the field is reflexive, which means that it interacts with itself, additional output file is written for every such field. /// We create sparse matrix for every columns relations (based on column modifiers). @@ -17,12 +18,10 @@ use crate::persistence::sparse_matrix::{ /// - sparse matrix for column b and c, /// - sparse matrix for column b and b (reflexive column). /// Apart from column names in sparse matrix we provide indices for incoming data. We have 3 columns such as a, b and c -/// but column b is reflexive so we need to include this column. The result is: Array(a, b, c, b). +/// but column b is reflexive so we need to include this column. The result is: (a, b, c, b). /// The rule is that every reflexive column is append with the order of occurrence to the end of constructed array. -/// `cols` columns configuration with transient, reflexive, complex marks -/// return sparse matrices for columns configuration. -pub fn create_sparse_matrices(cols: &[Column]) -> Vec> { - let mut sparse_matrices: Vec> = Vec::new(); +pub fn create_sparse_matrices(cols: &[Column]) -> Vec { + let mut sparse_matrices: Vec = Vec::new(); let num_fields = cols.len(); let mut reflexive_count = 0; @@ -31,24 +30,14 @@ pub fn create_sparse_matrices(cols: &[Column]) -> Vec Vec { +pub struct SparseMatrix { + /// First column index for which we creates subgraph pub col_a_id: u8, + + /// First column name pub col_a_name: String, + + /// Second column index for which we creates subgraph pub col_b_id: u8, + + /// Second column name pub col_b_name: String, - pub sparse_matrix_persistor: T, + + /// Counts every occurrence of entity relationships from first and second column + edge_count: u32, + + /// Maps entity hash to the id in such a way that each new hash gets another id (id + 1) + hash_2_id: FxHashMap, + + /// Maps id to hash value and occurrence + id_2_hash: Vec, + + /// Holds the sum of the values for each row + row_sum: Vec, + + /// Maps a unique value (as combination of two numbers) to `entries` index + pair_index: FxHashMap, + + /// Coordinates and values of nonzero entities + entries: Vec, +} + +/// Hash data +#[derive(Debug, Clone, Copy)] +pub struct Hash { + /// Value of the hash + pub value: u64, + + /// Number of hash occurrences + pub occurrence: u32, +} + +impl Hash { + fn new(value: u64) -> Self { + Self { + value, + occurrence: 1, + } + } +} + +/// Sparse matrix coordinate entry +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +pub struct Entry { + /// Matrix row + pub row: u32, + + /// Matrix column + pub col: u32, + + /// Matrix value + pub value: f32, +} + +/// Sparse matrix reader used in embedding process +pub trait SparseMatrixReader { + /// Returns sparse matrix identifier + fn get_id(&self) -> String; + + /// Returns total number of unique entities + fn get_number_of_entities(&self) -> u32; + + /// Returns total number of entries + fn get_number_of_entries(&self) -> u32; + + /// Returns iterator for hash data such as id and occurrence + fn iter_hashes(&self) -> CopyIter<'_, Hash>; + + /// Returns iterator for entries + fn iter_entries(&self) -> CopyIter<'_, Entry>; +} + +pub struct CopyIter<'a, T: Copy>(std::slice::Iter<'a, T>); + +impl Iterator for CopyIter<'_, T> { + type Item = T; + + #[inline] + fn next(&mut self) -> Option { + self.0.next().copied() + } } -impl SparseMatrix -where - T: SparseMatrixPersistor + Sync, -{ +impl SparseMatrix { + pub fn new(col_a_id: u8, col_a_name: String, col_b_id: u8, col_b_name: String) -> Self { + Self { + col_a_id, + col_a_name, + col_b_id, + col_b_name, + edge_count: 0, + hash_2_id: FxHashMap::default(), + id_2_hash: Vec::new(), + row_sum: Vec::new(), + pair_index: FxHashMap::default(), + entries: Vec::new(), + } + } + /// Handles hashes for one combination of incoming data. Let's say that input row looks like: /// userId1 | productId1, productId2 | brandId1, brandId2 /// Note! To simplify explanation there is no any reflexive column so the result is: @@ -76,10 +165,9 @@ where /// (userId1, productId1, brandId2), /// (userId1, productId2, brandId1), /// (userId1, productId2, brandId2) - /// These cartesian products are provided as array of hashes (long values). Sparse matrix has indices (to corresponding columns) - /// in order to read interesting hashes from provided array. - /// For one input row we actually call this function 4 times. - /// `hashes` - it contains array of hashes + /// These cartesian products are provided as array of hashes. Sparse matrix has indices + /// `col_a_id` and `col_b_id` (to corresponding columns) in order to read interesting hashes + /// from provided slice. For one input row we actually call this function 4 times. pub fn handle_pair(&mut self, hashes: &[u64]) { let a = self.col_a_id; let b = self.col_b_id; @@ -90,11 +178,6 @@ where ); } - pub fn finish(&mut self) { - self.normalize(); - self.sparse_matrix_persistor.finish(); - } - /// It creates sparse matrix for two columns in the incoming data. /// Let's say that we have such columns: /// customers | products | brands @@ -112,49 +195,55 @@ where /// `b_hash` - hash of a entity for a column B /// `count` - total number of combinations in a row fn add_pair_symmetric(&mut self, a_hash: u64, b_hash: u64, count: u64) { - let a = self.get_or_create_id(a_hash); - let b = self.get_or_create_id(b_hash); + let a = self.update_hash_and_get_id(a_hash); + let b = self.update_hash_and_get_id(b_hash); + let value = 1f32 / (count as f32); - self.sparse_matrix_persistor.increment_edge_counter(); - self.sparse_matrix_persistor - .increment_hash_occurrence(a_hash); - self.sparse_matrix_persistor - .increment_hash_occurrence(b_hash); + self.edge_count += 1; self.add_or_update_entry(a, b, value); self.add_or_update_entry(b, a, value); - self.sparse_matrix_persistor.update_row_sum(a, value); - self.sparse_matrix_persistor.update_row_sum(b, value); + self.update_row_sum(a, value); + self.update_row_sum(b, value); } - fn get_or_create_id(&mut self, hash: u64) -> u32 { - self.sparse_matrix_persistor.get_or_add_id_by_hash(hash) + fn update_hash_and_get_id(&mut self, hash: u64) -> u32 { + match self.hash_2_id.entry(hash) { + hash_map::Entry::Vacant(entry) => { + let id = self.id_2_hash.len() as u32; + entry.insert(id); + self.id_2_hash.push(Hash::new(hash)); + id + } + hash_map::Entry::Occupied(entry) => { + let id = *entry.get(); + self.id_2_hash[id as usize].occurrence += 1; + id + } + } } fn add_or_update_entry(&mut self, x: u32, y: u32, val: f32) { - let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data(); - let position = self.get_or_put_position(x, y, amount_of_data); + let magic = Self::magic_pair(x, y); + let num_of_entries = self.entries.len() as u32; + let position = *self.pair_index.entry(magic).or_insert(num_of_entries); - let entry = Entry { - row: x, - col: y, - value: val, - }; - if position != amount_of_data { - self.sparse_matrix_persistor.update_entry(position, entry); + if position < num_of_entries { + self.entries[position as usize].value += val; } else { - self.sparse_matrix_persistor.add_new_entry(position, entry); + let entry = Entry { + row: x, + col: y, + value: val, + }; + self.entries.push(entry); } } - fn get_or_put_position(&mut self, a: u32, b: u32, tentative: u32) -> u32 { - let magic = Self::magic_pair(a, b); - self.sparse_matrix_persistor - .get_or_add_pair_index(magic, tentative) - } - + /// Combining two numbers into a unique one: pairing functions. + /// It uses "elegant pairing" (https://odino.org/combining-two-numbers-into-a-unique-one-pairing-functions/). fn magic_pair(a: u32, b: u32) -> u64 { let x = u64::from(a); let y = u64::from(b); @@ -165,23 +254,245 @@ where } } + fn update_row_sum(&mut self, id: u32, val: f32) { + let id = id as usize; + if id < self.row_sum.len() { + self.row_sum[id] += val; + } else { + self.row_sum.push(val); + }; + } + + /// Normalization and other tasks after sparse matrix construction. + pub fn finish(&mut self) { + self.normalize(); + + info!("Number of entities: {}", self.get_number_of_entities()); + info!("Number of edges: {}", self.edge_count); + info!("Number of entries: {}", self.get_number_of_entries()); + + let hash_2_id_mem_size = self.hash_2_id.capacity() * 12; + let hash_mem_size = mem::size_of::(); + let id_2_hash_mem_size = self.id_2_hash.capacity() * hash_mem_size; + let row_sum_mem_size = self.row_sum.capacity() * 4; + let pair_index_mem_size = self.pair_index.capacity() * 12; + + let entry_mem_size = mem::size_of::(); + let entries_mem_size = self.entries.capacity() * entry_mem_size; + + let total_mem_size = hash_2_id_mem_size + + id_2_hash_mem_size + + row_sum_mem_size + + pair_index_mem_size + + entries_mem_size; + + info!( + "Total memory usage by the struct ~ {} MB", + (total_mem_size / 1048576) + ); + } + + /// Normalize entries by dividing every entry value by row sum fn normalize(&mut self) { - let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data(); - for i in 0..amount_of_data { - let entry = self.sparse_matrix_persistor.get_entry(i); - let new_value = entry.value / self.sparse_matrix_persistor.get_row_sum(entry.row); - self.sparse_matrix_persistor.replace_entry( - i, - Entry { - row: entry.row, - col: entry.col, - value: new_value, - }, - ); + for entry in self.entries.iter_mut() { + entry.value /= self.row_sum[entry.row as usize]; } } +} - pub fn get_id(&self) -> String { +impl SparseMatrixReader for SparseMatrix { + fn get_id(&self) -> String { format!("{}_{}", self.col_a_id, self.col_b_id) } + + fn get_number_of_entities(&self) -> u32 { + self.id_2_hash.len() as u32 + } + + fn get_number_of_entries(&self) -> u32 { + self.entries.len() as u32 + } + + #[inline] + fn iter_hashes(&self) -> CopyIter<'_, Hash> { + CopyIter(self.id_2_hash.iter()) + } + + #[inline] + fn iter_entries(&self) -> CopyIter<'_, Entry> { + CopyIter(self.entries.iter()) + } +} + +#[cfg(test)] +mod tests { + use crate::configuration::Column; + use crate::sparse_matrix::{create_sparse_matrices, Entry, SparseMatrix, SparseMatrixReader}; + use rustc_hash::FxHasher; + use std::collections::{HashMap, HashSet}; + use std::hash::Hasher; + + fn map_to_ids_and_names(sparse_matrices: &[SparseMatrix]) -> HashSet<(u8, &str, u8, &str)> { + sparse_matrices + .iter() + .map(|sm| { + ( + sm.col_a_id, + sm.col_a_name.as_str(), + sm.col_b_id, + sm.col_b_name.as_str(), + ) + }) + .collect() + } + + fn prepare_entries(hash_2_id: HashMap, edges: Vec<(&str, &str, f32)>) -> Vec { + let mut entries: Vec<_> = Vec::new(); + for (row, col, val) in edges { + // undirected graph needs (row, col) and (col, row) edges + let row = *hash_2_id.get(&hash(row)).unwrap(); + let col = *hash_2_id.get(&hash(col)).unwrap(); + let entry_row_col = Entry { + row, + col, + value: val, + }; + let entry_col_row = Entry { + row: col, + col: row, + value: val, + }; + entries.push(entry_row_col); + entries.push(entry_col_row); + } + entries + } + + fn hash(entity: &str) -> u64 { + let mut hasher = FxHasher::default(); + hasher.write(entity.as_bytes()); + hasher.finish() + } + + #[test] + fn create_sparse_matrices_if_no_columns_provided() { + let sparse_matrices = create_sparse_matrices(&[]); + assert_eq!(true, sparse_matrices.is_empty()) + } + + #[test] + fn create_sparse_matrices_if_transient_columns_provided() { + let mut columns = vec![ + Column { + name: String::from("a"), + transient: true, + ..Default::default() + }, + Column { + name: String::from("b"), + transient: true, + ..Default::default() + }, + ]; + let sparse_matrices = create_sparse_matrices(&columns); + assert_eq!(true, sparse_matrices.is_empty()); + + columns.push(Column { + name: String::from("c"), + complex: true, + ..Default::default() + }); + let sparse_matrices = create_sparse_matrices(&columns); + let sparse_matrices: HashSet<_> = map_to_ids_and_names(&sparse_matrices); + let expected_sparse_matrices: HashSet<_> = [(0, "a", 2, "c"), (1, "b", 2, "c")] + .iter() + .cloned() + .collect(); + assert_eq!(expected_sparse_matrices, sparse_matrices) + } + + #[test] + fn create_sparse_matrices_if_reflexive_columns_provided() { + let sparse_matrices = create_sparse_matrices(&[ + Column { + name: String::from("a"), + ..Default::default() + }, + Column { + name: String::from("b"), + transient: true, + ..Default::default() + }, + Column { + name: String::from("c"), + complex: true, + reflexive: true, + ..Default::default() + }, + Column { + name: String::from("d"), + complex: true, + ..Default::default() + }, + ]); + let sparse_matrices: HashSet<_> = map_to_ids_and_names(&sparse_matrices); + let expected_sparse_matrices: HashSet<_> = [ + (0, "a", 1, "b"), + (0, "a", 2, "c"), + (0, "a", 3, "d"), + (1, "b", 2, "c"), + (1, "b", 3, "d"), + (2, "c", 3, "d"), + (2, "c", 4, "c"), + ] + .iter() + .cloned() + .collect(); + assert_eq!(expected_sparse_matrices, sparse_matrices) + } + + #[test] + fn create_sparse_matrix_for_undirected_graph() { + let mut sm = SparseMatrix::new(0u8, String::from("col_0"), 1u8, String::from("col_1")); + + // input line: + // u1 p1 p2 b1 b2 + sm.handle_pair(&[4, hash("u1"), hash("p1"), hash("b1")]); + sm.handle_pair(&[4, hash("u1"), hash("p1"), hash("b2")]); + sm.handle_pair(&[4, hash("u1"), hash("p2"), hash("b1")]); + sm.handle_pair(&[4, hash("u1"), hash("p2"), hash("b2")]); + + // input line: + // u2 p2 p3 p4 b1 + sm.handle_pair(&[3, hash("u2"), hash("p2"), hash("b1")]); + sm.handle_pair(&[3, hash("u2"), hash("p3"), hash("b1")]); + sm.handle_pair(&[3, hash("u2"), hash("p4"), hash("b1")]); + + // number of unique entities + assert_eq!(6, sm.get_number_of_entities()); + + // number of edges for entities + assert_eq!(10, sm.get_number_of_entries()); + + let hash_2_id: HashMap<_, _> = sm + .iter_hashes() + .enumerate() + .map(|id_and_hash| (id_and_hash.1.value, id_and_hash.0 as u32)) + .collect(); + // number of hashes + assert_eq!(6, hash_2_id.len()); + + // every relation for undirected graph is represented as two edges, for example: + // (u1, p1, value) and (p1, u1, value) + let edges = vec![ + ("u1", "p1", 1.0 / 2.0), + ("u1", "p2", 1.0 / 2.0), + ("u2", "p2", 1.0 / 3.0), + ("u2", "p3", 1.0 / 3.0), + ("u2", "p4", 1.0 / 3.0), + ]; + let expected_entries = prepare_entries(hash_2_id, edges); + let entries: Vec<_> = sm.iter_entries().collect(); + assert_eq!(expected_entries, entries); + } } diff --git a/tests/snapshot.rs b/tests/snapshot.rs index 4be6628..c2d145b 100644 --- a/tests/snapshot.rs +++ b/tests/snapshot.rs @@ -31,7 +31,8 @@ fn test_build_graphs_and_create_embeddings() { let config = Arc::new(config); // embeddings for in-memory and mmap files calculation should be the same - for sparse_matrix in sparse_matrices.iter() { + for sparse_matrix in sparse_matrices.into_iter() { + let sparse_matrix = Arc::new(sparse_matrix); let snapshot_name = format!( "embeddings_{}_{}", sparse_matrix.col_a_name, sparse_matrix.col_b_name @@ -41,7 +42,7 @@ fn test_build_graphs_and_create_embeddings() { // calculate embeddings in memory calculate_embeddings( config.clone(), - sparse_matrix, + sparse_matrix.clone(), in_memory_entity_mapping_persistor.clone(), &mut in_memory_embedding_persistor, ); @@ -51,7 +52,7 @@ fn test_build_graphs_and_create_embeddings() { // calculate embeddings with mmap files calculate_embeddings_mmap( config.clone(), - sparse_matrix, + sparse_matrix.clone(), in_memory_entity_mapping_persistor.clone(), &mut in_memory_embedding_persistor, ); From d8a83818dcf352828536f3f59e1d8a33ddc999b4 Mon Sep 17 00:00:00 2001 From: Piotr Babel Date: Sun, 20 Dec 2020 23:35:47 +0100 Subject: [PATCH 2/2] Use Cargo.toml data for clap args. --- Cargo.toml | 2 +- src/main.rs | 162 ++++++++++++++++++++++++++++++---------------------- 2 files changed, 94 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 44e4101..d1b3784 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "cleora" version = "1.0.1" -authors = ["Piotr Babel ", "Jacek Dabrowski "] +authors = ["Piotr Babel ", "Jacek Dabrowski ", "Konrad Goluchowski "] edition = "2018" license-file = "LICENSE" readme = "README.md" diff --git a/src/main.rs b/src/main.rs index 134dd4c..4646147 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use clap::{App, Arg}; +use clap::{crate_authors, crate_description, crate_name, crate_version, App, Arg}; use cleora::configuration; use cleora::configuration::Configuration; use cleora::persistence::entity::InMemoryEntityMappingPersistor; @@ -20,65 +20,93 @@ fn main() { let now = Instant::now(); - let matches = App::new("cleora") - .version("1.0.0") - .author("Piotr Babel & Jacek Dabrowski ") - .about("cleora for embeddings calculation") - .arg(Arg::with_name("input") - .short("i") - .long("input") - .required(true) - .help("Input file path") - .takes_value(true)) - .arg(Arg::with_name("file-type") - .short("t") - .long("type") - .possible_values(&["tsv", "json"]) - .help("Input file type") - .takes_value(true)) - .arg(Arg::with_name("output-dir") - .short("o") - .long("output-dir") - .help("Output directory for files with embeddings") - .takes_value(true)) - .arg(Arg::with_name("dimension") - .short("d") - .long("dimension") - .required(true) - .help("Embedding dimension size") - .takes_value(true)) - .arg(Arg::with_name("number-of-iterations") - .short("n") - .long("number-of-iterations") - .help("Max number of iterations") - .takes_value(true)) - .arg(Arg::with_name("columns") - .short("c") - .long("columns") - .required(true) - .help("Column names (max 12), with modifiers: [transient::, reflexive::, complex::]") - .takes_value(true)) - .arg(Arg::with_name("relation-name") - .short("r") - .long("relation-name") - .help("Name of the relation, for output filename generation") - .takes_value(true)) - .arg(Arg::with_name("prepend-field-name") - .short("p") - .long("prepend-field-name") - .help("Prepend field name to entity in output") - .takes_value(true)) - .arg(Arg::with_name("log-every-n") - .short("l") - .long("log-every-n") - .help("Log output every N lines") - .takes_value(true)) - .arg(Arg::with_name("in-memory-embedding-calculation") - .short("e") - .long("in-memory-embedding-calculation") - .possible_values(&["0", "1"]) - .help("Calculate embeddings in memory or with memory-mapped files") - .takes_value(true)) + let matches = App::new(crate_name!()) + .version(crate_version!()) + .author(crate_authors!()) + .about(crate_description!()) + .arg( + Arg::with_name("input") + .short("i") + .long("input") + .required(true) + .help("Input file path") + .takes_value(true), + ) + .arg( + Arg::with_name("file-type") + .short("t") + .long("type") + .possible_values(&["tsv", "json"]) + .help("Input file type") + .takes_value(true), + ) + .arg( + Arg::with_name("output-dir") + .short("o") + .long("output-dir") + .help("Output directory for files with embeddings") + .takes_value(true), + ) + .arg( + Arg::with_name("dimension") + .short("d") + .long("dimension") + .required(true) + .help("Embedding dimension size") + .takes_value(true), + ) + .arg( + Arg::with_name("number-of-iterations") + .short("n") + .long("number-of-iterations") + .required(true) + .help("Max number of iterations") + .takes_value(true), + ) + .arg( + Arg::with_name("columns") + .short("c") + .long("columns") + .required(true) + .help( + "Column names (max 12), with modifiers: [transient::, reflexive::, complex::]", + ) + .takes_value(true), + ) + .arg( + Arg::with_name("relation-name") + .short("r") + .long("relation-name") + .default_value("emb") + .help("Name of the relation, for output filename generation") + .takes_value(true), + ) + .arg( + Arg::with_name("prepend-field-name") + .short("p") + .long("prepend-field-name") + .possible_values(&["0", "1"]) + .default_value("0") + .help("Prepend field name to entity in output") + .takes_value(true), + ) + .arg( + Arg::with_name("log-every-n") + .short("l") + .long("log-every-n") + .default_value("10000") + .help("Log output every N lines") + .takes_value(true), + ) + .arg( + Arg::with_name("in-memory-embedding-calculation") + .short("e") + .long("in-memory-embedding-calculation") + .possible_values(&["0", "1"]) + .default_value("1") + .help("Calculate embeddings in memory or with memory-mapped files") + .takes_value(true), + ) .get_matches(); info!("Reading args..."); @@ -100,27 +128,23 @@ fn main() { let dimension: u16 = matches.value_of("dimension").unwrap().parse().unwrap(); let max_iter: u8 = matches .value_of("number-of-iterations") - .unwrap_or("4") + .unwrap() .parse() .unwrap(); - let relation_name = matches.value_of("relation-name").unwrap_or("emb"); + let relation_name = matches.value_of("relation-name").unwrap(); let prepend_field_name = { let value: u8 = matches .value_of("prepend-field-name") - .unwrap_or("0") + .unwrap() .parse() .unwrap(); value == 1 }; - let log_every: u32 = matches - .value_of("log-every-n") - .unwrap_or("10000") - .parse() - .unwrap(); + let log_every: u32 = matches.value_of("log-every-n").unwrap().parse().unwrap(); let in_memory_embedding_calculation = { let value: u8 = matches .value_of("in-memory-embedding-calculation") - .unwrap_or("1") + .unwrap() .parse() .unwrap(); value == 1