Skip to content

Commit

Permalink
[Python] append column with a in-memory Pyarrow Table (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Dec 1, 2022
1 parent 1592cbd commit 741957f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
29 changes: 26 additions & 3 deletions python/lance/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ from pyarrow.lib cimport (
CExpression,
CField,
CRecordBatch,
CTable,
Field,
GetResultValue,
RecordBatchReader,
check_status,
pyarrow_wrap_batch,
pyarrow_unwrap_field,
pyarrow_unwrap_array,
pyarrow_unwrap_table,
)
from pyarrow.lib import tobytes
from pyarrow.util import _stringify_path
Expand Down Expand Up @@ -226,6 +228,10 @@ cdef extern from "lance/arrow/dataset.h" namespace "lance::arrow" nogil:
CResult[shared_ptr[CLanceDataset]] AddColumn(
const shared_ptr[CField]& field, CExpression expression);

CResult[shared_ptr[CLanceDataset]] Merge(
const shared_ptr[CTable]& table, const string& left_on, const string& right_on
)

cdef _dataset_version_to_json(CDatasetVersion cdv):
return {
"version": cdv.version(),
Expand Down Expand Up @@ -286,19 +292,19 @@ cdef class FileSystemDataset(Dataset):

def append_column(
self,
field: Field,
value: Union[Callable[[pyarrow.Table], pyarrow.Array], Expression],
field: Optional[Field] = None,
columns: Optional[List[str]] = None,
) -> FileSystemDataset:
"""Append a new column.

Parameters
----------
field : pyarrow.Field
The name and schema of the newly added column.
value : Callback[[pyarrow.Table], pyarrow.Array], pyarrow.compute.Expression
A function / callback that takes in a Batch and produces an Array. The generated array must
have the same length as the input batch.
field : pyarrow.Field, optional
The name and schema of the newly added column.
columns : list of strs, optional.
The list of columns to read from the source dataset.
"""
Expand All @@ -323,6 +329,23 @@ cdef class FileSystemDataset(Dataset):
else:
raise ValueError(f"Value does not accept type: {type(value)}")

def merge(self, right: pyarrow.Table, left_on: str, right_on: str) -> FileSystemDataset:
"""Merge another table using Left-join

Parameters:
right : pyarrow.Table
The table to merge into this dataset.
left_on : str
The name of the column in this dataset to be compared during merge.
right_on : str
The name of the column in the right table to be compared during merge.
"""
cdef shared_ptr[CTable] c_table = pyarrow_unwrap_table(right)
cdef shared_ptr[CLanceDataset] dataset = GetResultValue(
self.lance_dataset.Merge(c_table, tobytes(left_on), tobytes(right_on))
)
return FileSystemDataset.wrap(static_pointer_cast[CDataset, CLanceDataset](dataset))

def _lance_dataset_write(
Dataset data,
object base_dir not None,
Expand Down
38 changes: 31 additions & 7 deletions python/lance/tests/test_schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
from pathlib import Path

import pandas as pd

import lance
import pyarrow as pa
import pyarrow.compute as pc

import lance


def test_write_versioned_dataset(tmp_path: Path):
table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}])
base_dir = tmp_path / "test"
lance.write_dataset(table1, base_dir)

dataset = lance.dataset(base_dir)
new_dataset = dataset.append_column(pa.field("c", pa.utf8()), lambda x: pa.array([f"a{i}" for i in range(len(x))]))
new_dataset = dataset.append_column(
lambda x: pa.array([f"a{i}" for i in range(len(x))]),
field=pa.field("c", pa.utf8()),
)

actual_df = new_dataset.to_table().to_pandas()
expected_df = pd.DataFrame({"a": [1, 10], "b": [2, 20], "c": ["a0", "a1"]})
Expand All @@ -46,7 +49,9 @@ def value_func(x: pa.Table):
assert x.column_names == ["a"]
return pa.array([str(i) for i in x.column("a")])

new_dataset = dataset.append_column(pa.field("c", pa.utf8()), value_func, columns=["a"])
new_dataset = dataset.append_column(
value_func, field=pa.field("c", pa.utf8()), columns=["a"]
)

actual_df = new_dataset.to_table().to_pandas()
expected_df = pd.DataFrame({"a": [1, 10], "b": [2, 20], "c": ["1", "10"]})
Expand All @@ -59,7 +64,9 @@ def test_add_column_with_literal(tmp_path: Path):
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
dataset = lance.dataset(base_dir)
new_dataset = dataset.append_column(pa.field("b", pa.float64()), pc.scalar(0.5))
new_dataset = dataset.append_column(
pc.scalar(0.5), field=pa.field("b", pa.float64())
)

assert new_dataset.version["version"] == 2
actual_df = new_dataset.to_table().to_pandas()
Expand All @@ -74,11 +81,28 @@ def test_add_column_with_compute(tmp_path: Path):
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
dataset = lance.dataset(base_dir)
new_dataset = dataset.append_column(pa.field("b", pa.int64()),
pc.Expression._call("power", [pc.field("a"), pc.scalar(2)]))
new_dataset = dataset.append_column(
pc.Expression._call("power", [pc.field("a"), pc.scalar(2)]),
field=pa.field("b", pa.int64()),
)

assert new_dataset.version["version"] == 2
actual_df = new_dataset.to_table().to_pandas()
expected_df = table.to_pandas()
expected_df["b"] = expected_df["a"] * expected_df["a"]
pd.testing.assert_frame_equal(actual_df, expected_df)


def test_add_column_with_table(tmp_path: Path):
table = pa.Table.from_pylist([{"a": i, "b": str(i)} for i in range(10)])
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
dataset = lance.dataset(base_dir)

new_table = pa.Table.from_pylist([{"a": i, "c": i * 10} for i in range(10)])
new_dataset = dataset.merge(new_table, left_on="a", right_on="a")
assert new_dataset.version["version"] == 2
actual_df = new_dataset.to_table().to_pandas()
expected_df = table.to_pandas()
expected_df["c"] = new_table.column("c").to_numpy()
pd.testing.assert_frame_equal(actual_df, expected_df)

0 comments on commit 741957f

Please sign in to comment.