Skip to content

Commit

Permalink
Merge branch 'main' into af-demonstrateParentDistributionRequirementsBug
Browse files Browse the repository at this point in the history
  • Loading branch information
adamfaulkner-at authored Oct 2, 2024
2 parents 7e7a443 + 8f4232f commit 1408c7e
Show file tree
Hide file tree
Showing 24 changed files with 499 additions and 67 deletions.
256 changes: 256 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
9 changes: 5 additions & 4 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
use std::sync::Arc;

use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
logstore::{
abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry,
CommitOrBytes, LogStore, LogStoreConfig,
},
operations::transaction::TransactionError,
storage::{ObjectStoreRef, StorageOptions},
DeltaResult,
Expand Down Expand Up @@ -103,6 +100,10 @@ impl LogStore for S3LogStore {
get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore {
}
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.20.0"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore {
super::get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
52 changes: 51 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Delta log store.
use std::cmp::min;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::OnceLock;
use std::{cmp::max, collections::HashMap, sync::Arc};

use bytes::Bytes;
use dashmap::DashMap;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;
Expand Down Expand Up @@ -213,6 +214,9 @@ pub trait LogStore: Sync + Send {
/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down Expand Up @@ -441,6 +445,52 @@ pub async fn get_latest_version(
Ok(version)
}

/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
current_version: i64,
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint so start from current_version
current_version
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

// list files to find min version
let version = async {
let mut min_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(version_start);
let object_store = log_store.object_store();

// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
let mut files = object_store
.list(prefix)
.try_filter(move |f| futures::future::ready(f.location < offset_path))
.boxed();

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
min_version = min(min_version, log_version);
}
}

if min_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

Ok::<i64, DeltaTableError>(min_version)
}
.await?;
Ok(version)
}

/// Read delta log for a specific version
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
Expand Down
21 changes: 16 additions & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,14 @@ async fn execute(
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?,
);

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

// Extra select_columns is required so that before and after have same schema order
// DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650
let before = cdc_projection
.clone()
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
Expand All @@ -1164,13 +1172,16 @@ async fn execute(
.filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN)
.map(|c| Expr::Column(c.clone()))
.collect_vec(),
)?
.select_columns(
&after
.schema()
.columns()
.iter()
.map(|v| v.name())
.collect::<Vec<_>>(),
)?;

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

let tracker = CDCTracker::new(before, after);
change_data.push(tracker.collect()?);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ impl MergePlan {
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins) => {
debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");

#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
Expand All @@ -729,7 +731,6 @@ impl MergePlan {
bins.len() <= num_cpus::get(),
));

debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ impl DeltaTable {
self.log_store.get_latest_version(self.version()).await
}

/// returns the earliest available version of the table
pub async fn get_earliest_version(&self) -> Result<i64, DeltaTableError> {
self.log_store.get_earliest_version(self.version()).await
}

/// Currently loaded version of the table
pub fn version(&self) -> i64 {
self.state.as_ref().map(|s| s.version()).unwrap_or(-1)
Expand Down
14 changes: 7 additions & 7 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.20.0"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -16,12 +16,12 @@ rust-version.workspace = true
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-aws = { version = "0.3.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.3.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.4.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.4.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.4.0", path = "../catalog-glue", optional = true }
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.4.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true }

[features]
# All of these features are just reflected into the core crate until that
Expand Down
4 changes: 2 additions & 2 deletions crates/gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-gcp"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/hdfs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-hdfs"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
hdfs-native-object-store = "0.11"

# workspace dependecies
Expand Down
4 changes: 2 additions & 2 deletions crates/mount/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-mount"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core", features = [
deltalake-core = { version = "0.21.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "deltalake-test"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
publish = false

[dependencies]
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
dotenvy = "0"
fs_extra = "1.3.0"
futures = { version = "0.3" }
Expand Down
57 changes: 57 additions & 0 deletions docs/integrations/object-storage/adls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Azure ADLS Storage Backend

`delta-rs` offers native support for using Microsoft Azure Data Lake Storage (ADSL) as an object storage backend.

You don’t need to install any extra dependencies to read/write Delta tables to S3 with engines that use `delta-rs`. You do need to configure your ADLS access credentials correctly.

## Passing Credentials Explicitly

You can also pass ADLS credentials to your query engine explicitly.

For Polars, you would do this using the `storage_options` keyword as demonstrated above. This will forward your credentials to the `object store` library that Polars uses for cloud storage access under the hood. Read the [`object store` documentation](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants) for more information defining specific credentials.

## Example: Write Delta table to ADLS with Polars

Using Polars, you can write a Delta table to ADLS directly like this:

```python
import polars as pl

df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]})

# define container name
container = <container_name>

# define credentials
storage_options = {
"ACCOUNT_NAME": <account_name>,
"ACCESS_KEY": <access_key>,
}

# write Delta to ADLS
df_pl.write_delta(
f"abfs://{container}/delta_table",
storage_options = storage_options
)
```

## Example with pandas

For libraries without direct `write_delta` methods (like Pandas), you can use the `write_deltalake` function from the `deltalake` library:

```python
import pandas as pd
from deltalake import write_deltalake

df = pd.DataFrame({"foo": [1, 2, 3, 4, 5]})

write_deltalake(
f"abfs://{container}/delta_table_pandas",
df,
storage_options=storage_options
)
```

## Using Local Authentication

If your local session is authenticated using the Azure CLI then you can write Delta tables directly to ADLS. Read more about this in the [Azure CLI documentation](https://learn.microsoft.com/en-us/cli/azure/).
Loading

0 comments on commit 1408c7e

Please sign in to comment.