Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace panic with Result in reader.rs #3354

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,14 @@ fn create_batch_from_map(
.unzip();

let mut columns: Vec<ArrayRef> =
read_as_batch(&group_buffers, group_schema, RowType::Compact);
read_as_batch(&group_buffers, group_schema, RowType::Compact)?;

match mode {
AggregateMode::Partial => columns.extend(read_as_batch(
&state_buffers,
aggr_schema,
RowType::WordAligned,
)),
)?),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut results: Vec<Vec<ScalarValue>> = vec![vec![]; accumulators.len()];
for buffer in state_buffers.iter_mut() {
Expand All @@ -474,15 +474,19 @@ fn create_batch_from_map(
RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
}

fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) -> Vec<ArrayRef> {
fn read_as_batch(
rows: &[Vec<u8>],
schema: &Schema,
row_type: RowType,
) -> Result<Vec<ArrayRef>> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
let mut row = RowReader::new(schema, row_type);

for data in rows {
row.point_to(0, data);
read_row(&row, &mut output, schema);
read_row(&row, &mut output, schema)?;
}

output.output_as_columns()
Ok(output.output_as_columns())
}
70 changes: 54 additions & 16 deletions datafusion/row/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub fn read_as_batch(

for offset in offsets.iter().take(row_num) {
row.point_to(*offset, data);
read_row(&row, &mut output, &schema);
read_row(&row, &mut output, &schema)?;
}

output.output().map_err(DataFusionError::ArrowError)
Expand Down Expand Up @@ -270,45 +270,61 @@ impl<'a> RowReader<'a> {
}

/// Read the row currently pointed by RowWriter to the output columnar batch buffer
pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) {
pub fn read_row(
row: &RowReader,
batch: &mut MutableRecordBatch,
schema: &Schema,
) -> Result<()> {
if row.all_valid() {
for ((col_idx, to), field) in batch
.arrays
.iter_mut()
.enumerate()
.zip(schema.fields().iter())
{
read_field_null_free(to, field.data_type(), col_idx, row)
read_field_null_free(to, field.data_type(), col_idx, row)?;
}
Ok(())
} else {
for ((col_idx, to), field) in batch
.arrays
.iter_mut()
.enumerate()
.zip(schema.fields().iter())
{
read_field(to, field.data_type(), col_idx, row)
read_field(to, field.data_type(), col_idx, row)?;
}
Ok(())
}
}

macro_rules! fn_read_field {
($NATIVE: ident, $ARRAY: ident) => {
paste::item! {
pub(crate) fn [<read_field_ $NATIVE>](to: &mut Box<dyn ArrayBuilder>, col_idx: usize, row: &RowReader) {
pub(crate) fn [<read_field_ $NATIVE>](to: &mut Box<dyn ArrayBuilder>, col_idx: usize, row: &RowReader) -> Result<()> {
let to = to
.as_any_mut()
.downcast_mut::<$ARRAY>()
.unwrap();
.ok_or_else(||
DataFusionError::Internal(
format!("Error downcasting ArrayBuilder to {:?}", stringify!($ARRAY)),
),
)?;
to.append_option(row.[<get_ $NATIVE _opt>](col_idx));
Ok(())
}

pub(crate) fn [<read_field_ $NATIVE _null_free>](to: &mut Box<dyn ArrayBuilder>, col_idx: usize, row: &RowReader) {
pub(crate) fn [<read_field_ $NATIVE _null_free>](to: &mut Box<dyn ArrayBuilder>, col_idx: usize, row: &RowReader) -> Result<()> {
let to = to
.as_any_mut()
.downcast_mut::<$ARRAY>()
.unwrap();
.ok_or_else(||
DataFusionError::Internal(
format!("Error downcasting ArrayBuilder to {:?}", stringify!($ARRAY)),
),
)?;
to.append_value(row.[<get_ $NATIVE>](col_idx));
Ok(())
}
}
};
Expand All @@ -333,30 +349,46 @@ pub(crate) fn read_field_binary(
to: &mut Box<dyn ArrayBuilder>,
col_idx: usize,
row: &RowReader,
) {
let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
) -> Result<()> {
let to = to
.as_any_mut()
.downcast_mut::<BinaryBuilder>()
.ok_or_else(|| {
DataFusionError::Internal(
"Error downcasting ArrayBuilder to BinaryBuilder".into(),
)
})?;
if row.is_valid_at(col_idx) {
to.append_value(row.get_binary(col_idx));
} else {
to.append_null();
}
Ok(())
}

pub(crate) fn read_field_binary_null_free(
to: &mut Box<dyn ArrayBuilder>,
col_idx: usize,
row: &RowReader,
) {
let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
) -> Result<()> {
let to = to
.as_any_mut()
.downcast_mut::<BinaryBuilder>()
.ok_or_else(|| {
DataFusionError::Internal(
"Error downcasting ArrayBuilder to BinaryBuilder".into(),
)
})?;
to.append_value(row.get_binary(col_idx));
Ok(())
}

fn read_field(
to: &mut Box<dyn ArrayBuilder>,
dt: &DataType,
col_idx: usize,
row: &RowReader,
) {
) -> Result<()> {
use DataType::*;
match dt {
Boolean => read_field_bool(to, col_idx, row),
Expand All @@ -374,7 +406,10 @@ fn read_field(
Date64 => read_field_date64(to, col_idx, row),
Utf8 => read_field_utf8(to, col_idx, row),
Binary => read_field_binary(to, col_idx, row),
_ => unimplemented!(),
_ => Err(DataFusionError::NotImplemented(format!(
"The data type {:?} is not recognized",
dt
))),
}
}

Expand All @@ -383,7 +418,7 @@ fn read_field_null_free(
dt: &DataType,
col_idx: usize,
row: &RowReader,
) {
) -> Result<()> {
use DataType::*;
match dt {
Boolean => read_field_bool_null_free(to, col_idx, row),
Expand All @@ -401,6 +436,9 @@ fn read_field_null_free(
Date64 => read_field_date64_null_free(to, col_idx, row),
Utf8 => read_field_utf8_null_free(to, col_idx, row),
Binary => read_field_binary_null_free(to, col_idx, row),
_ => unimplemented!(),
_ => Err(DataFusionError::NotImplemented(format!(
"The data type {:?} is not recognized",
dt
))),
}
}