Skip to content

Commit

Permalink
feat(iceberg): Add memory file IO support (apache#481)
Browse files Browse the repository at this point in the history
* feat(iceberg): Add memory file IO support

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Add comments for memory file io

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored and shaeqahmed committed Dec 9, 2024
1 parent ef30096 commit f18e660
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 95 deletions.
5 changes: 3 additions & 2 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ license = { workspace = true }
keywords = ["iceberg"]

[features]
default = ["storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-fs", "storage-s3"]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]

Expand Down
30 changes: 16 additions & 14 deletions crates/iceberg/src/expr/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ impl Predicate {
/// # Example
///
/// ```rust
/// use std::ops::Bound::Unbounded;
/// use iceberg::expr::BoundPredicate::Unary;
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// use std::ops::Bound::Unbounded;
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
///
/// let expr2 = Reference::new("b").less_than(Datum::long(20));
Expand All @@ -505,10 +505,10 @@ impl Predicate {
/// # Example
///
/// ```rust
/// use std::ops::Bound::Unbounded;
/// use iceberg::expr::BoundPredicate::Unary;
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// use std::ops::Bound::Unbounded;
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
///
/// let expr2 = Reference::new("b").less_than(Datum::long(20));
Expand All @@ -534,12 +534,14 @@ impl Predicate {
/// # Example
///
/// ```rust
/// use std::ops::Bound::Unbounded;
/// use iceberg::expr::BoundPredicate::Unary;
/// use iceberg::expr::{LogicalExpression, Predicate, Reference};
/// use iceberg::spec::Datum;
/// use std::ops::Bound::Unbounded;
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
/// let expr2 = Reference::new("b").less_than(Datum::long(5)).and(Reference::new("c").less_than(Datum::long(10)));
/// let expr2 = Reference::new("b")
/// .less_than(Datum::long(5))
/// .and(Reference::new("c").less_than(Datum::long(10)));
///
/// let result = expr1.negate();
/// assert_eq!(&format!("{result}"), "a >= 10");
Expand Down Expand Up @@ -632,16 +634,16 @@ impl Not for Predicate {
/// # Example
///
///```rust
///use std::ops::Bound::Unbounded;
///use iceberg::expr::BoundPredicate::Unary;
///use iceberg::expr::Reference;
///use iceberg::spec::Datum;
///let expr1 = Reference::new("a").less_than(Datum::long(10));
///
///let expr = !expr1;
///
///assert_eq!(&format!("{expr}"), "NOT (a < 10)");
///```
/// use iceberg::expr::BoundPredicate::Unary;
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// use std::ops::Bound::Unbounded;
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
///
/// let expr = !expr1;
///
/// assert_eq!(&format!("{expr}"), "NOT (a < 10)");
/// ```
fn not(self) -> Self::Output {
Predicate::Not(LogicalExpression::new([Box::new(self)]))
}
Expand Down
14 changes: 0 additions & 14 deletions crates/iceberg/src/expr/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").less_than(Datum::long(10));
Expand All @@ -76,7 +75,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").less_than_or_equal_to(Datum::long(10));
Expand All @@ -96,7 +94,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").greater_than(Datum::long(10));
Expand All @@ -116,7 +113,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").greater_than_or_equal_to(Datum::long(10));
Expand All @@ -136,7 +132,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").equal_to(Datum::long(10));
Expand All @@ -152,7 +147,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").not_equal_to(Datum::long(10));
Expand All @@ -168,7 +162,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").starts_with(Datum::string("foo"));
Expand All @@ -188,7 +181,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
///
Expand All @@ -209,7 +201,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").is_nan();
Expand All @@ -225,7 +216,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").is_not_nan();
Expand All @@ -241,7 +231,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").is_null();
Expand All @@ -257,7 +246,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
/// let expr = Reference::new("a").is_not_null();
Expand All @@ -273,7 +261,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use fnv::FnvHashSet;
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
Expand All @@ -295,7 +282,6 @@ impl Reference {
/// # Example
///
/// ```rust
///
/// use fnv::FnvHashSet;
/// use iceberg::expr::Reference;
/// use iceberg::spec::Datum;
Expand Down
34 changes: 28 additions & 6 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,41 @@
//! # How to build `FileIO`
//!
//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
//!
//! ```rust
//! use iceberg::io::{FileIOBuilder, S3_REGION};
//! use iceberg::Result;
//!
//! # fn test() -> Result<()> {
//! // Build a memory file io.
//! let file_io = FileIOBuilder::new("memory").build()?;
//! // Build an fs file io.
//! let file_io = FileIOBuilder::new("fs").build()?;
//! // Build an s3 file io.
//! let file_io = FileIOBuilder::new("s3")
//! .with_prop(S3_REGION, "us-east-1")
//! .build()
//! .unwrap();
//! .build()?;
//! # Ok(())
//! # }
//! ```
//!
//! Or you can pass a path to ask `FileIO` to infer schema for you:
//!
//! ```rust
//! use iceberg::io::{FileIO, S3_REGION};
//! let file_io = FileIO::from_path("s3://bucket/a")
//! .unwrap()
//! use iceberg::Result;
//!
//! # fn test() -> Result<()> {
//! // Build a memory file io.
//! let file_io = FileIO::from_path("memory:///")?.build()?;
//! // Build an fs file io.
//! let file_io = FileIO::from_path("fs:///tmp")?.build()?;
//! // Build an s3 file io.
//! let file_io = FileIO::from_path("s3://bucket/a")?
//! .with_prop(S3_REGION, "us-east-1")
//! .build()
//! .unwrap();
//! .build()?;
//! # Ok(())
//! # }
//! ```
//!
//! # How to use `FileIO`
Expand All @@ -52,6 +70,10 @@ mod file_io;
pub use file_io::*;

mod storage;
#[cfg(feature = "storage-memory")]
mod storage_memory;
#[cfg(feature = "storage-memory")]
use storage_memory::*;
#[cfg(feature = "storage-s3")]
mod storage_s3;
#[cfg(feature = "storage-s3")]
Expand Down
20 changes: 19 additions & 1 deletion crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use super::FileIOBuilder;
#[cfg(feature = "storage-fs")]
use super::FsConfig;
#[cfg(feature = "storage-memory")]
use super::MemoryConfig;
#[cfg(feature = "storage-s3")]
use super::S3Config;
use crate::{Error, ErrorKind};
Expand All @@ -26,6 +28,8 @@ use opendal::{Operator, Scheme};
/// The storage carries all supported storage services in iceberg
#[derive(Debug)]
pub(crate) enum Storage {
#[cfg(feature = "storage-memory")]
Memory { config: MemoryConfig },
#[cfg(feature = "storage-fs")]
LocalFs { config: FsConfig },
#[cfg(feature = "storage-s3")]
Expand All @@ -44,6 +48,10 @@ impl Storage {
let scheme = Self::parse_scheme(&scheme_str)?;

match scheme {
#[cfg(feature = "storage-memory")]
Scheme::Memory => Ok(Self::Memory {
config: MemoryConfig::new(props),
}),
#[cfg(feature = "storage-fs")]
Scheme::Fs => Ok(Self::LocalFs {
config: FsConfig::new(props),
Expand Down Expand Up @@ -72,13 +80,22 @@ impl Storage {
///
/// * An [`opendal::Operator`] instance used to operate on file.
/// * Relative path to the root uri of [`opendal::Operator`].
///
pub(crate) fn create_operator<'a>(
&self,
path: &'a impl AsRef<str>,
) -> crate::Result<(Operator, &'a str)> {
let path = path.as_ref();
match self {
#[cfg(feature = "storage-memory")]
Storage::Memory { config } => {
let op = config.build(path)?;

if let Some(stripped) = path.strip_prefix("memory:/") {
Ok((op, stripped))
} else {
Ok((op, &path[1..]))
}
}
#[cfg(feature = "storage-fs")]
Storage::LocalFs { config } => {
let op = config.build(path)?;
Expand Down Expand Up @@ -116,6 +133,7 @@ impl Storage {
/// Parse scheme.
fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
match scheme {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
s => Ok(s.parse::<Scheme>()?),
Expand Down
43 changes: 43 additions & 0 deletions crates/iceberg/src/io/storage_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::Result;
use opendal::{Operator, Scheme};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};

#[derive(Default, Clone)]
pub(crate) struct MemoryConfig {}

impl Debug for MemoryConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryConfig").finish()
}
}

impl MemoryConfig {
/// Decode from iceberg props.
pub fn new(_: HashMap<String, String>) -> Self {
Self::default()
}

/// Build new opendal operator from given path.
pub fn build(&self, _: &str) -> Result<Operator> {
let m = HashMap::new();
Ok(Operator::via_map(Scheme::Memory, m)?)
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/*!
* Data Types
*/
*/
use crate::ensure_data_valid;
use crate::error::Result;
use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/*!
* Partitioning
*/
*/
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use typed_builder::TypedBuilder;
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/*!
* Snapshots
*/
*/
use crate::error::Result;
use chrono::{DateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/*!
* Sorting
*/
*/
use crate::error::Result;
use crate::spec::Schema;
use crate::{Error, ErrorKind};
Expand Down
Loading

0 comments on commit f18e660

Please sign in to comment.