Skip to content

Commit

Permalink
feat(source): support array flattening for json (#8061)
Browse files Browse the repository at this point in the history
- support array flattening for json #6794

Approved-By: tabVersion
  • Loading branch information
waruto210 authored Feb 21, 2023
1 parent 6bc544e commit 7c844d9
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 85 deletions.
4 changes: 1 addition & 3 deletions scripts/source/test_data/kafka_1_partition_mv_topic.1
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,4 @@
{"v1":7,"v2":"name0"}
{"v1":0,"v2":"name9"}
{"v1":3,"v2":"name2"}
{"v1":7,"v2":"name5"}
{"v1":1,"v2":"name7"}
{"v1":3,"v2":"name9"}
[{"v1":7,"v2":"name5"},{"v1":1,"v2":"name7"},{"v1":3,"v2":"name9"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_1_partition_topic.1
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_3_partition_topic.3
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_4_partition_topic.4
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,4 @@
{"v1": 93, "v2": "QE53BJ", "v3": [93, 93, 93], "v4": {"v5": 93, "v6": 94}}
{"v1": 94, "v2": "9Q7W89", "v3": [94, 94, 94], "v4": {"v5": 94, "v6": 95}}
{"v1": 95, "v2": "VGDBS1", "v3": [95, 95, 95], "v4": {"v5": 95, "v6": 96}}
{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}}
{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}}
{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}}
{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}}
[{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}},{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}},{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}},{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}}]
1 change: 0 additions & 1 deletion src/connector/src/parser/canal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@
mod simd_json_parser;

mod operators;
mod util;

pub use simd_json_parser::*;
2 changes: 1 addition & 1 deletion src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use risingwave_expr::vector_op::cast::{
};
use simd_json::{BorrowedValue, StaticNode, ValueAccess};

use super::util::at_least_one_ok;
use crate::parser::canal::operators::*;
use crate::parser::util::at_least_one_ok;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;
use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic};
Expand Down
49 changes: 0 additions & 49 deletions src/connector/src/parser/canal/util.rs

This file was deleted.

74 changes: 54 additions & 20 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use simd_json::{BorrowedValue, ValueAccess};

use crate::impl_common_parser_logic;
use crate::parser::common::simd_json_parse_value;
use crate::parser::util::at_least_one_ok;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;

Expand All @@ -35,6 +37,23 @@ impl JsonParser {
Ok(Self { rw_columns })
}

#[inline(always)]
fn parse_single_value(
value: BorrowedValue<'_>,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
writer.insert(|desc| {
simd_json_parse_value(
&desc.data_type,
value.get(desc.name.to_ascii_lowercase().as_str()),
)
.map_err(|e| {
tracing::error!("failed to process value ({}): {}", value, e);
e.into()
})
})
}

#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
Expand All @@ -46,20 +65,16 @@ impl JsonParser {
let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

writer.insert(|desc| {
simd_json_parse_value(
&desc.data_type,
value.get(desc.name.to_ascii_lowercase().as_str()),
)
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(payload),
e
);
e.into()
})
})
let results = match value {
BorrowedValue::Array(objects) => objects
.into_iter()
.map(|obj| Self::parse_single_value(obj, &mut writer))
.collect_vec(),
_ => {
return Self::parse_single_value(value, &mut writer);
}
};
at_least_one_ok(results)
}
}

Expand All @@ -77,8 +92,20 @@ mod tests {

use crate::parser::{JsonParser, SourceColumnDesc, SourceStreamChunkBuilder};

#[tokio::test]
async fn test_json_parser() {
fn get_payload() -> Vec<&'static [u8]> {
vec![
br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}"#.as_slice(),
br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}"#.as_slice(),
]
}

fn get_array_top_level_payload() -> Vec<&'static [u8]> {
vec![
br#"[{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}, {"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}]"#.as_slice()
]
}

async fn test_json_parser(get_payload: fn() -> Vec<&'static [u8]>) {
let descs = vec![
SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
SourceColumnDesc::simple("bool", DataType::Boolean, 2.into()),
Expand All @@ -96,10 +123,7 @@ mod tests {

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

for payload in [
br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}"#.as_slice(),
br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}"#.as_slice(),
] {
for payload in get_payload() {
let writer = builder.row_writer();
parser.parse_inner(payload, writer).await.unwrap();
}
Expand Down Expand Up @@ -177,6 +201,16 @@ mod tests {
}
}

#[tokio::test]
async fn test_json_parse_object_top_level() {
test_json_parser(get_payload).await;
}

#[tokio::test]
async fn test_json_parse_array_top_level() {
test_json_parser(get_array_top_level_payload).await;
}

#[tokio::test]
async fn test_json_parser_failed() {
let descs = vec![
Expand Down
35 changes: 34 additions & 1 deletion src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
use std::collections::HashMap;

use bytes::Bytes;
use itertools::Itertools;
use reqwest::Url;
use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError};
use risingwave_common::error::ErrorCode::{InternalError, InvalidParameterValue, ProtocolError};
use risingwave_common::error::{Result, RwError};

use crate::parser::WriteGuard;

/// get kafka topic name
pub(super) fn get_kafka_topic(props: &HashMap<String, String>) -> Result<&String> {
const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
Expand Down Expand Up @@ -55,3 +58,33 @@ pub(super) async fn download_from_http(location: &Url) -> Result<Bytes> {
.await
.map_err(|e| InvalidParameterValue(format!("failed to read HTTP body: {}", e)).into())
}

// `results.len()` should greater that zero
// if all results are errors, return err
// if all ok, return ok
// if part of them are errors, log err and return ok
#[inline]
pub(super) fn at_least_one_ok(mut results: Vec<Result<WriteGuard>>) -> Result<WriteGuard> {
let errors = results
.iter()
.filter_map(|r| r.as_ref().err())
.collect_vec();
let first_ok_index = results.iter().position(|r| r.is_ok());
let err_message = errors
.into_iter()
.map(|r| r.to_string())
.collect_vec()
.join(", ");

if let Some(first_ok_index) = first_ok_index {
if !err_message.is_empty() {
tracing::error!("failed to parse some columns: {}", err_message)
}
results.remove(first_ok_index)
} else {
Err(RwError::from(InternalError(format!(
"failed to parse all columns: {}",
err_message
))))
}
}

0 comments on commit 7c844d9

Please sign in to comment.