Skip to content

Commit

Permalink
feat: add parquet writer (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME authored Mar 9, 2024
1 parent c7727e3 commit 0914f7a
Show file tree
Hide file tree
Showing 9 changed files with 995 additions and 48 deletions.
1 change: 0 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
tokio = { workspace = true }
11 changes: 7 additions & 4 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::AsyncWrite as TokioAsyncWrite;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;

Expand Down Expand Up @@ -244,9 +245,9 @@ impl InputFile {
}

/// Trait for writing file.
pub trait FileWrite: AsyncWrite {}
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}

impl<T> FileWrite for T where T: AsyncWrite {}
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}

/// Output file is used for writing to files..
#[derive(Debug)]
Expand Down Expand Up @@ -282,8 +283,10 @@ impl OutputFile {
}

/// Creates output file for writing.
pub async fn writer(&self) -> Result<impl FileWrite> {
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
}
}

Expand Down
17 changes: 10 additions & 7 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl FileScanTask {
mod tests {
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
Expand Down Expand Up @@ -314,14 +314,15 @@ mod tests {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -330,14 +331,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -346,14 +348,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build(),
.build()
.unwrap(),
)
.build(),
],
Expand Down
34 changes: 17 additions & 17 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,34 +932,34 @@ impl TryFrom<i32> for ManifestStatus {
}

/// Data file carries data file path, partition tuple, metrics, …
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
pub struct DataFile {
/// field id: 134
///
/// Type of content stored by the data file: data, equality deletes,
/// or position deletes (all v1 files are data files)
content: DataContentType,
pub(crate) content: DataContentType,
/// field id: 100
///
/// Full URI for the file with FS scheme
file_path: String,
pub(crate) file_path: String,
/// field id: 101
///
/// String file format name, avro, orc or parquet
file_format: DataFileFormat,
pub(crate) file_format: DataFileFormat,
/// field id: 102
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
partition: Struct,
pub(crate) partition: Struct,
/// field id: 103
///
/// Number of records in this file
record_count: u64,
pub(crate) record_count: u64,
/// field id: 104
///
/// Total file size in bytes
file_size_in_bytes: u64,
pub(crate) file_size_in_bytes: u64,
/// field id: 108
/// key field id: 117
/// value field id: 118
Expand All @@ -968,29 +968,29 @@ pub struct DataFile {
/// store the column. Does not include bytes necessary to read other
/// columns, like footers. Leave null for row-oriented formats (Avro)
#[builder(default)]
column_sizes: HashMap<i32, u64>,
pub(crate) column_sizes: HashMap<i32, u64>,
/// field id: 109
/// key field id: 119
/// value field id: 120
///
/// Map from column id to number of values in the column (including null
/// and NaN values)
#[builder(default)]
value_counts: HashMap<i32, u64>,
pub(crate) value_counts: HashMap<i32, u64>,
/// field id: 110
/// key field id: 121
/// value field id: 122
///
/// Map from column id to number of null values in the column
#[builder(default)]
null_value_counts: HashMap<i32, u64>,
pub(crate) null_value_counts: HashMap<i32, u64>,
/// field id: 137
/// key field id: 138
/// value field id: 139
///
/// Map from column id to number of NaN values in the column
#[builder(default)]
nan_value_counts: HashMap<i32, u64>,
pub(crate) nan_value_counts: HashMap<i32, u64>,
/// field id: 125
/// key field id: 126
/// value field id: 127
Expand All @@ -1003,7 +1003,7 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
lower_bounds: HashMap<i32, Literal>,
pub(crate) lower_bounds: HashMap<i32, Literal>,
/// field id: 128
/// key field id: 129
/// value field id: 130
Expand All @@ -1016,19 +1016,19 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
upper_bounds: HashMap<i32, Literal>,
pub(crate) upper_bounds: HashMap<i32, Literal>,
/// field id: 131
///
/// Implementation-specific key metadata for encryption
#[builder(default)]
key_metadata: Vec<u8>,
pub(crate) key_metadata: Vec<u8>,
/// field id: 132
/// element field id: 133
///
/// Split offsets for the data file. For example, all row group offsets
/// in a Parquet file. Must be sorted ascending
#[builder(default)]
split_offsets: Vec<i64>,
pub(crate) split_offsets: Vec<i64>,
/// field id: 135
/// element field id: 136
///
Expand All @@ -1037,7 +1037,7 @@ pub struct DataFile {
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
#[builder(default)]
equality_ids: Vec<i32>,
pub(crate) equality_ids: Vec<i32>,
/// field id: 140
///
/// ID representing sort order for this file.
Expand All @@ -1049,7 +1049,7 @@ pub struct DataFile {
/// order id to null. Readers must ignore sort order id for position
/// delete files.
#[builder(default, setter(strip_option))]
sort_order_id: Option<i32>,
pub(crate) sort_order_id: Option<i32>,
}

/// Type of content stored by the data file: data, equality deletes, or
Expand Down
38 changes: 19 additions & 19 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,66 +52,66 @@ pub type TableMetadataRef = Arc<TableMetadata>;
/// We check the validity of this data structure when constructing.
pub struct TableMetadata {
/// Integer Version for the format.
format_version: FormatVersion,
pub(crate) format_version: FormatVersion,
/// A UUID that identifies the table
table_uuid: Uuid,
pub(crate) table_uuid: Uuid,
/// Location tables base location
location: String,
pub(crate) location: String,
/// The tables highest sequence number
last_sequence_number: i64,
pub(crate) last_sequence_number: i64,
/// Timestamp in milliseconds from the unix epoch when the table was last updated.
last_updated_ms: i64,
pub(crate) last_updated_ms: i64,
/// An integer; the highest assigned column ID for the table.
last_column_id: i32,
pub(crate) last_column_id: i32,
/// A list of schemas, stored as objects with schema-id.
schemas: HashMap<i32, SchemaRef>,
pub(crate) schemas: HashMap<i32, SchemaRef>,
/// ID of the table’s current schema.
current_schema_id: i32,
pub(crate) current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
partition_specs: HashMap<i32, PartitionSpecRef>,
pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
default_spec_id: i32,
pub(crate) default_spec_id: i32,
/// An integer; the highest assigned partition field ID across all partition specs for the table.
last_partition_id: i32,
pub(crate) last_partition_id: i32,
///A string to string map of table properties. This is used to control settings that
/// affect reading and writing and is not intended to be used for arbitrary metadata.
/// For example, commit.retry.num-retries is used to control the number of commit retries.
properties: HashMap<String, String>,
pub(crate) properties: HashMap<String, String>,
/// long ID of the current table snapshot; must be the same as the current
/// ID of the main branch in refs.
current_snapshot_id: Option<i64>,
pub(crate) current_snapshot_id: Option<i64>,
///A list of valid snapshots. Valid snapshots are snapshots for which all
/// data files exist in the file system. A data file must not be deleted
/// from the file system until the last snapshot in which it was listed is
/// garbage collected.
snapshots: HashMap<i64, SnapshotRef>,
pub(crate) snapshots: HashMap<i64, SnapshotRef>,
/// A list (optional) of timestamp and snapshot ID pairs that encodes changes
/// to the current snapshot for the table. Each time the current-snapshot-id
/// is changed, a new entry should be added with the last-updated-ms
/// and the new current-snapshot-id. When snapshots are expired from
/// the list of valid snapshots, all entries before a snapshot that has
/// expired should be removed.
snapshot_log: Vec<SnapshotLog>,
pub(crate) snapshot_log: Vec<SnapshotLog>,

/// A list (optional) of timestamp and metadata file location pairs
/// that encodes changes to the previous metadata files for the table.
/// Each time a new metadata file is created, a new entry of the
/// previous metadata file location should be added to the list.
/// Tables can be configured to remove oldest metadata log entries and
/// keep a fixed-size log of the most recent entries after a commit.
metadata_log: Vec<MetadataLog>,
pub(crate) metadata_log: Vec<MetadataLog>,

/// A list of sort orders, stored as full sort order objects.
sort_orders: HashMap<i64, SortOrderRef>,
pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
/// Default sort order id of the table. Note that this could be used by
/// writers, but is not used when reading because reads use the specs
/// stored in manifest files.
default_sort_order_id: i64,
pub(crate) default_sort_order_id: i64,
///A map of snapshot references. The map keys are the unique snapshot reference
/// names in the table, and the map values are snapshot reference objects.
/// There is always a main branch reference pointing to the current-snapshot-id
/// even if the refs map is null.
refs: HashMap<String, SnapshotReference>,
pub(crate) refs: HashMap<String, SnapshotReference>,
}

impl TableMetadata {
Expand Down
Loading

0 comments on commit 0914f7a

Please sign in to comment.