From 741957fe2dfda1d031332de3043f9be3aa2f64ce Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 1 Dec 2022 15:33:42 -0800 Subject: [PATCH] [Python] append column with a in-memory Pyarrow Table (#338) --- python/lance/_lib.pyx | 29 ++++++++++++++-- python/lance/tests/test_schema_evolution.py | 38 +++++++++++++++++---- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/python/lance/_lib.pyx b/python/lance/_lib.pyx index 3a0cd09be4..2b37d54cda 100644 --- a/python/lance/_lib.pyx +++ b/python/lance/_lib.pyx @@ -33,6 +33,7 @@ from pyarrow.lib cimport ( CExpression, CField, CRecordBatch, + CTable, Field, GetResultValue, RecordBatchReader, @@ -40,6 +41,7 @@ from pyarrow.lib cimport ( pyarrow_wrap_batch, pyarrow_unwrap_field, pyarrow_unwrap_array, + pyarrow_unwrap_table, ) from pyarrow.lib import tobytes from pyarrow.util import _stringify_path @@ -226,6 +228,10 @@ cdef extern from "lance/arrow/dataset.h" namespace "lance::arrow" nogil: CResult[shared_ptr[CLanceDataset]] AddColumn( const shared_ptr[CField]& field, CExpression expression); + CResult[shared_ptr[CLanceDataset]] Merge( + const shared_ptr[CTable]& table, const string& left_on, const string& right_on + ) + cdef _dataset_version_to_json(CDatasetVersion cdv): return { "version": cdv.version(), @@ -286,19 +292,19 @@ cdef class FileSystemDataset(Dataset): def append_column( self, - field: Field, value: Union[Callable[[pyarrow.Table], pyarrow.Array], Expression], + field: Optional[Field] = None, columns: Optional[List[str]] = None, ) -> FileSystemDataset: """Append a new column. Parameters ---------- - field : pyarrow.Field - The name and schema of the newly added column. value : Callback[[pyarrow.Table], pyarrow.Array], pyarrow.compute.Expression A function / callback that takes in a Batch and produces an Array. The generated array must have the same length as the input batch. + field : pyarrow.Field, optional + The name and schema of the newly added column. columns : list of strs, optional. The list of columns to read from the source dataset. """ @@ -323,6 +329,23 @@ cdef class FileSystemDataset(Dataset): else: raise ValueError(f"Value does not accept type: {type(value)}") + def merge(self, right: pyarrow.Table, left_on: str, right_on: str) -> FileSystemDataset: + """Merge another table using Left-join + + Parameters: + right : pyarrow.Table + The table to merge into this dataset. + left_on : str + The name of the column in this dataset to be compared during merge. + right_on : str + The name of the column in the right table to be compared during merge. + """ + cdef shared_ptr[CTable] c_table = pyarrow_unwrap_table(right) + cdef shared_ptr[CLanceDataset] dataset = GetResultValue( + self.lance_dataset.Merge(c_table, tobytes(left_on), tobytes(right_on)) + ) + return FileSystemDataset.wrap(static_pointer_cast[CDataset, CLanceDataset](dataset)) + def _lance_dataset_write( Dataset data, object base_dir not None, diff --git a/python/lance/tests/test_schema_evolution.py b/python/lance/tests/test_schema_evolution.py index 0e806f6a89..7920517238 100644 --- a/python/lance/tests/test_schema_evolution.py +++ b/python/lance/tests/test_schema_evolution.py @@ -15,11 +15,11 @@ from pathlib import Path import pandas as pd - -import lance import pyarrow as pa import pyarrow.compute as pc +import lance + def test_write_versioned_dataset(tmp_path: Path): table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) @@ -27,7 +27,10 @@ def test_write_versioned_dataset(tmp_path: Path): lance.write_dataset(table1, base_dir) dataset = lance.dataset(base_dir) - new_dataset = dataset.append_column(pa.field("c", pa.utf8()), lambda x: pa.array([f"a{i}" for i in range(len(x))])) + new_dataset = dataset.append_column( + lambda x: pa.array([f"a{i}" for i in range(len(x))]), + field=pa.field("c", pa.utf8()), + ) actual_df = new_dataset.to_table().to_pandas() expected_df = pd.DataFrame({"a": [1, 10], "b": [2, 20], "c": ["a0", "a1"]}) @@ -46,7 +49,9 @@ def value_func(x: pa.Table): assert x.column_names == ["a"] return pa.array([str(i) for i in x.column("a")]) - new_dataset = dataset.append_column(pa.field("c", pa.utf8()), value_func, columns=["a"]) + new_dataset = dataset.append_column( + value_func, field=pa.field("c", pa.utf8()), columns=["a"] + ) actual_df = new_dataset.to_table().to_pandas() expected_df = pd.DataFrame({"a": [1, 10], "b": [2, 20], "c": ["1", "10"]}) @@ -59,7 +64,9 @@ def test_add_column_with_literal(tmp_path: Path): base_dir = tmp_path / "test" lance.write_dataset(table, base_dir) dataset = lance.dataset(base_dir) - new_dataset = dataset.append_column(pa.field("b", pa.float64()), pc.scalar(0.5)) + new_dataset = dataset.append_column( + pc.scalar(0.5), field=pa.field("b", pa.float64()) + ) assert new_dataset.version["version"] == 2 actual_df = new_dataset.to_table().to_pandas() @@ -74,11 +81,28 @@ def test_add_column_with_compute(tmp_path: Path): base_dir = tmp_path / "test" lance.write_dataset(table, base_dir) dataset = lance.dataset(base_dir) - new_dataset = dataset.append_column(pa.field("b", pa.int64()), - pc.Expression._call("power", [pc.field("a"), pc.scalar(2)])) + new_dataset = dataset.append_column( + pc.Expression._call("power", [pc.field("a"), pc.scalar(2)]), + field=pa.field("b", pa.int64()), + ) assert new_dataset.version["version"] == 2 actual_df = new_dataset.to_table().to_pandas() expected_df = table.to_pandas() expected_df["b"] = expected_df["a"] * expected_df["a"] pd.testing.assert_frame_equal(actual_df, expected_df) + + +def test_add_column_with_table(tmp_path: Path): + table = pa.Table.from_pylist([{"a": i, "b": str(i)} for i in range(10)]) + base_dir = tmp_path / "test" + lance.write_dataset(table, base_dir) + dataset = lance.dataset(base_dir) + + new_table = pa.Table.from_pylist([{"a": i, "c": i * 10} for i in range(10)]) + new_dataset = dataset.merge(new_table, left_on="a", right_on="a") + assert new_dataset.version["version"] == 2 + actual_df = new_dataset.to_table().to_pandas() + expected_df = table.to_pandas() + expected_df["c"] = new_table.column("c").to_numpy() + pd.testing.assert_frame_equal(actual_df, expected_df)