Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add strict mode to json reader #4421

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
pub fn new(
data_type: DataType,
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
) -> Result<Self, ArrowError> {
let field = match &data_type {
Expand All @@ -45,6 +46,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
let decoder = make_decoder(
field.data_type().clone(),
coerce_primitive,
strict_mode,
field.is_nullable(),
)?;

Expand Down
3 changes: 3 additions & 0 deletions arrow-json/src/reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl MapArrayDecoder {
pub fn new(
data_type: DataType,
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
) -> Result<Self, ArrowError> {
let fields = match &data_type {
Expand All @@ -56,11 +57,13 @@ impl MapArrayDecoder {
let keys = make_decoder(
fields[0].data_type().clone(),
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
)?;
let values = make_decoder(
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
fields[1].is_nullable(),
)?;

Expand Down
148 changes: 128 additions & 20 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ mod timestamp_array;
pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,

schema: SchemaRef,
}
Expand All @@ -179,13 +180,15 @@ impl ReaderBuilder {
///
/// This could be obtained using [`infer_json_schema`] if not known
///
/// Any columns not present in `schema` will be ignored
/// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
/// In this case, an error is returned when a column is missing from `schema`.
///
/// [`infer_json_schema`]: crate::reader::infer_json_schema
pub fn new(schema: SchemaRef) -> Self {
Self {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
schema,
}
}
Expand All @@ -211,6 +214,15 @@ impl ReaderBuilder {
}
}

/// Sets if the decoder should return an error if it encounters a column not present
/// in `schema`
pub fn with_strict_mode(self, strict_mode: bool) -> Self {
Self {
strict_mode,
..self
}
}

/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
Ok(Reader {
Expand All @@ -224,6 +236,7 @@ impl ReaderBuilder {
let decoder = make_decoder(
DataType::Struct(self.schema.fields.clone()),
self.coerce_primitive,
self.strict_mode,
false,
)?;
let num_fields = self.schema.all_fields().len();
Expand Down Expand Up @@ -586,6 +599,7 @@ macro_rules! primitive_decoder {
fn make_decoder(
data_type: DataType,
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
downcast_integer! {
Expand Down Expand Up @@ -633,13 +647,13 @@ fn make_decoder(
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, is_nullable)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, is_nullable)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, is_nullable)?)),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
}
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, is_nullable)?)),
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
}
}
Expand Down Expand Up @@ -670,6 +684,7 @@ mod tests {
buf: &str,
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
schema: SchemaRef,
) -> Vec<RecordBatch> {
let mut unbuffered = vec![];
Expand All @@ -693,6 +708,7 @@ mod tests {
let buffered = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.with_coerce_primitive(coerce_primitive)
.with_strict_mode(strict_mode)
.build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
.unwrap()
.collect::<Result<Vec<_>, _>>()
Expand Down Expand Up @@ -724,7 +740,7 @@ mod tests {
Field::new("e", DataType::Date64, true),
]));

let batches = do_read(buf, 1024, false, schema);
let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_primitive::<Int64Type>();
Expand Down Expand Up @@ -763,7 +779,7 @@ mod tests {
{"a": "1", "b": "2"}
{"a": "hello", "b": "shoo"}
{"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}

{"b": null}
{"b": "", "a": null}

Expand All @@ -773,7 +789,7 @@ mod tests {
Field::new("b", DataType::LargeUtf8, true),
]));

let batches = do_read(buf, 1024, false, schema);
let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_string::<i32>();
Expand Down Expand Up @@ -826,7 +842,7 @@ mod tests {
),
]));

let batches = do_read(buf, 1024, false, schema);
let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let list = batches[0].column(0).as_list::<i32>();
Expand Down Expand Up @@ -895,7 +911,7 @@ mod tests {
),
]));

let batches = do_read(buf, 1024, false, schema);
let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let nested = batches[0].column(0).as_struct();
Expand Down Expand Up @@ -941,7 +957,7 @@ mod tests {

let schema = Arc::new(Schema::new(vec![map]));

let batches = do_read(buf, 1024, false, schema);
let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let map = batches[0].column(0).as_map();
Expand Down Expand Up @@ -1015,7 +1031,7 @@ mod tests {
Field::new("c", DataType::Utf8, true),
]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_string::<i32>();
Expand Down Expand Up @@ -1063,7 +1079,7 @@ mod tests {
Field::new("c", data_type, true),
]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_primitive::<T>();
Expand Down Expand Up @@ -1121,7 +1137,7 @@ mod tests {
Field::new("d", with_timezone, true),
]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let unit_in_nanos: i64 = match T::UNIT {
Expand Down Expand Up @@ -1221,7 +1237,7 @@ mod tests {
Field::new("c", T::DATA_TYPE, true),
]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_primitive::<T>();
Expand Down Expand Up @@ -1298,7 +1314,7 @@ mod tests {
),
]));

let batches = do_read(json, 1024, true, schema);
let batches = do_read(json, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let s: StructArray = batches.into_iter().next().unwrap().into();
Expand Down Expand Up @@ -1373,7 +1389,7 @@ mod tests {
Field::new("u64", DataType::UInt64, true),
]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let i64 = batches[0].column(0).as_primitive::<Int64Type>();
Expand All @@ -1397,7 +1413,7 @@ mod tests {
true,
)]));

let batches = do_read(buf, 1024, true, schema);
let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1);

let i64 = batches[0]
Expand All @@ -1406,6 +1422,98 @@ mod tests {
assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
}

#[test]
fn test_strict_mode_no_missing_columns_in_schema() {
let buf = r#"
{"a": 1, "b": "2", "c": true}
{"a": 2E0, "b": "4", "c": false}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int16, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Boolean, false),
]));

let batches = do_read(buf, 1024, true, true, schema);
assert_eq!(batches.len(), 1);

let buf = r#"
{"a": 1, "b": "2", "c": {"a": true, "b": 1}}
{"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int16, false),
Field::new("b", DataType::Utf8, false),
Field::new_struct(
"c",
vec![
Field::new("a", DataType::Boolean, false),
Field::new("b", DataType::Int16, false),
],
false,
),
]));

let batches = do_read(buf, 1024, true, true, schema);
assert_eq!(batches.len(), 1);
}

#[test]
fn test_strict_mode_missing_columns_in_schema() {
let buf = r#"
{"a": 1, "b": "2", "c": true}
{"a": 2E0, "b": "4", "c": false}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int16, true),
Field::new("c", DataType::Boolean, true),
]));

let err = ReaderBuilder::new(schema)
.with_batch_size(1024)
.with_strict_mode(true)
.build(Cursor::new(buf.as_bytes()))
.unwrap()
.read()
.unwrap_err();

assert_eq!(
err.to_string(),
"Json error: column 'b' missing from schema"
);

let buf = r#"
{"a": 1, "b": "2", "c": {"a": true, "b": 1}}
{"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int16, false),
Field::new("b", DataType::Utf8, false),
Field::new_struct(
"c",
vec![Field::new("a", DataType::Boolean, false)],
false,
),
]));

let err = ReaderBuilder::new(schema)
.with_batch_size(1024)
.with_strict_mode(true)
.build(Cursor::new(buf.as_bytes()))
.unwrap()
.read()
.unwrap_err();

assert_eq!(
err.to_string(),
"Json error: whilst decoding field 'c': column 'b' missing from schema"
);
}

fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
let file = File::open(path).unwrap();
let mut reader = BufReader::new(file);
Expand Down Expand Up @@ -1628,7 +1736,7 @@ mod tests {
true,
)]));

let batches = do_read(json_content, 1024, false, schema);
let batches = do_read(json_content, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_list::<i32>();
Expand Down Expand Up @@ -1656,7 +1764,7 @@ mod tests {
true,
)]));

let batches = do_read(json_content, 1024, false, schema);
let batches = do_read(json_content, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_list::<i32>();
Expand Down
Loading