diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index a2fa9982a9..2ee8675879 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -4,6 +4,9 @@ on: branches: - main pull_request: + paths: + - src/** + - protos/** jobs: build: runs-on: ubuntu-22.04 @@ -47,16 +50,16 @@ jobs: git fetch origin --tags git checkout 3.6.16 # Arrow 10.0 rm -rf /usr/local/bin/2to3 - rm -rf /usr/local/bin/2to3-3.11 - rm -rf /usr/local/bin/idle3 - rm -rf /usr/local/bin/idle3.11 + rm -rf /usr/local/bin/2to3-3.11 + rm -rf /usr/local/bin/idle3 + rm -rf /usr/local/bin/idle3.11 rm -rf /usr/local/bin/pydoc3 rm -rf /usr/local/bin/pydoc3.11 rm -rf /usr/local/bin/python3 rm -rf /usr/local/bin/python3.11 rm -rf /usr/local/bin/python3-config - rm -rf /usr/local/bin/python3.11-config - brew update + rm -rf /usr/local/bin/python3.11-config + brew update brew install apache-arrow protobuf - name: Cmake run: cmake -B build diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000000..7fee17ff5f --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,39 @@ +name: Rust +on: + push: + branches: + - main + pull_request: + paths: + - rust/** + - protos/** +jobs: + build: + runs-on: ubuntu-22.04 + timeout-minutes: 30 + defaults: + run: + working-directory: ./rust + steps: + - uses: actions/checkout@v3 + - name: Install dependencies + run: | + sudo apt update + sudo apt install -y protobuf-compiler + - name: Run clippy + #run: cargo clippy -- --deny "warnings" + run: cargo clippy + - name: Run tests + run: cargo test + mac-build: + runs-on: macos-12 + timeout-minutes: 30 + defaults: + run: + working-directory: ./rust + steps: + - uses: actions/checkout@v3 + - name: Install dependencies + run: brew install protobuf + - name: Run tests + run: cargo test diff --git a/.gitignore b/.gitignore index 06d7b35e34..a4995e40c5 100644 --- a/.gitignore +++ b/.gitignore @@ -74,4 +74,8 @@ wheelhouse .hypothesis -**/df.json \ No newline at end of file +**/df.json + +# Rust +target +Cargo.lock diff --git a/protos/format.proto b/protos/format.proto index 95f5e243d6..7bc65a2463 100644 --- a/protos/format.proto +++ b/protos/format.proto @@ -109,8 +109,10 @@ message Metadata { // The of all the pages in the same column are then contiguously stored. // // For example, for the column 5 and batch 4, we have: + // ```text // position = page_table[5][4][0]; // length = page_table[5][4][1]; + // ``` uint64 page_table_position = 3; } @@ -120,6 +122,7 @@ enum Encoding { PLAIN = 1; VAR_BINARY = 2; DICTIONARY = 3; + RLE = 4; } /// Dictionary field metadata diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml new file mode 100644 index 0000000000..823dcf2c50 --- /dev/null +++ b/rust/.cargo/config.toml @@ -0,0 +1,18 @@ +[profile.release] +lto = true + +[target.'cfg(all())'] +rustflags = [ + "-Wclippy::all", + # "-Wclippy::style", + "-Wclippy::fallible_impl_from", + "-Wclippy::redundant_pub_crate", + "-Wclippy::string_add_assign", + "-Wclippy::string_add", + "-Wclippy::string_lit_as_bytes", + "-Wclippy::string_to_string", + "-Wclippy::use_self", +] + +[target.aarch64-apple-darwin] +# rustflags = ["-C", "target-cpu=native", "-C", "target-feature=+neon"] diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 0000000000..622a3ade6d --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "lance" +version = "0.1.0" +edition = "2021" +authors = ["Lance Devs "] +description = "A columnar data format that is 100x faster than Parquet for random access." +license_file = "../LICENSE" +repository = "https://github.com/eto-ai/lance" +readme = "../README.md" +rust-version = "1.65" +keywords = [ + "data-format", + "data-science", + "machine-learning", + "apache-arrow", + "duckdb", + "dataops", + "data-analytics" +] +categories = [ + "database-implementations", + "data-structures", + "development-tools", + "science" +] + +[dependencies] +arrow-array = "30.0" +arrow-buffer = "30.0" +arrow-data = "30.0" +arrow-schema = "30.0" +async-trait = "0.1.60" +byteorder = "1.4.3" +chrono = "0.4.23" +clap = { version = "4.0.32", features = ["derive"], optional = true } +object_store = { version = "0.5", features = ["aws"] } +pin-project = "1.0" +prost = "0.11" +prost-types = "0.11" +tokio = { version = "1.23", features = ["rt-multi-thread"] } +url = "2.3" + +[build-dependencies] +prost-build = "0.11" + +[features] +cli = ["clap"] + +[[bin]] +name = "lq" +required-features = ["cli"] diff --git a/rust/README.md b/rust/README.md new file mode 100644 index 0000000000..4850ffc058 --- /dev/null +++ b/rust/README.md @@ -0,0 +1 @@ +# Rust Binding for Lance Data Format diff --git a/rust/build.rs b/rust/build.rs new file mode 100644 index 0000000000..18bdc63786 --- /dev/null +++ b/rust/build.rs @@ -0,0 +1,9 @@ +use std::io::Result; + +fn main() -> Result<()> { + println!("cargo:rerun-if-changed=protos"); + println!("cargo:rerun-if-changed=../protos"); + + prost_build::compile_protos(&["../protos/format.proto"], &["../protos"])?; + Ok(()) +} diff --git a/rust/src/bin/lq.rs b/rust/src/bin/lq.rs new file mode 100644 index 0000000000..92e6ce7c0a --- /dev/null +++ b/rust/src/bin/lq.rs @@ -0,0 +1,38 @@ +use clap::{Parser, Subcommand}; +use lance::dataset::Dataset; + +#[derive(Parser)] +struct Args { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Dataset inspection + Inspect { + /// The URI of the dataset. + uri: String, + + /// AWS profile + aws_profile: Option, + }, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + match &args.command { + Commands::Inspect { uri, aws_profile } => { + let dataset = Dataset::open(uri).await.unwrap(); + println!("Dataset URI: {}", uri); + println!( + "Latest version: {}, Total versions: {}", + dataset.version().version, + dataset.versions().await.unwrap().len() + ); + println!("Schema:\n{}", dataset.schema()) + } + } +} diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs new file mode 100644 index 0000000000..7142773ff4 --- /dev/null +++ b/rust/src/dataset.rs @@ -0,0 +1,122 @@ +//! Lance Dataset +//! + +use std::collections::BTreeMap; +use std::io::Result; + +use chrono::prelude::*; +use object_store::path::Path; + +use self::scanner::Scanner; +use crate::datatypes::Schema; +use crate::format::{pb, Manifest}; +use crate::io::reader::read_manifest; +use crate::io::{read_metadata_offset, ObjectStore}; + +pub mod scanner; + +const LATEST_MANIFEST_NAME: &str = "_latest.manifest"; +const VERSIONS_DIR: &str = "_versions"; +const DATA_DIR: &str = "data"; + +fn latest_manifest_path(base: &Path) -> Path { + base.child(LATEST_MANIFEST_NAME) +} + +/// Lance Dataset +#[derive(Debug)] +pub struct Dataset { + object_store: ObjectStore, + base: Path, + manifest: Manifest, +} + +/// Dataset Version +pub struct Version { + /// version number + pub version: u64, + + /// Timestamp of dataset creation in UTC. + pub timestamp: DateTime, + + /// Key-value pairs of metadata. + pub metadata: BTreeMap, +} + +/// Convert Manifest to Data Version. +impl From<&Manifest> for Version { + fn from(m: &Manifest) -> Self { + Self { + version: m.version, + timestamp: Utc::now(), + metadata: BTreeMap::default(), + } + } +} + +impl Dataset { + /// Open an existing dataset. + pub async fn open(uri: &str) -> Result { + let object_store = ObjectStore::new(uri)?; + + let latest_manifest_path = latest_manifest_path(object_store.base_path()); + + let mut object_reader = object_store.open(&latest_manifest_path).await?; + let bytes = object_store + .inner + .get(&latest_manifest_path) + .await? + .bytes() + .await?; + let offset = read_metadata_offset(&bytes)?; + let manifest_pb = object_reader + .read_message::(offset as usize) + .await?; + let manifest = (&manifest_pb).into(); + Ok(Self { + object_store, + base: Path::from(uri), + manifest, + }) + } + + pub fn scan(&self) -> Result { + todo!() + } + + pub fn object_store(&self) -> &ObjectStore { + &self.object_store + } + + fn versions_dir(&self) -> Path { + self.base.child(VERSIONS_DIR) + } + + pub fn version(&self) -> Version { + Version::from(&self.manifest) + } + + /// Get all versions. + pub async fn versions(&self) -> Result> { + let paths: Vec = self + .object_store + .inner + .list_with_delimiter(Some(&self.versions_dir())) + .await? + .objects + .iter() + .filter(|&obj| obj.location.as_ref().ends_with(".manifest")) + .map(|o| o.location.clone()) + .collect(); + let mut versions = vec![]; + for path in paths.iter() { + let manifest = read_manifest(&self.object_store, path).await?; + versions.push(Version::from(&manifest)); + } + Ok(versions) + } + + pub fn schema(&self) -> &Schema { + &self.manifest.schema + } +} diff --git a/rust/src/dataset/scanner.rs b/rust/src/dataset/scanner.rs new file mode 100644 index 0000000000..3a72dae1d5 --- /dev/null +++ b/rust/src/dataset/scanner.rs @@ -0,0 +1,25 @@ +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, SchemaRef}; + +/// Dataset Scanner +pub struct Scanner {} + +impl Scanner { + pub fn new() -> Self { + Self {} + } +} + +impl RecordBatchReader for Scanner { + fn schema(&self) -> SchemaRef { + todo!() + } +} + +impl Iterator for Scanner { + type Item = Result; + + fn next(&mut self) -> Option { + todo!() + } +} diff --git a/rust/src/datatypes.rs b/rust/src/datatypes.rs new file mode 100644 index 0000000000..31c0c83d64 --- /dev/null +++ b/rust/src/datatypes.rs @@ -0,0 +1,327 @@ +//! Lance data types + +use std::collections::HashMap; +use std::fmt; +use std::fmt::Formatter; + +use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema as ArrowSchema}; + +use crate::encodings::Encoding; +use crate::format::pb; + +/// LogicalType is a string presentation of arrow type. +/// to be serialized into protobuf. +#[derive(Debug)] +pub struct LogicalType(String); + +impl fmt::Display for LogicalType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl LogicalType { + fn is_list(&self) -> bool { + self.0 == "list" || self.0 == "list.struct" + } + + fn is_struct(&self) -> bool { + self.0 == "struct" + } +} + +impl From<&str> for LogicalType { + fn from(s: &str) -> Self { + Self(s.to_string()) + } +} + +impl TryFrom<&DataType> for LogicalType { + type Error = String; + + fn try_from(dt: &DataType) -> Result { + let type_str = match dt { + DataType::Null => Ok("null"), + DataType::Boolean => Ok("bool"), + DataType::Int8 => Ok("int8"), + DataType::UInt8 => Ok("uint8"), + DataType::Int16 => Ok("int16"), + DataType::UInt16 => Ok("uint16"), + DataType::Int32 => Ok("int32"), + DataType::UInt32 => Ok("uint32"), + DataType::Int64 => Ok("int64"), + DataType::UInt64 => Ok("uint64"), + DataType::Float16 => Ok("halffloat"), + DataType::Float32 => Ok("float"), + DataType::Float64 => Ok("double"), + DataType::Utf8 => Ok("string"), + DataType::Binary => Ok("binary"), + DataType::LargeUtf8 => Ok("large_string"), + DataType::LargeBinary => Ok("large_binary"), + DataType::Date32 => Ok("date32:day"), + DataType::Date64 => Ok("date64:ms"), + DataType::Struct(_) => Ok("struct"), + _ => Err(format!("Unsupport data type: {:?}", dt)), + }?; + + Ok(Self(type_str.to_string())) + } +} + +impl TryFrom<&LogicalType> for DataType { + type Error = String; + + fn try_from(lt: &LogicalType) -> Result { + use DataType::*; + match lt.0.as_str() { + "null" => Ok(Null), + "bool" => Ok(Boolean), + "int8" => Ok(Int8), + "uint8" => Ok(UInt8), + "int16" => Ok(Int16), + "uint16" => Ok(UInt16), + "int32" => Ok(Int32), + "uint32" => Ok(UInt32), + "int64" => Ok(Int64), + "uint64" => Ok(UInt64), + "halffloat" => Ok(Float16), + "float" => Ok(Float32), + "double" => Ok(Float64), + "string" => Ok(Utf8), + "binary" => Ok(Binary), + "large_string" => Ok(LargeUtf8), + "large_binary" => Ok(LargeBinary), + "date32:day" => Ok(Date32), + "date64:ms" => Ok(Date64), + _ => Err(format!("Unsupported type, {}", lt.0.as_str())), + } + } +} + +fn is_numeric(data_type: &DataType) -> bool { + use DataType::*; + matches!( + data_type, + UInt8 | UInt16 | UInt32 | UInt64 | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 + ) +} + +fn is_binary(data_type: &DataType) -> bool { + use DataType::*; + matches!(data_type, Binary | Utf8 | LargeBinary | LargeUtf8) +} + +/// Lance Schema Field +/// +#[derive(Debug)] +pub struct Field { + pub name: String, + pub id: i32, + pub parent_id: i32, + pub logical_type: LogicalType, + pub extension_name: String, + pub encoding: Option, + pub nullable: bool, + + children: Vec, +} + +impl Field { + /// Returns arrow data type. + pub fn data_type(&self) -> DataType { + match &self.logical_type { + lt if lt.is_list() => DataType::List(Box::new(ArrowField::from(&self.children[0]))), + lt if lt.is_struct() => { + DataType::Struct(self.children.iter().map(ArrowField::from).collect()) + } + lt => DataType::try_from(lt).unwrap(), + } + } +} + +impl fmt::Display for Field { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "Field(id={}, name={}, type={})", + self.id, self.name, self.logical_type.0, + ) + } +} + +impl TryFrom<&ArrowField> for Field { + type Error = ArrowError; + + fn try_from(field: &ArrowField) -> Result { + let children = match field.data_type() { + DataType::Struct(children) => children + .iter() + .map(Self::try_from) + .collect::>()?, + DataType::List(item) => vec![Self::try_from(item.as_ref())?], + _ => vec![], + }; + Ok(Self { + id: -1, + parent_id: -1, + name: field.name().clone(), + logical_type: LogicalType::try_from(field.data_type()) + .map_err(ArrowError::SchemaError)?, + encoding: match field.data_type() { + dt if is_numeric(dt) => Some(Encoding::Plain), + dt if is_binary(dt) => Some(Encoding::VarBinary), + DataType::Dictionary(_, _) => Some(Encoding::Dictionary), + _ => None, + }, + extension_name: "".to_string(), + nullable: field.is_nullable(), + children, + }) + } +} + +impl From<&Field> for ArrowField { + fn from(field: &Field) -> Self { + Self::new(&field.name, field.data_type(), field.nullable) + } +} + +impl From<&pb::Field> for Field { + fn from(field: &pb::Field) -> Self { + Self { + name: field.name.clone(), + id: field.id, + parent_id: field.parent_id, + logical_type: LogicalType(field.logical_type.clone()), + extension_name: field.extension_name.clone(), + encoding: match field.encoding { + 1 => Some(Encoding::Plain), + 2 => Some(Encoding::VarBinary), + 3 => Some(Encoding::Dictionary), + 4 => Some(Encoding::RLE), + _ => None, + }, + nullable: field.nullable, + children: vec![], + } + } +} + +impl From<&Field> for pb::Field { + fn from(field: &Field) -> Self { + Self { + id: field.id, + parent_id: field.parent_id, + name: field.name.clone(), + logical_type: field.logical_type.0.clone(), + encoding: match field.encoding { + Some(Encoding::Plain) => 1, + Some(Encoding::VarBinary) => 2, + Some(Encoding::Dictionary) => 3, + Some(Encoding::RLE) => 4, + _ => 0, + }, + nullable: field.nullable, + dictionary: None, + extension_name: field.extension_name.clone(), + r#type: 0, + } + } +} + +/// Lance Schema. +#[derive(Default, Debug)] +pub struct Schema { + pub fields: Vec, + pub metadata: HashMap, +} + +impl fmt::Display for Schema { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + for field in self.fields.iter() { + writeln!(f, "{}", field)? + } + Ok(()) + } +} + +/// Convert `arrow2::datatype::Schema` to Lance +impl TryFrom<&ArrowSchema> for Schema { + type Error = ArrowError; + + fn try_from(schema: &ArrowSchema) -> Result { + Ok(Self { + fields: schema + .fields + .iter() + .map(Field::try_from) + .collect::>()?, + metadata: schema.metadata.clone(), + }) + } +} + +/// Convert Lance Schema to Arrow Schema +impl From<&Schema> for ArrowSchema { + fn from(schema: &Schema) -> Self { + Self { + fields: schema.fields.iter().map(ArrowField::from).collect(), + metadata: schema.metadata.clone(), + } + } +} + +/// Convert list of protobuf `Field` to a Schema. +impl From<&Vec> for Schema { + fn from(fields: &Vec) -> Self { + Self { + fields: fields.iter().map(Field::from).collect(), + metadata: HashMap::default(), + } + } +} + +#[cfg(test)] +mod tests { + use arrow_schema::Field as ArrowField; + + use super::*; + + #[test] + fn arrow_field_to_field() { + for (name, data_type) in [ + ("null", DataType::Null), + ("bool", DataType::Boolean), + ("int8", DataType::Int8), + ("uint8", DataType::UInt8), + ("int16", DataType::Int16), + ("uint16", DataType::UInt16), + ("int32", DataType::Int32), + ("uint32", DataType::UInt32), + ("int64", DataType::Int64), + ("uint64", DataType::UInt64), + ("float16", DataType::Float16), + ("float32", DataType::Float32), + ("float64", DataType::Float64), + ] { + let arrow_field = ArrowField::new(name, data_type.clone(), true); + let field = Field::try_from(&arrow_field).unwrap(); + assert_eq!(field.name, name); + assert_eq!(field.data_type(), data_type); + assert_eq!(ArrowField::try_from(&field).unwrap(), arrow_field); + } + } + + #[test] + fn struct_field() { + let arrow_field = ArrowField::new( + "struct", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, true)]), + false, + ); + let field = Field::try_from(&arrow_field).unwrap(); + assert_eq!(field.name, "struct"); + assert_eq!(&field.data_type(), arrow_field.data_type()); + assert_eq!(ArrowField::try_from(&field).unwrap(), arrow_field); + } +} diff --git a/rust/src/encodings.rs b/rust/src/encodings.rs new file mode 100644 index 0000000000..6ea5507fcc --- /dev/null +++ b/rust/src/encodings.rs @@ -0,0 +1,40 @@ +//! Data encodings +//! +use std::io::Result; + +use arrow_array::Array; + +pub mod binary; +pub mod plain; +pub mod rle; +use crate::format::pb; + +/// Encoding enum. +#[derive(Debug)] +pub enum Encoding { + /// Plain encoding. + Plain, + /// Binary encoding. + VarBinary, + /// Dictionary encoding. + Dictionary, + /// RLE encoding. + RLE, +} + +impl From for pb::Encoding { + fn from(e: Encoding) -> Self { + match e { + Encoding::Plain => Self::Plain, + Encoding::VarBinary => Self::VarBinary, + Encoding::Dictionary => Self::Dictionary, + Encoding::RLE => Self::Rle, + } + } +} + +/// Encoder - Write an arrow array to the file. +pub trait Encoder { + /// Write an array, and returns the file offset of the beginning of the batch. + fn write(&mut self, array: &dyn Array) -> Result; +} diff --git a/rust/src/encodings/binary.rs b/rust/src/encodings/binary.rs new file mode 100644 index 0000000000..19e9b9b4f7 --- /dev/null +++ b/rust/src/encodings/binary.rs @@ -0,0 +1,3 @@ +//! Binary encoding +//! +pub struct BinaryEncoder {} diff --git a/rust/src/encodings/plain.rs b/rust/src/encodings/plain.rs new file mode 100644 index 0000000000..a56f3dd132 --- /dev/null +++ b/rust/src/encodings/plain.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Plain encoding +//! +//! Plain encoding works with primitive types, i.e., `boolean`, `i8...i64`, +//! it stores the array directly in the file. Therefore, it offers O(1) read +//! access. + +use std::io::{ErrorKind, Result}; +use std::marker::PhantomData; +use std::ops::Range; + +use arrow_array::{Array, ArrowPrimitiveType, PrimitiveArray}; +use arrow_buffer::Buffer; +use arrow_data::ArrayDataBuilder; +use object_store::path::Path; +use object_store::ObjectStore; +use tokio::io::AsyncWriteExt; + +use crate::io::object_writer::ObjectWriter; + +/// Encoder for plain encoding. +/// +pub struct PlainEncoder<'a, T: ArrowPrimitiveType> { + writer: &'a mut ObjectWriter<'a>, + phantom: PhantomData, +} + +impl<'a, T: ArrowPrimitiveType> PlainEncoder<'a, T> { + pub fn new(writer: &'a mut ObjectWriter<'a>) -> PlainEncoder<'a, T> { + PlainEncoder { + writer, + phantom: PhantomData, + } + } + + /// Encode an array of a batch. + /// Returns the offset of the metadata + pub async fn encode(&mut self, array: &dyn Array) -> Result { + let offset = self.writer.tell() as usize; + + let data = array.data().buffers()[0].as_slice(); + self.writer.write_all(data).await?; + + Ok(offset) + } +} + +/// Decoder for plain encoding. +pub struct PlainDecoder<'a, T: ArrowPrimitiveType> { + object_store: &'a dyn ObjectStore, + /// File path. + path: &'a Path, + /// The start position of the batch in the file. + position: usize, + /// Number of the rows in this batch. + length: usize, + + phantom: PhantomData, +} + +impl<'a, T: ArrowPrimitiveType> PlainDecoder<'a, T> { + pub fn new( + object_store: &'a dyn ObjectStore, + path: &'a Path, + position: usize, + length: usize, + ) -> Result> { + Ok(PlainDecoder { + object_store, + path, + position, + length, + phantom: PhantomData, + }) + } + + pub async fn at(&self, _idx: usize) -> Result> { + todo!() + } + + pub async fn decode(&self) -> Result> { + let array_bytes = T::get_byte_width() * self.length; + let range = Range { + start: self.position, + end: self.position + array_bytes, + }; + + // if self.nullable { + // Err("b"); + // } else { + let data = self.object_store.get_range(self.path, range).await?; + // A memory copy occurs here. + // TODO: zero-copy + // https://docs.rs/arrow-buffer/29.0.0/arrow_buffer/struct.Buffer.html#method.from_custom_allocation + let buf: Buffer = data.into(); + let array_data = match ArrayDataBuilder::new(T::DATA_TYPE) + .len(self.length) + .null_count(0) + .add_buffer(buf) + .build() + { + Ok(d) => d, + Err(e) => return Err(std::io::Error::new(ErrorKind::InvalidData, e.to_string())), + }; + Ok(Box::new(PrimitiveArray::::from(array_data))) + } + // } +} + +#[cfg(test)] +mod tests { + use arrow_array::cast::as_primitive_array; + use arrow_array::types::Int32Type; + use arrow_array::Int32Array; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::ObjectStore; + use tokio::io::AsyncWriteExt; + + use crate::io::object_writer::ObjectWriter; + + use super::*; + + #[tokio::test] + async fn test_encode_decode_int_array() { + let store = InMemory::new(); + let path = Path::from("/foo"); + let (_, mut writer) = store.put_multipart(&path).await.unwrap(); + + let arr = Int32Array::from(Vec::from_iter(1..4096)); + { + let mut object_writer = ObjectWriter::new(writer.as_mut()); + let mut encoder = PlainEncoder::::new(&mut object_writer); + + assert_eq!(encoder.encode(&arr).await.unwrap(), 0); + } + writer.shutdown().await.unwrap(); + + assert!(store.head(&Path::from("/foo")).await.unwrap().size > 0); + let decoder = PlainDecoder::::new(&store, &path, 0, arr.len()).unwrap(); + let read_arr = decoder.decode().await.unwrap(); + let expect_arr = as_primitive_array::(read_arr.as_ref()); + assert_eq!(expect_arr, &arr); + } +} diff --git a/rust/src/encodings/rle.rs b/rust/src/encodings/rle.rs new file mode 100644 index 0000000000..b248758bc1 --- /dev/null +++ b/rust/src/encodings/rle.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/rust/src/format.rs b/rust/src/format.rs new file mode 100644 index 0000000000..3ebd637b8e --- /dev/null +++ b/rust/src/format.rs @@ -0,0 +1,37 @@ +use crate::datatypes::Schema; + +mod fragment; +use fragment::Fragment; + +/// Protobuf definitions +#[allow(clippy::all)] +pub mod pb { + include!(concat!(env!("OUT_DIR"), "/lance.format.pb.rs")); +} + +/// Manifest of a dataset +/// +/// * Schema +/// * Version +/// * Fragments. +#[derive(Debug)] +pub struct Manifest { + /// Dataset schema. + pub schema: Schema, + + /// Dataset version + pub version: u64, + + /// Fragments, the pieces to build the dataset. + pub fragments: Vec, +} + +impl From<&pb::Manifest> for Manifest { + fn from(p: &pb::Manifest) -> Self { + Self { + schema: Schema::from(&p.fields), + version: p.version, + fragments: p.fragments.iter().map(Fragment::from).collect(), + } + } +} diff --git a/rust/src/format/fragment.rs b/rust/src/format/fragment.rs new file mode 100644 index 0000000000..1b3a730b66 --- /dev/null +++ b/rust/src/format/fragment.rs @@ -0,0 +1,60 @@ +use crate::format::pb; +use crate::format::pb::DataFragment; + +/// Lance Data File +/// +/// A data file is one piece of file storing data. +#[derive(Debug)] +pub struct DataFile { + /// Relative path of the data file to dataset root. + pub path: String, + /// The Ids of fields in this file. + pub fields: Vec, +} + +impl From<&DataFile> for pb::DataFile { + fn from(df: &DataFile) -> Self { + Self { + path: df.path.clone(), + fields: df.fields.clone(), + } + } +} + +impl From<&pb::DataFile> for DataFile { + fn from(proto: &pb::DataFile) -> Self { + Self { + path: proto.path.clone(), + fields: proto.fields.clone(), + } + } +} + +/// Data fragment. +/// +/// A fragment is a set of files which represent the different columns of the same rows. +/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`. +#[derive(Debug)] +pub struct Fragment { + /// Fragment ID + pub id: u64, + + /// Files within the fragment. + pub files: Vec, +} + +impl Fragment { + /// Get all field IDs from this datafragment + pub fn field_ids(&self) -> Vec { + vec![] + } +} + +impl From<&pb::DataFragment> for Fragment { + fn from(p: &DataFragment) -> Self { + Self { + id: p.id, + files: p.files.iter().map(DataFile::from).collect(), + } + } +} diff --git a/rust/src/io.rs b/rust/src/io.rs new file mode 100644 index 0000000000..1462cd256c --- /dev/null +++ b/rust/src/io.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! I/O utilities. + +use std::io::{Error, ErrorKind, Result}; + +use async_trait::async_trait; +use byteorder::{ByteOrder, LittleEndian}; +use prost::bytes::Bytes; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +pub mod object_reader; +pub mod object_store; +pub mod object_writer; +pub mod reader; + +pub use self::object_store::ObjectStore; + +const MAGIC: &[u8; 4] = b"LANC"; +const INDEX_MAGIC: &[u8; 8] = b"LANC_IDX"; + +#[async_trait] +pub trait AsyncWriteProtoExt { + /// Write footer with the offset to the root metadata block. + async fn write_footer(&mut self, offset: u64) -> Result<()>; +} + +#[async_trait] +impl AsyncWriteProtoExt for T { + async fn write_footer(&mut self, offset: u64) -> Result<()> { + self.write_u64_le(offset).await?; + self.write_all(INDEX_MAGIC).await?; + Ok(()) + } +} + +pub fn read_metadata_offset(bytes: &Bytes) -> Result { + let len = bytes.len(); + if len < 16 { + return Err(Error::new( + ErrorKind::Interrupted, + "does not have sufficient data", + )); + } + let offset_bytes = bytes.slice(len - 16..len - 8); + Ok(LittleEndian::read_u64(offset_bytes.as_ref())) +} diff --git a/rust/src/io/object_reader.rs b/rust/src/io/object_reader.rs new file mode 100644 index 0000000000..f499d80d6d --- /dev/null +++ b/rust/src/io/object_reader.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; +use std::sync::Arc; + +use std::io::{Error, ErrorKind, Result}; + +use byteorder::{ByteOrder, LittleEndian}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; +use prost::Message; + +/// Object Reader +/// +/// Object Store + Base Path +pub struct ObjectReader { + // Index Path + path: Path, + // Object Store. + // TODO: can we use reference instead? + object_store: Arc, + cached_metadata: Option, + prefetch_size: usize, +} + +impl ObjectReader { + /// Create an ObjectReader from URI + pub fn new( + object_store: Arc, + path: Path, + prefetch_size: usize, + ) -> Result { + Ok(Self { + object_store, + path, + cached_metadata: None, + prefetch_size, + }) + } + + /// Read a protobuf message at position `pos`. + pub async fn read_message(&mut self, pos: usize) -> Result { + if self.cached_metadata.is_none() { + self.cached_metadata = Some(self.object_store.head(&self.path).await?); + }; + let file_size: usize; + if let Some(metadata) = self.cached_metadata.clone() { + if pos > metadata.size { + return Err(Error::new(ErrorKind::InvalidData, "file size is too small")); + } + file_size = metadata.size; + } else { + panic!("Should not get here"); + } + + let range = pos..min(pos + self.prefetch_size, file_size); + let buf = self.object_store.get_range(&self.path, range).await?; + + let msg_len = LittleEndian::read_u32(&buf) as usize; + Ok(M::decode(&buf[4..4 + msg_len])?) + } +} + +#[cfg(test)] +mod tests {} diff --git a/rust/src/io/object_store.rs b/rust/src/io/object_store.rs new file mode 100644 index 0000000000..dc33247ee7 --- /dev/null +++ b/rust/src/io/object_store.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Wraps [ObjectStore](object_store::ObjectStore) +use std::io::{Error, ErrorKind, Result}; +use std::sync::Arc; + +use ::object_store::{ + aws::AmazonS3Builder, memory::InMemory, path::Path, ObjectStore as OSObjectStore, +}; +use object_store::local::LocalFileSystem; +use url::{ParseError, Url}; + +use super::object_reader::ObjectReader; + +/// Wraps [ObjectStore](object_store::ObjectStore) +#[derive(Debug)] +pub struct ObjectStore { + // Inner object store + pub inner: Arc, + scheme: String, + base_path: Path, + prefetch_size: usize, +} + +impl std::fmt::Display for ObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ObjectStore({})", self.scheme) + } +} + +impl ObjectStore { + /// Create a ObjectStore instance from a given URL. + pub fn new(uri: &str) -> Result { + if uri == ":memory:" { + return Ok(Self { + inner: Arc::new(InMemory::new()), + scheme: String::from("memory"), + base_path: Path::from("/"), + prefetch_size: 64 * 1024, + }); + }; + + let parsed = match Url::parse(uri) { + Ok(u) => u, + Err(ParseError::RelativeUrlWithoutBase) => { + return Ok(Self { + inner: Arc::new(LocalFileSystem::new()), + scheme: String::from("file"), + base_path: Path::from(uri), + prefetch_size: 4 * 1024, + }); + } + Err(e) => { + return Err(Error::new(ErrorKind::InvalidInput, e.to_string())); + } + }; + + let bucket_name = parsed.host().unwrap().to_string(); + let scheme: String; + let object_store: Arc = match parsed.scheme() { + "s3" => { + scheme = "s3".to_string(); + match AmazonS3Builder::from_env() + .with_bucket_name(bucket_name.clone()) + .build() + { + Ok(s3) => Arc::new(s3), + Err(e) => return Err(e.into()), + } + } + "file" => { + scheme = "flle".to_string(); + Arc::new(LocalFileSystem::new()) + } + &_ => todo!(), + }; + + Ok(Self { + inner: object_store, + scheme, + base_path: Path::from(parsed.path()), + prefetch_size: 64 * 1024, + }) + } + + pub fn prefetch_size(&self) -> usize { + self.prefetch_size + } + + pub fn set_prefetch_size(&mut self, new_size: usize) { + self.prefetch_size = new_size; + } + + pub fn base_path(&self) -> &Path { + &self.base_path + } + + pub async fn open(&self, path: &Path) -> Result { + match ObjectReader::new(self.inner.clone(), path.clone(), self.prefetch_size) { + Ok(r) => Ok(r), + Err(e) => Err(e.into()), + } + } +} diff --git a/rust/src/io/object_writer.rs b/rust/src/io/object_writer.rs new file mode 100644 index 0000000000..122adf8332 --- /dev/null +++ b/rust/src/io/object_writer.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Error; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project::pin_project; +use prost::Message; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +/// AsyncWrite with the capability to tell the position the data is written. +/// +#[pin_project] +pub struct ObjectWriter<'a> { + // TODO: wrap writer with a BufWriter. + #[pin] + writer: &'a mut (dyn AsyncWrite + Unpin + Send), + cursor: usize, +} + +impl<'a> ObjectWriter<'a> { + pub fn new(writer: &'a mut (dyn AsyncWrite + Unpin + Send)) -> ObjectWriter<'a> { + ObjectWriter { writer, cursor: 0 } + } + + /// Tell the current position (file size). + pub fn tell(&self) -> u64 { + self.cursor as u64 + } + + /// Write a protobuf message to the object, and returns the file position of the protobuf. + pub async fn write_protobuf(&mut self, msg: &impl Message) -> Result { + let offset = self.tell(); + + let len = msg.encoded_len(); + + self.write_u32_le(len as u32).await?; + self.write_all(&msg.encode_to_vec()).await?; + + Ok(offset) + } +} + +impl AsyncWrite for ObjectWriter<'_> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut this = self.project(); + *this.cursor += buf.len(); + this.writer.as_mut().poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.as_mut().poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.as_mut().poll_shutdown(cx) + } +} + +#[cfg(test)] +mod tests { + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::ObjectStore; + use tokio::io::AsyncWriteExt; + + use super::*; + + #[tokio::test] + async fn test_write() { + let store = InMemory::new(); + let (_, mut writer) = store.put_multipart(&Path::from("/foo")).await.unwrap(); + + let mut object_writer = ObjectWriter::new(writer.as_mut()); + assert_eq!(object_writer.tell(), 0); + + let mut buf = Vec::::new(); + buf.resize(256, 0); + assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256); + assert_eq!(object_writer.tell(), 256); + + assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256); + assert_eq!(object_writer.tell(), 512); + + assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256); + assert_eq!(object_writer.tell(), 256 * 3); + + object_writer.shutdown().await.unwrap(); + + assert_eq!(store.head(&Path::from("/foo")).await.unwrap().size, 256 * 3); + } +} diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs new file mode 100644 index 0000000000..f28d4a37bd --- /dev/null +++ b/rust/src/io/reader.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lance Data File Reader + +use std::io::{Error, ErrorKind, Result}; +use std::ops::Range; + +use byteorder::ByteOrder; +use byteorder::LittleEndian; +use object_store::path::Path; +use prost::Message; + +use super::object_store::ObjectStore; +use crate::format::pb; +use crate::format::Manifest; + +/// Read Manifest on URI. +pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { + let file_size = object_store.inner.head(path).await?.size; + const PREFETCH_SIZE: usize = 64 * 1024; + let range = Range { + start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize, + end: file_size, + }; + let buf = object_store.inner.get_range(path, range).await?; + if buf.len() < 16 { + return Err(Error::new( + ErrorKind::InvalidData, + "Invalid format: file size is smaller than 8 bytes", + )); + } + if !buf.ends_with(super::MAGIC) { + return Err(Error::new( + ErrorKind::InvalidData, + "Invalid format: magic number does not match", + )); + } + let manifest_pos = LittleEndian::read_i64(&buf[buf.len() - 16..buf.len() - 8]) as usize; + assert!(file_size - manifest_pos < buf.len()); + let proto = + pb::Manifest::decode(&buf[buf.len() - (file_size - manifest_pos) + 4..buf.len() - 16])?; + Ok(Manifest::from(&proto)) +} + +/// Lance File Reader. +/// +/// It reads arrow data from one data file. +pub struct FileReader {} + +#[cfg(test)] +mod tests {} diff --git a/rust/src/lib.rs b/rust/src/lib.rs new file mode 100644 index 0000000000..9dadc50969 --- /dev/null +++ b/rust/src/lib.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lance Columnar Data Format +//! +//! Lance columnar data format is an alternative to Parquet. It provides 100x faster for random access, +//! automatic versioning, optimized for computer vision, bioinformatics, spatial and ML data. +//! [Apache Arrow](https://arrow.apache.org/) and DuckDB compatible. + +pub mod dataset; +pub mod datatypes; +pub mod encodings; +pub mod format; +pub mod io;