From 23ec7a9e40f97fe10e45cd7d99545fd0136e7d39 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 10:50:33 -0800 Subject: [PATCH 01/19] can build lance core with duckdb ext --- cpp/src/lance/format/CMakeLists.txt | 2 +- integration/duckdb/CMakeLists.txt | 11 ++++- .../duckdb/src/lance/duckdb/lance-extension.h | 2 + .../duckdb/src/lance/duckdb/lance_reader.cc | 44 +++++++++++++++++++ .../duckdb/src/lance/duckdb/lance_reader.h | 27 ++++++++++++ 5 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 integration/duckdb/src/lance/duckdb/lance_reader.cc create mode 100644 integration/duckdb/src/lance/duckdb/lance_reader.h diff --git a/cpp/src/lance/format/CMakeLists.txt b/cpp/src/lance/format/CMakeLists.txt index dc8eb312b2..865b8b3be9 100644 --- a/cpp/src/lance/format/CMakeLists.txt +++ b/cpp/src/lance/format/CMakeLists.txt @@ -16,7 +16,7 @@ protobuf_generate_cpp( PROTO_SRCS PROTO_HDRS - ${CMAKE_SOURCE_DIR}/../protos/format.proto + ${PROJECT_SOURCE_DIR}/../protos/format.proto ) add_library( diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 934a0d9779..0e529e4637 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -109,12 +109,21 @@ if(LANCE_BUILD_PYTORCH) include_directories(${OpenCV_INCLUDE_DIRS}) endif() +# Add lance +include_directories(${CMAKE_BINARY_DIR}/lance/src) +add_subdirectory(../../cpp lance) + + include_directories(src) set(LANCE_EXT_SOURCE_COMMON src/lance/duckdb/lance-extension.cc src/lance/duckdb/list_functions.cc - src/lance/duckdb/vector_functions.cc) + src/lance/duckdb/list_functions.h + src/lance/duckdb/vector_functions.cc + src/lance/duckdb/lance_reader.h + src/lance/duckdb/lance_reader.cc +) set(LANCE_EXT_SOURCE_ML src/lance/duckdb/ml/catalog.cc diff --git a/integration/duckdb/src/lance/duckdb/lance-extension.h b/integration/duckdb/src/lance/duckdb/lance-extension.h index 57ff598a8e..1178111f42 100644 --- a/integration/duckdb/src/lance/duckdb/lance-extension.h +++ b/integration/duckdb/src/lance/duckdb/lance-extension.h @@ -22,7 +22,9 @@ namespace duckdb { class LanceExtension : public Extension { public: + void Load(DuckDB &db) override; + std::string Name() override; }; diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc new file mode 100644 index 0000000000..d5a107b240 --- /dev/null +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -0,0 +1,44 @@ +// Copyright 2022 Lance Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "lance/duckdb/lance_reader.h" + +#include + +namespace lance::duckdb { + +struct GlobalLanceReaderFunctionState : public ::duckdb::GlobalTableFunctionState {}; + +struct LocalLanceReaderFunctionState : public ::duckdb::LocalTableFunctionState { + +}; + +void LanceScan(::duckdb::ClientContext &context, + ::duckdb::TableFunctionInput &input, + ::duckdb::DataChunk &output) {} + +::duckdb::TableFunctionSet GetLanceReaderFunction() { + ::duckdb::TableFunctionSet func_set("lance_scan"); + + ::duckdb::TableFunction table_function({::duckdb::LogicalType::VARCHAR}, LanceScan); + table_function.projection_pushdown = true; + table_function.filter_pushdown = true; + table_function.filter_prune = true; + + func_set.AddFunction(table_function); + return func_set; +} + +} // namespace lance::duckdb \ No newline at end of file diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.h b/integration/duckdb/src/lance/duckdb/lance_reader.h new file mode 100644 index 0000000000..c80c1102f6 --- /dev/null +++ b/integration/duckdb/src/lance/duckdb/lance_reader.h @@ -0,0 +1,27 @@ +// Copyright 2022 Lance Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include + +namespace lance::duckdb { + +/// Get lance reader: +/// +/// SELECT * from lance_scan("s3://path/to"); +::duckdb::TableFunctionSet GetLanceReaderFunction(); + +} From 0cadabc27e22599170fbe9e5f64a2272453e41d4 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 11:41:26 -0800 Subject: [PATCH 02/19] table bind --- integration/duckdb/CMakeLists.txt | 8 +++-- .../src/lance/duckdb/lance-extension.cc | 9 +++-- .../duckdb/src/lance/duckdb/lance_reader.cc | 33 +++++++++++++++++-- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 0e529e4637..07c89016fc 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -109,11 +109,12 @@ if(LANCE_BUILD_PYTORCH) include_directories(${OpenCV_INCLUDE_DIRS}) endif() -# Add lance -include_directories(${CMAKE_BINARY_DIR}/lance/src) +# Add lance core as dependency +find_package(Arrow REQUIRED) +find_package(ArrowDataset REQUIRED) +include_directories(${CMAKE_BINARY_DIR}/lance/src ../../cpp/include) add_subdirectory(../../cpp lance) - include_directories(src) set(LANCE_EXT_SOURCE_COMMON @@ -141,6 +142,7 @@ endif() # add_library(lance_extension STATIC ${LANCE_EXT_SOURCES}) set(PARAMETERS "-warnings") build_loadable_extension(lance ${PARAMETERS} ${LANCE_EXT_SOURCES}) +target_link_libraries(lance_loadable_extension lance ArrowDataset::arrow_dataset_shared) if(LANCE_BUILD_PYTORCH) target_link_libraries(lance_loadable_extension "${TORCH_LIBRARIES}" diff --git a/integration/duckdb/src/lance/duckdb/lance-extension.cc b/integration/duckdb/src/lance/duckdb/lance-extension.cc index 30bbe6f46b..2def507532 100644 --- a/integration/duckdb/src/lance/duckdb/lance-extension.cc +++ b/integration/duckdb/src/lance/duckdb/lance-extension.cc @@ -18,9 +18,10 @@ #include +#include "lance/duckdb/lance_reader.h" #include "lance/duckdb/list_functions.h" -#include "lance/duckdb/vector_functions.h" #include "lance/duckdb/ml/functions.h" +#include "lance/duckdb/vector_functions.h" namespace duckdb { @@ -46,11 +47,15 @@ void LanceExtension::Load(::duckdb::DuckDB &db) { catalog.CreateTableFunction(context, func.get()); } + auto scan_func = lance::duckdb::GetLanceReaderFunction(); + ::duckdb::CreateTableFunctionInfo scan(scan_func); + catalog.CreateTableFunction(context, &scan); + con.Commit(); } std::string LanceExtension::Name() { return {"lance"}; } -}; +}; // namespace duckdb extern "C" { diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index d5a107b240..1c8022d331 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -15,16 +15,42 @@ #include "lance/duckdb/lance_reader.h" +#include + +#include #include +#include +#include namespace lance::duckdb { -struct GlobalLanceReaderFunctionState : public ::duckdb::GlobalTableFunctionState {}; +struct GlobalScanState : public ::duckdb::GlobalTableFunctionState { + std::shared_ptr dataset; +}; -struct LocalLanceReaderFunctionState : public ::duckdb::LocalTableFunctionState { +struct LocalScanState : public ::duckdb::LocalTableFunctionState {}; +/// BindData for Lance Scan +struct ScanBindData : public ::duckdb::TableFunctionData { + std::shared_ptr dataset; }; +std::unique_ptr<::duckdb::FunctionData> LanceScanBind( + ::duckdb::ClientContext &context, + ::duckdb::TableFunctionBindInput &input, + std::vector<::duckdb::LogicalType> &return_types, + std::vector &names) { + auto dataset_uri = input.inputs[0].GetValue(); + printf("Dataset uri is: %s\n", dataset_uri.c_str()); +// auto result = + names.emplace_back("abc"); + return_types.emplace_back(::duckdb::LogicalType::INTEGER); + + auto bind_data = std::make_unique(); + bind_data->column_ids.emplace_back(0); + return std::move(bind_data); +} + void LanceScan(::duckdb::ClientContext &context, ::duckdb::TableFunctionInput &input, ::duckdb::DataChunk &output) {} @@ -32,7 +58,8 @@ void LanceScan(::duckdb::ClientContext &context, ::duckdb::TableFunctionSet GetLanceReaderFunction() { ::duckdb::TableFunctionSet func_set("lance_scan"); - ::duckdb::TableFunction table_function({::duckdb::LogicalType::VARCHAR}, LanceScan); + ::duckdb::TableFunction table_function( + {::duckdb::LogicalType::VARCHAR}, LanceScan, LanceScanBind); table_function.projection_pushdown = true; table_function.filter_pushdown = true; table_function.filter_prune = true; From bc7873dc44ee5f746066051112f188af184d3645 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 13:22:37 -0800 Subject: [PATCH 03/19] parse schema --- integration/duckdb/CMakeLists.txt | 6 +- integration/duckdb/src/lance/duckdb/lance.cc | 90 +++++++++++++++++++ integration/duckdb/src/lance/duckdb/lance.h | 38 ++++++++ .../duckdb/src/lance/duckdb/lance_reader.cc | 17 +++- 4 files changed, 145 insertions(+), 6 deletions(-) create mode 100644 integration/duckdb/src/lance/duckdb/lance.cc create mode 100644 integration/duckdb/src/lance/duckdb/lance.h diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 07c89016fc..a2e228d598 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -118,12 +118,14 @@ add_subdirectory(../../cpp lance) include_directories(src) set(LANCE_EXT_SOURCE_COMMON + src/lance/duckdb/lance_reader.cc + src/lance/duckdb/lance_reader.h src/lance/duckdb/lance-extension.cc + src/lance/duckdb/lance.cc + src/lance/duckdb/lance.h src/lance/duckdb/list_functions.cc src/lance/duckdb/list_functions.h src/lance/duckdb/vector_functions.cc - src/lance/duckdb/lance_reader.h - src/lance/duckdb/lance_reader.cc ) set(LANCE_EXT_SOURCE_ML diff --git a/integration/duckdb/src/lance/duckdb/lance.cc b/integration/duckdb/src/lance/duckdb/lance.cc new file mode 100644 index 0000000000..944495905d --- /dev/null +++ b/integration/duckdb/src/lance/duckdb/lance.cc @@ -0,0 +1,90 @@ +// Copyright 2022 Lance Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "lance/duckdb/lance.h" + +#include + +#include +#include + +namespace lance::duckdb { + +namespace { + +inline ::duckdb::LogicalType ArrowDictionaryTypeToLogicalType(const ::arrow::DataType& dtype) { + auto& dict_type = dynamic_cast(dtype); + return ToLogicalType(*dict_type.value_type()); +} + +inline ::duckdb::LogicalType ArrowStructTypeToLogicalType(const ::arrow::DataType& dtype) { + auto& struct_type = dynamic_cast(dtype); + ::duckdb::child_list_t<::duckdb::LogicalType> children; + for (auto& child : struct_type.fields()) { + children.emplace_back(std::make_pair(child->name(), ToLogicalType(*child->type()))); + } + return ::duckdb::LogicalType::STRUCT(children); +} + +template +inline ::duckdb::LogicalType ArrowListTypeToLogicalType(const ::arrow::DataType& dtype) { + auto& list_type = dynamic_cast(dtype); + auto child_type = ToLogicalType(*list_type.value_type()); + return ::duckdb::LogicalType::LIST(child_type); +} + +} // namespace +::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type) { + switch (arrow_type.id()) { + case ::arrow::Type::BOOL: + return ::duckdb::LogicalType::BOOLEAN; + case ::arrow::Type::INT8: + return ::duckdb::LogicalType::TINYINT; + case ::arrow::Type::UINT8: + return ::duckdb::LogicalType::UTINYINT; + case ::arrow::Type::INT16: + return ::duckdb::LogicalType::SMALLINT; + case ::arrow::Type::UINT16: + return ::duckdb::LogicalType::USMALLINT; + case ::arrow::Type::INT32: + return ::duckdb::LogicalType::INTEGER; + case ::arrow::Type::UINT64: + return ::duckdb::LogicalType::UINTEGER; + case ::arrow::Type::FLOAT: + case ::arrow::Type::HALF_FLOAT: + return ::duckdb::LogicalType::FLOAT; + case ::arrow::Type::DOUBLE: + return ::duckdb::LogicalType::DOUBLE; + case ::arrow::Type::STRING: + case ::arrow::Type::LARGE_STRING: + return ::duckdb::LogicalType::VARCHAR; + case ::arrow::Type::BINARY: + case ::arrow::Type::LARGE_BINARY: + return ::duckdb::LogicalType::BLOB; + case ::arrow::Type::DICTIONARY: + return ArrowDictionaryTypeToLogicalType(arrow_type); + case ::arrow::Type::STRUCT: + return ArrowStructTypeToLogicalType(arrow_type); + case ::arrow::Type::LIST: + return ArrowListTypeToLogicalType<::arrow::ListType>(arrow_type); + case ::arrow::Type::FIXED_SIZE_LIST: + return ArrowListTypeToLogicalType<::arrow::FixedSizeListType>(arrow_type); + default: + throw ::duckdb::InvalidInputException("Does not support type: %s", + arrow_type.ToString().c_str()); + } +} + +} // namespace lance::duckdb \ No newline at end of file diff --git a/integration/duckdb/src/lance/duckdb/lance.h b/integration/duckdb/src/lance/duckdb/lance.h new file mode 100644 index 0000000000..8e1834a0e3 --- /dev/null +++ b/integration/duckdb/src/lance/duckdb/lance.h @@ -0,0 +1,38 @@ +// Copyright 2022 Lance Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +/// \brief Lance Core Adaptors and utilities + +#include +#include + +#include +#include + +namespace lance::duckdb { + +template +T GetResult(::arrow::Result&& result) { + if (result.ok()) { + return std::move(result.ValueOrDie()); + } + throw E(result.status().message()); +} + +::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type); + +} // namespace lance::duckdb diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 1c8022d331..fdbfb7a3e5 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -15,6 +15,7 @@ #include "lance/duckdb/lance_reader.h" +#include #include #include @@ -22,6 +23,8 @@ #include #include +#include "lance/duckdb/lance.h" + namespace lance::duckdb { struct GlobalScanState : public ::duckdb::GlobalTableFunctionState { @@ -42,12 +45,18 @@ std::unique_ptr<::duckdb::FunctionData> LanceScanBind( std::vector &names) { auto dataset_uri = input.inputs[0].GetValue(); printf("Dataset uri is: %s\n", dataset_uri.c_str()); -// auto result = - names.emplace_back("abc"); - return_types.emplace_back(::duckdb::LogicalType::INTEGER); + std::string path; + auto fs = GetResult(::arrow::fs::FileSystemFromUriOrPath(dataset_uri, &path)); + auto dataset = GetResult(lance::arrow::LanceDataset::Make(std::move(fs), path)); + auto schema = dataset->schema(); auto bind_data = std::make_unique(); - bind_data->column_ids.emplace_back(0); + for (int i = 0; i < schema->fields().size(); ++i) { + const auto& field = schema->field(i); + names.emplace_back(field->name()); + return_types.emplace_back(ToLogicalType(*field->type())); + bind_data->column_ids.emplace_back(i); + } return std::move(bind_data); } From a7769048db5b2e05df4517851633e23f36bff4ad Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 14:19:06 -0800 Subject: [PATCH 04/19] init batch reader --- integration/duckdb/CMakeLists.txt | 2 +- integration/duckdb/src/lance/duckdb/lance.h | 9 ++++ .../duckdb/src/lance/duckdb/lance_reader.cc | 47 +++++++++++++++++-- .../duckdb/src/lance/duckdb/lance_reader.h | 2 +- 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index a2e228d598..8a7d3e48fb 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -144,7 +144,7 @@ endif() # add_library(lance_extension STATIC ${LANCE_EXT_SOURCES}) set(PARAMETERS "-warnings") build_loadable_extension(lance ${PARAMETERS} ${LANCE_EXT_SOURCES}) -target_link_libraries(lance_loadable_extension lance ArrowDataset::arrow_dataset_shared) +target_link_libraries(lance_loadable_extension lance ArrowDataset::arrow_dataset_shared fmt::fmt) if(LANCE_BUILD_PYTORCH) target_link_libraries(lance_loadable_extension "${TORCH_LIBRARIES}" diff --git a/integration/duckdb/src/lance/duckdb/lance.h b/integration/duckdb/src/lance/duckdb/lance.h index 8e1834a0e3..0fa4f7dede 100644 --- a/integration/duckdb/src/lance/duckdb/lance.h +++ b/integration/duckdb/src/lance/duckdb/lance.h @@ -18,6 +18,7 @@ /// \brief Lance Core Adaptors and utilities #include +#include #include #include @@ -33,6 +34,14 @@ T GetResult(::arrow::Result&& result) { throw E(result.status().message()); } +template +void CheckStatus(const ::arrow::Status& status) { + if (!status.ok()) { + throw E(status.message()); + } +} + +/// Convert Arrow and Lance types into DuckDB logical type ::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type); } // namespace lance::duckdb diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index fdbfb7a3e5..1624fcdf52 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -17,18 +17,24 @@ #include #include +#include #include #include #include #include +#include +#include #include "lance/duckdb/lance.h" namespace lance::duckdb { +namespace { + struct GlobalScanState : public ::duckdb::GlobalTableFunctionState { std::shared_ptr dataset; + ::arrow::dataset::TaggedRecordBatchGenerator batch_generator; }; struct LocalScanState : public ::duckdb::LocalTableFunctionState {}; @@ -44,15 +50,14 @@ std::unique_ptr<::duckdb::FunctionData> LanceScanBind( std::vector<::duckdb::LogicalType> &return_types, std::vector &names) { auto dataset_uri = input.inputs[0].GetValue(); - printf("Dataset uri is: %s\n", dataset_uri.c_str()); - std::string path; auto fs = GetResult(::arrow::fs::FileSystemFromUriOrPath(dataset_uri, &path)); auto dataset = GetResult(lance::arrow::LanceDataset::Make(std::move(fs), path)); auto schema = dataset->schema(); auto bind_data = std::make_unique(); + bind_data->dataset = std::move(dataset); for (int i = 0; i < schema->fields().size(); ++i) { - const auto& field = schema->field(i); + const auto &field = schema->field(i); names.emplace_back(field->name()); return_types.emplace_back(ToLogicalType(*field->type())); bind_data->column_ids.emplace_back(i); @@ -60,15 +65,47 @@ std::unique_ptr<::duckdb::FunctionData> LanceScanBind( return std::move(bind_data); } +std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( + ::duckdb::ClientContext &context, ::duckdb::TableFunctionInitInput &input) { + auto bind_data = dynamic_cast(input.bind_data); + assert(bind_data != nullptr); + + auto state = std::make_unique(); + state->dataset = bind_data->dataset; + + auto schema = state->dataset->schema(); + std::vector columns; + for (auto& column_id : input.column_ids) { + columns.emplace_back(schema->field(column_id)->name()); + } + + auto builder = GetResult(state->dataset->NewScan()); + CheckStatus(builder->Project(columns)); + auto scanner = GetResult(builder->Finish()); + state->batch_generator = GetResult(scanner->ScanBatchesAsync()); + fmt::print("Columns ids: {}\n", input.column_ids); + return state; +} + void LanceScan(::duckdb::ClientContext &context, ::duckdb::TableFunctionInput &input, - ::duckdb::DataChunk &output) {} + ::duckdb::DataChunk &output) { + auto global_state = dynamic_cast(input.global_state); + auto fut = global_state->batch_generator(); + auto batch = GetResult(fut.MoveResult()); + if (batch.record_batch == nullptr) { + return; + } + fmt::print("Batch: {}\n", batch.record_batch->ToString()); +} + +} // namespace ::duckdb::TableFunctionSet GetLanceReaderFunction() { ::duckdb::TableFunctionSet func_set("lance_scan"); ::duckdb::TableFunction table_function( - {::duckdb::LogicalType::VARCHAR}, LanceScan, LanceScanBind); + {::duckdb::LogicalType::VARCHAR}, LanceScan, LanceScanBind, InitGlobal); table_function.projection_pushdown = true; table_function.filter_pushdown = true; table_function.filter_prune = true; diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.h b/integration/duckdb/src/lance/duckdb/lance_reader.h index c80c1102f6..f33bb2ac14 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.h +++ b/integration/duckdb/src/lance/duckdb/lance_reader.h @@ -21,7 +21,7 @@ namespace lance::duckdb { /// Get lance reader: /// -/// SELECT * from lance_scan("s3://path/to"); +/// SELECT * from lance_scan("s3://path/to/dataset"); ::duckdb::TableFunctionSet GetLanceReaderFunction(); } From 19faf07b9cbd6ebaa7f8a772e1e424d084f63dd1 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 15:35:23 -0800 Subject: [PATCH 05/19] convert array to vector --- .../duckdb/src/lance/duckdb/lance_reader.cc | 79 +++++++++++++++++-- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 1624fcdf52..2aa5b94213 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -15,16 +15,19 @@ #include "lance/duckdb/lance_reader.h" +#include +#include #include +#include +#include +#include +#include #include -#include #include #include #include #include -#include -#include #include "lance/duckdb/lance.h" @@ -75,7 +78,7 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( auto schema = state->dataset->schema(); std::vector columns; - for (auto& column_id : input.column_ids) { + for (auto &column_id : input.column_ids) { columns.emplace_back(schema->field(column_id)->name()); } @@ -87,16 +90,82 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( return state; } +template +void NumericArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { + fmt::print("Numberic to vector, out={} arr->type={} template={} arr={}\n", + fmt::ptr(out), + arr->type()->ToString(), + ArrowType().ToString(), + fmt::ptr(arr)); + auto array = std::dynamic_pointer_cast::ArrayType>(arr); + assert(array != nullptr); + // TODO: Use zero copy + // out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); + fmt::print("Add data: out={} length={}\n", fmt::ptr(out), fmt::ptr(array)); + for (int i = 0; i < array->length(); ++i) { + out->SetValue(i, ::duckdb::Value::CreateValue(array->Value(i))); + } +} + +/// Convert a `arrow::Array` to `duckdb::Vector`. +::duckdb::Vector ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr) { + // TODO: optimize it for zero copy + auto logical_type = ToLogicalType(*arr->type()); + ::duckdb::Vector result(logical_type); + switch (arr->type_id()) { + case ::arrow::Type::UINT8: + NumericArrayToVector<::arrow::UInt8Type>(arr, &result); + break; + case ::arrow::Type::INT8: + NumericArrayToVector<::arrow::Int8Type>(arr, &result); + break; + case ::arrow::Type::UINT16: + NumericArrayToVector<::arrow::UInt16Type>(arr, &result); + break; + case ::arrow::Type::INT16: + NumericArrayToVector<::arrow::Int16Type>(arr, &result); + break; + case ::arrow::Type::UINT32: + NumericArrayToVector<::arrow::UInt32Type>(arr, &result); + break; + case ::arrow::Type::INT32: + NumericArrayToVector<::arrow::Int32Type>(arr, &result); + break; + case ::arrow::Type::UINT64: + NumericArrayToVector<::arrow::UInt64Type>(arr, &result); + break; + case ::arrow::Type::INT64: + NumericArrayToVector<::arrow::Int64Type>(arr, &result); + break; + case ::arrow::Type::FLOAT: + NumericArrayToVector<::arrow::FloatType>(arr, &result); + break; + case ::arrow::Type::DOUBLE: + NumericArrayToVector<::arrow::FloatType>(arr, &result); + break; + default: + throw ::duckdb::IOException("Unsupported type: " + arr->type()->ToString()); + } + return std::move(result); +} + void LanceScan(::duckdb::ClientContext &context, ::duckdb::TableFunctionInput &input, ::duckdb::DataChunk &output) { - auto global_state = dynamic_cast(input.global_state); + auto global_state = dynamic_cast(input.global_state); auto fut = global_state->batch_generator(); auto batch = GetResult(fut.MoveResult()); if (batch.record_batch == nullptr) { return; } fmt::print("Batch: {}\n", batch.record_batch->ToString()); + output.SetCapacity(batch.record_batch->num_rows()); + for (auto &col : batch.record_batch->columns()) { + // auto vec = ArrowArrayToVector(col); + fmt::print("Convert to array: {}\n", col->ToString()); + output.data.emplace_back(ArrowArrayToVector(col)); + fmt::print("After convert\n"); + } } } // namespace From bba8168c48c5114ae9ddca2b8a7cf2587ceb402e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 17:14:52 -0800 Subject: [PATCH 06/19] read primitive columns --- .../duckdb/src/lance/duckdb/lance_reader.cc | 51 ++++++++----------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 2aa5b94213..4ea1447be4 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -90,63 +90,55 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( return state; } +/// Convert numeric array to duckdb vector. template void NumericArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { - fmt::print("Numberic to vector, out={} arr->type={} template={} arr={}\n", - fmt::ptr(out), - arr->type()->ToString(), - ArrowType().ToString(), - fmt::ptr(arr)); - auto array = std::dynamic_pointer_cast::ArrayType>(arr); + // TODO: dynamic_pointer_cast does not work here, IDK why. + auto array = std::static_pointer_cast::ArrayType>(arr); assert(array != nullptr); - // TODO: Use zero copy - // out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); - fmt::print("Add data: out={} length={}\n", fmt::ptr(out), fmt::ptr(array)); + // TODO: How to use zero copy to move data from arrow to duckdb. for (int i = 0; i < array->length(); ++i) { out->SetValue(i, ::duckdb::Value::CreateValue(array->Value(i))); } + out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } /// Convert a `arrow::Array` to `duckdb::Vector`. -::duckdb::Vector ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr) { - // TODO: optimize it for zero copy - auto logical_type = ToLogicalType(*arr->type()); - ::duckdb::Vector result(logical_type); +void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { switch (arr->type_id()) { case ::arrow::Type::UINT8: - NumericArrayToVector<::arrow::UInt8Type>(arr, &result); + NumericArrayToVector<::arrow::UInt8Type>(arr, out); break; case ::arrow::Type::INT8: - NumericArrayToVector<::arrow::Int8Type>(arr, &result); + NumericArrayToVector<::arrow::Int8Type>(arr, out); break; case ::arrow::Type::UINT16: - NumericArrayToVector<::arrow::UInt16Type>(arr, &result); + NumericArrayToVector<::arrow::UInt16Type>(arr, out); break; case ::arrow::Type::INT16: - NumericArrayToVector<::arrow::Int16Type>(arr, &result); + NumericArrayToVector<::arrow::Int16Type>(arr, out); break; case ::arrow::Type::UINT32: - NumericArrayToVector<::arrow::UInt32Type>(arr, &result); + NumericArrayToVector<::arrow::UInt32Type>(arr, out); break; case ::arrow::Type::INT32: - NumericArrayToVector<::arrow::Int32Type>(arr, &result); + NumericArrayToVector<::arrow::Int32Type>(arr, out); break; case ::arrow::Type::UINT64: - NumericArrayToVector<::arrow::UInt64Type>(arr, &result); + NumericArrayToVector<::arrow::UInt64Type>(arr, out); break; case ::arrow::Type::INT64: - NumericArrayToVector<::arrow::Int64Type>(arr, &result); + NumericArrayToVector<::arrow::Int64Type>(arr, out); break; case ::arrow::Type::FLOAT: - NumericArrayToVector<::arrow::FloatType>(arr, &result); + NumericArrayToVector<::arrow::FloatType>(arr, out); break; case ::arrow::Type::DOUBLE: - NumericArrayToVector<::arrow::FloatType>(arr, &result); + NumericArrayToVector<::arrow::FloatType>(arr, out); break; default: throw ::duckdb::IOException("Unsupported type: " + arr->type()->ToString()); } - return std::move(result); } void LanceScan(::duckdb::ClientContext &context, @@ -158,13 +150,10 @@ void LanceScan(::duckdb::ClientContext &context, if (batch.record_batch == nullptr) { return; } - fmt::print("Batch: {}\n", batch.record_batch->ToString()); - output.SetCapacity(batch.record_batch->num_rows()); - for (auto &col : batch.record_batch->columns()) { - // auto vec = ArrowArrayToVector(col); - fmt::print("Convert to array: {}\n", col->ToString()); - output.data.emplace_back(ArrowArrayToVector(col)); - fmt::print("After convert\n"); + output.SetCardinality(batch.record_batch->num_rows()); + for (int i = 0; i < output.data.size(); ++i) { + auto col = batch.record_batch->column(i); + ArrowArrayToVector(col, &output.data[i]); } } From b796ce609bed50065e5f244eeae12a7073e1cb3a Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 17:26:36 -0800 Subject: [PATCH 07/19] read string and primtive arr --- .../duckdb/src/lance/duckdb/lance_reader.cc | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 4ea1447be4..416333f72f 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -92,7 +92,7 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( /// Convert numeric array to duckdb vector. template -void NumericArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { +void ToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { // TODO: dynamic_pointer_cast does not work here, IDK why. auto array = std::static_pointer_cast::ArrayType>(arr); assert(array != nullptr); @@ -103,38 +103,54 @@ void NumericArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb:: out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } +template <> +void ToVector<::arrow::StringType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + auto array = + std::static_pointer_cast::ArrayType>(arr); + assert(array != nullptr); + // TODO: How to use zero copy to move data from arrow to duckdb. + for (int i = 0; i < array->length(); ++i) { + out->SetValue(i, std::string(array->Value(i))); + } + out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); +} + /// Convert a `arrow::Array` to `duckdb::Vector`. void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { switch (arr->type_id()) { case ::arrow::Type::UINT8: - NumericArrayToVector<::arrow::UInt8Type>(arr, out); + ToVector<::arrow::UInt8Type>(arr, out); break; case ::arrow::Type::INT8: - NumericArrayToVector<::arrow::Int8Type>(arr, out); + ToVector<::arrow::Int8Type>(arr, out); break; case ::arrow::Type::UINT16: - NumericArrayToVector<::arrow::UInt16Type>(arr, out); + ToVector<::arrow::UInt16Type>(arr, out); break; case ::arrow::Type::INT16: - NumericArrayToVector<::arrow::Int16Type>(arr, out); + ToVector<::arrow::Int16Type>(arr, out); break; case ::arrow::Type::UINT32: - NumericArrayToVector<::arrow::UInt32Type>(arr, out); + ToVector<::arrow::UInt32Type>(arr, out); break; case ::arrow::Type::INT32: - NumericArrayToVector<::arrow::Int32Type>(arr, out); + ToVector<::arrow::Int32Type>(arr, out); break; case ::arrow::Type::UINT64: - NumericArrayToVector<::arrow::UInt64Type>(arr, out); + ToVector<::arrow::UInt64Type>(arr, out); break; case ::arrow::Type::INT64: - NumericArrayToVector<::arrow::Int64Type>(arr, out); + ToVector<::arrow::Int64Type>(arr, out); break; case ::arrow::Type::FLOAT: - NumericArrayToVector<::arrow::FloatType>(arr, out); + ToVector<::arrow::FloatType>(arr, out); break; case ::arrow::Type::DOUBLE: - NumericArrayToVector<::arrow::FloatType>(arr, out); + ToVector<::arrow::FloatType>(arr, out); + break; + case ::arrow::Type::STRING: + ToVector<::arrow::StringType>(arr, out); break; default: throw ::duckdb::IOException("Unsupported type: " + arr->type()->ToString()); From 8e0ec62d8fd8dd66226af88e0442cf8cfd8c8e71 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 19:44:46 -0800 Subject: [PATCH 08/19] dictionary to array --- integration/duckdb/CMakeLists.txt | 2 +- .../duckdb/src/lance/duckdb/lance_reader.cc | 26 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 8a7d3e48fb..4769e74bfe 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -4,7 +4,7 @@ if(POLICY CMP0135) cmake_policy(SET CMP0135 NEW) endif() -add_compile_options(-mf16c) # opencv +#add_compile_options(-mf16c) # opencv project(lance_duckdb CXX) option(LANCE_BUILD_PYTORCH "Build with PyTorch" TRUE) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 416333f72f..598778dce1 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -103,11 +103,11 @@ void ToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } +/// Convert a String array into duckdb vector. template <> void ToVector<::arrow::StringType>(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { - auto array = - std::static_pointer_cast::ArrayType>(arr); + auto array = std::static_pointer_cast<::arrow::StringArray>(arr); assert(array != nullptr); // TODO: How to use zero copy to move data from arrow to duckdb. for (int i = 0; i < array->length(); ++i) { @@ -116,6 +116,23 @@ void ToVector<::arrow::StringType>(const std::shared_ptr<::arrow::Array> &arr, out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } +template <> +void ToVector<::arrow::DictionaryType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + auto array = std::static_pointer_cast<::arrow::DictionaryArray>(arr); + // TODO: zero copy + out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); + auto dict_arr = std::dynamic_pointer_cast<::arrow::StringArray>(array->dictionary()); + out->Print(); + fmt::print("Logical type: {}\n", out->GetType().ToString()); + auto indices_arr = std::dynamic_pointer_cast<::arrow::Int16Array>(array->indices()); + fmt::print("Index arr: {}\n", fmt::ptr(indices_arr)); + for (int i = 0; i < indices_arr->length(); ++i) { + auto idx = indices_arr->Value(i); + out->SetValue(i, std::string(dict_arr->Value(idx))); + } +} + /// Convert a `arrow::Array` to `duckdb::Vector`. void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { switch (arr->type_id()) { @@ -152,8 +169,11 @@ void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Ve case ::arrow::Type::STRING: ToVector<::arrow::StringType>(arr, out); break; + case ::arrow::Type::DICTIONARY: + ToVector<::arrow::DictionaryType>(arr, out); + break; default: - throw ::duckdb::IOException("Unsupported type: " + arr->type()->ToString()); + throw ::duckdb::IOException("Unsupported Arrow Type: " + arr->type()->ToString()); } } From b1abcba26a1a020521ccb804f6f0eac6c7f488b6 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 21:04:50 -0800 Subject: [PATCH 09/19] struct --- integration/duckdb/src/lance/duckdb/arrow.h | 21 ++++++++++++++ .../duckdb/src/lance/duckdb/lance_reader.cc | 28 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 integration/duckdb/src/lance/duckdb/arrow.h diff --git a/integration/duckdb/src/lance/duckdb/arrow.h b/integration/duckdb/src/lance/duckdb/arrow.h new file mode 100644 index 0000000000..10aa6e8c40 --- /dev/null +++ b/integration/duckdb/src/lance/duckdb/arrow.h @@ -0,0 +1,21 @@ +// Copyright 2022 Lance Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +namespace lance::duckdb { + + +} diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 598778dce1..fd5c597ca3 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -35,6 +35,9 @@ namespace lance::duckdb { namespace { +// Forward declaration +void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out); + struct GlobalScanState : public ::duckdb::GlobalTableFunctionState { std::shared_ptr dataset; ::arrow::dataset::TaggedRecordBatchGenerator batch_generator; @@ -120,12 +123,13 @@ template <> void ToVector<::arrow::DictionaryType>(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { auto array = std::static_pointer_cast<::arrow::DictionaryArray>(arr); + fmt::print("arr type->{}\n", arr->type()->ToString()); // TODO: zero copy out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); auto dict_arr = std::dynamic_pointer_cast<::arrow::StringArray>(array->dictionary()); out->Print(); fmt::print("Logical type: {}\n", out->GetType().ToString()); - auto indices_arr = std::dynamic_pointer_cast<::arrow::Int16Array>(array->indices()); + auto indices_arr = std::static_pointer_cast<::arrow::Int8Array>(array->indices()); fmt::print("Index arr: {}\n", fmt::ptr(indices_arr)); for (int i = 0; i < indices_arr->length(); ++i) { auto idx = indices_arr->Value(i); @@ -133,6 +137,25 @@ void ToVector<::arrow::DictionaryType>(const std::shared_ptr<::arrow::Array> &ar } } +template <> +void ToVector<::arrow::StructType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + assert(arr->type_id() == ::arrow::Type::STRUCT); + auto struct_arr = std::static_pointer_cast<::arrow::StructArray>(arr); + auto &vector_children = ::duckdb::StructVector::GetEntries(*out); + + // Sanity checks + if (struct_arr->num_fields() != vector_children.size()) { + throw ::duckdb::InvalidInputException("Struct fields are not expected: %lu != %lu", + struct_arr->num_fields(), + vector_children.size()); + } + + for (int i = 0; i < struct_arr->num_fields(); i++) { + ArrowArrayToVector(struct_arr->field(i), vector_children[i].get()); + } +} + /// Convert a `arrow::Array` to `duckdb::Vector`. void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { switch (arr->type_id()) { @@ -172,6 +195,9 @@ void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Ve case ::arrow::Type::DICTIONARY: ToVector<::arrow::DictionaryType>(arr, out); break; + case ::arrow::Type::STRUCT: + ToVector<::arrow::StructType>(arr, out); + break; default: throw ::duckdb::IOException("Unsupported Arrow Type: " + arr->type()->ToString()); } From 9be4e7386b87d4a3f04cd0ee17509b21959244e9 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 22:43:13 -0800 Subject: [PATCH 10/19] read binary and fixed size list --- .clang-format | 1 + .../duckdb/src/lance/duckdb/lance_reader.cc | 60 +++++++++++++++++-- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/.clang-format b/.clang-format index 9f379d799b..8dab6cde60 100644 --- a/.clang-format +++ b/.clang-format @@ -2,6 +2,7 @@ BasedOnStyle: Google ColumnLimit: 100 BinPackArguments: false BinPackParameters: false +ReferenceAlignment: Left --- Language: Proto BasedOnStyle: Google \ No newline at end of file diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index fd5c597ca3..95fe49817d 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -89,7 +89,6 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( CheckStatus(builder->Project(columns)); auto scanner = GetResult(builder->Finish()); state->batch_generator = GetResult(scanner->ScanBatchesAsync()); - fmt::print("Columns ids: {}\n", input.column_ids); return state; } @@ -119,24 +118,35 @@ void ToVector<::arrow::StringType>(const std::shared_ptr<::arrow::Array> &arr, out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } +/// Convert a String array into duckdb vector. +template <> +void ToVector<::arrow::BinaryType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + auto array = std::static_pointer_cast<::arrow::BinaryArray>(arr); + assert(array != nullptr); + // TODO: How to use zero copy to move data from arrow to duckdb. + for (int i = 0; i < array->length(); ++i) { + auto val = array->Value(i); + out->SetValue(i, ::duckdb::Value::BLOB((::duckdb::data_ptr_t)val.data(), val.size())); + } + out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); +} + template <> void ToVector<::arrow::DictionaryType>(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { auto array = std::static_pointer_cast<::arrow::DictionaryArray>(arr); - fmt::print("arr type->{}\n", arr->type()->ToString()); // TODO: zero copy out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); auto dict_arr = std::dynamic_pointer_cast<::arrow::StringArray>(array->dictionary()); - out->Print(); - fmt::print("Logical type: {}\n", out->GetType().ToString()); auto indices_arr = std::static_pointer_cast<::arrow::Int8Array>(array->indices()); - fmt::print("Index arr: {}\n", fmt::ptr(indices_arr)); for (int i = 0; i < indices_arr->length(); ++i) { auto idx = indices_arr->Value(i); out->SetValue(i, std::string(dict_arr->Value(idx))); } } +/// Convert `arrow::Array` to duckdb Struct Vector. template <> void ToVector<::arrow::StructType>(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { @@ -156,9 +166,40 @@ void ToVector<::arrow::StructType>(const std::shared_ptr<::arrow::Array> &arr, } } +template <> +void ToVector<::arrow::ListType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + /// TODO: zero copy vector construction. + assert(arr->type_id() == ::arrow::Type::LIST); + auto list_arr = std::static_pointer_cast<::arrow::ListArray>(arr); + for (int i = 0; i < list_arr->length(); ++i) { + auto scalar = GetResult(list_arr->GetScalar(i)); + auto list_scalar = std::static_pointer_cast<::arrow::ListScalar>(scalar); + ::duckdb::Vector elem_vector(ToLogicalType(*list_scalar->value->type())); + ArrowArrayToVector(list_scalar->value, &elem_vector); + } +} + +template <> +void ToVector<::arrow::FixedSizeListType>(const std::shared_ptr<::arrow::Array> &arr, + ::duckdb::Vector *out) { + /// TODO: zero copy vector construction. + assert(arr->type_id() == ::arrow::Type::FIXED_SIZE_LIST); + auto list_arr = std::static_pointer_cast<::arrow::FixedSizeListArray>(arr); + for (int i = 0; i < list_arr->length(); ++i) { + auto scalar = GetResult(list_arr->GetScalar(i)); + auto list_scalar = std::static_pointer_cast<::arrow::FixedSizeListScalar>(scalar); + ::duckdb::Vector elem_vector(ToLogicalType(*list_scalar->value->type())); + ArrowArrayToVector(list_scalar->value, &elem_vector); + } +} + /// Convert a `arrow::Array` to `duckdb::Vector`. void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { switch (arr->type_id()) { + case ::arrow::Type::BOOL: + ToVector<::arrow::BooleanType>(arr, out); + break; case ::arrow::Type::UINT8: ToVector<::arrow::UInt8Type>(arr, out); break; @@ -192,12 +233,21 @@ void ArrowArrayToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Ve case ::arrow::Type::STRING: ToVector<::arrow::StringType>(arr, out); break; + case ::arrow::Type::BINARY: + ToVector<::arrow::BinaryType>(arr, out); + break; case ::arrow::Type::DICTIONARY: ToVector<::arrow::DictionaryType>(arr, out); break; case ::arrow::Type::STRUCT: ToVector<::arrow::StructType>(arr, out); break; + case ::arrow::Type::LIST: + ToVector<::arrow::ListType>(arr, out); + break; + case ::arrow::Type::FIXED_SIZE_LIST: + ToVector<::arrow::FixedSizeListType>(arr, out); + break; default: throw ::duckdb::IOException("Unsupported Arrow Type: " + arr->type()->ToString()); } From bc0e24e95b3dc95ed8a1e5aee64289cdb8915c9f Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 22:51:04 -0800 Subject: [PATCH 11/19] use duckdb 20 --- integration/duckdb/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 4769e74bfe..064065ab8f 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -88,7 +88,7 @@ endif() FetchContent_MakeAvailable(${available_contents}) -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) include_directories(${duckdb_SOURCE_DIR}/src/include) From 6751dada9dbca27ddc729f1c9a14598b10bc0f76 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:06:52 -0800 Subject: [PATCH 12/19] use cpp/src --- integration/duckdb/CMakeLists.txt | 2 +- integration/duckdb/src/lance/duckdb/lance_reader.cc | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/integration/duckdb/CMakeLists.txt b/integration/duckdb/CMakeLists.txt index 064065ab8f..b27cb2cdb6 100644 --- a/integration/duckdb/CMakeLists.txt +++ b/integration/duckdb/CMakeLists.txt @@ -112,7 +112,7 @@ endif() # Add lance core as dependency find_package(Arrow REQUIRED) find_package(ArrowDataset REQUIRED) -include_directories(${CMAKE_BINARY_DIR}/lance/src ../../cpp/include) +include_directories(${CMAKE_BINARY_DIR}/lance/src ../../cpp/include ../../cpp/src) add_subdirectory(../../cpp lance) include_directories(src) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 95fe49817d..25e9ad1faf 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -29,6 +29,7 @@ #include #include +#include "lance/arrow/type.h" #include "lance/duckdb/lance.h" namespace lance::duckdb { @@ -93,10 +94,10 @@ std::unique_ptr<::duckdb::GlobalTableFunctionState> InitGlobal( } /// Convert numeric array to duckdb vector. -template +template void ToVector(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) { // TODO: dynamic_pointer_cast does not work here, IDK why. - auto array = std::static_pointer_cast::ArrayType>(arr); + auto array = std::static_pointer_cast::ArrayType>(arr); assert(array != nullptr); // TODO: How to use zero copy to move data from arrow to duckdb. for (int i = 0; i < array->length(); ++i) { From 323130e6fa831949ca837aab59ab63fc4878467e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:12:20 -0800 Subject: [PATCH 13/19] install arrow in duckdb CI --- .github/workflows/duckdb.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/duckdb.yml b/.github/workflows/duckdb.yml index ed3a646641..91a616b3c1 100644 --- a/.github/workflows/duckdb.yml +++ b/.github/workflows/duckdb.yml @@ -18,6 +18,14 @@ jobs: - uses: actions/checkout@v2 - name: ccache uses: hendrikmuhs/ccache-action@v1 + - name: Install dependencies + run: | + sudo apt update + sudo apt install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt update + sudo apt install -y -V libarrow-dev=${ArrowVersion} libarrow-dataset-dev=${ArrowVersion} libparquet-dev=${ArrowVersion} - name: Cmake run: cmake -B build - name: Build @@ -30,6 +38,12 @@ jobs: working-directory: ./integration/duckdb steps: - uses: actions/checkout@v2 + - name: Install dependencies + run: | + brew update + cd $(brew --repository) + git checkout 3.6.8 # Arrow 10.0 + brew install apache-arrow - name: Cmake run: cmake -B build - name: Build From 4cb234b0db577c1fdfb737864a802d3971c8c87e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:29:33 -0800 Subject: [PATCH 14/19] has replacement --- .github/workflows/duckdb.yml | 2 ++ .../src/lance/duckdb/lance-extension.cc | 3 +++ .../duckdb/src/lance/duckdb/lance_reader.cc | 19 +++++++++++++++++++ .../duckdb/src/lance/duckdb/lance_reader.h | 10 +++++++++- 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/.github/workflows/duckdb.yml b/.github/workflows/duckdb.yml index 91a616b3c1..8b15ec07ce 100644 --- a/.github/workflows/duckdb.yml +++ b/.github/workflows/duckdb.yml @@ -14,6 +14,8 @@ jobs: defaults: run: working-directory: ./integration/duckdb + env: + ArrowVersion: 10.0.1-1 steps: - uses: actions/checkout@v2 - name: ccache diff --git a/integration/duckdb/src/lance/duckdb/lance-extension.cc b/integration/duckdb/src/lance/duckdb/lance-extension.cc index 2def507532..57c37c63ed 100644 --- a/integration/duckdb/src/lance/duckdb/lance-extension.cc +++ b/integration/duckdb/src/lance/duckdb/lance-extension.cc @@ -30,6 +30,7 @@ void LanceExtension::Load(::duckdb::DuckDB &db) { con.BeginTransaction(); auto &context = *con.context; auto &catalog = ::duckdb::Catalog::GetCatalog(context); + auto &config = DBConfig::GetConfig(*db.instance); for (auto &func : lance::duckdb::GetListFunctions()) { catalog.CreateFunction(context, func.get()); @@ -51,6 +52,8 @@ void LanceExtension::Load(::duckdb::DuckDB &db) { ::duckdb::CreateTableFunctionInfo scan(scan_func); catalog.CreateTableFunction(context, &scan); + config.replacement_scans.emplace_back(lance::duckdb::LanceScanReplacement); + con.Commit(); } diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index 25e9ad1faf..d9e90b9cb4 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -25,6 +25,8 @@ #include #include +#include +#include #include #include #include @@ -285,4 +287,21 @@ ::duckdb::TableFunctionSet GetLanceReaderFunction() { return func_set; } +std::unique_ptr<::duckdb::TableFunctionRef> LanceScanReplacement( + ::duckdb::ClientContext &context, + const ::std::string &table_name, + ::duckdb::ReplacementScanData *data) { + auto lower_name = ::duckdb::StringUtil::Lower(table_name); + if (!::duckdb::StringUtil::EndsWith(lower_name, ".lance")) { + return nullptr; + } + auto table_function = ::duckdb::make_unique<::duckdb::TableFunctionRef>(); + ::std::vector<::std::unique_ptr<::duckdb::ParsedExpression>> children; + children.emplace_back( + ::std::make_unique<::duckdb::ConstantExpression>(::duckdb::Value(table_name))); + table_function->function = + ::std::make_unique<::duckdb::FunctionExpression>("lance_scan", ::std::move(children)); + return table_function; +} + } // namespace lance::duckdb \ No newline at end of file diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.h b/integration/duckdb/src/lance/duckdb/lance_reader.h index f33bb2ac14..f3a6f4d00b 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.h +++ b/integration/duckdb/src/lance/duckdb/lance_reader.h @@ -16,6 +16,9 @@ #pragma once #include +#include +#include +#include namespace lance::duckdb { @@ -24,4 +27,9 @@ namespace lance::duckdb { /// SELECT * from lance_scan("s3://path/to/dataset"); ::duckdb::TableFunctionSet GetLanceReaderFunction(); -} +std::unique_ptr<::duckdb::TableFunctionRef> LanceScanReplacement( + ::duckdb::ClientContext &context, + const ::std::string &table_name, + ::duckdb::ReplacementScanData *data); + +} // namespace lance::duckdb From 1e0d39399e178681ea7b32dfa56b08f6488a0040 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:31:56 -0800 Subject: [PATCH 15/19] remove unused file --- integration/duckdb/src/lance/duckdb/arrow.h | 21 --------------------- 1 file changed, 21 deletions(-) delete mode 100644 integration/duckdb/src/lance/duckdb/arrow.h diff --git a/integration/duckdb/src/lance/duckdb/arrow.h b/integration/duckdb/src/lance/duckdb/arrow.h deleted file mode 100644 index 10aa6e8c40..0000000000 --- a/integration/duckdb/src/lance/duckdb/arrow.h +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2022 Lance Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#pragma once - -namespace lance::duckdb { - - -} From 5af9bf0774f8eff0c01b1da6fe046b918f4ae863 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:36:40 -0800 Subject: [PATCH 16/19] format --- integration/duckdb/src/lance/duckdb/lance.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/integration/duckdb/src/lance/duckdb/lance.cc b/integration/duckdb/src/lance/duckdb/lance.cc index 944495905d..d39b8bd7ad 100644 --- a/integration/duckdb/src/lance/duckdb/lance.cc +++ b/integration/duckdb/src/lance/duckdb/lance.cc @@ -46,6 +46,7 @@ inline ::duckdb::LogicalType ArrowListTypeToLogicalType(const ::arrow::DataType& } } // namespace + ::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type) { switch (arrow_type.id()) { case ::arrow::Type::BOOL: From fcbd479a8206c29bae3e08b3a6fa5d98fd25310b Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 3 Dec 2022 23:40:02 -0800 Subject: [PATCH 17/19] add time/date logical type --- integration/duckdb/src/lance/duckdb/lance.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integration/duckdb/src/lance/duckdb/lance.cc b/integration/duckdb/src/lance/duckdb/lance.cc index d39b8bd7ad..fd1132a33e 100644 --- a/integration/duckdb/src/lance/duckdb/lance.cc +++ b/integration/duckdb/src/lance/duckdb/lance.cc @@ -74,6 +74,14 @@ ::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type) { case ::arrow::Type::BINARY: case ::arrow::Type::LARGE_BINARY: return ::duckdb::LogicalType::BLOB; + case ::arrow::Type::TIME32: + case ::arrow::Type::TIME64: + return ::duckdb::LogicalType::TIME; + case ::arrow::Type::TIMESTAMP: + return ::duckdb::LogicalType::TIMESTAMP; + case ::arrow::Type::DATE32: + case ::arrow::Type::DATE64: + return ::duckdb::LogicalType::DATE; case ::arrow::Type::DICTIONARY: return ArrowDictionaryTypeToLogicalType(arrow_type); case ::arrow::Type::STRUCT: From d8703d6bda0a3a8834c37e16ba5be98f95c2ca99 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 4 Dec 2022 14:52:13 -0800 Subject: [PATCH 18/19] use override --- integration/duckdb/src/lance/duckdb/lance.cc | 23 ++++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/integration/duckdb/src/lance/duckdb/lance.cc b/integration/duckdb/src/lance/duckdb/lance.cc index fd1132a33e..371f710eef 100644 --- a/integration/duckdb/src/lance/duckdb/lance.cc +++ b/integration/duckdb/src/lance/duckdb/lance.cc @@ -24,24 +24,23 @@ namespace lance::duckdb { namespace { -inline ::duckdb::LogicalType ArrowDictionaryTypeToLogicalType(const ::arrow::DataType& dtype) { - auto& dict_type = dynamic_cast(dtype); - return ToLogicalType(*dict_type.value_type()); +inline ::duckdb::LogicalType ToLogicalType(const ::arrow::DictionaryType& dtype) { + return lance::duckdb::ToLogicalType(*dtype.value_type()); } -inline ::duckdb::LogicalType ArrowStructTypeToLogicalType(const ::arrow::DataType& dtype) { - auto& struct_type = dynamic_cast(dtype); +inline ::duckdb::LogicalType ToLogicalType(const ::arrow::StructType& struct_type) { ::duckdb::child_list_t<::duckdb::LogicalType> children; for (auto& child : struct_type.fields()) { - children.emplace_back(std::make_pair(child->name(), ToLogicalType(*child->type()))); + children.emplace_back( + std::make_pair(child->name(), lance::duckdb::ToLogicalType(*child->type()))); } return ::duckdb::LogicalType::STRUCT(children); } template -inline ::duckdb::LogicalType ArrowListTypeToLogicalType(const ::arrow::DataType& dtype) { +inline ::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& dtype) { auto& list_type = dynamic_cast(dtype); - auto child_type = ToLogicalType(*list_type.value_type()); + auto child_type = lance::duckdb::ToLogicalType(*list_type.value_type()); return ::duckdb::LogicalType::LIST(child_type); } @@ -83,13 +82,13 @@ ::duckdb::LogicalType ToLogicalType(const ::arrow::DataType& arrow_type) { case ::arrow::Type::DATE64: return ::duckdb::LogicalType::DATE; case ::arrow::Type::DICTIONARY: - return ArrowDictionaryTypeToLogicalType(arrow_type); + return ToLogicalType(dynamic_cast(arrow_type)); case ::arrow::Type::STRUCT: - return ArrowStructTypeToLogicalType(arrow_type); + return ToLogicalType(dynamic_cast(arrow_type)); case ::arrow::Type::LIST: - return ArrowListTypeToLogicalType<::arrow::ListType>(arrow_type); + return ToLogicalType<::arrow::ListType>(arrow_type); case ::arrow::Type::FIXED_SIZE_LIST: - return ArrowListTypeToLogicalType<::arrow::FixedSizeListType>(arrow_type); + return ToLogicalType<::arrow::FixedSizeListType>(arrow_type); default: throw ::duckdb::InvalidInputException("Does not support type: %s", arrow_type.ToString().c_str()); From 9a3711020706ea2dc9fcb91f3ccc63d903604892 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 4 Dec 2022 18:14:51 -0800 Subject: [PATCH 19/19] fix typo --- integration/duckdb/src/lance/duckdb/lance_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/duckdb/src/lance/duckdb/lance_reader.cc b/integration/duckdb/src/lance/duckdb/lance_reader.cc index d9e90b9cb4..9bc93f7a14 100644 --- a/integration/duckdb/src/lance/duckdb/lance_reader.cc +++ b/integration/duckdb/src/lance/duckdb/lance_reader.cc @@ -121,7 +121,7 @@ void ToVector<::arrow::StringType>(const std::shared_ptr<::arrow::Array> &arr, out->SetVectorType(::duckdb::VectorType::FLAT_VECTOR); } -/// Convert a String array into duckdb vector. +/// Convert a Binary array into duckdb vector. template <> void ToVector<::arrow::BinaryType>(const std::shared_ptr<::arrow::Array> &arr, ::duckdb::Vector *out) {