Skip to content

Commit

Permalink
feat(rust): add more commit info to most operations (#2009)
Browse files Browse the repository at this point in the history
# Description
Some operations were not writing the readVersion and operationMetrics to
the commitInfo. This adds that to those operations as well.

---------

Co-authored-by: Robert Pack <[email protected]>
  • Loading branch information
ion-elgreco and roeap authored Jan 4, 2024
1 parent 7981b95 commit 236fa74
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 10 deletions.
23 changes: 21 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,17 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros();

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Delete {
Expand All @@ -259,7 +270,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -390,7 +401,7 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

let (table, metrics) = DeltaOps(table).delete().await.unwrap();
let (mut table, metrics) = DeltaOps(table).delete().await.unwrap();

assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 0);
Expand All @@ -399,6 +410,14 @@ mod tests {
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);

// rewrite is not required
assert_eq!(metrics.rewrite_time_ms, 0);

Expand Down
18 changes: 13 additions & 5 deletions crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,29 @@ impl FileSystemCheckPlan {
default_row_commit_version: file.default_row_commit_version,
}));
}
let metrics = FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
};

let mut app_metadata = HashMap::new();

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());
if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

commit(
self.log_store.as_ref(),
&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
None,
Some(app_metadata),
)
.await?;

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
Ok(metrics)
}
}

Expand Down
18 changes: 17 additions & 1 deletion crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,17 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64;

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Merge {
Expand All @@ -1396,7 +1407,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -2046,6 +2057,11 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let parameters = last_commit.operation_parameters.clone().unwrap();
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
Expand Down
24 changes: 22 additions & 2 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,24 @@ async fn execute(
let operation = DeltaOperation::Update {
predicate: Some(fmt_expr_to_sql(&predicate)?),
};

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

version = commit(
log_store.as_ref(),
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;

Expand Down Expand Up @@ -844,7 +856,7 @@ mod tests {

// Validate order operators do not include nulls
let table = prepare_values_table().await;
let (table, metrics) = DeltaOps(table)
let (mut table, metrics) = DeltaOps(table)
.update()
.with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2))))
.with_update("value", lit(10))
Expand All @@ -857,6 +869,14 @@ mod tests {
assert_eq!(metrics.num_updated_rows, 2);
assert_eq!(metrics.num_copied_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);

let expected = [
"+-------+",
"| value |",
Expand Down

0 comments on commit 236fa74

Please sign in to comment.