Skip to content

Commit

Permalink
Fix clippy warnings. Add nicer error messages for mmap files.
Browse files Browse the repository at this point in the history
  • Loading branch information
piobab committed Nov 20, 2020
1 parent 0ac407d commit 8ee275c
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 68 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ jobs:
override: true
- run: rustup component add clippy
- uses: actions-rs/cargo@v1
continue-on-error: true # FIXME remove when https://github.com/Synerise/cleora/pull/2 merged
with:
command: clippy
args: -- -D warnings
136 changes: 116 additions & 20 deletions src/embedding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ use crate::persistence::entity::EntityMappingPersistor;
use crate::persistence::sparse_matrix::SparseMatrixPersistor;
use crate::sparse_matrix::SparseMatrix;
use fnv::FnvHasher;
use log::info;
use log::{info, warn};
use memmap::MmapMut;
use rayon::prelude::*;
use std::collections::HashSet;
use std::fs;
use std::fs::OpenOptions;
use std::hash::Hasher;
use std::sync::Arc;

/// Number of broken entities (those with errors during writing to the file) which are logged.
/// There can be much more but log the first few.
const LOGGED_NUMBER_OF_BROKEN_ENTITIES: usize = 20;

/// Calculate embeddings in memory.
pub fn calculate_embeddings<T1, T2, T3>(
sparse_matrix: &mut SparseMatrix<T1>,
Expand Down Expand Up @@ -176,8 +181,18 @@ where
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
embedding_persistor.put_metadata(entities_count, self.dimension);
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
// if can't write first data to the file, probably further is the same
panic!(
"Can't write metadata. Entities: {}. Dimension: {}.",
entities_count, self.dimension
)
});

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
Expand All @@ -191,11 +206,21 @@ where
let value = col.get(i as usize).unwrap();
embedding.insert(j, *value);
}
embedding_persistor.put_data(entity_name, hash_occur, embedding);
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
};
}

embedding_persistor.finish();
if !broken_entities.is_empty() {
log_broken_entities(broken_entities);
}

embedding_persistor
.finish()
.unwrap_or_else(|_| warn!("Can't finish writing to the file."));

info!("Done saving embeddings.");
}
Expand All @@ -207,6 +232,18 @@ fn hash(num: i64) -> i64 {
hasher.finish() as i64
}

fn log_broken_entities(broken_entities: HashSet<String>) {
let num_of_broken_entities = broken_entities.len();
let few_broken_entities: HashSet<_> = broken_entities
.into_iter()
.take(LOGGED_NUMBER_OF_BROKEN_ENTITIES)
.collect();
warn!(
"Number of entities which can't be written to the file: {}. First {} broken entities: {:?}.",
num_of_broken_entities, LOGGED_NUMBER_OF_BROKEN_ENTITIES, few_broken_entities
);
}

/// Calculate embeddings with memory-mapped files.
pub fn calculate_embeddings_mmap<T1, T2, T3>(
sparse_matrix: &mut SparseMatrix<T1>,
Expand All @@ -229,7 +266,13 @@ pub fn calculate_embeddings_mmap<T1, T2, T3>(
let res = mult.propagate(max_iter, init);
mult.persist(res, entity_mapping_persistor, embedding_persistor);

fs::remove_file(format!("{}_matrix_{}", sparse_matrix.get_id(), max_iter)).unwrap();
let work_file = format!("{}_matrix_{}", sparse_matrix.get_id(), max_iter);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!(
"File {} can't be removed after work. Remove the file in order to save disk space.",
work_file
)
});

info!("Finalizing embeddings calculations!")
}
Expand Down Expand Up @@ -260,10 +303,22 @@ where
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap();
file.set_len(number_of_bytes).unwrap();
let mut mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
.open(&file_name)
.expect("Can't create new set of options for memory mapped file");
file.set_len(number_of_bytes).unwrap_or_else(|_| {
panic!(
"Can't update the size of {} file to {} bytes",
&file_name, number_of_bytes
)
});
let mut mmap = unsafe {
MmapMut::map_mut(&file).unwrap_or_else(|_| {
panic!(
"Can't create memory mapped file for the underlying file {}",
file_name
)
})
};

// no specific requirement (ca be lower as well)
let max_hash = 8 * 1024 * 1024;
Expand Down Expand Up @@ -296,7 +351,8 @@ where
self.dimension, entities_count
);

mmap.flush();
mmap.flush()
.expect("Can't flush memory map modifications to disk");
mmap
}

Expand All @@ -308,7 +364,12 @@ where
for i in 0..max_iter {
let next = self.next_power(i, new_res);
new_res = self.normalize(next);
fs::remove_file(format!("{}_matrix_{}", self.sparse_matrix_id, i)).unwrap();

let work_file = format!("{}_matrix_{}", self.sparse_matrix_id, i);
fs::remove_file(&work_file).unwrap_or_else(|_| {
warn!("File {} can't be removed after work. Remove the file in order to save disk space.", work_file)
});

info!(
"Done iter: {}. Dims: {}, entities: {}, num data points: {}.",
i,
Expand All @@ -330,10 +391,22 @@ where
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap();
file.set_len(number_of_bytes).unwrap();
let mut mmap_output = unsafe { MmapMut::map_mut(&file).unwrap() };
.open(&file_name)
.expect("Can't create new set of options for memory mapped file");
file.set_len(number_of_bytes).unwrap_or_else(|_| {
panic!(
"Can't update the size of {} file to {} bytes",
&file_name, number_of_bytes
)
});
let mut mmap_output = unsafe {
MmapMut::map_mut(&file).unwrap_or_else(|_| {
panic!(
"Can't create memory mapped file for the underlying file {}",
file_name
)
})
};

let amount_of_data = self.sparse_matrix_persistor.get_amount_of_data();

Expand Down Expand Up @@ -364,7 +437,9 @@ where
}
});

mmap_output.flush();
mmap_output
.flush()
.expect("Can't flush memory map modifications to disk");
mmap_output
}

Expand Down Expand Up @@ -407,7 +482,8 @@ where
}
});

res.flush();
res.flush()
.expect("Can't flush memory map modifications to disk");
res
}

Expand All @@ -423,8 +499,18 @@ where
info!("Start saving embeddings.");

let entities_count = self.sparse_matrix_persistor.get_entity_counter();
embedding_persistor.put_metadata(entities_count, self.dimension);
embedding_persistor
.put_metadata(entities_count, self.dimension)
.unwrap_or_else(|_| {
// if can't write first data to the file, probably further is the same
panic!(
"Can't write metadata. Entities: {}. Dimension: {}.",
entities_count, self.dimension
)
});

// entities which can't be written to the file (error occurs)
let mut broken_entities = HashSet::new();
for i in 0..entities_count {
let hash = self.sparse_matrix_persistor.get_hash(i);
let entity_name_opt = entity_mapping_persistor.get_entity(hash as u64);
Expand All @@ -444,11 +530,21 @@ where

embedding.insert(j, value);
}
embedding_persistor.put_data(entity_name, hash_occur, embedding);
embedding_persistor
.put_data(&entity_name, hash_occur, embedding)
.unwrap_or_else(|_| {
broken_entities.insert(entity_name);
});
};
}

embedding_persistor.finish();
if !broken_entities.is_empty() {
log_broken_entities(broken_entities);
}

embedding_persistor
.finish()
.unwrap_or_else(|_| warn!("Can't finish writing to the file."));

info!("Done saving embeddings.");
}
Expand Down
4 changes: 2 additions & 2 deletions src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {
total_combinations *= len_and_offset.length as u64;
}

let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::new();
let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::default();
let in_memory_entity_mapping_persistor = Arc::new(in_memory_entity_mapping_persistor);
let entity_processor = EntityProcessor::new(
&dummy_config,
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
// columns are most important, the rest can be omitted
let dummy_config = Configuration::default(String::from(""), columns);

let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::new();
let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::default();
let in_memory_entity_mapping_persistor = Arc::new(in_memory_entity_mapping_persistor);
let mut result: SmallVec<[SmallVec<[u64; SMALL_VECTOR_SIZE]>; SMALL_VECTOR_SIZE]> =
SmallVec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ fn main() {
dbg!(&config);

info!("Starting calculation...");
let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::new();
let in_memory_entity_mapping_persistor = InMemoryEntityMappingPersistor::default();
let in_memory_entity_mapping_persistor = Arc::new(in_memory_entity_mapping_persistor);

let sparse_matrices = build_graphs(&config, in_memory_entity_mapping_persistor.clone());
Expand Down
66 changes: 29 additions & 37 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,11 @@ pub mod entity {
fn contains(&self, hash: u64) -> bool;
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct InMemoryEntityMappingPersistor {
entity_mappings: RwLock<FxHashMap<u64, String>>,
}

impl InMemoryEntityMappingPersistor {
pub fn new() -> Self {
InMemoryEntityMappingPersistor {
entity_mappings: RwLock::new(FxHashMap::default()),
}
}
}

impl EntityMappingPersistor for InMemoryEntityMappingPersistor {
fn get_entity(&self, hash: u64) -> Option<String> {
let entity_mappings_read = self.entity_mappings.read().unwrap();
Expand Down Expand Up @@ -72,7 +64,7 @@ pub mod sparse_matrix {
fn finish(&self);
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct InMemorySparseMatrixPersistor {
entity_count: u32,
edge_count: u32,
Expand All @@ -84,21 +76,6 @@ pub mod sparse_matrix {
entries: Vec<Entry>,
}

impl InMemorySparseMatrixPersistor {
pub fn new() -> Self {
InMemorySparseMatrixPersistor {
entity_count: 0,
edge_count: 0,
hash_2_id: FxHashMap::default(),
id_2_hash: FxHashMap::default(),
hash_2_count: FxHashMap::default(),
row_sum: Vec::new(),
pair_index: FxHashMap::default(),
entries: Vec::new(),
}
}
}

impl SparseMatrixPersistor for InMemorySparseMatrixPersistor {
fn increment_hash_occurrence(&mut self, hash: u64) -> u32 {
let value = self.hash_2_count.entry(hash).or_insert(0);
Expand Down Expand Up @@ -219,12 +196,18 @@ pub mod sparse_matrix {

pub mod embedding {
use std::fs::File;
use std::io;
use std::io::{BufWriter, Write};

pub trait EmbeddingPersistor {
fn put_metadata(&mut self, entity_count: u32, dimension: u16);
fn put_data(&mut self, entity: String, occur_count: u32, vector: Vec<f32>);
fn finish(&mut self);
fn put_metadata(&mut self, entity_count: u32, dimension: u16) -> Result<(), io::Error>;
fn put_data(
&mut self,
entity: &str,
occur_count: u32,
vector: Vec<f32>,
) -> Result<(), io::Error>;
fn finish(&mut self) -> Result<(), io::Error>;
}

pub struct TextFileVectorPersistor {
Expand All @@ -244,28 +227,37 @@ pub mod embedding {
}

impl EmbeddingPersistor for TextFileVectorPersistor {
fn put_metadata(&mut self, entity_count: u32, dimension: u16) {
fn put_metadata(&mut self, entity_count: u32, dimension: u16) -> Result<(), io::Error> {
let metadata = format!("{} {}", entity_count, dimension);
self.buf_writer.write(metadata.as_bytes());
self.buf_writer.write_all(metadata.as_bytes())?;
Ok(())
}

fn put_data(&mut self, entity: String, occur_count: u32, vector: Vec<f32>) {
self.buf_writer.write(b"\n");
self.buf_writer.write(entity.as_bytes());
fn put_data(
&mut self,
entity: &str,
occur_count: u32,
vector: Vec<f32>,
) -> Result<(), io::Error> {
self.buf_writer.write_all(b"\n")?;
self.buf_writer.write_all(entity.as_bytes())?;

if self.produce_entity_occurrence_count {
let occur = format!(" {}", occur_count);
self.buf_writer.write(occur.as_bytes());
self.buf_writer.write_all(occur.as_bytes())?;
}

for &v in &vector {
let vec = format!(" {}", v);
self.buf_writer.write(vec.as_bytes());
self.buf_writer.write_all(vec.as_bytes())?;
}

Ok(())
}

fn finish(&mut self) {
self.buf_writer.write(b"\n");
fn finish(&mut self) -> Result<(), io::Error> {
self.buf_writer.write_all(b"\n")?;
Ok(())
}
}
}
Loading

0 comments on commit 8ee275c

Please sign in to comment.