Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RUST] Initialize read support in Rust. #401

Merged
merged 13 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .github/workflows/cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
branches:
- main
pull_request:
paths:
- src/**
- protos/**
jobs:
build:
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -47,16 +50,16 @@ jobs:
git fetch origin --tags
git checkout 3.6.16 # Arrow 10.0
rm -rf /usr/local/bin/2to3
rm -rf /usr/local/bin/2to3-3.11
rm -rf /usr/local/bin/idle3
rm -rf /usr/local/bin/idle3.11
rm -rf /usr/local/bin/2to3-3.11
rm -rf /usr/local/bin/idle3
rm -rf /usr/local/bin/idle3.11
rm -rf /usr/local/bin/pydoc3
rm -rf /usr/local/bin/pydoc3.11
rm -rf /usr/local/bin/python3
rm -rf /usr/local/bin/python3.11
rm -rf /usr/local/bin/python3-config
rm -rf /usr/local/bin/python3.11-config
brew update
rm -rf /usr/local/bin/python3.11-config
brew update
brew install apache-arrow protobuf
- name: Cmake
run: cmake -B build
Expand Down
39 changes: 39 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Rust
on:
push:
branches:
- main
pull_request:
paths:
- rust/**
- protos/**
jobs:
build:
runs-on: ubuntu-22.04
timeout-minutes: 30
defaults:
run:
working-directory: ./rust
steps:
- uses: actions/checkout@v3
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y protobuf-compiler
- name: Run clippy
#run: cargo clippy -- --deny "warnings"
run: cargo clippy
- name: Run tests
run: cargo test
mac-build:
runs-on: macos-12
timeout-minutes: 30
defaults:
run:
working-directory: ./rust
steps:
- uses: actions/checkout@v3
- name: Install dependencies
run: brew install protobuf
- name: Run tests
run: cargo test
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ wheelhouse
.hypothesis


**/df.json
**/df.json

# Rust
target
Cargo.lock
3 changes: 3 additions & 0 deletions protos/format.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ message Metadata {
// The <position, length> of all the pages in the same column are then contiguously stored.
//
// For example, for the column 5 and batch 4, we have:
// ```text
// position = page_table[5][4][0];
// length = page_table[5][4][1];
// ```
uint64 page_table_position = 3;
}

Expand All @@ -120,6 +122,7 @@ enum Encoding {
PLAIN = 1;
VAR_BINARY = 2;
DICTIONARY = 3;
RLE = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's exciting

}

/// Dictionary field metadata
Expand Down
18 changes: 18 additions & 0 deletions rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[profile.release]
lto = true

[target.'cfg(all())']
rustflags = [
"-Wclippy::all",
# "-Wclippy::style",
"-Wclippy::fallible_impl_from",
"-Wclippy::redundant_pub_crate",
"-Wclippy::string_add_assign",
"-Wclippy::string_add",
"-Wclippy::string_lit_as_bytes",
"-Wclippy::string_to_string",
"-Wclippy::use_self",
]

[target.aarch64-apple-darwin]
# rustflags = ["-C", "target-cpu=native", "-C", "target-feature=+neon"]
51 changes: 51 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[package]
name = "lance"
version = "0.1.0"
edition = "2021"
authors = ["Lance Devs <[email protected]>"]
description = "A columnar data format that is 100x faster than Parquet for random access."
license_file = "../LICENSE"
repository = "https://github.com/eto-ai/lance"
readme = "../README.md"
rust-version = "1.65"
keywords = [
"data-format",
"data-science",
"machine-learning",
"apache-arrow",
"duckdb",
"dataops",
"data-analytics"
]
categories = [
"database-implementations",
"data-structures",
"development-tools",
"science"
]

[dependencies]
arrow-array = "30.0"
arrow-buffer = "30.0"
arrow-data = "30.0"
arrow-schema = "30.0"
async-trait = "0.1.60"
byteorder = "1.4.3"
chrono = "0.4.23"
clap = { version = "4.0.32", features = ["derive"], optional = true }
object_store = { version = "0.5", features = ["aws"] }
pin-project = "1.0"
prost = "0.11"
prost-types = "0.11"
tokio = { version = "1.23", features = ["rt-multi-thread"] }
url = "2.3"

[build-dependencies]
prost-build = "0.11"

[features]
cli = ["clap"]

[[bin]]
name = "lq"
required-features = ["cli"]
1 change: 1 addition & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Rust Binding for Lance Data Format
9 changes: 9 additions & 0 deletions rust/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::io::Result;

fn main() -> Result<()> {
println!("cargo:rerun-if-changed=protos");
println!("cargo:rerun-if-changed=../protos");

prost_build::compile_protos(&["../protos/format.proto"], &["../protos"])?;
Ok(())
}
38 changes: 38 additions & 0 deletions rust/src/bin/lq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use clap::{Parser, Subcommand};
use lance::dataset::Dataset;

#[derive(Parser)]
struct Args {
#[command(subcommand)]
command: Commands,
}

#[derive(Subcommand)]
enum Commands {
/// Dataset inspection
Inspect {
/// The URI of the dataset.
uri: String,

/// AWS profile
aws_profile: Option<String>,
},
}

#[tokio::main]
async fn main() {
let args = Args::parse();

match &args.command {
Commands::Inspect { uri, aws_profile } => {
let dataset = Dataset::open(uri).await.unwrap();
println!("Dataset URI: {}", uri);
println!(
"Latest version: {}, Total versions: {}",
dataset.version().version,
dataset.versions().await.unwrap().len()
);
println!("Schema:\n{}", dataset.schema())
}
}
}
122 changes: 122 additions & 0 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Lance Dataset
//!

use std::collections::BTreeMap;
use std::io::Result;

use chrono::prelude::*;
use object_store::path::Path;

use self::scanner::Scanner;
use crate::datatypes::Schema;
use crate::format::{pb, Manifest};
use crate::io::reader::read_manifest;
use crate::io::{read_metadata_offset, ObjectStore};

pub mod scanner;

const LATEST_MANIFEST_NAME: &str = "_latest.manifest";
const VERSIONS_DIR: &str = "_versions";
const DATA_DIR: &str = "data";

fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}

/// Lance Dataset
#[derive(Debug)]
pub struct Dataset {
object_store: ObjectStore,
base: Path,
manifest: Manifest,
}

/// Dataset Version
pub struct Version {
/// version number
pub version: u64,

/// Timestamp of dataset creation in UTC.
pub timestamp: DateTime<Utc>,

/// Key-value pairs of metadata.
pub metadata: BTreeMap<String, String>,
}

/// Convert Manifest to Data Version.
impl From<&Manifest> for Version {
fn from(m: &Manifest) -> Self {
Self {
version: m.version,
timestamp: Utc::now(),
metadata: BTreeMap::default(),
}
}
}

impl Dataset {
/// Open an existing dataset.
pub async fn open(uri: &str) -> Result<Self> {
let object_store = ObjectStore::new(uri)?;

let latest_manifest_path = latest_manifest_path(object_store.base_path());

let mut object_reader = object_store.open(&latest_manifest_path).await?;
let bytes = object_store
.inner
.get(&latest_manifest_path)
.await?
.bytes()
.await?;
let offset = read_metadata_offset(&bytes)?;
let manifest_pb = object_reader
.read_message::<pb::Manifest>(offset as usize)
.await?;
let manifest = (&manifest_pb).into();
Ok(Self {
object_store,
base: Path::from(uri),
manifest,
})
}

pub fn scan(&self) -> Result<Scanner> {
todo!()
}

pub fn object_store(&self) -> &ObjectStore {
&self.object_store
}

fn versions_dir(&self) -> Path {
self.base.child(VERSIONS_DIR)
}

pub fn version(&self) -> Version {
Version::from(&self.manifest)
}

/// Get all versions.
pub async fn versions(&self) -> Result<Vec<Version>> {
let paths: Vec<Path> = self
.object_store
.inner
.list_with_delimiter(Some(&self.versions_dir()))
.await?
.objects
.iter()
.filter(|&obj| obj.location.as_ref().ends_with(".manifest"))
.map(|o| o.location.clone())
.collect();
let mut versions = vec![];
for path in paths.iter() {
let manifest = read_manifest(&self.object_store, path).await?;
versions.push(Version::from(&manifest));
}
Ok(versions)
}

pub fn schema(&self) -> &Schema {
&self.manifest.schema
}
}
25 changes: 25 additions & 0 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};

/// Dataset Scanner
pub struct Scanner {}

impl Scanner {
pub fn new() -> Self {
Self {}
}
}

impl RecordBatchReader for Scanner {
fn schema(&self) -> SchemaRef {
todo!()
}
}

impl Iterator for Scanner {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
Loading