Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge schema by using record batch writer #2232

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn excute_non_empty_expr(
metrics: &mut DeleteMetrics,
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
) -> DeltaResult<Vec<Add>> {
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.

Expand Down Expand Up @@ -222,7 +222,7 @@ async fn execute(
.unwrap()
.as_millis() as i64;

let mut actions: Vec<Action> = add.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add.clone();
let mut version = snapshot.version();
metrics.num_removed_files = remove.len();
metrics.num_added_files = actions.len();
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ async fn execute(

metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64;

let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();
metrics.num_target_files_added = actions.len();

let survivors = barrier
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub mod merge;
pub mod update;
#[cfg(feature = "datafusion")]
pub mod write;
pub mod writer;

// TODO make ops consume a snapshot ...

Expand Down
50 changes: 30 additions & 20 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ use serde::{Deserialize, Serialize};
use tracing::debug;

use super::transaction::{commit, PROTOCOL};
use super::writer::{PartitionWriter, PartitionWriterConfig};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, PartitionsExt, Remove, Scalar};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::writer::{DeltaWriter, RecordBatchWriter};
use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};

/// Metrics from Optimize
Expand Down Expand Up @@ -443,14 +443,18 @@ impl MergePlan {
};

// Next, initialize the writer
let writer_config = PartitionWriterConfig::try_new(
// TODO: task_parameters.input_parameters.target_size in RecordBatchWriter::for_storage
let mut writer = RecordBatchWriter::for_storage(
object_store,
task_parameters.writer_properties.clone(),
task_parameters.file_schema.clone(),
partition_values.clone(),
Some(task_parameters.writer_properties.clone()),
Some(task_parameters.input_parameters.target_size as usize),
None,
Some(
partition_values
.keys()
.map(|s| s.to_owned())
.collect_vec(),
),
)?;
let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?;

let mut read_stream = read_stream.await?;

Expand All @@ -460,21 +464,27 @@ impl MergePlan {
batch =
super::cast::cast_record_batch(&batch, task_parameters.file_schema.clone(), false)?;
partial_metrics.num_batches += 1;
writer.write(&batch).await.map_err(DeltaTableError::from)?;
writer.write(batch).await.map_err(DeltaTableError::from)?;
}

let add_actions = writer.close().await?.into_iter().map(|mut add| {
add.data_change = false;

let size = add.size;

partial_metrics.num_files_added += 1;
partial_metrics.files_added.total_files += 1;
partial_metrics.files_added.total_size += size;
partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size);
partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size);

Action::Add(add)
let add_actions = writer.flush().await?.into_iter().map(|mut action| {
match &mut action {
Action::Add(add) => {
// add.partition_values = partition_values.into(); TODO: Required?
add.data_change = false;
let size = add.size;

partial_metrics.num_files_added += 1;
partial_metrics.files_added.total_files += 1;
partial_metrics.files_added.total_size += size;
partial_metrics.files_added.max =
std::cmp::max(partial_metrics.files_added.max, size);
partial_metrics.files_added.min =
std::cmp::min(partial_metrics.files_added.min, size);
Action::Add(add.clone())
}
o => o.clone(),
}
});
partial_actions.extend(add_actions);

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async fn execute(
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();

metrics.num_added_files = actions.len();
metrics.num_removed_files = candidates.candidates.len();
Expand Down
35 changes: 17 additions & 18 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use parquet::file::properties::WriterProperties;

use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use super::writer::{DeltaWriter, WriterConfig};
use super::{transaction::commit, CreateBuilder};
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::expr::parse_predicate_expression;
Expand All @@ -57,6 +56,7 @@ use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
use crate::writer::record_batch::divide_by_partition_values;
use crate::writer::{DeltaWriter, RecordBatchWriter};
use crate::DeltaTable;

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -307,12 +307,12 @@ async fn write_execution_plan_with_predicate(
plan: Arc<dyn ExecutionPlan>,
partition_columns: Vec<String>,
object_store: ObjectStoreRef,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
target_file_size: Option<usize>, // TODO: Implement this in Record Batch Writer
write_batch_size: Option<usize>, // TODO: Implement this in Record Batch Writer
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
) -> DeltaResult<Vec<Action>> {
// Use input schema to prevent wrapping partitions columns into a dictionary.
let schema: ArrowSchemaRef = if overwrite_schema {
plan.schema()
Expand Down Expand Up @@ -342,26 +342,25 @@ async fn write_execution_plan_with_predicate(
let inner_plan = plan.clone();
let inner_schema = schema.clone();
let task_ctx = Arc::new(TaskContext::from(&state));
let config = WriterConfig::new(

let mut writer = RecordBatchWriter::for_storage(
object_store.clone(),
writer_properties.clone().unwrap_or_default(),
inner_schema.clone(),
partition_columns.clone(),
writer_properties.clone(),
target_file_size,
write_batch_size,
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
Some(partition_columns.clone()),
)?;
let checker_stream = checker.clone();
let mut stream = inner_plan.execute(i, task_ctx)?;
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> =
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> =
tokio::task::spawn(async move {
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
checker_stream.check_batch(&batch).await?;
let arr =
super::cast::cast_record_batch(&batch, inner_schema.clone(), safe_cast)?;
writer.write(&arr).await?;
writer.write(arr).await?;
}
writer.close().await
writer.flush().await
});

tasks.push(handle);
Expand Down Expand Up @@ -392,7 +391,7 @@ pub(crate) async fn write_execution_plan(
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
) -> DeltaResult<Vec<Action>> {
write_execution_plan_with_predicate(
None,
snapshot,
Expand All @@ -417,7 +416,7 @@ async fn execute_non_empty_expr(
expression: &Expr,
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
) -> DeltaResult<Vec<Add>> {
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.

Expand Down Expand Up @@ -488,7 +487,7 @@ async fn prepare_predicate_actions(
};
let remove = candidates.candidates;

let mut actions: Vec<Action> = add.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add.clone();

for action in remove {
actions.push(Action::Remove(Remove {
Expand Down Expand Up @@ -644,7 +643,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.overwrite_schema,
)
.await?;
actions.extend(add_actions.into_iter().map(Action::Add));
actions.extend(add_actions);

// Collect remove actions if we are overwriting the table
if let Some(snapshot) = &this.snapshot {
Expand Down
Loading
Loading