-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++] Add columns from in-memory table #337
Changes from all commits
5219708
5e8ca96
12c5cbf
f26c663
25c8016
3c97c43
8068ac3
b5a4e2c
77d611a
37d6389
24930ec
80ce0d9
4a57900
3ec88a2
4063567
c01b652
f433d7c
db6d628
3042752
5ef309a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,17 +18,22 @@ | |
#include <arrow/array/concatenate.h> | ||
#include <arrow/status.h> | ||
#include <arrow/table.h> | ||
#include <arrow/type_traits.h> | ||
#include <fmt/format.h> | ||
|
||
#include <algorithm> | ||
#include <filesystem> | ||
#include <mutex> | ||
#include <range/v3/all.hpp> | ||
#include <string> | ||
#include <string_view> | ||
#include <unordered_map> | ||
#include <utility> | ||
|
||
#include "lance/arrow/dataset_ext.h" | ||
#include "lance/arrow/file_lance.h" | ||
#include "lance/arrow/fragment.h" | ||
#include "lance/arrow/hash_merger.h" | ||
#include "lance/arrow/updater.h" | ||
#include "lance/arrow/utils.h" | ||
#include "lance/format/manifest.h" | ||
|
@@ -313,8 +318,12 @@ DatasetVersion LanceDataset::version() const { return impl_->manifest->GetDatase | |
|
||
::arrow::Result<std::shared_ptr<UpdaterBuilder>> LanceDataset::NewUpdate( | ||
const std::shared_ptr<::arrow::Field>& new_field) const { | ||
return std::make_shared<UpdaterBuilder>(std::make_shared<LanceDataset>(*this), | ||
std::move(new_field)); | ||
return NewUpdate(::arrow::schema({new_field})); | ||
} | ||
|
||
::arrow::Result<std::shared_ptr<UpdaterBuilder>> LanceDataset::NewUpdate( | ||
const std::shared_ptr<::arrow::Schema>& new_columns) const { | ||
return std::make_shared<UpdaterBuilder>(std::make_shared<LanceDataset>(*this), new_columns); | ||
} | ||
|
||
::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::AddColumn( | ||
|
@@ -382,4 +391,62 @@ ::arrow::Result<::arrow::dataset::FragmentIterator> LanceDataset::GetFragmentsIm | |
return ::arrow::MakeVectorIterator(fragments); | ||
} | ||
|
||
::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Merge( | ||
const std::shared_ptr<::arrow::Table>& other, | ||
changhiskhan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const std::string& on, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure we'll always have the luxury of the same name on both. Can change this later tho i guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can change the signature to Suggestions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
::arrow::MemoryPool* pool) { | ||
return Merge(other, on, on, pool); | ||
} | ||
|
||
::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Merge( | ||
const std::shared_ptr<::arrow::Table>& right, | ||
const std::string& left_on, | ||
const std::string& right_on, | ||
::arrow::MemoryPool* pool) { | ||
/// Sanity checks | ||
auto left_column = schema_->GetFieldByName(left_on); | ||
if (left_column == nullptr) { | ||
return ::arrow::Status::Invalid( | ||
fmt::format("Column {} does not exist in the dataset.", left_on)); | ||
} | ||
auto right_column = right->GetColumnByName(right_on); | ||
if (right_column == nullptr) { | ||
return ::arrow::Status::Invalid( | ||
fmt::format("Column {} does not exist in the table.", right_on)); | ||
} | ||
|
||
auto& left_type = left_column->type(); | ||
auto& right_type = right_column->type(); | ||
if (!left_type->Equals(right_type)) { | ||
return ::arrow::Status::Invalid("LanceDataset::AddColumns: types are not equal: ", | ||
left_type->ToString(), | ||
" != ", | ||
right_type->ToString()); | ||
} | ||
|
||
// First phase, build hash table (in memory for simplicity) | ||
auto merger = HashMerger(right, right_on, pool); | ||
ARROW_RETURN_NOT_OK(merger.Init()); | ||
|
||
// Second phase | ||
auto table_schema = right->schema(); | ||
ARROW_ASSIGN_OR_RAISE(auto incoming_schema, | ||
table_schema->RemoveField(table_schema->GetFieldIndex(right_on))); | ||
ARROW_ASSIGN_OR_RAISE(auto update_builder, NewUpdate(std::move(incoming_schema))); | ||
update_builder->Project({left_on}); | ||
ARROW_ASSIGN_OR_RAISE(auto updater, update_builder->Finish()); | ||
|
||
while (true) { | ||
ARROW_ASSIGN_OR_RAISE(auto batch, updater->Next()); | ||
if (!batch) { | ||
break; | ||
} | ||
assert(batch->schema()->Equals(::arrow::schema({left_column}))); | ||
auto index_arr = batch->GetColumnByName(left_on); | ||
ARROW_ASSIGN_OR_RAISE(auto right_batch, merger.Collect(index_arr)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fancypants |
||
ARROW_RETURN_NOT_OK(updater->UpdateBatch(right_batch)); | ||
} | ||
return updater->Finish(); | ||
} | ||
|
||
} // namespace lance::arrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does either this or the single-field addition work for new subfield of nested field? If yes is there a test? If not, should it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand your question. You mean add some tests for a column with nested data? or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm just asking what the behavior is (i forgot)