Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Rust structures --> RecordBatch by adding Serde support to RawDecoder (#3949) #3979

Merged
merged 8 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "1.9", default-features = false, features = ["std"] }
num = { version = "0.4", default-features = false, features = ["std"] }
serde = { version = "1.0", default-features = false }
tustvold marked this conversation as resolved.
Show resolved Hide resolved
serde_json = { version = "1.0", default-features = false, features = ["std"] }
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
lexical-core = { version = "0.8", default-features = false }

[dev-dependencies]
tempfile = "3.3"
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
181 changes: 181 additions & 0 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ use crate::raw::struct_array::StructArrayDecoder;
use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
use crate::raw::timestamp_array::TimestampArrayDecoder;
use arrow_array::timezone::Tz;
use arrow_array::types::Float32Type;
use arrow_array::types::*;
use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
use chrono::Utc;
use serde::Serialize;
use std::io::BufRead;

mod boolean_array;
mod decimal_array;
mod list_array;
mod map_array;
mod primitive_array;
mod serializer;
mod string_array;
mod struct_array;
mod tape;
Expand Down Expand Up @@ -233,6 +236,184 @@ impl RawDecoder {
self.tape_decoder.decode(buf)
}

/// Serialize `rows` to this [`RawDecoder`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more compelling example would be to implement some struct

struct MyRow  {
  field: i32,
  name:  String
}

And demonstrate how to turn that into a RecordBatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a fairly comprehensive example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is very cool. However, I think it is very hard to find (given that it is on "RawDecoder" that is part of arrow_json).

I suggest we add a sentence / link to this example from the front page (and maybe even bring a simpler example to the front page): https://docs.rs/arrow/latest/arrow/index.html#io

///
/// This provides a simple way to convert [serde]-compatible datastructures into arrow
/// [`RecordBatch`].
///
/// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
/// especially where the schema is known at compile-time, however, this provides a mechanism
/// to get something up and running quickly
///
/// It can be used with [`serde_json::Value`]
///
/// ```
/// # use std::sync::Arc;
/// # use serde_json::{Value, json};
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::Float32Type;
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Schema};
/// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
///
/// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
///
/// decoder.serialize(&json).unwrap();
/// let batch = decoder.flush().unwrap().unwrap();
/// assert_eq!(batch.num_rows(), 2);
/// assert_eq!(batch.num_columns(), 1);
/// let values = batch.column(0).as_primitive::<Float32Type>().values();
/// assert_eq!(values, &[2.3, 5.7])
/// ```
///
/// Or with arbitrary [`Serialize`] types
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Schema};
/// # use serde::Serialize;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::{Float32Type, Int32Type};
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
/// float: f32,
/// }
///
/// let schema = Schema::new(vec![
/// Field::new("int32", DataType::Int32, false),
/// Field::new("float", DataType::Float32, false),
/// ]);
///
/// let rows = vec![
/// MyStruct{ int32: 0, float: 3. },
/// MyStruct{ int32: 4, float: 67.53 },
/// ];
///
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
/// decoder.serialize(&rows).unwrap();
///
/// let batch = decoder.flush().unwrap().unwrap();
///
/// // Expect batch containing two columns
/// let int32 = batch.column(0).as_primitive::<Int32Type>();
/// assert_eq!(int32.values(), &[0, 4]);
///
/// let float = batch.column(1).as_primitive::<Float32Type>();
/// assert_eq!(float.values(), &[3., 67.53]);
/// ```
///
/// Or even complex nested types
///
/// ```
/// # use std::collections::BTreeMap;
/// # use std::sync::Arc;
/// # use arrow_array::StructArray;
/// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Fields, Schema};
/// # use serde::Serialize;
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
/// list: Vec<f64>,
/// nested: Vec<Option<Nested>>,
/// }
///
/// impl MyStruct {
/// /// Returns the [`Fields`] for [`MyStruct`]
/// fn fields() -> Fields {
/// let nested = DataType::Struct(Nested::fields());
/// Fields::from([
/// Arc::new(Field::new("int32", DataType::Int32, false)),
/// Arc::new(Field::new_list(
/// "list",
/// Field::new("element", DataType::Float64, false),
/// false,
/// )),
/// Arc::new(Field::new_list(
/// "nested",
/// Field::new("element", nested, true),
/// true,
/// )),
/// ])
/// }
/// }
///
/// #[derive(Serialize)]
/// struct Nested {
/// map: BTreeMap<String, Vec<String>>
/// }
///
/// impl Nested {
/// /// Returns the [`Fields`] for [`Nested`]
/// fn fields() -> Fields {
/// let element = Field::new("element", DataType::Utf8, false);
/// Fields::from([
/// Arc::new(Field::new_map(
/// "map",
/// "entries",
/// Field::new("key", DataType::Utf8, false),
/// Field::new_list("value", element, false),
/// false, // sorted
/// false, // nullable
/// ))
/// ])
/// }
/// }
///
/// let data = vec![
/// MyStruct {
/// int32: 34,
/// list: vec![1., 2., 34.],
/// nested: vec![
/// None,
/// Some(Nested {
/// map: vec![
/// ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
/// ("key2".to_string(), vec!["baz".to_string()])
/// ].into_iter().collect()
/// })
/// ]
/// },
/// MyStruct {
/// int32: 56,
/// list: vec![],
/// nested: vec![]
/// },
/// MyStruct {
/// int32: 24,
/// list: vec![-1., 245.],
/// nested: vec![None]
/// }
/// ];
///
/// let schema = Schema::new(MyStruct::fields());
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
/// decoder.serialize(&data).unwrap();
/// let batch = decoder.flush().unwrap().unwrap();
/// assert_eq!(batch.num_rows(), 3);
/// assert_eq!(batch.num_columns(), 3);
///
/// // Convert to StructArray to format
/// let s = StructArray::from(batch);
/// let options = FormatOptions::default().with_null("null");
/// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
///
/// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
/// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
/// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
/// ```
///
/// Note: this ignores any batch size setting, and always decodes all rows
pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
self.tape_decoder.serialize(rows)
}

/// Flushes the currently buffered data to a [`RecordBatch`]
///
/// Returns `Ok(None)` if no buffered data
Expand Down
Loading