From 79dace9a4750c540727104ab96fb444a77571cbb Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 18 Dec 2024 14:50:36 +0300 Subject: [PATCH] metadata flags --- arrow-array/src/lib.rs | 2 + arrow-array/src/record_batch.rs | 79 +++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs index 0fc9d30ab6e3..1286278de60e 100644 --- a/arrow-array/src/lib.rs +++ b/arrow-array/src/lib.rs @@ -230,6 +230,8 @@ pub use array::*; mod record_batch; pub use record_batch::{ RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, RecordBatchWriter, + CHECKPOINT_MESSAGE, INTERMEDIATE_NODE_GENERATED_WATERMARK, NORMAL_RECORD_BATCH, + SOURCE_GENERATED_WATERMARK, }; mod arithmetic; diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 8958ca6fae62..09f2e634a607 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -23,6 +23,27 @@ use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef use std::ops::Index; use std::sync::Arc; +/// THIS CONSTANT IS ARAS ONLY +pub const NORMAL_RECORD_BATCH: u8 = 0; +/// THIS CONSTANT IS ARAS ONLY +/// +/// Whether the watermark is [`SOURCE_GENERATED_WATERMARK`] or +/// [`INTERMEDIATE_NODE_GENERATED_WATERMARK`] is not be important +/// for the watermark algorithms. They are seperated because of +/// testing purposes. Once the watermark infrastructure is solid +/// and complete, then we will unify them. +pub const SOURCE_GENERATED_WATERMARK: u8 = 1; +/// THIS CONSTANT IS ARAS ONLY +/// +/// Whether the watermark is [`SOURCE_GENERATED_WATERMARK`] or +/// [`INTERMEDIATE_NODE_GENERATED_WATERMARK`] is not be important +/// for the watermark algorithms. They are seperated because of +/// testing purposes. Once the watermark infrastructure is solid +/// and complete, then we will unify them. +pub const INTERMEDIATE_NODE_GENERATED_WATERMARK: u8 = 2; +/// THIS CONSTANT IS ARAS ONLY +pub const CHECKPOINT_MESSAGE: u8 = 3; + /// Trait for types that can read `RecordBatch`'s. /// /// To create from an iterator, see [RecordBatchIterator]. @@ -204,6 +225,28 @@ pub struct RecordBatch { /// /// This is stored separately from the columns to handle the case of no columns row_count: usize, + + /// THIS MEMBER IS ARAS ONLY + /// + /// This tag is used to store metadata flags for the record batch. This + /// should include information that do not make a material change in the + /// schema, but still needs to be carried with. Carrying such information + /// as part of schema metadata results in "schema mismatch" issues, so + /// this attribute should be used in such cases. Typical downstream use + /// cases include specifying whether the record batch is a proper data + /// batch, or a watermark batch, or a control message like a checkpoint + /// barrier. + /// + /// The flags define the type of data or message represented by the record batch. + /// The following values are currently defined: + /// + /// - `0`: Normal record batch data. + /// - `1`: A source-generated watermark. + /// - `2`: An intermediate node-generated watermark. + /// - `3`: A checkpoint message. + /// + /// Additional flag values may be defined in the future to support new use cases. + metadata_flags: u8, } impl RecordBatch { @@ -263,6 +306,7 @@ impl RecordBatch { schema, columns, row_count: 0, + metadata_flags: 0, } } @@ -336,6 +380,7 @@ impl RecordBatch { schema, columns, row_count, + metadata_flags: options.metadata_flags, }) } @@ -355,6 +400,7 @@ impl RecordBatch { schema, columns: self.columns, row_count: self.row_count, + metadata_flags: self.metadata_flags, }) } @@ -390,6 +436,7 @@ impl RecordBatch { &RecordBatchOptions { match_field_names: true, row_count: Some(self.row_count), + metadata_flags: self.metadata_flags, }, ) } @@ -512,6 +559,7 @@ impl RecordBatch { schema: self.schema.clone(), columns, row_count: length, + metadata_flags: self.metadata_flags, } } @@ -620,6 +668,21 @@ impl RecordBatch { .map(|array| array.get_array_memory_size()) .sum() } + + /// THIS METHOD IS ARAS ONLY + /// + /// Gets the metadata_flags of RecordBatch + pub fn metadata_flags(&self) -> u8 { + self.metadata_flags + } + + /// THIS METHOD IS ARAS ONLY + /// + /// Sets the metadata_flags of RecordBatch and returns self + pub fn with_metadata_flags(mut self, metadata_flags: u8) -> Self { + self.metadata_flags = metadata_flags; + self + } } /// Options that control the behaviour used when creating a [`RecordBatch`]. @@ -631,6 +694,11 @@ pub struct RecordBatchOptions { /// Optional row count, useful for specifying a row count for a RecordBatch with no columns pub row_count: Option, + + /// THIS MEMBER IS ARAS ONLY + /// + /// This tag is used to store metadata flags for the record batch. + pub metadata_flags: u8, } impl RecordBatchOptions { @@ -639,6 +707,7 @@ impl RecordBatchOptions { Self { match_field_names: true, row_count: None, + metadata_flags: 0, } } /// Sets the row_count of RecordBatchOptions and returns self @@ -651,6 +720,13 @@ impl RecordBatchOptions { self.match_field_names = match_field_names; self } + /// THIS METHOD IS ARAS ONLY + /// + /// Sets the metadata_flags of RecordBatchOptions and returns self + pub fn with_metadata_flags(mut self, metadata_flags: u8) -> Self { + self.metadata_flags = metadata_flags; + self + } } impl Default for RecordBatchOptions { fn default() -> Self { @@ -671,6 +747,7 @@ impl From for RecordBatch { schema: Arc::new(Schema::new(fields)), row_count, columns, + metadata_flags: 0, } } } @@ -982,6 +1059,7 @@ mod tests { let options = RecordBatchOptions { match_field_names: false, row_count: None, + metadata_flags: 0, }; let batch = RecordBatch::try_new_with_options(schema, vec![a], &options); assert!(batch.is_ok()); @@ -1226,6 +1304,7 @@ mod tests { &RecordBatchOptions { match_field_names: true, row_count: Some(3), + metadata_flags: 0, }, ) .expect("valid conversion");