Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: honor appendOnly table config #1747

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc

| Writer Version | Requirement | Status |
| -------------- | --------------------------------------------- | :------------------: |
| Version 2 | Append Only Tables | [![open]][roadmap] |
| Version 2 | Append Only Tables | ![done]
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
20 changes: 18 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
@@ -312,11 +312,14 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
write_batch,
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::array::Int32Array;
use arrow::datatypes::{Field, Schema};
@@ -339,6 +342,19 @@ mod tests {
table
}

#[tokio::test]
async fn test_delete_when_delta_table_is_append_only() {
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
let batch = get_record_batch(None, false);
// append some data
let table = write_batch(table, batch).await;
// delete
let _err = DeltaOps(table)
.delete()
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

#[tokio::test]
async fn test_delete_default() {
let schema = get_arrow_schema(&None);
43 changes: 31 additions & 12 deletions rust/src/operations/merge.rs
Original file line number Diff line number Diff line change
@@ -532,7 +532,7 @@ impl MergeOperationConfig {
}
}

#[derive(Default, Serialize)]
#[derive(Default, Serialize, Debug)]
/// Metrics for the Merge Operation
pub struct MergeMetrics {
/// Number of rows in the source data
@@ -1245,12 +1245,13 @@ impl std::future::IntoFuture for MergeBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::get_arrow_schema;
use crate::writer::test_utils::get_delta_schema;
use crate::writer::test_utils::setup_table_with_configuration;
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::datatypes::Schema as ArrowSchema;
use arrow::record_batch::RecordBatch;
@@ -1277,6 +1278,21 @@ mod tests {
table
}

#[tokio::test]
async fn test_merge_when_delta_table_is_append_only() {
let schema = get_arrow_schema(&None);
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
// append some data
let table = write_data(table, &schema).await;
// merge
let _err = DeltaOps(table)
.merge(merge_source(schema), col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

async fn write_data(table: DeltaTable, schema: &Arc<ArrowSchema>) -> DeltaTable {
let batch = RecordBatch::try_new(
Arc::clone(schema),
@@ -1300,14 +1316,7 @@ mod tests {
.unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

fn merge_source(schema: Arc<ArrowSchema>) -> DataFrame {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
@@ -1322,8 +1331,18 @@ mod tests {
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();
(table, source)
ctx.read_batch(batch).unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

(table, merge_source(schema))
}

async fn assert_merge(table: DeltaTable, metrics: MergeMetrics) {
1 change: 1 addition & 0 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
@@ -229,6 +229,7 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
&snapshot,
None,
)
.await?;
12 changes: 6 additions & 6 deletions rust/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
@@ -691,7 +691,7 @@ mod tests {
actions: Vec<Action>,
read_whole_table: bool,
) -> Result<(), CommitConflictError> {
let setup_actions = setup.unwrap_or_else(init_table_actions);
let setup_actions = setup.unwrap_or_else(|| init_table_actions(None));
let state = DeltaTableState::from_actions(setup_actions, 0).unwrap();
let transaction_info = TransactionInfo::new(&state, reads, &actions, read_whole_table);
let summary = WinningCommitSummary {
@@ -717,7 +717,7 @@ mod tests {
// the concurrent transaction deletes a file that the current transaction did NOT read
let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_not_read);
setup_actions.push(file_read);
let result = execute_test(
@@ -733,7 +733,7 @@ mod tests {
// concurrently add file, that the current transaction would not have read
let file_added = tu::create_add_action("file_added", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
@@ -797,7 +797,7 @@ mod tests {
// delete / read
// transaction reads a file that is removed by concurrent transaction
let file_read = tu::create_add_action("file_read", true, get_stats(1, 10));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
@@ -842,7 +842,7 @@ mod tests {
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
@@ -858,7 +858,7 @@ mod tests {
// `read_whole_table` should disallow any concurrent remove actions
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
58 changes: 52 additions & 6 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,11 @@ pub enum TransactionError {
/// Error returned when maximum number of commit trioals is exceeded
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),
/// The transaction includes Remove action with data change but Delta table is append-only
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,
}

impl From<TransactionError> for DeltaTableError {
@@ -68,9 +73,18 @@ impl From<TransactionError> for DeltaTableError {
// Convert actions to their json representation
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
read_snapshot: &DeltaTableState,
) -> Result<String, TransactionError> {
let append_only = read_snapshot.table_config().append_only();
let mut jsons = Vec::<String>::new();
for action in actions {
if append_only {
if let Action::remove(remove) = action {
if remove.data_change {
return Err(TransactionError::DeltaTableAppendOnly);
}
}
}
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
@@ -81,6 +95,7 @@ fn log_entry_from_actions<'a>(
pub(crate) fn get_commit_bytes(
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<bytes::Bytes, TransactionError> {
if !actions.iter().any(|a| matches!(a, Action::commitInfo(..))) {
@@ -99,9 +114,13 @@ pub(crate) fn get_commit_bytes(
actions
.iter()
.chain(std::iter::once(&Action::commitInfo(commit_info))),
read_snapshot,
)?))
} else {
Ok(bytes::Bytes::from(log_entry_from_actions(actions)?))
Ok(bytes::Bytes::from(log_entry_from_actions(
actions,
read_snapshot,
)?))
}
}

@@ -112,10 +131,11 @@ pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(operation, actions, app_metadata)?;
let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?;

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
@@ -177,7 +197,8 @@ pub async fn commit_with_retries(
app_metadata: Option<Map<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;
let tmp_commit =
prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?;

let mut attempt_number = 1;

@@ -218,9 +239,11 @@ pub async fn commit_with_retries(

#[cfg(all(test, feature = "parquet"))]
mod tests {
use self::test_utils::init_table_actions;
use self::test_utils::{create_remove_action, init_table_actions};
use super::*;
use crate::DeltaConfigKey;
use object_store::memory::InMemory;
use std::collections::HashMap;

#[test]
fn test_commit_uri_from_version() {
@@ -232,13 +255,36 @@ mod tests {

#[test]
fn test_log_entry_from_actions() {
let actions = init_table_actions();
let entry = log_entry_from_actions(&actions).unwrap();
let actions = init_table_actions(None);
let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap();
let entry = log_entry_from_actions(&actions, &state).unwrap();
let lines: Vec<_> = entry.lines().collect();
// writes every action to a line
assert_eq!(actions.len(), lines.len())
}

fn remove_action_exists_when_delta_table_is_append_only(
data_change: bool,
) -> Result<String, TransactionError> {
let remove = create_remove_action("test_append_only", data_change);
let mut actions = init_table_actions(Some(HashMap::from([(
DeltaConfigKey::AppendOnly.as_ref().to_string(),
Some("true".to_string()),
)])));
actions.push(remove);
let state =
DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state");
log_entry_from_actions(&actions, &state)
}

#[test]
fn test_remove_action_exists_when_delta_table_is_append_only() {
let _err = remove_action_exists_when_delta_table_is_append_only(true)
.expect_err("Remove action is included when Delta table is append-only. Should error");
let _actions = remove_action_exists_when_delta_table_is_append_only(false)
.expect("Data is not changed by the Remove action. Should succeed");
}

#[tokio::test]
async fn test_try_commit_transaction() {
let store = InMemory::new();
4 changes: 2 additions & 2 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ mod tests {

#[test]
fn test_parse_predicate_expression() {
let snapshot = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let snapshot = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let session = SessionContext::new();
let state = session.state();

@@ -361,7 +361,7 @@ mod tests {

#[test]
fn test_files_matching_predicate() {
let mut actions = init_table_actions();
let mut actions = init_table_actions(None);
actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into())));
10 changes: 5 additions & 5 deletions rust/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ pub fn create_metadata_action(
Action::metaData(MetaData::try_from(metadata).unwrap())
}

pub fn init_table_actions() -> Vec<Action> {
pub fn init_table_actions(configuration: Option<HashMap<String, Option<String>>>) -> Vec<Action> {
let raw = r#"
{
"timestamp": 1670892998177,
@@ -96,7 +96,7 @@ pub fn init_table_actions() -> Vec<Action> {
vec![
Action::commitInfo(commit_info),
create_protocol_action(None, None),
create_metadata_action(None, None),
create_metadata_action(None, configuration),
]
}

@@ -127,7 +127,7 @@ pub async fn create_initialized_table(
HashMap::new(),
),
]);
let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let state = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let operation = DeltaOperation::Create {
mode: SaveMode::ErrorIfExists,
location: "location".into(),
@@ -144,8 +144,8 @@ pub async fn create_initialized_table(
configuration.unwrap_or_default(),
),
};
let actions = init_table_actions();
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None)
let actions = init_table_actions(None);
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None)
.await
.unwrap();
try_commit_transaction(storage.as_ref(), &prepared_commit, 0)
38 changes: 23 additions & 15 deletions rust/src/operations/update.rs
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ pub struct UpdateBuilder {
safe_cast: bool,
}

#[derive(Default, Serialize)]
#[derive(Default, Serialize, Debug)]
/// Metrics collected during the Update operation
pub struct UpdateMetrics {
/// Number of files added.
@@ -459,12 +459,14 @@ impl std::future::IntoFuture for UpdateBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
write_batch,
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
use crate::{protocol::SaveMode, DeltaResult};
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::Int32Array;
@@ -486,13 +488,6 @@ mod tests {
table
}

async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaResult<DeltaTable> {
DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
}

async fn prepare_values_table() -> DeltaTable {
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
@@ -515,6 +510,19 @@ mod tests {
DeltaOps::new_in_memory().write(vec![batch]).await.unwrap()
}

#[tokio::test]
async fn test_update_when_delta_table_is_append_only() {
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
let batch = get_record_batch(None, false);
// Append
let table = write_batch(table, batch).await;
let _err = DeltaOps(table)
.update()
.with_update("modified", lit("2023-05-14"))
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

#[tokio::test]
async fn test_update_no_predicate() {
let schema = get_arrow_schema(&None);
@@ -535,7 +543,7 @@ mod tests {
)
.unwrap();

let table = write_batch(table, batch).await.unwrap();
let table = write_batch(table, batch).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

@@ -589,7 +597,7 @@ mod tests {
// Update a partitioned table where the predicate contains only partition column
// The expectation is that a physical scan of data is not required

let table = write_batch(table, batch).await.unwrap();
let table = write_batch(table, batch).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

@@ -646,7 +654,7 @@ mod tests {
)
.unwrap();

let table = write_batch(table, batch.clone()).await.unwrap();
let table = write_batch(table, batch.clone()).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 2);

@@ -681,7 +689,7 @@ mod tests {

// Update a partitioned table where the predicate contains a partition column and non-partition column
let table = setup_table(Some(vec!["modified"])).await;
let table = write_batch(table, batch).await.unwrap();
let table = write_batch(table, batch).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 2);

17 changes: 16 additions & 1 deletion rust/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -567,15 +567,30 @@ mod tests {
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{
get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch,
get_record_batch_with_nested_struct,
get_record_batch_with_nested_struct, setup_table_with_configuration, write_batch,
};
use crate::DeltaConfigKey;
use arrow::datatypes::Field;
use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray};
use arrow_schema::{DataType, TimeUnit};
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use serde_json::{json, Value};

#[tokio::test]
async fn test_write_when_delta_table_is_append_only() {
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
let batch = get_record_batch(None, false);
// Append
let table = write_batch(table, batch.clone()).await;
// Overwrite
let _err = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite)
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

#[tokio::test]
async fn test_create_write() {
let table_schema = get_delta_schema();
26 changes: 24 additions & 2 deletions rust/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
@@ -7,10 +7,11 @@ use arrow::compute::take;
use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray, StructArray, UInt32Array};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};

use crate::operations::create::CreateBuilder;
use crate::operations::{create::CreateBuilder, DeltaOps};
use crate::schema::{Schema, SchemaTypeStruct};
use crate::table::DeltaTableMetaData;
use crate::{DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField};
use crate::writer::SaveMode;
use crate::{DeltaConfigKey, DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField};

pub type TestResult = Result<(), Box<dyn std::error::Error + 'static>>;

@@ -48,6 +49,14 @@ pub fn get_record_batch(part: Option<String>, with_null: bool) -> RecordBatch {
}
}

pub async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaTable {
DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.expect("Failed to append")
}

pub fn get_arrow_schema(part: &Option<String>) -> Arc<ArrowSchema> {
match part {
Some(key) if key.contains("/id=") => Arc::new(ArrowSchema::new(vec![Field::new(
@@ -284,6 +293,19 @@ pub fn get_delta_schema_with_nested_struct() -> Schema {
])
}

pub async fn setup_table_with_configuration(
key: DeltaConfigKey,
value: Option<impl Into<String>>,
) -> DeltaTable {
let table_schema = get_delta_schema();
DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.get_fields().clone())
.with_configuration_property(key, value)
.await
.expect("Failed to create table")
}

pub fn create_bare_table() -> DeltaTable {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();