Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Add columns from in-memory table #337

Merged
merged 20 commits into from
Dec 1, 2022
51 changes: 51 additions & 0 deletions cpp/include/lance/arrow/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,57 @@ class LanceDataset : public ::arrow::dataset::Dataset {
::arrow::Result<std::shared_ptr<UpdaterBuilder>> NewUpdate(
const std::shared_ptr<::arrow::Field>& new_field) const;

::arrow::Result<std::shared_ptr<UpdaterBuilder>> NewUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does either this or the single-field addition work for new subfield of nested field? If yes is there a test? If not, should it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand your question. You mean add some tests for a column with nested data? or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm just asking what the behavior is (i forgot)

const std::shared_ptr<::arrow::Schema>& new_columns) const;

/// Merge an in-memory table, except the "right_on" column.
///
/// The algorithm follows the semantic of `LEFT JOIN` in SQL.
/// The difference to LEFT JOIN is that this function does not allow one row
/// on the left ("this" dataset) maps to two distinct rows on the right ("other").
changhiskhan marked this conversation as resolved.
Show resolved Hide resolved
/// However, if it can not find a matched row on the right side, a NULL value is provided.
///
/// For example,
///
/// \code
/// dataset (left) = {
/// "id": [1, 2, 3, 4],
/// "vals": ["a", "b", "c", "d"],
/// }
/// table (right) = {
/// "id": [5, 1, 10, 3, 8],
/// "attrs": [5.0, 1.0, 10.0, 3.0, 8.0],
/// }
///
/// dataset.AddColumn(table, on="id") =>
/// {
/// "id": [1, 2, 3, 4],
/// "vals": ["a", "b", "c", "d"],
/// "attrs": [1.0, Null, 3.0, Null],
/// }
/// \endcode
///
/// \param right the table to merge with this dataset.
/// \param left_on the column in this dataset be compared to.
/// \param right_on the column in the table to be compared to.
/// This column must exist in both side and have the same data type.
/// \param pool memory pool
/// \return `::arrow::Status::OK` if success.
///
::arrow::Result<std::shared_ptr<LanceDataset>> Merge(
const std::shared_ptr<::arrow::Table>& right,
const std::string& left_on,
const std::string& right_on,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

/// Merge an in-memory table, both sides must have the same column specified by the "on" name.
///
/// See `Merge(right, left_on, right_on, pool)` for details.
::arrow::Result<std::shared_ptr<LanceDataset>> Merge(
const std::shared_ptr<::arrow::Table>& right,
const std::string& on,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

::arrow::Result<std::shared_ptr<::arrow::dataset::Dataset>> ReplaceSchema(
std::shared_ptr<::arrow::Schema> schema) const override;

Expand Down
12 changes: 8 additions & 4 deletions cpp/include/lance/arrow/updater.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,24 @@ class Updater {
/// The array must has the same length as the batch returned previously via `Next()`.
::arrow::Status UpdateBatch(const std::shared_ptr<::arrow::Array>& arr);

/// Update the values to new values, presented in a `RecordBatch`.
/// The batch must has the same length as the batch returned previously via `Next()`.
::arrow::Status UpdateBatch(const std::shared_ptr<::arrow::RecordBatch>& batch);

/// Finish the update and returns a new version of dataset.
::arrow::Result<std::shared_ptr<LanceDataset>> Finish();

private:
/// Make a new Updater
///
/// \param dataset The dataset to be updated.
/// \param field the (new) column to update.
/// \param schema the (new) columns to update.
/// \param projection_columns the columns to read from source dataset.
///
/// \return an Updater if success.
static ::arrow::Result<std::shared_ptr<Updater>> Make(
std::shared_ptr<LanceDataset> dataset,
const std::shared_ptr<::arrow::Field>& field,
const std::shared_ptr<::arrow::Schema>& schema,
const std::vector<std::string>& projection_columns);

/// PIMPL
Expand All @@ -99,7 +103,7 @@ class Updater {
/// parameters to build a Updater.
class UpdaterBuilder {
public:
UpdaterBuilder(std::shared_ptr<LanceDataset> dataset, std::shared_ptr<::arrow::Field> field);
UpdaterBuilder(std::shared_ptr<LanceDataset> dataset, std::shared_ptr<::arrow::Schema> schema);

/// Set the projection columns from the source dataset.
void Project(std::vector<std::string> columns);
Expand All @@ -109,7 +113,7 @@ class UpdaterBuilder {
private:
std::shared_ptr<LanceDataset> dataset_;

std::shared_ptr<::arrow::Field> field_;
std::shared_ptr<::arrow::Schema> schema_;

std::vector<std::string> projection_columns_;
};
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/lance/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ add_library(
file_lance_ext.h
fragment.cc
fragment.h
hash_merger.cc
hash_merger.h
scanner.cc
stl.h
type.cc
Expand All @@ -39,6 +41,7 @@ add_lance_test(api_test)
add_lance_test(arrow_dataset_test)
add_lance_test(dataset_test)
add_lance_test(fragment_test)
add_lance_test(hash_merger_test)
add_lance_test(scanner_test)
add_lance_test(type_test)
add_lance_test(updater_test)
Expand Down
71 changes: 69 additions & 2 deletions cpp/src/lance/arrow/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
#include <arrow/array/concatenate.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <arrow/type_traits.h>
#include <fmt/format.h>

#include <algorithm>
#include <filesystem>
#include <mutex>
#include <range/v3/all.hpp>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>

#include "lance/arrow/dataset_ext.h"
#include "lance/arrow/file_lance.h"
#include "lance/arrow/fragment.h"
#include "lance/arrow/hash_merger.h"
#include "lance/arrow/updater.h"
#include "lance/arrow/utils.h"
#include "lance/format/manifest.h"
Expand Down Expand Up @@ -313,8 +318,12 @@ DatasetVersion LanceDataset::version() const { return impl_->manifest->GetDatase

::arrow::Result<std::shared_ptr<UpdaterBuilder>> LanceDataset::NewUpdate(
const std::shared_ptr<::arrow::Field>& new_field) const {
return std::make_shared<UpdaterBuilder>(std::make_shared<LanceDataset>(*this),
std::move(new_field));
return NewUpdate(::arrow::schema({new_field}));
}

::arrow::Result<std::shared_ptr<UpdaterBuilder>> LanceDataset::NewUpdate(
const std::shared_ptr<::arrow::Schema>& new_columns) const {
return std::make_shared<UpdaterBuilder>(std::make_shared<LanceDataset>(*this), new_columns);
}

::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::AddColumn(
Expand Down Expand Up @@ -382,4 +391,62 @@ ::arrow::Result<::arrow::dataset::FragmentIterator> LanceDataset::GetFragmentsIm
return ::arrow::MakeVectorIterator(fragments);
}

::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Merge(
const std::shared_ptr<::arrow::Table>& other,
changhiskhan marked this conversation as resolved.
Show resolved Hide resolved
const std::string& on,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we'll always have the luxury of the same name on both. Can change this later tho i guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can change the signature to AddColumns(other, left_on, right_on)? I tried, but could not come up with a good function signature.

Suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left_on and right_on seems fine. In python we can have optional on= if the column name is indeed the same ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

::arrow::MemoryPool* pool) {
return Merge(other, on, on, pool);
}

::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Merge(
const std::shared_ptr<::arrow::Table>& right,
const std::string& left_on,
const std::string& right_on,
::arrow::MemoryPool* pool) {
/// Sanity checks
auto left_column = schema_->GetFieldByName(left_on);
if (left_column == nullptr) {
return ::arrow::Status::Invalid(
fmt::format("Column {} does not exist in the dataset.", left_on));
}
auto right_column = right->GetColumnByName(right_on);
if (right_column == nullptr) {
return ::arrow::Status::Invalid(
fmt::format("Column {} does not exist in the table.", right_on));
}

auto& left_type = left_column->type();
auto& right_type = right_column->type();
if (!left_type->Equals(right_type)) {
return ::arrow::Status::Invalid("LanceDataset::AddColumns: types are not equal: ",
left_type->ToString(),
" != ",
right_type->ToString());
}

// First phase, build hash table (in memory for simplicity)
auto merger = HashMerger(right, right_on, pool);
ARROW_RETURN_NOT_OK(merger.Init());

// Second phase
auto table_schema = right->schema();
ARROW_ASSIGN_OR_RAISE(auto incoming_schema,
table_schema->RemoveField(table_schema->GetFieldIndex(right_on)));
ARROW_ASSIGN_OR_RAISE(auto update_builder, NewUpdate(std::move(incoming_schema)));
update_builder->Project({left_on});
ARROW_ASSIGN_OR_RAISE(auto updater, update_builder->Finish());

while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, updater->Next());
if (!batch) {
break;
}
assert(batch->schema()->Equals(::arrow::schema({left_column})));
auto index_arr = batch->GetColumnByName(left_on);
ARROW_ASSIGN_OR_RAISE(auto right_batch, merger.Collect(index_arr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fancypants

ARROW_RETURN_NOT_OK(updater->UpdateBatch(right_batch));
}
return updater->Finish();
}

} // namespace lance::arrow
33 changes: 33 additions & 0 deletions cpp/src/lance/arrow/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,37 @@ TEST_CASE("Dataset add column with a function call") {
::arrow::field("doubles", ::arrow::float64())}),
{ids, doubles});
CHECK(table2->Equals(*expected_table));
}

TEST_CASE("Dataset add columns with a table") {
auto ids = ToArray({1, 2, 3, 4, 5}).ValueOrDie();
auto values = ToArray({"one", "two", "three", "four", "five"}).ValueOrDie();
auto schema = ::arrow::schema(
{::arrow::field("id", ::arrow::int32()), ::arrow::field("value", ::arrow::utf8())});
auto table = ::arrow::Table::Make(schema, {ids, values});
auto base_uri = WriteTable(table);

auto fs = std::make_shared<::arrow::fs::LocalFileSystem>();
auto dataset = lance::arrow::LanceDataset::Make(fs, base_uri).ValueOrDie();
CHECK(dataset->version().version() == 1);

auto added_ids = ToArray({5, 4, 3, 10, 12, 1}).ValueOrDie();
auto added_values = ToArray({50, 40, 30, 100, 120, 10}).ValueOrDie();
auto added_table =
::arrow::Table::Make(::arrow::schema({::arrow::field("id", ::arrow::int32()),
::arrow::field("new_value", ::arrow::int32())}),
{added_ids, added_values});
auto new_dataset = dataset->Merge(added_table, "id").ValueOrDie();
CHECK(new_dataset->version().version() == 2);
auto new_table =
new_dataset->NewScan().ValueOrDie()->Finish().ValueOrDie()->ToTable().ValueOrDie();

// TODO: Plain array does not support null yet, so arr[1] = 0 instead of Null.
auto new_values = ToArray({10, 0, 30, 40, 50}).ValueOrDie();
auto expected_table =
::arrow::Table::Make(::arrow::schema({::arrow::field("id", ::arrow::int32()),
::arrow::field("value", ::arrow::utf8()),
::arrow::field("new_value", ::arrow::int32())}),
{ids, values, new_values});
CHECK(new_table->Equals(*expected_table));
}
Loading