-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathdeserialize_to_struct.rs
150 lines (139 loc) · 4.82 KB
/
deserialize_to_struct.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{AsArray, PrimitiveArray};
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_batches_eq;
use futures::StreamExt;
/// This example shows how to convert query results into Rust structs by using
/// the Arrow APIs to convert the results into Rust native types.
///
/// This is a bit tricky initially as the results are returned as columns stored
/// as [ArrayRef]
///
/// [ArrayRef]: arrow::array::ArrayRef
#[tokio::main]
async fn main() -> Result<()> {
// Run a query that returns two columns of data
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
let df = ctx
.sql("SELECT int_col, double_col FROM alltypes_plain")
.await?;
// print out the results showing we have an int32 and a float64 column
let results = df.clone().collect().await?;
assert_batches_eq!(
[
"+---------+------------+",
"| int_col | double_col |",
"+---------+------------+",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"+---------+------------+",
],
&results
);
// We will now convert the query results into a Rust struct
let mut stream = df.execute_stream().await?;
let mut list = vec![];
// DataFusion produces data in chunks called `RecordBatch`es which are
// typically 8000 rows each. This loop processes each `RecordBatch` as it is
// produced by the query plan and adds it to the list
while let Some(b) = stream.next().await.transpose()? {
// Each `RecordBatch` has one or more columns. Each column is stored as
// an `ArrayRef`. To interact with data using Rust native types we need to
// convert these `ArrayRef`s into concrete array types using APIs from
// the arrow crate.
// In this case, we know that each batch has two columns of the Arrow
// types Int32 and Float64, so first we cast the two columns to the
// appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).:
let int_col: &PrimitiveArray<Int32Type> = b.column(0).as_primitive();
let float_col: &PrimitiveArray<Float64Type> = b.column(1).as_primitive();
// With PrimitiveArrays, we can access to the values as native Rust
// types i32 and f64, and forming the desired `Data` structs
for (i, f) in int_col.values().iter().zip(float_col.values()) {
list.push(Data {
int_col: *i,
double_col: *f,
})
}
}
// Finally, we have the results in the list of Rust structs
let res = format!("{list:#?}");
assert_eq!(
res,
r#"[
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
]"#
);
// Use the fields in the struct to avoid clippy complaints
let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col);
let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col);
assert_eq!(int_sum, 4);
assert_eq!(double_sum, 40.4);
Ok(())
}
/// This is target struct where we want the query results.
#[derive(Debug)]
struct Data {
int_col: i32,
double_col: f64,
}