Skip to content

Commit

Permalink
Implement FORMAT option in LOAD FROM clause. (#4613)
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin authored and ray6080 committed Dec 17, 2024
1 parent 212da4d commit 6d6aa45
Show file tree
Hide file tree
Showing 27 changed files with 159 additions and 78 deletions.
Binary file added dataset/copy-test/node/parquet/types_50k_0.par
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_double.jpy
Binary file not shown.
6 changes: 6 additions & 0 deletions dataset/tinysnb/vPerson.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDuration,workedHours,usedNames,courseScoresPerTerm,grades,height,u
0,Alice,1,true,false,35,5.0,1900-01-01,2011-08-20 11:25:30Z+00:00,3 years 2 days 13 hours 2 minutes,"[10,5]","[Aida]","[[10,8],[6,7,8]]","[96,54,86,92]",1.731,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11
2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,"[12,8]","[Bobby]","[[8,9],[9,10]]","[98,42,93,88]",0.99,{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a12}
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]","[91,75,21,95]",1.00,a0eebc999c0b4ef8bb6d6bb9bd380a13
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]","[76,88,99,89]",1.30,a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a14
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]","[96,59,65,88]",1.463,{a0eebc99-9c0b4ef8-bb6d6bb9-bd380a15}
26 changes: 15 additions & 11 deletions extension/delta/src/function/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
#include "connector/delta_connector.h"
#include "connector/duckdb_result_converter.h"
#include "connector/duckdb_type_converter.h"
#include "function/table/scan_functions.h"

namespace kuzu {
namespace delta_extension {

using namespace function;
using namespace common;

struct DeltaScanBindData : public CallTableFuncBindData {
struct DeltaScanBindData : public ScanBindData {
std::string query;
std::shared_ptr<DeltaConnector> connector;
duckdb_extension::DuckDBResultConverter converter;

DeltaScanBindData(std::string query, std::shared_ptr<DeltaConnector> connector,
duckdb_extension::DuckDBResultConverter converter, std::vector<LogicalType> returnTypes,
std::vector<std::string> returnColumnNames)
: CallTableFuncBindData{std::move(returnTypes), std::move(returnColumnNames),
1 /* maxOffset */},
std::vector<std::string> returnColumnNames, ReaderConfig config, main::ClientContext* ctx)
: ScanBindData{std::move(returnTypes), std::move(returnColumnNames), std::move(config),
ctx},
query{std::move(query)}, connector{std::move(connector)},
converter{std::move(converter)} {}

Expand All @@ -38,27 +39,30 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
input->inputs[0].getValue<std::string>());
auto result = connector->executeQuery(query + " LIMIT 1");
std::vector<LogicalType> returnTypes;
std::vector<std::string> returnColumnNames;
std::vector<std::string> returnColumnNames = input->expectedColumnNames;
for (auto type : result->types) {
returnTypes.push_back(
duckdb_extension::DuckDBTypeConverter::convertDuckDBType(type.ToString()));
}

for (auto name : result->names) {
returnColumnNames.push_back(name);
if (input->expectedColumnNames.empty()) {
for (auto name : result->names) {
returnColumnNames.push_back(name);
}
}
KU_ASSERT(returnTypes.size() == returnColumnNames.size());
return std::make_unique<DeltaScanBindData>(std::move(query), connector,
duckdb_extension::DuckDBResultConverter{returnTypes}, copyVector(returnTypes),
std::move(returnColumnNames));
std::move(returnColumnNames), ReaderConfig{}, context);
}

struct DeltaScanSharedState : public function::TableFuncSharedState {
struct DeltaScanSharedState : public BaseScanSharedState {
explicit DeltaScanSharedState(std::unique_ptr<duckdb::MaterializedQueryResult> queryResult)
: queryResult{std::move(queryResult)} {}
: BaseScanSharedState{}, queryResult{std::move(queryResult)} {}

uint64_t getNumRows() const override { return queryResult->RowCount(); }

std::unique_ptr<duckdb::MaterializedQueryResult> queryResult;
std::mutex lock;
};

std::unique_ptr<TableFuncSharedState> initDeltaScanSharedState(TableFunctionInitInput& input) {
Expand Down
49 changes: 37 additions & 12 deletions extension/delta/test/test_files/delta.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,21 @@
-STATEMENT load extension "${KUZU_ROOT_DIRECTORY}/extension/delta/build/libdelta.kuzu_extension"
---- ok

-LOG LocalDeltaTable
-STATEMENT CALL DELTA_SCAN('${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/person_table') RETURN *
-LOG LoadFromLocalDeltaTable
-STATEMENT LOAD FROM '${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/person_table'(file_format='delta') RETURN *
---- 5
Alice|25|1000.500000|1999-01-15 00:00:00
Bob|30|1500.750000|1994-06-25 00:00:00
Charlie|35|2000.000000|1988-12-05 00:00:00
Diana|40|2500.300000|1983-07-20 00:00:00
Ethan|28|1800.600000|1996-03-10 00:00:00

-LOG CopyFromLocalDeltaTable
-STATEMENT CREATE NODE TABLE person(fname STRING, ID INT64, salary double, birth_date timestamp, PRIMARY KEY(fname));
---- ok
-STATEMENT COPY person FROM '${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/person_table'(file_format='delta');
---- ok
-STATEMENT MATCH (p:person) RETURN p.*;
---- 5
Alice|25|1000.500000|1999-01-15 00:00:00
Bob|30|1500.750000|1994-06-25 00:00:00
Expand All @@ -28,7 +41,7 @@ Ethan|28|1800.600000|1996-03-10 00:00:00
---- ok
-STATEMENT CALL s3_region='US'
---- ok
-STATEMENT CALL DELTA_SCAN('s3://kuzu-test/delta/university') RETURN *
-STATEMENT LOAD FROM 's3://kuzu-test/delta/university' (file_format='delta') RETURN *
---- 6
Cambridge|5|280.200000
Harvard|1|210.500000
Expand All @@ -38,23 +51,35 @@ Stanford|2|250.300000
Yale|6|190.700000

-LOG TableOfDifferentTypes
-STATEMENT CALL DELTA_SCAN('${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/student') RETURN *
-STATEMENT LOAD FROM '${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/student' (file_format='delta') RETURN *
---- 3
Alice|[52,24,31]|True|2020-05-17 00:00:00|{age: 25, gender: female}
Bob|[15,66,72]|False|2011-03-22 00:00:00|{age: 22, gender: male}
Carol|[29,24,11]||2001-04-15 00:00:00|{age: 33, gender: female}

-LOG InvalidDeltaPath
-STATEMENT CALL DELTA_SCAN('${KUZU_ROOT_DIRECTORY}/student') RETURN *
---- error
IO Error: Hit DeltaKernel FFI error (from: get_engine_interface_builder for path ${KUZU_ROOT_DIRECTORY}/student/): Hit error: 27 (InvalidTableLocation) with message (Invalid table location: Path does not exist: "${KUZU_ROOT_DIRECTORY}/student/".)

-LOG InvalidDeltaPath
-STATEMENT CALL DELTA_SCAN('${KUZU_ROOT_DIRECTORY}/student') RETURN *
-STATEMENT LOAD FROM '${KUZU_ROOT_DIRECTORY}/student'(file_format='delta') RETURN *
---- error
IO Error: Hit DeltaKernel FFI error (from: get_engine_interface_builder for path ${KUZU_ROOT_DIRECTORY}/student/): Hit error: 27 (InvalidTableLocation) with message (Invalid table location: Path does not exist: "${KUZU_ROOT_DIRECTORY}/student/".)
Binder exception: No file found that matches the pattern: ${KUZU_ROOT_DIRECTORY}/student.

-LOG InvalidDeltaTable
-STATEMENT CALL DELTA_SCAN('${KUZU_ROOT_DIRECTORY}/src') RETURN *
-STATEMENT LOAD FROM '${KUZU_ROOT_DIRECTORY}/src' (file_format='delta') RETURN *
---- error
IO Error: Hit DeltaKernel FFI error (from: While trying to read from delta table: '${KUZU_ROOT_DIRECTORY}/src/'): Hit error: 15 (MissingVersionError) with message (No table version found.)

-LOG LoadDeltaTableWithOutType
-STATEMENT LOAD FROM '${KUZU_ROOT_DIRECTORY}/extension/delta/test/delta_tables/person_table' RETURN *
---- error
Binder exception: Cannot infer the format of the given file. Please set the file format explicitly by (file_format=<type>).

-LOG CopyFromInvalidPath
-STATEMENT COPY person FROM '${KUZU_ROOT_DIRECTORY}/'(file_format='delta');
---- error
IO Error: Hit DeltaKernel FFI error (from: While trying to read from delta table: '${KUZU_ROOT_DIRECTORY}/'): Hit error: 15 (MissingVersionError) with message (No table version found.)
-STATEMENT MATCH (p:person) RETURN p.*;
---- 5
Alice|25|1000.500000|1999-01-15 00:00:00
Bob|30|1500.750000|1994-06-25 00:00:00
Charlie|35|2000.000000|1988-12-05 00:00:00
Diana|40|2500.300000|1983-07-20 00:00:00
Ethan|28|1800.600000|1996-03-10 00:00:00
4 changes: 2 additions & 2 deletions extension/json/src/functions/table_functions/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ struct JsonScanConfig {
int64_t breadth = JsonConstant::DEFAULT_JSON_DETECT_BREADTH;
bool autoDetect = JsonConstant::DEFAULT_AUTO_DETECT_VALUE;

explicit JsonScanConfig(const std::unordered_map<std::string, Value>& options);
explicit JsonScanConfig(const common::case_insensitive_map_t<Value>& options);
};

JsonScanConfig::JsonScanConfig(const std::unordered_map<std::string, Value>& options) {
JsonScanConfig::JsonScanConfig(const common::case_insensitive_map_t<Value>& options) {
for (const auto& i : options) {
if (i.first == "FORMAT") {
if (i.second.getDataType().getLogicalTypeID() != LogicalTypeID::STRING) {
Expand Down
2 changes: 1 addition & 1 deletion extension/json/src/include/json_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace kuzu {
namespace json_extension {

struct JsonScan {
static constexpr const char* name = common::SCAN_JSON_FUNC_NAME;
static constexpr const char* name = "JSON_SCAN";

static std::unique_ptr<function::TableFunction> getFunction();
};
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static std::vector<ExportedTableData> getExportInfo(const Catalog& catalog, Tran
return exportData;
}

FileTypeInfo getFileType(std::unordered_map<std::string, common::Value>& options) {
FileTypeInfo getFileType(case_insensitive_map_t<common::Value>& options) {
auto fileTypeInfo = FileTypeInfo{FileType::CSV, "CSV"};
if (options.find("FORMAT") != options.end()) {
auto value = options.at("FORMAT");
Expand Down
29 changes: 23 additions & 6 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "common/exception/binder.h"
#include "common/exception/copy.h"
#include "common/exception/message.h"
#include "common/file_system/local_file_system.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_format.h"
#include "common/string_utils.h"
Expand All @@ -24,7 +25,8 @@ namespace binder {
FileTypeInfo bindSingleFileType(main::ClientContext* context, const std::string& filePath) {
std::filesystem::path fileName(filePath);
auto extension = context->getVFSUnsafe()->getFileExtension(fileName);
return FileTypeInfo{FileTypeUtils::getFileTypeFromExtension(extension), extension.substr(1)};
return FileTypeInfo{FileTypeUtils::getFileTypeFromExtension(extension),
extension.substr(std::min<uint64_t>(1, extension.length()))};
}

FileTypeInfo Binder::bindFileTypeInfo(const std::vector<std::string>& filePaths) {
Expand All @@ -43,6 +45,14 @@ FileTypeInfo Binder::bindFileTypeInfo(const std::vector<std::string>& filePaths)
std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
std::vector<std::string> boundFilePaths;
for (auto& filePath : filePaths) {
// This is a temporary workaround because we use duckdb to read from iceberg/delta.
// When we read delta/iceberg tables from s3/httpfs, we don't have the httpfs extension
// loaded meaning that we cannot handle remote paths. So we pass the file path to duckdb
// for validation when we bindFileScanSource.
if (!LocalFileSystem::isLocalPath(filePath)) {
boundFilePaths.push_back(filePath);
continue;
}
auto globbedFilePaths = clientContext->getVFSUnsafe()->glob(clientContext, filePath);
if (globbedFilePaths.empty()) {
throw BinderException{
Expand All @@ -55,9 +65,8 @@ std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& f
return boundFilePaths;
}

std::unordered_map<std::string, Value> Binder::bindParsingOptions(
const parser::options_t& parsingOptions) {
std::unordered_map<std::string, Value> options;
case_insensitive_map_t<Value> Binder::bindParsingOptions(const parser::options_t& parsingOptions) {
case_insensitive_map_t<Value> options;
for (auto& option : parsingOptions) {
auto name = option.first;
common::StringUtils::toUpper(name);
Expand Down Expand Up @@ -92,9 +101,17 @@ std::unique_ptr<BoundBaseScanSource> Binder::bindFileScanSource(const BaseScanSo
const std::vector<LogicalType>& columnTypes) {
auto fileSource = scanSource.constPtrCast<FileScanSource>();
auto filePaths = bindFilePaths(fileSource->filePaths);
auto fileTypeInfo = bindFileTypeInfo(filePaths);
auto parsingOptions = bindParsingOptions(options);
FileTypeInfo fileTypeInfo;
if (parsingOptions.contains(ReaderConfig::FILE_FORMAT_OPTION_NAME)) {
auto fileFormat = parsingOptions.at(ReaderConfig::FILE_FORMAT_OPTION_NAME).toString();
fileTypeInfo = FileTypeInfo{FileTypeUtils::fromString(fileFormat), fileFormat};
parsingOptions.erase(ReaderConfig::FILE_FORMAT_OPTION_NAME);
} else {
fileTypeInfo = bindFileTypeInfo(filePaths);
}
auto config = std::make_unique<ReaderConfig>(std::move(fileTypeInfo), std::move(filePaths));
config->options = bindParsingOptions(options);
config->options = std::move(parsingOptions);
auto func = getScanFunction(config->fileTypeInfo, *config);
auto bindInput = std::make_unique<ScanTableFuncBindInput>(config->copy(), columnNames,
LogicalType::copy(columnTypes), clientContext, &func);
Expand Down
1 change: 0 additions & 1 deletion src/binder/bind/read/bind_load_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(const ReadingClause& re
if (filePaths.size() > 1) {
throw BinderException("Load from multiple files is not supported.");
}
auto fileTypeInfo = bindFileTypeInfo(filePaths);
auto boundScanSource =
bindFileScanSource(*source, loadFrom.getParsingOptions(), columnNames, columnTypes);
auto& scanInfo = boundScanSource->constCast<BoundTableScanSource>().info;
Expand Down
8 changes: 6 additions & 2 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,16 @@ function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const Rea
csvConfig.parallel ? ParallelCSVScan::name : SerialCSVScan::name, inputTypes,
functions);
} break;
case FileType::TURTLE:
case FileType::UNKNOWN: {
try {
func = function::BuiltInFunctionsUtils::matchFunction(clientContext->getTx(),
SCAN_JSON_FUNC_NAME, inputTypes, functions);
common::stringFormat("{}_SCAN", typeInfo.fileTypeStr), inputTypes, functions);
} catch (...) {
if (typeInfo.fileTypeStr == "") {
throw common::BinderException{
"Cannot infer the format of the given file. Please "
"set the file format explicitly by (file_format=<type>)."};
}
throw common::BinderException{
common::stringFormat("Cannot load from file type {}.", typeInfo.fileTypeStr)};
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/copier_config/csv_reader_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static bool isValidBooleanOptionValue(const Value& value, const std::string& nam
}

CSVReaderConfig CSVReaderConfig::construct(
const std::unordered_map<std::string, common::Value>& options) {
const common::case_insensitive_map_t<common::Value>& options) {
auto config = CSVReaderConfig();
for (auto& op : options) {
auto name = op.first;
Expand Down
24 changes: 4 additions & 20 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "common/copier_config/reader_config.h"

#include "common/assert.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"

namespace kuzu {
namespace common {
Expand All @@ -16,15 +16,6 @@ FileType FileTypeUtils::getFileTypeFromExtension(std::string_view extension) {
if (extension == ".npy") {
return FileType::NPY;
}
if (extension == ".ttl") {
return FileType::TURTLE;
}
if (extension == ".nq") {
return FileType::NQUADS;
}
if (extension == ".nt") {
return FileType::NTRIPLES;
}
return FileType::UNKNOWN;
}

Expand All @@ -42,30 +33,23 @@ std::string FileTypeUtils::toString(FileType fileType) {
case FileType::NPY: {
return "NPY";
}
case FileType::TURTLE: {
return "TURTLE";
}
case FileType::NQUADS: {
return "NQUADS";
}
case FileType::NTRIPLES: {
return "NTRIPLES";
}
default: {
KU_UNREACHABLE;
}
}
}

FileType FileTypeUtils::fromString(std::string fileType) {
fileType = common::StringUtils::getUpper(fileType);
if (fileType == "CSV") {
return FileType::CSV;
} else if (fileType == "PARQUET") {
return FileType::PARQUET;
} else if (fileType == "NPY") {
return FileType::NPY;
} else {
throw BinderException(stringFormat("Unsupported file type: {}.", fileType));
return FileType::UNKNOWN;
// throw BinderException(stringFormat("Unsupported file type: {}.", fileType));
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ std::string LocalFileSystem::expandPath(main::ClientContext* context,
return fullPath;
}

bool LocalFileSystem::isLocalPath(const std::string& path) {
return path.rfind("s3://", 0) != 0 && path.rfind("http://", 0) != 0 &&
path.rfind("https://", 0) != 0;
}

void LocalFileSystem::readFromFile(FileInfo& fileInfo, void* buffer, uint64_t numBytes,
uint64_t position) const {
auto localFileInfo = fileInfo.constPtrCast<LocalFileInfo>();
Expand Down
2 changes: 1 addition & 1 deletion src/function/export/export_parquet_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct ParquetOptions {
kuzu_parquet::format::CompressionCodec::type codec =
kuzu_parquet::format::CompressionCodec::SNAPPY;

explicit ParquetOptions(std::unordered_map<std::string, common::Value> parsingOptions) {
explicit ParquetOptions(case_insensitive_map_t<common::Value> parsingOptions) {
for (auto& [name, value] : parsingOptions) {
if (name == "COMPRESSION") {
setCompression(value);
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class Binder {
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes);

std::unordered_map<std::string, common::Value> bindParsingOptions(
common::case_insensitive_map_t<common::Value> bindParsingOptions(
const parser::options_t& parsingOptions);
common::FileTypeInfo bindFileTypeInfo(const std::vector<std::string>& filePaths);
std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);
Expand Down
4 changes: 2 additions & 2 deletions src/include/binder/bound_export_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class BoundExportDatabase final : public BoundStatement {
public:
BoundExportDatabase(std::string filePath, common::FileTypeInfo fileTypeInfo,
std::vector<ExportedTableData> exportData,
std::unordered_map<std::string, common::Value> csvOption)
common::case_insensitive_map_t<common::Value> csvOption)
: BoundStatement{common::StatementType::EXPORT_DATABASE,
BoundStatementResult::createSingleStringColumnResult()},
exportData(std::move(exportData)),
Expand All @@ -31,7 +31,7 @@ class BoundExportDatabase final : public BoundStatement {

std::string getFilePath() const { return boundFileInfo.filePaths[0]; }
common::FileType getFileType() const { return boundFileInfo.fileTypeInfo.fileType; }
std::unordered_map<std::string, common::Value> getExportOptions() const {
common::case_insensitive_map_t<common::Value> getExportOptions() const {
return boundFileInfo.options;
}
const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }
Expand Down
2 changes: 0 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,5 @@ static constexpr char LOCAL_DB_NAME[] = "local(kuzu)";

constexpr auto DECIMAL_PRECISION_LIMIT = 38;

static constexpr char SCAN_JSON_FUNC_NAME[] = "READ_JSON";

} // namespace common
} // namespace kuzu
Loading

0 comments on commit 6d6aa45

Please sign in to comment.