Skip to content

Commit

Permalink
adding hdfs support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored and rui-mo committed May 11, 2022
1 parent cbd85fd commit 874b563
Show file tree
Hide file tree
Showing 12 changed files with 883 additions and 0 deletions.
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ option(VELOX_ENABLE_SUBSTRAIT "Buid Substrait-to-Velox converter." OFF)
option(VELOX_ENABLE_BENCHMARKS "Build velox top level benchmarks." OFF)
option(VELOX_ENABLE_BENCHMARKS_BASIC "Build velox basic benchmarks." OFF)
option(VELOX_ENABLE_S3 "Build S3 Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_BUILD_TEST_UTILS "Enable Velox test utilities" OFF)

Expand Down Expand Up @@ -106,6 +107,14 @@ if(VELOX_ENABLE_S3)
add_definitions(-DVELOX_ENABLE_S3)
endif()

if(VELOX_ENABLE_HDFS)
find_library(
LIBHDFS3
NAMES libhdfs3.so
HINTS "/usr/local" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

if(VELOX_ENABLE_PARQUET)
add_definitions(-DVELOX_ENABLE_PARQUET)
endif()
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/storage_adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@
if(VELOX_ENABLE_S3)
add_subdirectory(s3fs)
endif()
if(VELOX_ENABLE_HDFS)
add_subdirectory(hdfs)
endif()
22 changes: 22 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# for generated headers

add_library(velox_hdfs HdfsFileSystem.cpp HdfsReadFile.cpp)
target_link_libraries(velox_hdfs ${LIBHDFS3})

if(${VELOX_BUILD_TESTING})
#add_subdirectory(tests)
endif()
115 changes: 115 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "HdfsFileSystem.h"
#include <hdfs/hdfs.h>
#include "HdfsReadFile.h"
#include "velox/common/file/FileSystems.h"
#include "velox/core/Context.h"

namespace facebook::velox::filesystems {
folly::once_flag hdfsInitiationFlag;
std::string_view HdfsFileSystem::kScheme("hdfs:/");

class HdfsFileSystem::Impl {
public:
explicit Impl(const Config* config) {
auto endpointInfo = getServiceEndpoint(config);
auto builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(builder, endpointInfo.host.c_str());
hdfsBuilderSetNameNodePort(builder, endpointInfo.port);
hdfsClient_ = hdfsBuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS, got error:{}.",
hdfsGetLastError())
}

~Impl() {
LOG(INFO) << "Disconnecting HDFS file system";
int disconnectResult = hdfsDisconnect(hdfsClient_);
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
}
}

static HdfsServiceEndpoint getServiceEndpoint(const Config* config) {
auto hdfsHost = config->get("hdfs_host");
VELOX_CHECK(
hdfsHost.hasValue(),
"hdfsHost is empty, configuration missing for hdfs host");
auto hdfsPort = config->get("hdfs_port");
VELOX_CHECK(
hdfsPort.hasValue(),
"hdfsPort is empty, configuration missing for hdfs port");
HdfsServiceEndpoint endpoint{*hdfsHost, atoi(hdfsPort->data())};
return endpoint;
}

hdfsFS hdfsClient() {
return hdfsClient_;
}

private:
hdfsFS hdfsClient_;
};

HdfsFileSystem::HdfsFileSystem(const std::shared_ptr<const Config>& config)
: FileSystem(config) {
impl_ = std::make_shared<Impl>(config.get());
}

std::string HdfsFileSystem::name() const {
return "HDFS";
}

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path) {
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
}

return std::make_unique<HdfsReadFile>(impl_->hdfsClient(), path);
}

std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
std::string_view path) {
VELOX_UNSUPPORTED("Write to HDFS is unsupported");
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filename) {
return filename.find(kScheme) == 0;
}

static std::function<std::shared_ptr<FileSystem>(std::shared_ptr<const Config>)>
filesystemGenerator = [](std::shared_ptr<const Config> properties) {
static std::shared_ptr<FileSystem> filesystem;
folly::call_once(hdfsInitiationFlag, [&properties]() {
filesystem = std::make_shared<HdfsFileSystem>(properties);
});
return filesystem;
};

void HdfsFileSystem::remove(std::string_view path) {
VELOX_UNSUPPORTED("Does not support removing files from hdfs");
}

void registerHdfsFileSystem() {
registerFileSystem(
HdfsFileSystem::isHdfsFile, filesystemGenerator);
}
} // namespace facebook::velox::filesystems

48 changes: 48 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/common/file/FileSystems.h"

namespace facebook::velox::filesystems {
struct HdfsServiceEndpoint {
std::string host;
int port;
};

class HdfsFileSystem : public FileSystem {
private:
static std::string_view kScheme;

public:
explicit HdfsFileSystem(const std::shared_ptr<const Config>& config);

std::string name() const override;

std::unique_ptr<ReadFile> openFileForRead(std::string_view path) override;

std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) override;

void remove(std::string_view path) override;

static bool isHdfsFile(std::string_view filename);

protected:
class Impl;
std::shared_ptr<Impl> impl_;
};

// Register the HDFS.
void registerHdfsFileSystem();
} // namespace facebook::velox::filesystems
103 changes: 103 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#include <hdfs/hdfs.h>

namespace facebook::velox {

HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
VELOX_CHECK_NOT_NULL(
fileInfo_,
"Unable to get file path info for file: {}. got error: {}",
filePath_,
hdfsGetLastError());
}

void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
auto file = hdfsOpenFile(hdfsClient_, filePath_.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
file,
"Unable to open file {}. got error: {}",
filePath_,
hdfsGetLastError());
seekToPosition(file, offset);
uint64_t totalBytesRead = 0;
while (totalBytesRead < length) {
auto bytesRead = hdfsRead(hdfsClient_, file, pos, length - totalBytesRead);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.")
totalBytesRead += bytesRead;
pos += bytesRead;
}

if (hdfsCloseFile(hdfsClient_, file) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void HdfsReadFile::seekToPosition(hdfsFile file, uint64_t offset) const {
auto seekStatus = hdfsSeek(hdfsClient_, file, offset);
VELOX_CHECK_EQ(
seekStatus,
0,
"Cannot seek through HDFS file: {}, error: {}",
filePath_,
std::string(hdfsGetLastError()));
}

std::string_view
HdfsReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
preadInternal(offset, length, static_cast<char*>(buf));
return {static_cast<char*>(buf), length};
}

std::string HdfsReadFile::pread(uint64_t offset, uint64_t length) const {
std::string result(length, 0);
char* pos = result.data();
preadInternal(offset, length, pos);
return result;
}

uint64_t HdfsReadFile::size() const {
return fileInfo_->mSize;
}

uint64_t HdfsReadFile::memoryUsage() const {
return fileInfo_->mBlockSize;
}

bool HdfsReadFile::shouldCoalesce() const {
return false;
}

void HdfsReadFile::checkFileReadParameters(uint64_t offset, uint64_t length)
const {
auto fileSize = size();
auto endPoint = offset + length;
VELOX_CHECK_GE(
fileSize,
endPoint,
"Cannot read HDFS file beyond its size: {}, offset: {}, end point: {}",
fileSize,
offset,
endPoint)
}
} // namespace facebook::velox
45 changes: 45 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <hdfs/hdfs.h>
#include "velox/common/file/File.h"

namespace facebook::velox {

class HdfsReadFile final : public ReadFile {
public:
explicit HdfsReadFile(hdfsFS hdfs, std::string_view path);

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const final;

std::string pread(uint64_t offset, uint64_t length) const final;

uint64_t size() const final;

uint64_t memoryUsage() const final;

bool shouldCoalesce() const final;

private:
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;
void seekToPosition(hdfsFile file, uint64_t offset) const;
void checkFileReadParameters(uint64_t offset, uint64_t length) const;
hdfsFS hdfsClient_;
hdfsFileInfo* fileInfo_;
std::string filePath_;
};
} // namespace facebook::velox
27 changes: 27 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_hdfs_file_test HdfsFileSystemTest.cpp HdfsMiniCluster.cpp)
add_test(velox_hdfs_file_test velox_hdfs_file_test)
target_link_libraries(
velox_hdfs_file_test
velox_file
velox_hdfs
velox_core
velox_exec_test_util
velox_hive_connector
velox_dwio_common_exception
velox_exec
gtest
gtest_main)
Loading

0 comments on commit 874b563

Please sign in to comment.