Skip to content

Commit

Permalink
Fix reading invalid utf-8 line.
Browse files Browse the repository at this point in the history
  • Loading branch information
piobab committed Nov 22, 2020
1 parent d09f752 commit aadbeca
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 49 deletions.
9 changes: 0 additions & 9 deletions src/entity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::configuration::{Column, Configuration};
use crate::persistence::entity::EntityMappingPersistor;
use log::info;
use smallvec::{smallvec, SmallVec};
use std::hash::Hasher;
use std::sync::Arc;
Expand Down Expand Up @@ -80,7 +79,6 @@ where
field_hashes: SmallVec<[u64; SMALL_VECTOR_SIZE]>,
not_ignored_columns_count: u16,
columns_count: u16,
rows_count: u64,
entity_mapping_persistor: Arc<T>,
hashes_handler: F,
}
Expand Down Expand Up @@ -117,7 +115,6 @@ where
field_hashes,
not_ignored_columns_count,
columns_count,
rows_count: 0u64,
entity_mapping_persistor: persistor,
hashes_handler,
}
Expand Down Expand Up @@ -179,12 +176,6 @@ where
for hash_row in hash_rows {
(self.hashes_handler)(hash_row);
}

self.rows_count += 1;

if self.rows_count % self.config.log_every_n as u64 == 0 {
info!("Number of lines processed: {}", self.rows_count);
}
}

#[inline(always)]
Expand Down
113 changes: 73 additions & 40 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::persistence::entity::InMemoryEntityMappingPersistor;
use crate::persistence::sparse_matrix::InMemorySparseMatrixPersistor;
use crate::sparse_matrix::{create_sparse_matrices, SparseMatrix};
use bus::Bus;
use log::{error, info};
use simdjson_rust::dom;
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
Expand Down Expand Up @@ -47,30 +48,22 @@ pub fn build_graphs(
bus.broadcast(hashes);
});

let input_file = File::open(&config.input).expect("Can't open file"); // handle error
let mut buffered = BufReader::new(input_file);

let mut line = String::new();
let mut parser = dom::Parser::default();
match &config.file_type {
FileType::JSON => {
while buffered.read_line(&mut line).unwrap() > 0 {
let row = read_json_columns(&line, &mut parser, &config.columns);
let mut parser = dom::Parser::default();
read_file(config, |line| {
let row = parse_json_line(line, &mut parser, &config.columns);
entity_processor.process_row(&row);
line.clear(); // clear to reuse the buffer
}
});
}
FileType::TSV => {
while buffered.read_line(&mut line).unwrap() > 0 {
{
let values = line.trim().split('\t');
let row: Vec<_> = values.map(|c| c.split(' ').collect()).collect();
entity_processor.process_row(&row);
}
line.clear(); // clear to reuse the buffer
}
read_file(config, |line| {
let row = parse_tsv_line(line);
entity_processor.process_row(&row);
});
}
}

entity_processor.finish();

let mut sparse_matrices = vec![];
Expand All @@ -84,42 +77,82 @@ pub fn build_graphs(
sparse_matrices
}

/// Read file line by line. Pass every valid line to handler for parsing.
fn read_file<F>(config: &Configuration, mut line_handler: F)
where
F: FnMut(&str),
{
let input_file = File::open(&config.input).expect("Can't open file");
let mut buffered = BufReader::new(input_file);

let mut line_number = 1u64;
let mut line = String::new();
loop {
match buffered.read_line(&mut line) {
Ok(bytes_read) => {
// EOF
if bytes_read == 0 {
break;
}

line_handler(&line);
}
Err(err) => {
error!("Can't read line number: {}. Error: {}.", line_number, err);
}
};

// clear to reuse the buffer
line.clear();

if line_number % config.log_every_n as u64 == 0 {
info!("Number of lines processed: {}", line_number);
}

line_number += 1;
}
}

/// Parse a line of JSON and read its columns into a vector for processing.
fn read_json_columns(
fn parse_json_line(
line: &str,
parser: &mut dom::Parser,
columns: &[Column],
) -> Vec<SmallVec<[String; SMALL_VECTOR_SIZE]>> {
let parsed = parser.parse(&line).unwrap();
columns
.iter()
.map({
|c| {
if !c.complex {
let elem = parsed.at_key(&c.name).unwrap();
let value = match elem.get_type() {
dom::element::ElementType::String => elem.get_string().unwrap(),
_ => elem.minify(),
};
smallvec![value]
} else {
parsed
.at_key(&c.name)
.unwrap()
.get_array()
.expect("values for complex columns must be arrays")
.into_iter()
.map(|v| match v.get_type() {
dom::element::ElementType::String => v.get_string().unwrap(),
_ => v.minify(),
})
.collect()
}
.map(|c| {
if !c.complex {
let elem = parsed.at_key(&c.name).unwrap();
let value = match elem.get_type() {
dom::element::ElementType::String => elem.get_string().unwrap(),
_ => elem.minify(),
};
smallvec![value]
} else {
parsed
.at_key(&c.name)
.unwrap()
.get_array()
.expect("Values for complex columns must be arrays")
.into_iter()
.map(|v| match v.get_type() {
dom::element::ElementType::String => v.get_string().unwrap(),
_ => v.minify(),
})
.collect()
}
})
.collect()
}

/// Parse a line of TSV and read its columns into a vector for processing.
fn parse_tsv_line(line: &str) -> Vec<SmallVec<[&str; SMALL_VECTOR_SIZE]>> {
let values = line.trim().split('\t');
values.map(|c| c.split(' ').collect()).collect()
}

/// Train SparseMatrix'es (graphs) in separated threads.
pub fn train(
config: Configuration,
Expand Down

0 comments on commit aadbeca

Please sign in to comment.