Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add drop to dataset #3184

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::traits::FromJString;
use crate::utils::{extract_write_params, get_index_params};
use crate::utils::{extract_storage_options, extract_write_params, get_index_params};
use crate::{traits::IntoJava, RT};
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
Expand All @@ -30,7 +30,7 @@ use jni::{objects::JObject, JNIEnv};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::transaction::Operation;
use lance::dataset::{Dataset, ReadParams, WriteParams};
use lance::io::ObjectStoreParams;
use lance::io::{ObjectStore, ObjectStoreParams};
use lance::table::format::Fragment;
use lance::table::format::Index;
use lance_index::DatasetIndexExt;
Expand All @@ -48,6 +48,23 @@ pub struct BlockingDataset {
}

impl BlockingDataset {
pub fn drop(uri: &str, storage_options: HashMap<String, String>) -> Result<()> {
RT.block_on(async move {
let registry = Arc::new(ObjectStoreRegistry::default());
let object_store_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, path) =
ObjectStore::from_uri_and_params(registry, uri, &object_store_params)
.await
.map_err(|e| Error::io_error(e.to_string()))?;
object_store
.remove_dir_all(path)
.await
.map_err(|e| Error::io_error(e.to_string()))
})
}
pub fn write(
reader: impl RecordBatchReader + Send + 'static,
uri: &str,
Expand Down Expand Up @@ -199,6 +216,20 @@ fn inner_create_with_ffi_schema<'local>(
)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_drop<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
path: JString<'local>,
storage_options_obj: JObject<'local>,
) -> JObject<'local> {
let path_str = ok_or_throw!(env, path.extract(&mut env));
let storage_options =
ok_or_throw!(env, extract_storage_options(&mut env, &storage_options_obj));
ok_or_throw!(env, BlockingDataset::drop(&path_str, storage_options));
return JObject::null();
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local>(
mut env: JNIEnv<'local>,
Expand Down
35 changes: 22 additions & 13 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ use crate::ffi::JNIEnvExt;
use lance_index::vector::Query;
use std::collections::HashMap;

pub fn extract_storage_options(
env: &mut JNIEnv,
storage_options_obj: &JObject,
) -> Result<HashMap<String, String>> {
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
Ok(storage_options)
}

pub fn extract_write_params(
env: &mut JNIEnv,
max_rows_per_file: &JObject,
Expand All @@ -58,19 +78,8 @@ pub fn extract_write_params(
}
// Java code always sets the data storage version to stable for now
write_params.data_storage_version = Some(LanceFileVersion::Stable);
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
let storage_options: HashMap<String, String> =
extract_storage_options(env, storage_options_obj)?;

write_params.store_params = Some(ObjectStoreParams {
storage_options: Some(storage_options),
Expand Down
9 changes: 9 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ public static Dataset commit(BufferAllocator allocator, String path, FragmentOpe
public static native Dataset commitAppend(String path, Optional<Long> readVersion,
List<String> fragmentsMetadata, Map<String, String> storageOptions);

/**
* Drop a Dataset.
*
* @param path The file path of the dataset
* @param storageOptions Storage options
*
*/
public static native void drop(String path, Map<String, String> storageOptions);

/**
* Create a new Dataset Scanner.
*
Expand Down
14 changes: 14 additions & 0 deletions java/core/src/test/java/com/lancedb/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.HashMap;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -183,4 +185,16 @@ void testGetSchemaWithClosedDataset() {
assertThrows(RuntimeException.class, dataset::getSchema);
}
}

@Test
void testDropPath() {
String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName();
String datasetPath = tempDir.resolve(testMethodName).toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
dataset = testDataset.createEmptyDataset();
Dataset.drop(datasetPath, new HashMap<>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT

@Override
public boolean dropTable(Identifier ident) {
throw new UnsupportedOperationException();
LanceConfig config = LanceConfig.from(options, ident.name());
LanceDatasetAdapter.dropDataset(config);
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ public static void createDataset(String datasetUri, StructType sparkSchema, Writ
ArrowUtils.toArrowSchema(sparkSchema, ZoneId.systemDefault().getId(), true, false),
params).close();
}

public static void dropDataset(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
Dataset.drop(uri, options.getStorageOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ static void setup() {
List<Row> data = Arrays.asList(row1, row2);

testData = spark.createDataFrame(data, schema);
testData.createOrReplaceTempView("tmp_view");
}

@AfterAll
Expand Down Expand Up @@ -192,4 +193,13 @@ private void validateData(String datasetName, int iteration) {
assertEquals("Bob", row.getString(0));
}
}
}

@Test
public void dropAndReplaceTable(TestInfo testInfo) {
String datasetName = testInfo.getTestMethod().get().getName();
String path = LanceConfig.getDatasetUri(dbPath.toString(), datasetName);
spark.sql("CREATE OR REPLACE TABLE lance.`" + path + "` AS SELECT * FROM tmp_view");
spark.sql("CREATE OR REPLACE TABLE lance.`" + path + "` AS SELECT * FROM tmp_view");
spark.sql("DROP TABLE lance.`" + path + "`");
}
}
6 changes: 6 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,12 @@ def stats(self) -> "LanceStats":
"""
return LanceStats(self._ds)

@staticmethod
def drop(
base_uri: Union[str, Path], storage_options: Optional[Dict[str, str]] = None
) -> None:
_Dataset.drop(str(base_uri), storage_options)


class BulkCommitResult(TypedDict):
dataset: LanceDataset
Expand Down
8 changes: 8 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2715,3 +2715,11 @@ def test_detached_commits(tmp_path: Path):
)

assert detached2.to_table() == pa.table({"x": [0, 1, 3]})


def test_dataset_drop(tmp_path: Path):
table = pa.table({"x": [0]})
lance.write_dataset(table, tmp_path)
assert Path(tmp_path).exists()
lance.LanceDataset.drop(tmp_path)
assert not Path(tmp_path).exists()
11 changes: 11 additions & 0 deletions python/python/tests/test_s3_ddb.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,14 @@ def test_file_writer_reader(s3_bucket: str):
bytes(reader.read_global_buffer(global_buffer_pos)).decode()
== global_buffer_text
)


@pytest.mark.integration
def test_s3_drop(s3_bucket: str):
storage_options = copy.deepcopy(CONFIG)
table_name = uuid.uuid4().hex
tmp_path = f"s3://{s3_bucket}/{table_name}.lance"
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path, storage_options=storage_options)
dataset.validate()
lance.LanceDataset.drop(tmp_path, storage_options=storage_options)
14 changes: 14 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use snafu::{location, Location};
use uuid::Uuid;

use crate::error::PythonErrorExt;
use crate::file::object_store_from_uri_or_path;
use crate::fragment::{FileFragment, FragmentMetadata};
use crate::schema::LanceSchema;
use crate::session::Session;
Expand Down Expand Up @@ -1407,6 +1408,19 @@ impl Dataset {
Session::new(self.ds.session())
}

#[allow(clippy::too_many_arguments)]
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
#[staticmethod]
fn drop(dest: String, storage_options: Option<HashMap<String, String>>) -> PyResult<()> {
RT.spawn(None, async move {
let (object_store, path) =
object_store_from_uri_or_path(&dest, storage_options).await?;
object_store
.remove_dir_all(path)
.await
.map_err(|e| PyIOError::new_err(e.to_string()))
})?
}

#[allow(clippy::too_many_arguments)]
#[staticmethod]
fn commit(
Expand Down
Loading