From bc49dc59ad304daa8ddf21c2595b1819c6c6a0af Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sun, 19 Jun 2022 20:14:56 +0800 Subject: [PATCH] hdfs support Signed-off-by: Yuan Zhou --- CMakeLists.txt | 11 +- .../hive/storage_adapters/CMakeLists.txt | 3 + .../hive/storage_adapters/hdfs/CMakeLists.txt | 22 ++ .../storage_adapters/hdfs/HdfsFileSystem.cpp | 116 ++++++ .../storage_adapters/hdfs/HdfsFileSystem.h | 55 +++ .../storage_adapters/hdfs/HdfsReadFile.cpp | 103 +++++ .../hive/storage_adapters/hdfs/HdfsReadFile.h | 45 +++ .../hdfs/tests/CMakeLists.txt | 28 ++ .../hdfs/tests/HdfsFileSystemTest.cpp | 360 ++++++++++++++++++ .../hdfs/tests/HdfsMiniCluster.cpp | 115 ++++++ .../hdfs/tests/HdfsMiniCluster.h | 65 ++++ velox/exec/tests/utils/TempFilePath.h | 8 + 12 files changed, 930 insertions(+), 1 deletion(-) create mode 100644 velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt create mode 100644 velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp create mode 100644 velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h create mode 100644 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp create mode 100644 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h create mode 100644 velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt create mode 100644 velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp create mode 100644 velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.cpp create mode 100644 velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 48937fe374fc..682ccde123b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,7 +44,8 @@ option(VELOX_ENABLE_SUBSTRAIT "Buid Substrait-to-Velox converter." OFF) option(VELOX_ENABLE_BENCHMARKS "Build velox top level benchmarks." OFF) option(VELOX_ENABLE_BENCHMARKS_BASIC "Build velox basic benchmarks." OFF) option(VELOX_ENABLE_S3 "Build S3 Connector" OFF) -option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) +option(VELOX_ENABLE_HDFS "Build Hdfs Connector" ON) +option(VELOX_ENABLE_PARQUET "Enable Parquet support" ON) option(VELOX_BUILD_TEST_UTILS "Enable Velox test utilities" OFF) if(${VELOX_BUILD_MINIMAL}) @@ -106,6 +107,14 @@ if(VELOX_ENABLE_S3) add_definitions(-DVELOX_ENABLE_S3) endif() +if(VELOX_ENABLE_HDFS) + find_library( + LIBHDFS3 + NAMES libhdfs3.so libhdfs3.dylib + HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) + add_definitions(-DVELOX_ENABLE_HDFS3) +endif() + if(VELOX_ENABLE_PARQUET) add_definitions(-DVELOX_ENABLE_PARQUET) endif() diff --git a/velox/connectors/hive/storage_adapters/CMakeLists.txt b/velox/connectors/hive/storage_adapters/CMakeLists.txt index e836f1ada192..c07ba92d178c 100644 --- a/velox/connectors/hive/storage_adapters/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/CMakeLists.txt @@ -15,3 +15,6 @@ if(VELOX_ENABLE_S3) add_subdirectory(s3fs) endif() +if(VELOX_ENABLE_HDFS) + add_subdirectory(hdfs) +endif() diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt new file mode 100644 index 000000000000..4427191524a0 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for generated headers + +add_library(velox_hdfs HdfsFileSystem.cpp HdfsReadFile.cpp) +target_link_libraries(velox_hdfs ${LIBHDFS3}) + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp new file mode 100644 index 000000000000..c528bd0dd0da --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HdfsFileSystem.h" +#include +#include "HdfsReadFile.h" +#include "velox/common/file/FileSystems.h" +#include "velox/core/Context.h" + +namespace facebook::velox::filesystems { +folly::once_flag hdfsInitiationFlag; +std::string_view HdfsFileSystem::kScheme("hdfs://"); + +class HdfsFileSystem::Impl { + public: + explicit Impl(const Config* config) { + auto endpointInfo = getServiceEndpoint(config); + auto builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, endpointInfo.host.c_str()); + hdfsBuilderSetNameNodePort(builder, endpointInfo.port); + hdfsClient_ = hdfsBuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfsClient_, + "Unable to connect to HDFS, got error: {}.", + hdfsGetLastError()) + } + + ~Impl() { + LOG(INFO) << "Disconnecting HDFS file system"; + int disconnectResult = hdfsDisconnect(hdfsClient_); + if (disconnectResult != 0) { + LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: " + << errno; + } + } + + static HdfsServiceEndpoint getServiceEndpoint(const Config* config) { + auto hdfsHost = config->get("hive.hdfs.host"); + VELOX_CHECK( + hdfsHost.hasValue(), + "hdfsHost is empty, configuration missing for hdfs host"); + auto hdfsPort = config->get("hive.hdfs.port"); + VELOX_CHECK( + hdfsPort.hasValue(), + "hdfsPort is empty, configuration missing for hdfs port"); + HdfsServiceEndpoint endpoint{*hdfsHost, atoi(hdfsPort->data())}; + return endpoint; + } + + hdfsFS hdfsClient() { + return hdfsClient_; + } + + private: + hdfsFS hdfsClient_; +}; + +HdfsFileSystem::HdfsFileSystem(const std::shared_ptr& config) + : FileSystem(config) { + impl_ = std::make_shared(config.get()); +} + +std::string HdfsFileSystem::name() const { + return "HDFS"; +} + +std::unique_ptr HdfsFileSystem::openFileForRead( + std::string_view path) { + if (path.find(kScheme) == 0) { + path.remove_prefix(kScheme.length()); + } + if (auto index = path.find('/')) { + path.remove_prefix(index); + } + + return std::make_unique(impl_->hdfsClient(), path); +} + +std::unique_ptr HdfsFileSystem::openFileForWrite( + std::string_view path) { + VELOX_UNSUPPORTED("Write to HDFS is unsupported"); +} + +bool HdfsFileSystem::isHdfsFile(const std::string_view filename) { + return filename.find(kScheme) == 0; +} + +static std::function(std::shared_ptr)> + filesystemGenerator = [](std::shared_ptr properties) { + static std::shared_ptr filesystem; + folly::call_once(hdfsInitiationFlag, [&properties]() { + filesystem = std::make_shared(properties); + }); + return filesystem; + }; + +void HdfsFileSystem::remove(std::string_view path) { + VELOX_UNSUPPORTED("Does not support removing files from hdfs"); +} + +void registerHdfsFileSystem() { + registerFileSystem(HdfsFileSystem::isHdfsFile, filesystemGenerator); +} +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h new file mode 100644 index 000000000000..6c189f766f96 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::filesystems { +struct HdfsServiceEndpoint { + std::string host; + int port; +}; + +/** + * You can configure hdfs settings (timeouts etc) using configure file + * which is given by environment parameter LIBHDFS3_CONF + * or "hdfs-client.xml" in working directory. + * + * Internally you can use hdfsBuilderConfSetStr to configure the client + */ +class HdfsFileSystem : public FileSystem { + private: + static std::string_view kScheme; + + public: + explicit HdfsFileSystem(const std::shared_ptr& config); + + std::string name() const override; + + std::unique_ptr openFileForRead(std::string_view path) override; + + std::unique_ptr openFileForWrite(std::string_view path) override; + + void remove(std::string_view path) override; + + static bool isHdfsFile(std::string_view filename); + + protected: + class Impl; + std::shared_ptr impl_; +}; + +// Register the HDFS. +void registerHdfsFileSystem(); +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp new file mode 100644 index 000000000000..fed6df767928 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HdfsReadFile.h" +#include +#include + +namespace facebook::velox { + +HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) + : hdfsClient_(hdfs), filePath_(path) { + fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); + VELOX_CHECK_NOT_NULL( + fileInfo_, + "Unable to get file path info for file: {}. got error: {}", + filePath_, + hdfsGetLastError()); +} + +void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) + const { + checkFileReadParameters(offset, length); + auto file = hdfsOpenFile(hdfsClient_, filePath_.data(), O_RDONLY, 0, 0, 0); + VELOX_CHECK_NOT_NULL( + file, + "Unable to open file {}. got error: {}", + filePath_, + hdfsGetLastError()); + seekToPosition(file, offset); + uint64_t totalBytesRead = 0; + while (totalBytesRead < length) { + auto bytesRead = hdfsRead(hdfsClient_, file, pos, length - totalBytesRead); + VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.") + totalBytesRead += bytesRead; + pos += bytesRead; + } + + if (hdfsCloseFile(hdfsClient_, file) == -1) { + LOG(ERROR) << "Unable to close file, errno: " << errno; + } +} + +void HdfsReadFile::seekToPosition(hdfsFile file, uint64_t offset) const { + auto seekStatus = hdfsSeek(hdfsClient_, file, offset); + VELOX_CHECK_EQ( + seekStatus, + 0, + "Cannot seek through HDFS file: {}, error: {}", + filePath_, + std::string(hdfsGetLastError())); +} + +std::string_view +HdfsReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { + preadInternal(offset, length, static_cast(buf)); + return {static_cast(buf), length}; +} + +std::string HdfsReadFile::pread(uint64_t offset, uint64_t length) const { + std::string result(length, 0); + char* pos = result.data(); + preadInternal(offset, length, pos); + return result; +} + +uint64_t HdfsReadFile::size() const { + return fileInfo_->mSize; +} + +uint64_t HdfsReadFile::memoryUsage() const { + return fileInfo_->mBlockSize; +} + +bool HdfsReadFile::shouldCoalesce() const { + return false; +} + +void HdfsReadFile::checkFileReadParameters(uint64_t offset, uint64_t length) + const { + auto fileSize = size(); + auto endPoint = offset + length; + VELOX_CHECK_GE( + fileSize, + endPoint, + "Cannot read HDFS file beyond its size: {}, offset: {}, end point: {}", + fileSize, + offset, + endPoint) +} +} // namespace facebook::velox diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h new file mode 100644 index 000000000000..d076df52ca74 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "velox/common/file/File.h" + +namespace facebook::velox { + +class HdfsReadFile final : public ReadFile { + public: + explicit HdfsReadFile(hdfsFS hdfs, std::string_view path); + + std::string_view pread(uint64_t offset, uint64_t length, void* buf) + const final; + + std::string pread(uint64_t offset, uint64_t length) const final; + + uint64_t size() const final; + + uint64_t memoryUsage() const final; + + bool shouldCoalesce() const final; + + private: + void preadInternal(uint64_t offset, uint64_t length, char* pos) const; + void seekToPosition(hdfsFile file, uint64_t offset) const; + void checkFileReadParameters(uint64_t offset, uint64_t length) const; + hdfsFS hdfsClient_; + hdfsFileInfo* fileInfo_; + std::string filePath_; +}; +} // namespace facebook::velox diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt new file mode 100644 index 000000000000..dea1e458a514 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_hdfs_file_test HdfsFileSystemTest.cpp HdfsMiniCluster.cpp) +add_test(velox_hdfs_file_test velox_hdfs_file_test) +target_link_libraries( + velox_hdfs_file_test + velox_file + velox_hdfs + velox_core + velox_exec_test_util + velox_hive_connector + velox_dwio_common_exception + velox_exec + gtest + gtest_main + gmock) diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp new file mode 100644 index 000000000000..80719afb60f6 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp @@ -0,0 +1,360 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" +#include +#include +#include +#include +#include +#include +#include "HdfsMiniCluster.h" +#include "gtest/gtest.h" +#include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/tests/utils/TempFilePath.h" + +using namespace facebook::velox; + +constexpr int kOneMB = 1 << 20; +static const std::string destinationPath = "/test_file.txt"; +static const std::string hdfsPort = "7878"; +static const std::string localhost = "localhost"; +static const std::string fullDestinationPath = + "hdfs://" + localhost + ":" + hdfsPort + destinationPath; +static const std::unordered_map configurationValues( + {{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}}); + +class HdfsFileSystemTest : public testing::Test { + public: + static void SetUpTestSuite() { + if (miniCluster == nullptr) { + miniCluster = std::make_shared< + facebook::velox::filesystems::test::HdfsMiniCluster>(); + miniCluster->start(); + auto tempFile = createFile(); + miniCluster->addFile(tempFile->path, destinationPath); + } + } + + void SetUp() override { + if (!miniCluster->isRunning()) { + miniCluster->start(); + } + } + + static void TearDownTestSuite() { + miniCluster->stop(); + } + static std::atomic startThreads; + static std::shared_ptr + miniCluster; + + private: + static std::shared_ptr<::exec::test::TempFilePath> createFile() { + auto tempFile = ::exec::test::TempFilePath::create(); + tempFile->append("aaaaa"); + tempFile->append("bbbbb"); + tempFile->append(std::string(kOneMB, 'c')); + tempFile->append("ddddd"); + return tempFile; + } +}; + +std::shared_ptr + HdfsFileSystemTest::miniCluster = nullptr; +std::atomic HdfsFileSystemTest::startThreads = false; + +void readData(ReadFile* readFile) { + ASSERT_EQ(readFile->size(), 15 + kOneMB); + char buffer1[5]; + ASSERT_EQ(readFile->pread(10 + kOneMB, 5, &buffer1), "ddddd"); + char buffer2[10]; + ASSERT_EQ(readFile->pread(0, 10, &buffer2), "aaaaabbbbb"); + auto buffer3 = new char[kOneMB]; + ASSERT_EQ(readFile->pread(10, kOneMB, buffer3), std::string(kOneMB, 'c')); + delete[] buffer3; + ASSERT_EQ(readFile->size(), 15 + kOneMB); + char buffer4[10]; + const std::string_view arf = readFile->pread(5, 10, &buffer4); + const std::string zarf = readFile->pread(kOneMB, 15); + auto buf = std::make_unique(8); + const std::string_view warf = readFile->pread(4, 8, buf.get()); + const std::string_view warfFromBuf(buf.get(), 8); + ASSERT_EQ(arf, "bbbbbccccc"); + ASSERT_EQ(zarf, "ccccccccccddddd"); + ASSERT_EQ(warf, "abbbbbcc"); + ASSERT_EQ(warfFromBuf, "abbbbbcc"); +} + +void checkReadErrorMessages( + ReadFile* readFile, + std::string errorMessage, + int endpoint) { + try { + readFile->pread(10 + kOneMB, endpoint); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT(error.message(), testing::HasSubstr(errorMessage)); + } + try { + auto buf = std::make_unique(8); + readFile->pread(10 + kOneMB, endpoint, buf.get()); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT(error.message(), testing::HasSubstr(errorMessage)); + } +} + +void verifyFailures(ReadFile* readFile) { + auto startPoint = 10 + kOneMB; + auto size = 15 + kOneMB; + auto endpoint = 10 + 2 * kOneMB; + auto offsetErrorMessage = + (boost::format( + "(%d vs. %d) Cannot read HDFS file beyond its size: %d, offset: %d, end point: %d") % + size % endpoint % size % startPoint % endpoint) + .str(); + auto serverAddress = (boost::format("%s:%s") % localhost % hdfsPort).str(); + auto readFailErrorMessage = + (boost::format( + "Unable to open file %s. got error: HdfsIOException: InputStreamImpl: cannot open file: %s.\t" + "Caused by: Hdfs::HdfsRpcException: HdfsFailoverException: Failed to invoke RPC call \"getBlockLocations\" on server \"%s\"\t\t" + "Caused by: HdfsNetworkConnectException: Connect to \"%s\" failed") % + destinationPath % destinationPath % serverAddress % serverAddress) + .str(); + auto builderErrorMessage = + (boost::format( + "Unable to connect to HDFS, got error: Hdfs::HdfsRpcException: HdfsFailoverException: " + "Failed to invoke RPC call \"getFsStats\" on server \"%s\"\tCaused by: " + "HdfsNetworkConnectException: Connect to \"%s\" failed") % + serverAddress % serverAddress) + .str(); + checkReadErrorMessages(readFile, offsetErrorMessage, kOneMB); + HdfsFileSystemTest::miniCluster->stop(); + checkReadErrorMessages(readFile, readFailErrorMessage, 1); + try { + auto memConfig = + std::make_shared(configurationValues); + facebook::velox::filesystems::HdfsFileSystem hdfsFileSystem(memConfig); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT(error.message(), testing::HasSubstr(builderErrorMessage)); + } +} + +TEST_F(HdfsFileSystemTest, read) { + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, localhost.c_str()); + hdfsBuilderSetNameNodePort(builder, 7878); + auto hdfs = hdfsBuilderConnect(builder); + HdfsReadFile readFile(hdfs, destinationPath); + readData(&readFile); +} + +TEST_F(HdfsFileSystemTest, viaFileSystem) { + facebook::velox::filesystems::registerHdfsFileSystem(); + auto memConfig = std::make_shared(configurationValues); + auto hdfsFileSystem = + filesystems::getFileSystem(fullDestinationPath, memConfig); + auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath); + readData(readFile.get()); +} + +TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) { + try { + facebook::velox::filesystems::registerHdfsFileSystem(); + auto memConfig = + std::make_shared(configurationValues); + auto hdfsFileSystem = + filesystems::getFileSystem(fullDestinationPath, memConfig); + auto readFile = hdfsFileSystem->openFileForRead( + "hdfs://localhost:7777/path/that/does/not/exist"); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT( + error.message(), + testing::HasSubstr( + "Unable to get file path info for file: /path/that/does/not/exist. got error: FileNotFoundException: Path /path/that/does/not/exist does not exist.")); + } +} + +TEST_F(HdfsFileSystemTest, missingHost) { + try { + facebook::velox::filesystems::registerHdfsFileSystem(); + std::unordered_map missingHostConfiguration( + {{"hive.hdfs.port", hdfsPort}}); + auto memConfig = + std::make_shared(missingHostConfiguration); + facebook::velox::filesystems::HdfsFileSystem hdfsFileSystem(memConfig); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT( + error.message(), + testing::HasSubstr( + "hdfsHost is empty, configuration missing for hdfs host")); + } +} + +TEST_F(HdfsFileSystemTest, missingPort) { + try { + facebook::velox::filesystems::registerHdfsFileSystem(); + std::unordered_map missingPortConfiguration( + {{"hive.hdfs.host", localhost}}); + auto memConfig = + std::make_shared(missingPortConfiguration); + facebook::velox::filesystems::HdfsFileSystem hdfsFileSystem(memConfig); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT( + error.message(), + testing::HasSubstr( + "hdfsPort is empty, configuration missing for hdfs port")); + } +} + +TEST_F(HdfsFileSystemTest, missingFileViaReadFile) { + try { + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, localhost.c_str()); + hdfsBuilderSetNameNodePort(builder, std::stoi(hdfsPort)); + auto hdfs = hdfsBuilderConnect(builder); + HdfsReadFile readFile(hdfs, "/path/that/does/not/exist"); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT( + error.message(), + testing::HasSubstr( + "Unable to get file path info for file: /path/that/does/not/exist. got error: FileNotFoundException: Path /path/that/does/not/exist does not exist.")); + } +} + +TEST_F(HdfsFileSystemTest, schemeMatching) { + try { + auto fs = + std::dynamic_pointer_cast( + filesystems::getFileSystem("/", nullptr)); + FAIL() << "expected VeloxException"; + } catch (facebook::velox::VeloxException const& error) { + EXPECT_THAT( + error.message(), + testing::HasSubstr( + "No registered file system matched with filename '/'")); + } + auto fs = + std::dynamic_pointer_cast( + filesystems::getFileSystem(fullDestinationPath, nullptr)); + ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath)); +} + +TEST_F(HdfsFileSystemTest, writeNotSupported) { + try { + facebook::velox::filesystems::registerHdfsFileSystem(); + auto memConfig = + std::make_shared(configurationValues); + auto hdfsFileSystem = + filesystems::getFileSystem(fullDestinationPath, memConfig); + hdfsFileSystem->openFileForWrite("/path"); + } catch (facebook::velox::VeloxException const& error) { + EXPECT_EQ(error.message(), "Write to HDFS is unsupported"); + } +} + +TEST_F(HdfsFileSystemTest, removeNotSupported) { + try { + facebook::velox::filesystems::registerHdfsFileSystem(); + auto memConfig = + std::make_shared(configurationValues); + auto hdfsFileSystem = + filesystems::getFileSystem(fullDestinationPath, memConfig); + hdfsFileSystem->remove("/path"); + } catch (facebook::velox::VeloxException const& error) { + EXPECT_EQ(error.message(), "Does not support removing files from hdfs"); + } +} + +TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { + startThreads = false; + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, localhost.c_str()); + hdfsBuilderSetNameNodePort(builder, 7878); + auto hdfs = hdfsBuilderConnect(builder); + HdfsReadFile readFile(hdfs, destinationPath); + std::vector threads; + std::mt19937 generator(std::random_device{}()); + std::vector sleepTimesInMicroseconds = {0, 500, 50000}; + std::uniform_int_distribution distribution( + 0, sleepTimesInMicroseconds.size() - 1); + for (int i = 0; i < 25; i++) { + auto thread = std::thread( + [&readFile, &distribution, &generator, &sleepTimesInMicroseconds] { + int index = distribution(generator); + while (!HdfsFileSystemTest::startThreads) { + std::this_thread::yield(); + } + std::this_thread::sleep_for( + std::chrono::microseconds(sleepTimesInMicroseconds[index])); + readData(&readFile); + }); + threads.emplace_back(std::move(thread)); + } + startThreads = true; + for (auto& thread : threads) { + thread.join(); + } +} + +TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) { + startThreads = false; + facebook::velox::filesystems::registerHdfsFileSystem(); + auto memConfig = std::make_shared(configurationValues); + auto hdfsFileSystem = + filesystems::getFileSystem(fullDestinationPath, memConfig); + + std::vector threads; + std::mt19937 generator(std::random_device{}()); + std::vector sleepTimesInMicroseconds = {0, 500, 50000}; + std::uniform_int_distribution distribution( + 0, sleepTimesInMicroseconds.size() - 1); + for (int i = 0; i < 25; i++) { + auto thread = std::thread([&hdfsFileSystem, + &distribution, + &generator, + &sleepTimesInMicroseconds] { + int index = distribution(generator); + while (!HdfsFileSystemTest::startThreads) { + std::this_thread::yield(); + } + std::this_thread::sleep_for( + std::chrono::microseconds(sleepTimesInMicroseconds[index])); + auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath); + readData(readFile.get()); + }); + threads.emplace_back(std::move(thread)); + } + startThreads = true; + for (auto& thread : threads) { + thread.join(); + } +} + +TEST_F(HdfsFileSystemTest, readFailures) { + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, localhost.c_str()); + hdfsBuilderSetNameNodePort(builder, stoi(hdfsPort)); + auto hdfs = hdfsBuilderConnect(builder); + HdfsReadFile readFile(hdfs, destinationPath); + verifyFailures(&readFile); +} diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.cpp new file mode 100644 index 000000000000..10ee508ba638 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.cpp @@ -0,0 +1,115 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HdfsMiniCluster.h" + +namespace facebook::velox::filesystems::test { +void HdfsMiniCluster::start() { + try { + serverProcess_ = std::make_unique( + env_, + exePath_, + jarCommand, + env_["HADOOP_HOME"].to_string() + miniclusterJar, + miniclusterCommand, + noMapReduceOption, + formatNameNodeOption, + httpPortOption, + httpPort, + nameNodePortOption, + nameNodePort, + configurationOption, + turnOffPermissions); + serverProcess_->wait_for(std::chrono::duration(60000)); + VELOX_CHECK_EQ( + serverProcess_->exit_code(), + 383, + "Minicluster process exited, code: ", + serverProcess_->exit_code()) + } catch (const std::exception& e) { + VELOX_FAIL("Failed to launch Minicluster server: {}", e.what()); + } +} + +void HdfsMiniCluster::stop() { + if (serverProcess_ && serverProcess_->valid()) { + serverProcess_->terminate(); + serverProcess_->wait(); + serverProcess_.reset(); + } +} + +bool HdfsMiniCluster::isRunning() { + if (serverProcess_) { + return true; + } + return false; +} + +// requires hadoop executable to be on the PATH +HdfsMiniCluster::HdfsMiniCluster() { + env_ = (boost::process::environment)boost::this_process::environment(); + env_["PATH"] = env_["PATH"].to_string() + hadoopSearchPath; + auto path = env_["PATH"].to_vector(); + exePath_ = boost::process::search_path( + miniClusterExecutableName, + std::vector(path.begin(), path.end())); + if (exePath_.empty()) { + VELOX_FAIL( + "Failed to find minicluster executable {}'", miniClusterExecutableName); + } + boost::filesystem::path hadoopHomeDirectory = exePath_; + hadoopHomeDirectory.remove_leaf().remove_leaf(); + setupEnvironment(hadoopHomeDirectory.string()); +} + +void HdfsMiniCluster::addFile(std::string source, std::string destination) { + auto filePutProcess = std::make_shared( + env_, + exePath_, + filesystemCommand, + filesystemUrlOption, + filesystemUrl, + filePutOption, + source, + destination); + bool isExited = + filePutProcess->wait_for(std::chrono::duration(5000)); + if (!isExited) { + VELOX_FAIL( + "Failed to add file to hdfs, exit code: {}", + filePutProcess->exit_code()) + } +} + +HdfsMiniCluster::~HdfsMiniCluster() { + stop(); +} + +void HdfsMiniCluster::setupEnvironment(const std::string& homeDirectory) { + env_["HADOOP_HOME"] = homeDirectory; + env_["HADOOP_INSTALL"] = homeDirectory; + env_["HADOOP_MAPRED_HOME"] = homeDirectory; + env_["HADOOP_COMMON_HOME"] = homeDirectory; + env_["HADOOP_HDFS_HOME"] = homeDirectory; + env_["YARN_HOME"] = homeDirectory; + env_["HADOOP_COMMON_LIB_NATIVE_DIR"] = homeDirectory + "/lib/native"; + env_["HADOOP_CONF_DIR"] = homeDirectory; + env_["HADOOP_PREFIX"] = homeDirectory; + env_["HADOOP_LIBEXEC_DIR"] = homeDirectory + "/libexec"; + env_["HADOOP_CONF_DIR"] = homeDirectory + "/etc/hadoop"; +} +} // namespace facebook::velox::filesystems::test diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h new file mode 100644 index 000000000000..f48abd749a47 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/core/Context.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +#include +#include +#include +#include "boost/process.hpp" + +namespace facebook::velox::filesystems::test { +static const std::string miniClusterExecutableName{"hadoop"}; +static const std::string hadoopSearchPath{":/usr/local/hadoop-2.10.1/bin"}; +static const std::string jarCommand{"jar"}; +static const std::string miniclusterJar{ + "/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.10.1-tests.jar"}; +static const std::string miniclusterCommand{"minicluster"}; +static const std::string noMapReduceOption{"-nomr"}; +static const std::string formatNameNodeOption{"-format"}; +static const std::string httpPortOption{"-nnhttpport"}; +static const std::string httpPort{"7676"}; +static const std::string nameNodePortOption{"-nnport"}; +static const std::string nameNodePort{"7878"}; +static const std::string configurationOption{"-D"}; +static const std::string turnOffPermissions{"dfs.permissions=false"}; +static const std::string filesystemCommand{"fs"}; +static const std::string filesystemUrlOption{"-fs"}; +static const std::string filesystemUrl{"hdfs://localhost:" + nameNodePort}; +static const std::string filePutOption{"-put"}; + +class HdfsMiniCluster { + public: + HdfsMiniCluster(); + + void start(); + + void stop(); + + bool isRunning(); + + void addFile(std::string source, std::string destination); + virtual ~HdfsMiniCluster(); + + private: + void setupEnvironment(const std::string& homeDirectory); + + std::unique_ptr<::boost::process::child> serverProcess_; + boost::filesystem::path exePath_; + boost::process::environment env_; +}; +} // namespace facebook::velox::filesystems::test diff --git a/velox/exec/tests/utils/TempFilePath.h b/velox/exec/tests/utils/TempFilePath.h index 936d83e17676..cf615b413138 100644 --- a/velox/exec/tests/utils/TempFilePath.h +++ b/velox/exec/tests/utils/TempFilePath.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -39,6 +40,13 @@ class TempFilePath { TempFilePath(const TempFilePath&) = delete; TempFilePath& operator=(const TempFilePath&) = delete; + void append(std::string data) { + std::ofstream file(path, std::ios_base::app); + file << data; + file.flush(); + file.close(); + } + private: int fd;