Skip to content

Commit

Permalink
Fix JsonWriter and RecordBatchWriter to respect stats skipping
Browse files Browse the repository at this point in the history
This is an update to JsonWriter and RecordBatchWriter to allow them to
write commit log stats information in accordance with
delta.dataSkippingNumIndexedCols and
delta.dataSkippingStatsColumns if present on the table.  If these fields
are unset, then the default behavior of collecting stats for the first 32
columns is preserved

Signed-off-by: Justin Jossick <[email protected]>
  • Loading branch information
jjossick committed Nov 12, 2024
1 parent 8c7b019 commit 1d3952e
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 14 deletions.
145 changes: 139 additions & 6 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct JsonWriter {
writer_properties: WriterProperties,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, DataArrowWriter>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}

/// Writes messages to an underlying arrow buffer.
Expand Down Expand Up @@ -187,22 +189,37 @@ impl JsonWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let delta_table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?;

.build()?;
// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();

// if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns
let configuration: HashMap<String, Option<String>> = delta_table.metadata().map_or_else(
|_| HashMap::new(),
|metadata| metadata.configuration.clone(),
);

Ok(Self {
storage: storage.object_store(),
storage: delta_table.object_store(),
arrow_schema_ref: schema,
writer_properties,
partition_columns: partition_columns.unwrap_or_default(),
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand All @@ -219,13 +236,25 @@ impl JsonWriter {
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();
let configuration: HashMap<String, Option<String>> =
table.metadata()?.configuration.clone();

Ok(Self {
storage: table.object_store(),
arrow_schema_ref,
writer_properties,
partition_columns,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand Down Expand Up @@ -372,8 +401,8 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
path.to_string(),
file_size,
&metadata,
DEFAULT_NUM_INDEX_COLS,
&None,
self.num_indexed_cols,
&self.stats_columns,
)?);
}
Ok(actions)
Expand Down Expand Up @@ -659,4 +688,108 @@ mod tests {
assert_eq!(table.version(), 1);
}
}

#[tokio::test]
async fn test_json_write_data_skipping_stats_columns() {
// use std::fs::File;
// use std::io::Read;
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingStatsColumns".to_string(),
Some("id,value".to_string()),
)]
.into_iter()
.collect();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.with_configuration(config)
.await
.unwrap();
assert_eq!(table.version(), 0);
let mut writer = JsonWriter::for_table(&table).unwrap();
let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\",\"value\":42},\"maxValues\":{\"id\":\"A\",\"value\":42},\"nullCount\":{\"id\":0,\"value\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}

#[tokio::test]
async fn test_json_write_data_skipping_num_indexed_cols() {
// use std::fs::File;
// use std::io::Read;
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingNumIndexedCols".to_string(),
Some("1".to_string()),
)]
.into_iter()
.collect();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.with_configuration(config)
.await
.unwrap();
assert_eq!(table.version(), 0);
let mut writer = JsonWriter::for_table(&table).unwrap();
let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"A\"},\"nullCount\":{\"id\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}
}
138 changes: 131 additions & 7 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct RecordBatchWriter {
should_evolve: bool,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, PartitionWriter>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}

impl std::fmt::Debug for RecordBatchWriter {
Expand All @@ -60,25 +62,39 @@ impl RecordBatchWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let delta_table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?
.object_store();

.build()?;
// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();

// if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns
let configuration: HashMap<String, Option<String>> = delta_table.metadata().map_or_else(
|_| HashMap::new(),
|metadata| metadata.configuration.clone(),
);

Ok(Self {
storage,
storage: delta_table.object_store(),
arrow_schema_ref: schema.clone(),
original_schema_ref: schema,
writer_properties,
partition_columns: partition_columns.unwrap_or_default(),
should_evolve: false,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand All @@ -96,6 +112,8 @@ impl RecordBatchWriter {
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();
let configuration: HashMap<String, Option<String>> =
table.metadata()?.configuration.clone();

Ok(Self {
storage: table.object_store(),
Expand All @@ -105,6 +123,16 @@ impl RecordBatchWriter {
partition_columns,
should_evolve: false,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand Down Expand Up @@ -233,8 +261,8 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
path.to_string(),
file_size,
&metadata,
DEFAULT_NUM_INDEX_COLS,
&None,
self.num_indexed_cols,
&self.stats_columns,
)?);
}
Ok(actions)
Expand Down Expand Up @@ -985,4 +1013,100 @@ mod tests {
);
}
}

#[tokio::test]
async fn test_write_data_skipping_stats_columns() {
let batch = get_record_batch(None, false);
let partition_cols: &[String] = &vec![];
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingStatsColumns".to_string(),
Some("id,value".to_string()),
)]
.into_iter()
.collect();

let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
.with_configuration(config)
.with_partition_columns(partition_cols)
.await
.unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();

assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].record_batch, batch);
writer.write(batch).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats ="{\"numRecords\":11,\"minValues\":{\"value\":1,\"id\":\"A\"},\"maxValues\":{\"id\":\"B\",\"value\":11},\"nullCount\":{\"id\":0,\"value\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}

#[tokio::test]
async fn test_write_data_skipping_num_indexed_colsn() {
let batch = get_record_batch(None, false);
let partition_cols: &[String] = &vec![];
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingNumIndexedCols".to_string(),
Some("1".to_string()),
)]
.into_iter()
.collect();

let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
.with_configuration(config)
.with_partition_columns(partition_cols)
.await
.unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();

assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].record_batch, batch);
writer.write(batch).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":11,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"B\"},\"nullCount\":{\"id\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}
}
2 changes: 1 addition & 1 deletion crates/core/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn create_bare_table() -> DeltaTable {
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
let table_schema = get_delta_schema();
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

Expand Down

0 comments on commit 1d3952e

Please sign in to comment.