Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate JSON Reader options and DecoderOptions #1539

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 73 additions & 80 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
//! let mut json = json::Reader::new(
//! BufReader::new(file),
//! Arc::new(schema),
//! 1024,
//! Default::default()
//! json::reader::DecoderOptions::new(),
//! );
//!
//! let batch = json.next().unwrap().unwrap();
Expand Down Expand Up @@ -561,16 +560,17 @@ where
///
/// # Examples
/// ```
/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
/// use arrow::json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
/// use std::fs::File;
/// use std::io::{BufReader, Seek, SeekFrom};
/// use std::sync::Arc;
///
/// let mut reader =
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
/// let options = DecoderOptions::new()
/// .with_batch_size(1024);
/// let decoder = Decoder::new(Arc::new(inferred_schema), options);
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
Expand All @@ -583,35 +583,68 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
/// Batch size (number of records to load each time)
batch_size: usize,
/// This is a collection of options for json decoder
doptions: DecoderOptions,
options: DecoderOptions,
}

#[derive(Default, Debug)]
#[derive(Debug)]
pub struct DecoderOptions {
/// Batch size (number of records to load each time), defaults to 1024 records
batch_size: usize,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// optional HashMap of column names to its format string
format_strings: Option<HashMap<String, String>>,
}

impl Default for DecoderOptions {
fn default() -> Self {
Self {
batch_size: 1024,
projection: None,
format_strings: None,
}
}
}

impl DecoderOptions {
pub fn new() -> Self {
Default::default()
}

/// Set the batch size (number of records to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self
}

/// Set the decoder's format Strings param
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self
}
}

impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
Self {
schema,
batch_size,
doptions,
}
pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
Self { schema, options }
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.doptions.projection {
match &self.options.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
Expand All @@ -636,9 +669,10 @@ impl Decoder {
where
I: Iterator<Item = Result<Value>>,
{
let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
let batch_size = self.options.batch_size;
let mut rows: Vec<Value> = Vec::with_capacity(batch_size);

for value in value_iter.by_ref().take(self.batch_size) {
for value in value_iter.by_ref().take(batch_size) {
let v = value?;
match v {
Value::Object(_) => rows.push(v),
Expand All @@ -656,7 +690,7 @@ impl Decoder {
}

let rows = &rows[..];
let projection = self.doptions.projection.clone().unwrap_or_default();
let projection = self.options.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);

let projected_fields: Vec<Field> = if projection.is_empty() {
Expand Down Expand Up @@ -934,7 +968,7 @@ impl Decoder {
T::Native: num::NumCast,
{
let format_string = self
.doptions
.options
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
Expand Down Expand Up @@ -1556,13 +1590,8 @@ impl<R: Read> Reader<R> {
///
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
pub fn new(
reader: R,
schema: SchemaRef,
batch_size: usize,
doptions: DecoderOptions,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, options)
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
Expand All @@ -1571,12 +1600,11 @@ impl<R: Read> Reader<R> {
pub fn from_buf_reader(
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
doptions: DecoderOptions,
options: DecoderOptions,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, doptions),
decoder: Decoder::new(schema, options),
}
}

Expand All @@ -1595,7 +1623,7 @@ impl<R: Read> Reader<R> {
}

/// JSON file reader builder
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct ReaderBuilder {
/// Optional schema for the JSON file
///
Expand All @@ -1606,26 +1634,8 @@ pub struct ReaderBuilder {
///
/// If a number is not provided, all the records are read.
max_records: Option<usize>,
/// Batch size (number of records to load each time)
///
/// The default batch size when using the `ReaderBuilder` is 1024 records
batch_size: usize,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<String>>,
/// optional HashMap of column names to format strings
format_strings: Option<HashMap<String, String>>,
}

impl Default for ReaderBuilder {
fn default() -> Self {
Self {
schema: None,
max_records: None,
batch_size: 1024,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this default is moved into DecoderOptions

projection: None,
format_strings: None,
}
}
/// Options for json decoder
options: DecoderOptions,
}

impl ReaderBuilder {
Expand Down Expand Up @@ -1672,13 +1682,13 @@ impl ReaderBuilder {

/// Set the batch size (number of records to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self.options = self.options.with_batch_size(batch_size);
self
}

/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self.options = self.options.with_projection(projection);
self
}

Expand All @@ -1687,7 +1697,7 @@ impl ReaderBuilder {
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self.options = self.options.with_format_strings(format_strings);
self
}

Expand All @@ -1707,15 +1717,7 @@ impl ReaderBuilder {
)?),
};

Ok(Reader::from_buf_reader(
buf_reader,
schema,
self.batch_size,
DecoderOptions {
projection: self.projection,
format_strings: self.format_strings,
},
))
Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
}
}

Expand Down Expand Up @@ -1868,8 +1870,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
Default::default(),
DecoderOptions::new(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
Expand Down Expand Up @@ -1919,11 +1920,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
1024,
DecoderOptions {
format_strings: Some(fmts),
..Default::default()
},
DecoderOptions::new().with_format_strings(fmts),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
Expand Down Expand Up @@ -1955,11 +1952,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
DecoderOptions {
projection: Some(vec!["a".to_string(), "c".to_string()]),
..Default::default()
},
DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -2126,8 +2119,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader =
Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
let options = DecoderOptions::new().with_batch_size(64);
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
Expand Down Expand Up @@ -3199,7 +3192,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3234,7 +3227,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
Expand Down Expand Up @@ -3263,7 +3256,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Expand Down