Skip to content

Commit

Permalink
Merge branch 'main' into fsckingcargo
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jan 12, 2025
2 parents e24d952 + 1bb60ee commit b97838b
Show file tree
Hide file tree
Showing 80 changed files with 3,560 additions and 501 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,40 @@ jobs:
- name: Run tests with native-tls
run: |
cargo test --no-default-features --features integration_test,s3-native-tls,datafusion
integration_test_lakefs:
name: Integration Tests (LakeFS v1.48)
runs-on: ubuntu-latest
env:
CARGO_INCREMENTAL: 0
# Disable full debug symbol generation to speed up CI build and keep memory down
# <https://doc.rust-lang.org/cargo/reference/profiles.html>
RUSTFLAGS: "-C debuginfo=line-tables-only"
# https://github.com/rust-lang/cargo/issues/10280
CARGO_NET_GIT_FETCH_WITH_CLI: "true"
RUST_BACKTRACE: "1"
RUST_LOG: debug

steps:
- uses: actions/checkout@v3

- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
override: true

- name: Download Lakectl
run: |
wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.1/lakeFS_1.48.1_Linux_x86_64.tar.gz
tar -xf lakeFS_1.48.1_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE" >> $GITHUB_PATH
- name: Start emulated services
run: docker compose -f docker-compose-lakefs.yml up -d

- name: Run tests with rustls (default)
run: |
cargo test --features integration_test_lakefs,lakefs,datafusion
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v2
- name: Generate code coverage
run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs --skip test_read_tables_lakefs
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
30 changes: 28 additions & 2 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,38 @@ jobs:
run: make setup-dat

- name: Run tests
run: uv run pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules
run: uv run --no-sync pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules

- name: Test without pandas
run: |
uv pip uninstall pandas
uv run pytest -m "not pandas and not integration and not benchmark"
uv run --no-sync pytest -m "not pandas and not integration and not benchmark"
uv pip install pandas
test-lakefs:
name: Python Build (Python 3.10 LakeFS Integration tests)
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=1"
CARGO_INCREMENTAL: 0

steps:
- uses: actions/checkout@v3

- name: Setup Environment
uses: ./.github/actions/setup-env

- name: Start emulated services
run: docker compose -f ../docker-compose-lakefs.yml up -d

- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: uv run --no-sync pytest -m '(lakefs and integration)' --doctest-modules

test-pyspark:
name: PySpark Integration Tests
Expand All @@ -109,6 +134,7 @@ jobs:
- name: Run tests
run: make test-pyspark


multi-python-running:
name: Running with Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ datafusion-sql = { version = "44" }
# serde
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"
strum = { version = "*"}


# "stdlib"
bytes = { version = "1" }
Expand Down
16 changes: 11 additions & 5 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use deltalake_core::{
};
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;
use uuid::Uuid;

/// Return the [S3LogStore] implementation with the provided configuration options
pub fn default_s3_logstore(
Expand All @@ -30,7 +31,7 @@ pub fn default_s3_logstore(
/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct S3LogStore {
pub(crate) storage: Arc<dyn ObjectStore>,
pub(crate) storage: ObjectStoreRef,
config: LogStoreConfig,
}

Expand All @@ -53,7 +54,7 @@ impl LogStore for S3LogStore {
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
read_commit_entry(self.storage.as_ref(), version).await
read_commit_entry(self.object_store(None).as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [`TransactionError`]
Expand All @@ -65,10 +66,14 @@ impl LogStore for S3LogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
Ok(write_commit_entry(&self.object_store(), version, &tmp_commit).await?)
Ok(
write_commit_entry(self.object_store(None).as_ref(), version, &tmp_commit)
.await?,
)
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
Expand All @@ -87,10 +92,11 @@ impl LogStore for S3LogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
match &commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
abort_commit_entry(self.object_store(None).as_ref(), version, tmp_commit).await
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
Expand All @@ -104,7 +110,7 @@ impl LogStore for S3LogStore {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
fn object_store(&self, _operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

Expand Down
17 changes: 13 additions & 4 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use deltalake_core::{
storage::{ObjectStoreRef, StorageOptions},
DeltaResult, DeltaTableError,
};
use uuid::Uuid;

const STORE_NAME: &str = "DeltaS3ObjectStore";
const MAX_REPAIR_RETRIES: i64 = 3;
Expand Down Expand Up @@ -93,7 +94,13 @@ impl S3DynamoDbLogStore {
return Ok(RepairLogEntryResult::AlreadyCompleted);
}
for retry in 0..=MAX_REPAIR_RETRIES {
match write_commit_entry(&self.storage, entry.version, &entry.temp_path).await {
match write_commit_entry(
self.object_store(None).as_ref(),
entry.version,
&entry.temp_path,
)
.await
{
Ok(()) => {
debug!("Successfully committed entry for version {}", entry.version);
return self.try_complete_entry(entry, true).await;
Expand Down Expand Up @@ -192,7 +199,7 @@ impl LogStore for S3DynamoDbLogStore {
if let Ok(Some(entry)) = entry {
self.repair_entry(&entry).await?;
}
read_commit_entry(&self.storage, version).await
read_commit_entry(self.object_store(None).as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
Expand All @@ -204,6 +211,7 @@ impl LogStore for S3DynamoDbLogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
Expand Down Expand Up @@ -253,6 +261,7 @@ impl LogStore for S3DynamoDbLogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
Expand All @@ -278,7 +287,7 @@ impl LogStore for S3DynamoDbLogStore {
},
})?;

abort_commit_entry(&self.storage, version, &tmp_commit).await?;
abort_commit_entry(self.object_store(None).as_ref(), version, &tmp_commit).await?;
Ok(())
}

Expand All @@ -304,7 +313,7 @@ impl LogStore for S3DynamoDbLogStore {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
fn object_store(&self, _operation_id: Option<Uuid>) -> ObjectStoreRef {
self.storage.clone()
}

Expand Down
2 changes: 2 additions & 0 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,9 @@ mod tests {
}

#[test]
#[serial]
fn test_is_aws() {
clear_env_of_aws_keys();
let options = StorageOptions::default();
assert!(is_aws(&options));

Expand Down
32 changes: 20 additions & 12 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;
use tracing::log::*;
use uuid::Uuid;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
Expand Down Expand Up @@ -95,7 +96,7 @@ async fn test_create_s3_table() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
let table_name = format!("{}_{}", "create_test", Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));

let schema = StructType::new(vec![StructField::new(
Expand All @@ -113,7 +114,7 @@ async fn test_create_s3_table() -> TestResult<()> {

let payload = PutPayload::from_static(b"test-drivin");
let _put = log_store
.object_store()
.object_store(None)
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
Expand All @@ -138,10 +139,7 @@ async fn get_missing_item() -> TestResult<()> {
let client = make_client()?;
let version = i64::MAX;
let result = client
.get_commit_entry(
&format!("s3a://my_delta_table_{}", uuid::Uuid::new_v4()),
version,
)
.get_commit_entry(&format!("s3a://my_delta_table_{}", Uuid::new_v4()), version)
.await;
assert_eq!(result.unwrap(), None);
Ok(())
Expand Down Expand Up @@ -186,7 +184,7 @@ async fn test_repair_commit_entry() -> TestResult<()> {
// create another incomplete log entry, this time move the temporary file already
let entry = create_incomplete_commit_entry(&table, 2, "unfinished_commit").await?;
log_store
.object_store()
.object_store(None)
.rename_if_not_exists(&entry.temp_path, &commit_uri_from_version(entry.version))
.await?;

Expand Down Expand Up @@ -253,6 +251,7 @@ async fn test_abort_commit_entry() -> TestResult<()> {
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone()),
Uuid::new_v4(),
)
.await?;

Expand All @@ -262,13 +261,17 @@ async fn test_abort_commit_entry() -> TestResult<()> {
}
// Temp commit file should have been deleted
assert!(matches!(
log_store.object_store().get(&entry.temp_path).await,
log_store.object_store(None).get(&entry.temp_path).await,
Err(ObjectStoreError::NotFound { .. })
));

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, CommitOrBytes::TmpCommit(entry.temp_path))
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path),
Uuid::new_v4(),
)
.await?;

Ok(())
Expand Down Expand Up @@ -301,14 +304,19 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
log_store
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone())
CommitOrBytes::TmpCommit(entry.temp_path.clone()),
Uuid::new_v4(),
)
.await,
Err(_),
));

// Check temp commit file still exists
assert!(log_store.object_store().get(&entry.temp_path).await.is_ok());
assert!(log_store
.object_store(None)
.get(&entry.temp_path)
.await
.is_ok());

Ok(())
}
Expand Down Expand Up @@ -439,7 +447,7 @@ fn add_action(name: &str) -> Action {
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4());
let table_name = format!("{}_{}", table_name, Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));
let schema = StructType::new(vec![StructField::new(
"Id".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn create_s3_backend(
.with_allow_http(true)
.build_storage()
.unwrap()
.object_store();
.object_store(None);

let delayed_store = DelayedObjectStore {
inner: store,
Expand Down
2 changes: 1 addition & 1 deletion crates/azure/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T
let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
.with_allow_http(true)
.build_storage()?
.object_store();
.object_store(None);

let expected = Bytes::from_static(b"test world from delta-rs on friday");

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true}

# "stdlib"
bytes = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl DeltaTableState {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store.object_store());
env.register_object_store(url, store.object_store(None));
}

/// The logical schema for a Deltatable is different from the protocol level schema since partition
Expand Down Expand Up @@ -2708,7 +2708,7 @@ mod tests {
.unwrap();

let (object_store, mut operations) =
RecordingObjectStore::new(table.log_store().object_store());
RecordingObjectStore::new(table.log_store().object_store(None));
let log_store =
DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone());
let provider = DeltaTableProvider::try_new(
Expand Down
Loading

0 comments on commit b97838b

Please sign in to comment.