-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds optional serde support to datafusion-proto #2892
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion" | |||||
readme = "README.md" | ||||||
authors = ["Apache Arrow <[email protected]>"] | ||||||
license = "Apache-2.0" | ||||||
keywords = [ "arrow", "query", "sql" ] | ||||||
keywords = ["arrow", "query", "sql"] | ||||||
edition = "2021" | ||||||
rust-version = "1.58" | ||||||
|
||||||
|
@@ -33,18 +33,24 @@ name = "datafusion_proto" | |||||
path = "src/lib.rs" | ||||||
|
||||||
[features] | ||||||
default = [] | ||||||
serde = ["pbjson", "pbjson-build", "dep:serde"] | ||||||
|
||||||
[dependencies] | ||||||
arrow = { version = "18.0.0" } | ||||||
datafusion = { path = "../core", version = "10.0.0" } | ||||||
datafusion-common = { path = "../common", version = "10.0.0" } | ||||||
datafusion-expr = { path = "../expr", version = "10.0.0" } | ||||||
prost = "0.10" | ||||||
|
||||||
serde = { version = "1.0", optional = true } | ||||||
pbjson = { version = "0.3", optional = true } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
pbjson-types = { version = "0.3", optional = true } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
[dev-dependencies] | ||||||
doc-comment = "0.3" | ||||||
tokio = "1.18" | ||||||
serde_json = "1.0" | ||||||
|
||||||
[build-dependencies] | ||||||
tonic-build = { version = "0.7" } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
pbjson-build = { version = "0.3", optional = true } | ||||||
tustvold marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,9 @@ use datafusion_common::DataFusionError; | |
#[allow(clippy::all)] | ||
pub mod protobuf { | ||
include!(concat!(env!("OUT_DIR"), "/datafusion.rs")); | ||
|
||
#[cfg(feature = "serde")] | ||
include!(concat!(env!("OUT_DIR"), "/datafusion.serde.rs")); | ||
} | ||
|
||
pub mod bytes; | ||
|
@@ -75,19 +78,32 @@ mod roundtrip_tests { | |
use std::fmt::Formatter; | ||
use std::sync::Arc; | ||
|
||
#[cfg(feature = "serde")] | ||
fn roundtrip_serde_test(proto: &protobuf::LogicalExprNode) { | ||
let string = serde_json::to_string(proto).unwrap(); | ||
let back: protobuf::LogicalExprNode = serde_json::from_str(&string).unwrap(); | ||
assert_eq!(proto, &back); | ||
} | ||
|
||
#[cfg(not(feature = "serde"))] | ||
fn roundtrip_serde_test(_proto: &protobuf::LogicalExprNode) {} | ||
|
||
// Given a DataFusion logical Expr, convert it to protobuf and back, using debug formatting to test | ||
// equality. | ||
macro_rules! roundtrip_expr_test { | ||
($initial_struct:ident, $ctx:ident) => { | ||
let proto: protobuf::LogicalExprNode = (&$initial_struct).try_into().unwrap(); | ||
fn roundtrip_expr_test<T, E>(initial_struct: T, ctx: SessionContext) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drive by refactor to eliminate an unnecessary macro, as they complicate development, refactoring, debugging, etc... |
||
where | ||
for<'a> &'a T: TryInto<protobuf::LogicalExprNode, Error = E> + Debug, | ||
E: Debug, | ||
{ | ||
let proto: protobuf::LogicalExprNode = (&initial_struct).try_into().unwrap(); | ||
let round_trip: Expr = parse_expr(&proto, &ctx).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for making this PR! I'm sure it's a step in the correct direction, but after cloning it, I'm having trouble figuring out how to extend it to include If I'm reading this correctly, it looks like the protobuf generator generated an entirely separate copy of all the Expr & LogicalPlan enums, and we created some fairly large functions to convert back and forth between them like It does look from other tests like we are able to roundtrip plans to protobuf, so maybe I am missing something. When I tried:
I get
Sorry, I'm a bit lost. Any help is appreciated :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I was running my tests without the "serde" feature 🤦 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I got all that sorted, and was able to do what I was trying to use this PR for: tustvold#63 I was hoping (naively) to see something like:
Unfortunately (though serializable and deserializable) the result is not as human readable as I hoped, and probably not something I'd want to check into a test assertion: https://gist.github.com/avantgardnerio/35a04950ca768fdfe6579aea08126b74 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above being said, I can see other uses for JSON serialization that extend beyond my narrow use-case that could make this PR valuable enough to merge on its own merits. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Running that raw JSON through this formatter produces some encouraging output:
|
||
|
||
let round_trip: Expr = parse_expr(&proto, &$ctx).unwrap(); | ||
assert_eq!( | ||
format!("{:?}", &initial_struct), | ||
format!("{:?}", round_trip) | ||
); | ||
|
||
assert_eq!( | ||
format!("{:?}", $initial_struct), | ||
format!("{:?}", round_trip) | ||
); | ||
}; | ||
roundtrip_serde_test(&proto); | ||
} | ||
|
||
fn new_box_field(name: &str, dt: DataType, nullable: bool) -> Box<Field> { | ||
|
@@ -807,23 +823,23 @@ mod roundtrip_tests { | |
let test_expr = Expr::Not(Box::new(lit(1.0_f32))); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_is_null() { | ||
let test_expr = Expr::IsNull(Box::new(col("id"))); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_is_not_null() { | ||
let test_expr = Expr::IsNotNull(Box::new(col("id"))); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -836,7 +852,7 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -848,7 +864,7 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -859,7 +875,7 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -871,15 +887,15 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_negative() { | ||
let test_expr = Expr::Negative(Box::new(lit(1.0_f32))); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -891,15 +907,15 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_wildcard() { | ||
let test_expr = Expr::Wildcard; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -909,7 +925,7 @@ mod roundtrip_tests { | |
args: vec![col("col")], | ||
}; | ||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -921,7 +937,7 @@ mod roundtrip_tests { | |
}; | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -975,7 +991,7 @@ mod roundtrip_tests { | |
let mut ctx = SessionContext::new(); | ||
ctx.register_udaf(dummy_agg); | ||
|
||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -1000,7 +1016,7 @@ mod roundtrip_tests { | |
let mut ctx = SessionContext::new(); | ||
ctx.register_udf(udf); | ||
|
||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
|
@@ -1012,22 +1028,22 @@ mod roundtrip_tests { | |
])); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_rollup() { | ||
let test_expr = Expr::GroupingSet(GroupingSet::Rollup(vec![col("a"), col("b")])); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
|
||
#[test] | ||
fn roundtrip_cube() { | ||
let test_expr = Expr::GroupingSet(GroupingSet::Cube(vec![col("a"), col("b")])); | ||
|
||
let ctx = SessionContext::new(); | ||
roundtrip_expr_test!(test_expr, ctx); | ||
roundtrip_expr_test(test_expr, ctx); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.