diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index fbd40812a7d5..3690e4e81db0 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -47,11 +47,6 @@ import scala.collection.JavaConverters._ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { - /** - * Generate native row partition. - * - * @return - */ override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, @@ -95,6 +90,26 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { } } + /** + * Generate native row partition. + * + * @return + */ + override def genPartitions( + wsCtx: WholeStageTransformContext, + splitInfos: Seq[Seq[SplitInfo]]): Seq[BaseGlutenPartition] = { + splitInfos.zipWithIndex.map { + case (splitInfos, index) => + wsCtx.substraitContext.initSplitInfosIndex(0) + wsCtx.substraitContext.setSplitInfos(splitInfos) + val substraitPlan = wsCtx.root.toProtobuf + GlutenPartition( + index, + substraitPlan.toByteArray, + locations = splitInfos.flatMap(_.preferredLocations().asScala).toArray) + } + } + /** * Generate Iterator[ColumnarBatch] for first stage. * @@ -252,7 +267,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { GlutenPartition( index, substraitPlan.toByteArray, - splitInfo.preferredLocations().asScala.toArray) + locations = splitInfo.preferredLocations().asScala.toArray) } }(t => logInfo(s"Generating the Substrait plan took: $t ms.")) diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala index 21a64187f857..1428b5c0a6d2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala @@ -49,7 +49,7 @@ class MixedAffinitySuite extends QueryTest with SharedSparkSession { } val partition = GlutenMergeTreePartition(0, "", "", "", "fakePath", 0, 0) val locations = affinity.getNativeMergeTreePartitionLocations(partition) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("forced_host_host-0")) { nativePartition.preferredLocations().toSet } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index baa2e904c315..e2bb878ffa3d 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -21,7 +21,7 @@ import io.glutenproject.backendsapi.IteratorApi import io.glutenproject.execution._ import io.glutenproject.metrics.IMetrics import io.glutenproject.substrait.plan.PlanNode -import io.glutenproject.substrait.rel.{LocalFilesBuilder, SplitInfo} +import io.glutenproject.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import io.glutenproject.utils.Iterators import io.glutenproject.vectorized._ @@ -47,16 +47,12 @@ import java.net.URLDecoder import java.nio.charset.StandardCharsets import java.time.ZoneOffset import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ class IteratorApiImpl extends IteratorApi with Logging { - /** - * Generate native row partition. - * - * @return - */ override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, @@ -80,6 +76,24 @@ class IteratorApiImpl extends IteratorApi with Logging { } } + /** Generate native row partition. */ + override def genPartitions( + wsCtx: WholeStageTransformContext, + splitInfos: Seq[Seq[SplitInfo]]): Seq[BaseGlutenPartition] = { + // Only serialize plan once, save lots time when plan is complex. + val planByteArray = wsCtx.root.toProtobuf.toByteArray + + splitInfos.zipWithIndex.map { + case (splitInfos, index) => + GlutenPartition( + index, + planByteArray, + splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray, + splitInfos.flatMap(_.preferredLocations().asScala).toArray + ) + } + } + private def constructSplitInfo(schema: StructType, files: Array[PartitionedFile]) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] @@ -121,11 +135,7 @@ class IteratorApiImpl extends IteratorApi with Logging { transKernel.injectWriteFilesTempPath(path) } - /** - * Generate Iterator[ColumnarBatch] for first stage. - * - * @return - */ + /** Generate Iterator[ColumnarBatch] for first stage. */ override def genFirstStageIterator( inputPartition: BaseGlutenPartition, context: TaskContext, @@ -133,13 +143,26 @@ class IteratorApiImpl extends IteratorApi with Logging { updateInputMetrics: (InputMetricsWrapper) => Unit, updateNativeMetrics: IMetrics => Unit, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { + assert( + inputPartition.isInstanceOf[GlutenPartition], + "Velox backend only accept GlutenPartition.") + + val beforeBuild = System.nanoTime() val columnarNativeIterators = new JArrayList[GeneralInIterator](inputIterators.map { iter => new ColumnarBatchInIterator(iter.asJava) }.asJava) val transKernel = NativePlanEvaluator.create() + + val splitInfoByteArray = inputPartition + .asInstanceOf[GlutenPartition] + .splitInfosByteArray val resIter: GeneralOutIterator = - transKernel.createKernelWithBatchIterator(inputPartition.plan, columnarNativeIterators) + transKernel.createKernelWithBatchIterator( + inputPartition.plan, + splitInfoByteArray, + columnarNativeIterators) + pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild) Iterators .wrap(resIter.asScala) @@ -157,11 +180,7 @@ class IteratorApiImpl extends IteratorApi with Logging { // scalastyle:off argcount - /** - * Generate Iterator[ColumnarBatch] for final stage. - * - * @return - */ + /** Generate Iterator[ColumnarBatch] for final stage. */ override def genFinalStageIterator( context: TaskContext, inputIterators: Seq[Iterator[ColumnarBatch]], @@ -183,7 +202,10 @@ class IteratorApiImpl extends IteratorApi with Logging { val nativeResultIterator = transKernel.createKernelWithBatchIterator( rootNode.toProtobuf.toByteArray, - columnarNativeIterator) + // Final iterator does not contain scan split, so pass empty split info to native here. + new Array[Array[Byte]](0), + columnarNativeIterator + ) Iterators .wrap(nativeResultIterator.asScala) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index b8134e833a5b..24f386b63aff 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -72,6 +72,8 @@ class Runtime : public std::enable_shared_from_this { virtual void injectWriteFilesTempPath(const std::string& path) = 0; + virtual void parseSplitInfo(const uint8_t* data, int32_t size) = 0; + // Just for benchmark ::substrait::Plan& getPlan() { return substraitPlan_; @@ -140,6 +142,7 @@ class Runtime : public std::enable_shared_from_this { protected: std::unique_ptr objStore_ = ObjectStore::create(); ::substrait::Plan substraitPlan_; + std::vector<::substrait::ReadRel_LocalFiles> localFiles_; std::optional writeFilesTempPath_; SparkTaskInfo taskInfo_; // Session conf map diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 4da006db80c8..83d1a6285dce 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -368,6 +368,7 @@ Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI jobject wrapper, jlong memoryManagerHandle, jbyteArray planArr, + jobjectArray splitInfosArr, jobjectArray iterArr, jint stageId, jint partitionId, @@ -381,10 +382,17 @@ Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI auto spillDirStr = jStringToCString(env, spillDir); + for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) { + jbyteArray splitInfoArray = static_cast(env->GetObjectArrayElement(splitInfosArr, i)); + jsize splitInfoSize = env->GetArrayLength(splitInfoArray); + auto splitInfoData = reinterpret_cast(env->GetByteArrayElements(splitInfoArray, nullptr)); + ctx->parseSplitInfo(splitInfoData, splitInfoSize); + } + auto planData = reinterpret_cast(env->GetByteArrayElements(planArr, nullptr)); auto planSize = env->GetArrayLength(planArr); - ctx->parsePlan(planData, planSize, {stageId, partitionId, taskId}); + auto& conf = ctx->getConfMap(); // Handle the Java iters diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 428d39bacf37..9bbc22f522ec 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -109,7 +109,8 @@ void populateWriterMetrics( } // namespace auto BM_Generic = [](::benchmark::State& state, - const std::string& substraitJsonFile, + const std::string& planFile, + const std::string& splitFile, const std::vector& inputFiles, const std::unordered_map& conf, FileReaderType readerType) { @@ -119,11 +120,15 @@ auto BM_Generic = [](::benchmark::State& state, } else { setCpu(state.thread_index()); } + bool emptySplit = splitFile.empty(); memory::MemoryManager::testingSetInstance({}); auto memoryManager = getDefaultMemoryManager(); auto runtime = Runtime::create(kVeloxRuntimeKind, conf); - const auto& filePath = substraitJsonFile; - auto plan = getPlanFromFile(filePath); + auto plan = getPlanFromFile("Plan", planFile); + std::string split; + if (!emptySplit) { + split = getPlanFromFile("ReadRel.LocalFiles", splitFile); + } auto startTime = std::chrono::steady_clock::now(); int64_t collectBatchTime = 0; WriterMetrics writerMetrics{}; @@ -145,6 +150,9 @@ auto BM_Generic = [](::benchmark::State& state, } runtime->parsePlan(reinterpret_cast(plan.data()), plan.size(), {}); + if (!emptySplit) { + runtime->parseSplitInfo(reinterpret_cast(split.data()), split.size()); + } auto resultIter = runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); @@ -231,6 +239,7 @@ int main(int argc, char** argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); std::string substraitJsonFile; + std::string splitFile; std::vector inputFiles; std::unordered_map conf; @@ -242,7 +251,8 @@ int main(int argc, char** argv) { if (argc < 2) { LOG(INFO) << "No input args. Usage: " << std::endl - << "./generic_benchmark /absolute-path/to/substrait_json_file /absolute-path/to/data_file_1 /absolute-path/to/data_file_2 ..."; + << "./generic_benchmark /absolute-path/to/substrait_json_file /absolute-path/to/split_json_file(optional)" + << " /absolute-path/to/data_file_1 /absolute-path/to/data_file_2 ..."; LOG(INFO) << "Running example..."; inputFiles.resize(2); substraitJsonFile = getGeneratedFilePath("example.json"); @@ -250,10 +260,11 @@ int main(int argc, char** argv) { inputFiles[1] = getGeneratedFilePath("example_lineitem"); } else { substraitJsonFile = argv[1]; + splitFile = argv[2]; abortIfFileNotExists(substraitJsonFile); LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile; LOG(INFO) << "Using " << argc - 2 << " input data file(s): "; - for (auto i = 2; i < argc; ++i) { + for (auto i = 3; i < argc; ++i) { inputFiles.emplace_back(argv[i]); abortIfFileNotExists(inputFiles.back()); LOG(INFO) << inputFiles.back(); @@ -265,19 +276,20 @@ int main(int argc, char** argv) { std::exit(EXIT_FAILURE); } -#define GENERIC_BENCHMARK(NAME, READER_TYPE) \ - do { \ - auto* bm = ::benchmark::RegisterBenchmark(NAME, BM_Generic, substraitJsonFile, inputFiles, conf, READER_TYPE) \ - ->MeasureProcessCPUTime() \ - ->UseRealTime(); \ - if (FLAGS_threads > 0) { \ - bm->Threads(FLAGS_threads); \ - } else { \ - bm->ThreadRange(1, std::thread::hardware_concurrency()); \ - } \ - if (FLAGS_iterations > 0) { \ - bm->Iterations(FLAGS_iterations); \ - } \ +#define GENERIC_BENCHMARK(NAME, READER_TYPE) \ + do { \ + auto* bm = \ + ::benchmark::RegisterBenchmark(NAME, BM_Generic, substraitJsonFile, splitFile, inputFiles, conf, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + if (FLAGS_threads > 0) { \ + bm->Threads(FLAGS_threads); \ + } else { \ + bm->ThreadRange(1, std::thread::hardware_concurrency()); \ + } \ + if (FLAGS_iterations > 0) { \ + bm->Iterations(FLAGS_iterations); \ + } \ } while (0) DLOG(INFO) << "FLAGS_threads:" << FLAGS_threads; diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index 2620d2b9b1eb..888df451adb9 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -47,14 +47,14 @@ void initVeloxBackend() { initVeloxBackend(bmConfMap); } -std::string getPlanFromFile(const std::string& filePath) { +std::string getPlanFromFile(const std::string& type, const std::string& filePath) { // Read json file and resume the binary data. std::ifstream msgJson(filePath); std::stringstream buffer; buffer << msgJson.rdbuf(); std::string msgData = buffer.str(); - return gluten::substraitFromJsonToPb("Plan", msgData); + return gluten::substraitFromJsonToPb(type, msgData); } velox::dwio::common::FileFormat getFileFormat(const std::string& fileFormat) { diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h b/cpp/velox/benchmarks/common/BenchmarkUtils.h index 04823896f622..0bd09b947874 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h @@ -72,7 +72,7 @@ inline std::string getGeneratedFilePath(const std::string& fileName) { } /// Read binary data from a json file. -std::string getPlanFromFile(const std::string& filePath); +std::string getPlanFromFile(const std::string& type, const std::string& filePath); /// Get the file paths, starts, lengths from a directory. /// Use fileFormat to specify the format to read, eg., orc, parquet. diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index cef4cd039ed3..aaa59958cf68 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -18,7 +18,6 @@ #include "VeloxPlanConverter.h" #include -#include "arrow/c/bridge.h" #include "compute/ResultIterator.h" #include "config/GlutenConfig.h" #include "operators/plannodes/RowVectorStream.h" @@ -34,183 +33,86 @@ VeloxPlanConverter::VeloxPlanConverter( const std::unordered_map& confMap, const std::optional writeFilesTempPath, bool validationMode) - : inputIters_(inputIters), - validationMode_(validationMode), + : validationMode_(validationMode), substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode), - pool_(veloxPool) {} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::WriteRel& writeRel) { - if (writeRel.has_input()) { - setInputPlanNode(writeRel.input()); - } else { - throw std::runtime_error("Child expected"); - } -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::FetchRel& fetchRel) { - if (fetchRel.has_input()) { - setInputPlanNode(fetchRel.input()); - } else { - throw std::runtime_error("Child expected"); - } -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::ExpandRel& sexpand) { - if (sexpand.has_input()) { - setInputPlanNode(sexpand.input()); - } else { - throw std::runtime_error("Child expected"); - } -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::GenerateRel& sGenerate) { - if (sGenerate.has_input()) { - setInputPlanNode(sGenerate.input()); - } else { - throw std::runtime_error("Child expected"); - } + pool_(veloxPool) { + // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h. + auto factory = [inputIters = std::move(inputIters)]( + std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) { + VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx); + auto vectorStream = std::make_shared(pool, inputIters[streamIdx], outputType); + return std::make_shared(nodeId, outputType, std::move(vectorStream)); + }; + substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory)); } -void VeloxPlanConverter::setInputPlanNode(const ::substrait::SortRel& ssort) { - if (ssort.has_input()) { - setInputPlanNode(ssort.input()); - } else { - throw std::runtime_error("Child expected"); - } -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::WindowRel& swindow) { - if (swindow.has_input()) { - setInputPlanNode(swindow.input()); - } else { - throw std::runtime_error("Child expected"); +namespace { +std::shared_ptr parseScanSplitInfo( + const google::protobuf::RepeatedPtrField& fileList) { + using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; + + auto splitInfo = std::make_shared(); + splitInfo->paths.reserve(fileList.size()); + splitInfo->starts.reserve(fileList.size()); + splitInfo->lengths.reserve(fileList.size()); + splitInfo->partitionColumns.reserve(fileList.size()); + for (const auto& file : fileList) { + // Expect all Partitions share the same index. + splitInfo->partitionIndex = file.partition_index(); + + std::unordered_map partitionColumnMap; + for (const auto& partitionColumn : file.partition_columns()) { + partitionColumnMap[partitionColumn.key()] = partitionColumn.value(); + } + splitInfo->partitionColumns.emplace_back(partitionColumnMap); + + splitInfo->paths.emplace_back(file.uri_file()); + splitInfo->starts.emplace_back(file.start()); + splitInfo->lengths.emplace_back(file.length()); + switch (file.file_format_case()) { + case SubstraitFileFormatCase::kOrc: + splitInfo->format = dwio::common::FileFormat::ORC; + break; + case SubstraitFileFormatCase::kDwrf: + splitInfo->format = dwio::common::FileFormat::DWRF; + break; + case SubstraitFileFormatCase::kParquet: + splitInfo->format = dwio::common::FileFormat::PARQUET; + break; + case SubstraitFileFormatCase::kText: + splitInfo->format = dwio::common::FileFormat::TEXT; + break; + default: + splitInfo->format = dwio::common::FileFormat::UNKNOWN; + break; + } } + return splitInfo; } -void VeloxPlanConverter::setInputPlanNode(const ::substrait::AggregateRel& sagg) { - if (sagg.has_input()) { - setInputPlanNode(sagg.input()); - } else { - throw std::runtime_error("Child expected"); - } -} +void parseLocalFileNodes( + SubstraitToVeloxPlanConverter* planConverter, + std::vector<::substrait::ReadRel_LocalFiles>& localFiles) { + std::vector> splitInfos; + splitInfos.reserve(localFiles.size()); + for (int32_t i = 0; i < localFiles.size(); i++) { + const auto& localFile = localFiles[i]; + const auto& fileList = localFile.items(); -void VeloxPlanConverter::setInputPlanNode(const ::substrait::ProjectRel& sproject) { - if (sproject.has_input()) { - setInputPlanNode(sproject.input()); - } else { - throw std::runtime_error("Child expected"); + splitInfos.push_back(std::move(parseScanSplitInfo(fileList))); } -} -void VeloxPlanConverter::setInputPlanNode(const ::substrait::FilterRel& sfilter) { - if (sfilter.has_input()) { - setInputPlanNode(sfilter.input()); - } else { - throw std::runtime_error("Child expected"); - } + planConverter->setSplitInfos(std::move(splitInfos)); } +} // namespace -void VeloxPlanConverter::setInputPlanNode(const ::substrait::JoinRel& sjoin) { - if (sjoin.has_left()) { - setInputPlanNode(sjoin.left()); - } else { - throw std::runtime_error("Left child expected"); - } - - if (sjoin.has_right()) { - setInputPlanNode(sjoin.right()); - } else { - throw std::runtime_error("Right child expected"); - } -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::ReadRel& sread) { - int32_t iterIdx = substraitVeloxPlanConverter_.getStreamIndex(sread); - if (iterIdx == -1) { - return; - } - - // Get the input schema of this iterator. - uint64_t colNum = 0; - std::vector veloxTypeList; - if (sread.has_base_schema()) { - const auto& baseSchema = sread.base_schema(); - // Input names is not used. Instead, new input/output names will be created - // because the ValueStreamNode in Velox does not support name change. - colNum = baseSchema.names().size(); - veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema); - } - - std::vector outNames; - outNames.reserve(colNum); - for (int idx = 0; idx < colNum; idx++) { - auto colName = SubstraitParser::makeNodeName(planNodeId_, idx); - outNames.emplace_back(colName); - } - - std::shared_ptr iterator; +std::shared_ptr VeloxPlanConverter::toVeloxPlan( + const ::substrait::Plan& substraitPlan, + std::vector<::substrait::ReadRel_LocalFiles> localFiles) { if (!validationMode_) { - if (inputIters_.size() == 0) { - throw std::runtime_error("Invalid input iterator."); - } - iterator = inputIters_[iterIdx]; - } - - auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); - auto vectorStream = std::make_shared(pool_, std::move(iterator), outputType); - auto valuesNode = std::make_shared(nextPlanNodeId(), outputType, std::move(vectorStream)); - substraitVeloxPlanConverter_.insertInputNode(iterIdx, valuesNode, planNodeId_); -} - -void VeloxPlanConverter::setInputPlanNode(const ::substrait::Rel& srel) { - if (srel.has_aggregate()) { - setInputPlanNode(srel.aggregate()); - } else if (srel.has_project()) { - setInputPlanNode(srel.project()); - } else if (srel.has_filter()) { - setInputPlanNode(srel.filter()); - } else if (srel.has_read()) { - setInputPlanNode(srel.read()); - } else if (srel.has_join()) { - setInputPlanNode(srel.join()); - } else if (srel.has_sort()) { - setInputPlanNode(srel.sort()); - } else if (srel.has_expand()) { - setInputPlanNode(srel.expand()); - } else if (srel.has_fetch()) { - setInputPlanNode(srel.fetch()); - } else if (srel.has_window()) { - setInputPlanNode(srel.window()); - } else if (srel.has_generate()) { - setInputPlanNode(srel.generate()); - } else if (srel.has_write()) { - setInputPlanNode(srel.write()); - } else { - throw std::runtime_error("Rel is not supported: " + srel.DebugString()); + parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles); } -} -void VeloxPlanConverter::setInputPlanNode(const ::substrait::RelRoot& sroot) { - // Output names can be got from RelRoot, but are not used currently. - if (sroot.has_input()) { - setInputPlanNode(sroot.input()); - } else { - throw std::runtime_error("Input is expected in RelRoot."); - } -} - -std::shared_ptr VeloxPlanConverter::toVeloxPlan( - ::substrait::Plan& substraitPlan) { - // In fact, only one RelRoot is expected here. - for (auto& srel : substraitPlan.relations()) { - if (srel.has_root()) { - setInputPlanNode(srel.root()); - } - if (srel.has_rel()) { - setInputPlanNode(srel.rel()); - } - } auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan); DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true); return veloxPlan; diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index f8833728f65c..4cc0a88b6abe 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -36,45 +36,19 @@ class VeloxPlanConverter { const std::optional writeFilesTempPath = std::nullopt, bool validationMode = false); - std::shared_ptr toVeloxPlan(::substrait::Plan& substraitPlan); + std::shared_ptr toVeloxPlan( + const ::substrait::Plan& substraitPlan, + std::vector<::substrait::ReadRel_LocalFiles> localFiles); const std::unordered_map>& splitInfos() { return substraitVeloxPlanConverter_.splitInfos(); } private: - void setInputPlanNode(const ::substrait::WriteRel& writeRel); - - void setInputPlanNode(const ::substrait::FetchRel& fetchRel); - - void setInputPlanNode(const ::substrait::ExpandRel& sExpand); - - void setInputPlanNode(const ::substrait::GenerateRel& sGenerate); - - void setInputPlanNode(const ::substrait::SortRel& sSort); - - void setInputPlanNode(const ::substrait::WindowRel& s); - - void setInputPlanNode(const ::substrait::AggregateRel& sagg); - - void setInputPlanNode(const ::substrait::ProjectRel& sproject); - - void setInputPlanNode(const ::substrait::FilterRel& sfilter); - - void setInputPlanNode(const ::substrait::JoinRel& sJoin); - - void setInputPlanNode(const ::substrait::ReadRel& sread); - - void setInputPlanNode(const ::substrait::Rel& srel); - - void setInputPlanNode(const ::substrait::RelRoot& sroot); - std::string nextPlanNodeId(); int planNodeId_ = 0; - std::vector> inputIters_; - bool validationMode_; SubstraitToVeloxPlanConverter substraitVeloxPlanConverter_; diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index ff7bd2543958..48593c95d076 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -51,6 +51,21 @@ void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo ta GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); } +void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size) { + if (debugModeEnabled(confMap_)) { + try { + auto jsonPlan = substraitFromPbToJson("ReadRel.LocalFiles", data, size); + LOG(INFO) << std::string(50, '#') << " received substrait::ReadRel.LocalFiles:"; + LOG(INFO) << std::endl << jsonPlan; + } catch (const std::exception& e) { + LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what(); + } + } + ::substrait::ReadRel_LocalFiles localFile; + GLUTEN_CHECK(parseProtobuf(data, size, &localFile) == true, "Parse substrait plan failed"); + localFiles_.push_back(localFile); +} + void VeloxRuntime::getInfoAndIds( const std::unordered_map>& splitInfoMap, const std::unordered_set& leafPlanNodeIds, @@ -76,7 +91,7 @@ std::string VeloxRuntime::planString(bool details, const std::unordered_map> inputs; auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, std::nullopt, true); - auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_); + auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_); return veloxPlan->toString(details, true); } @@ -95,7 +110,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( VeloxPlanConverter veloxPlanConverter( inputs, getLeafVeloxPool(memoryManager).get(), sessionConf, writeFilesTempPath_); - veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_); + veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); // Scan node can be required. std::vector> scanInfos; diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index ed9d4f145af1..0c62bf966161 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -37,6 +37,8 @@ class VeloxRuntime final : public Runtime { void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override; + void parseSplitInfo(const uint8_t* data, int32_t size) override; + static std::shared_ptr getAggregateVeloxPool(MemoryManager* memoryManager) { return toVeloxMemoryManager(memoryManager)->getAggregateMemoryPool(); } diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index b55d37380941..0975b9975fb5 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -17,7 +17,6 @@ #pragma once -#include #include "compute/ResultIterator.h" #include "memory/VeloxColumnarBatch.h" #include "velox/exec/Driver.h" @@ -30,7 +29,7 @@ class RowVectorStream { facebook::velox::memory::MemoryPool* pool, std::shared_ptr iterator, const facebook::velox::RowTypePtr& outputType) - : iterator_(iterator), outputType_(outputType), pool_(pool) {} + : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {} bool hasNext() { return iterator_->hasNext(); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 6717a3714bb2..aeb575cd431a 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -960,6 +960,36 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } } +core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( + const ::substrait::ReadRel& readRel, + int32_t streamIdx) { + // Get the input schema of this iterator. + uint64_t colNum = 0; + std::vector veloxTypeList; + if (readRel.has_base_schema()) { + const auto& baseSchema = readRel.base_schema(); + // Input names is not used. Instead, new input/output names will be created + // because the ValueStreamNode in Velox does not support name change. + colNum = baseSchema.names().size(); + veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema); + } + + std::vector outNames; + outNames.reserve(colNum); + for (int idx = 0; idx < colNum; idx++) { + auto colName = SubstraitParser::makeNodeName(planNodeId_, idx); + outNames.emplace_back(colName); + } + + auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); + auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, outputType); + + auto splitInfo = std::make_shared(); + splitInfo->isStream = true; + splitInfoMap_[node->id()] = splitInfo; + return node; +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) { // emit is not allowed in TableScanNode and ValuesNode related // outputs @@ -968,21 +998,19 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: !readRel.common().has_emit(), "Emit not supported for ValuesNode and TableScanNode related Substrait plans."); } - // Check if the ReadRel specifies an input of stream. If yes, the pre-built - // input node will be used as the data source. - auto splitInfo = std::make_shared(); + // Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source. auto streamIdx = getStreamIndex(readRel); if (streamIdx >= 0) { - if (inputNodesMap_.find(streamIdx) == inputNodesMap_.end()) { - VELOX_FAIL("Could not find source index {} in input nodes map.", streamIdx); - } - auto streamNode = inputNodesMap_[streamIdx]; - splitInfo->isStream = true; - splitInfoMap_[streamNode->id()] = splitInfo; - return streamNode; + return constructValueStreamNode(readRel, streamIdx); } // Otherwise, will create TableScan node for ReadRel. + auto splitInfo = std::make_shared(); + if (!validationMode_) { + VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel and related split info."); + splitInfo = splitInfos_[splitInfoIdx_++]; + } + // Get output names and types. std::vector colNameList; std::vector veloxTypeList; @@ -1005,48 +1033,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: isPartitionColumns = SubstraitParser::parsePartitionColumns(baseSchema); } - // Parse local files and construct split info. - if (readRel.has_local_files()) { - using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; - const auto& fileList = readRel.local_files().items(); - splitInfo->paths.reserve(fileList.size()); - splitInfo->starts.reserve(fileList.size()); - splitInfo->lengths.reserve(fileList.size()); - splitInfo->partitionColumns.reserve(fileList.size()); - for (const auto& file : fileList) { - // Expect all Partitions share the same index. - splitInfo->partitionIndex = file.partition_index(); - - std::unordered_map partitionColumnMap; - for (const auto& partitionColumn : file.partition_columns()) { - partitionColumnMap[partitionColumn.key()] = partitionColumn.value(); - } - splitInfo->partitionColumns.emplace_back(partitionColumnMap); - - splitInfo->paths.emplace_back(file.uri_file()); - splitInfo->starts.emplace_back(file.start()); - splitInfo->lengths.emplace_back(file.length()); - switch (file.file_format_case()) { - case SubstraitFileFormatCase::kOrc: - splitInfo->format = dwio::common::FileFormat::ORC; - break; - case SubstraitFileFormatCase::kDwrf: - splitInfo->format = dwio::common::FileFormat::DWRF; - break; - case SubstraitFileFormatCase::kParquet: - splitInfo->format = dwio::common::FileFormat::PARQUET; - break; - case SubstraitFileFormatCase::kText: - splitInfo->format = dwio::common::FileFormat::TEXT; - break; - default: - splitInfo->format = dwio::common::FileFormat::UNKNOWN; - break; - } - - fileFormat_ = splitInfo->format; - } - } // Do not hard-code connector ID and allow for connectors other than Hive. static const std::string kHiveConnectorId = "test-hive"; @@ -1286,6 +1272,9 @@ std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) { } int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) { + if (validationMode_) { + return -1; + } if (sRead.has_local_files()) { const auto& fileList = sRead.local_files().items(); if (fileList.size() == 0) { @@ -1309,10 +1298,7 @@ int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel VELOX_FAIL(err.what()); } } - if (validationMode_) { - return -1; - } - VELOX_FAIL("Local file is expected."); + return -1; } void SubstraitToVeloxPlanConverter::extractJoinKeys( diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 4650383f6fbc..c0892bd587c2 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,12 +19,12 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" -#include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" #include "velox/dwio/common/Options.h" namespace gluten { +class ResultIterator; // Holds names of Spark OffsetWindowFunctions. static const std::unordered_set kOffsetWindowFunctions = {"nth_value"}; @@ -102,6 +102,8 @@ class SubstraitToVeloxPlanConverter { /// Lengths: the lengths in byte to read from the items. core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead); + core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + /// Used to convert Substrait Rel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel); @@ -138,6 +140,15 @@ class SubstraitToVeloxPlanConverter { planNodeId_ = planNodeId; } + void setSplitInfos(std::vector> splitInfos) { + splitInfos_ = splitInfos; + } + + void setValueStreamNodeFactory( + std::function factory) { + valueStreamNodeFactory_ = std::move(factory); + } + /// Used to check if ReadRel specifies an input of stream. /// If yes, the index of input stream will be returned. /// If not, -1 will be returned. @@ -527,9 +538,6 @@ class SubstraitToVeloxPlanConverter { /// The unique identification for each PlanNode. int planNodeId_ = 0; - // used to check whether IsNotNull Filter is support - facebook::velox::dwio::common::FileFormat fileFormat_ = facebook::velox::dwio::common::FileFormat::UNKNOWN; - /// The map storing the relations between the function id and the function /// name. Will be constructed based on the Substrait representation. std::unordered_map functionMap_; @@ -537,11 +545,16 @@ class SubstraitToVeloxPlanConverter { /// The map storing the split stats for each PlanNode. std::unordered_map> splitInfoMap_; + std::function valueStreamNodeFactory_; + /// The map storing the pre-built plan nodes which can be accessed through /// index. This map is only used when the computation of a Substrait plan /// depends on other input nodes. std::unordered_map> inputNodesMap_; + int32_t splitInfoIdx_{0}; + std::vector> splitInfos_; + /// The Expression converter used to convert Substrait representations into /// Velox expressions. std::unique_ptr exprConverter_; diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 96a387beddc3..3355ac86ead0 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -27,6 +27,8 @@ class DummyRuntime final : public Runtime { void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override {} + void parseSplitInfo(const uint8_t* data, int32_t size) override {} + std::shared_ptr createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc index 0c2cda2fb5c6..8e4f2ee94bff 100644 --- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc @@ -18,15 +18,14 @@ #include "JsonToProtoConverter.h" #include +#include "compute/VeloxPlanConverter.h" #include "memory/VeloxMemoryManager.h" #include "substrait/SubstraitToVeloxPlan.h" -#include "substrait/SubstraitToVeloxPlanValidator.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/dwio/common/tests/utils/DataFiles.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" -#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/type/Type.h" @@ -41,9 +40,8 @@ namespace gluten { class Substrait2VeloxPlanConversionTest : public exec::test::HiveConnectorTestBase { protected: std::vector> makeSplits( - const SubstraitToVeloxPlanConverter& converter, std::shared_ptr planNode) { - const auto& splitInfos = converter.splitInfos(); + const auto& splitInfos = planConverter_->splitInfos(); auto leafPlanNodeIds = planNode->leafPlanNodeIds(); // Only one leaf node is expected here. EXPECT_EQ(1, leafPlanNodeIds.size()); @@ -72,8 +70,10 @@ class Substrait2VeloxPlanConversionTest : public exec::test::HiveConnectorTestBa } std::shared_ptr tmpDir_{exec::test::TempDirectoryPath::create()}; - std::shared_ptr planConverter_ = - std::make_shared(gluten::defaultLeafVeloxMemoryPool().get()); + std::shared_ptr planConverter_ = std::make_shared( + std::vector>(), + gluten::defaultLeafVeloxMemoryPool().get(), + std::unordered_map()); }; // This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's @@ -229,29 +229,34 @@ TEST_F(Substrait2VeloxPlanConversionTest, q6) { // Find and deserialize Substrait plan json file. std::string subPlanPath = FilePathGenerator::getDataFilePath("q6_first_stage.json"); + std::string splitPath = FilePathGenerator::getDataFilePath("q6_first_stage_split.json"); // Read q6_first_stage.json and resume the Substrait plan. ::substrait::Plan substraitPlan; JsonToProtoConverter::readFromFile(subPlanPath, substraitPlan); + ::substrait::ReadRel_LocalFiles split; + JsonToProtoConverter::readFromFile(splitPath, split); // Convert to Velox PlanNode. - auto planNode = planConverter_->toVeloxPlan(substraitPlan); - + auto planNode = planConverter_->toVeloxPlan(substraitPlan, std::vector<::substrait::ReadRel_LocalFiles>{split}); auto expectedResult = makeRowVector({ makeFlatVector(1, [](auto /*row*/) { return 13613.1921; }), }); - exec::test::AssertQueryBuilder(planNode).splits(makeSplits(*planConverter_, planNode)).assertResults(expectedResult); + exec::test::AssertQueryBuilder(planNode).splits(makeSplits(planNode)).assertResults(expectedResult); } TEST_F(Substrait2VeloxPlanConversionTest, ifthenTest) { std::string subPlanPath = FilePathGenerator::getDataFilePath("if_then.json"); + std::string splitPath = FilePathGenerator::getDataFilePath("if_then_split.json"); ::substrait::Plan substraitPlan; JsonToProtoConverter::readFromFile(subPlanPath, substraitPlan); + ::substrait::ReadRel_LocalFiles split; + JsonToProtoConverter::readFromFile(splitPath, split); // Convert to Velox PlanNode. - auto planNode = planConverter_->toVeloxPlan(substraitPlan); + auto planNode = planConverter_->toVeloxPlan(substraitPlan, std::vector<::substrait::ReadRel_LocalFiles>{split}); ASSERT_EQ( "-- Project[expressions: ] -> \n " "-- TableScan[table: hive_table, range filters: [(hd_demo_sk, Filter(IsNotNull, deterministic, null not allowed))," @@ -264,12 +269,15 @@ TEST_F(Substrait2VeloxPlanConversionTest, ifthenTest) { TEST_F(Substrait2VeloxPlanConversionTest, filterUpper) { std::string subPlanPath = FilePathGenerator::getDataFilePath("filter_upper.json"); + std::string splitPath = FilePathGenerator::getDataFilePath("filter_upper_split.json"); ::substrait::Plan substraitPlan; JsonToProtoConverter::readFromFile(subPlanPath, substraitPlan); + ::substrait::ReadRel_LocalFiles split; + JsonToProtoConverter::readFromFile(splitPath, split); // Convert to Velox PlanNode. - auto planNode = planConverter_->toVeloxPlan(substraitPlan); + auto planNode = planConverter_->toVeloxPlan(substraitPlan, std::vector<::substrait::ReadRel_LocalFiles>{split}); ASSERT_EQ( "-- Project[expressions: ] -> \n -- TableScan[table: hive_table, range filters: " "[(key, BigintRange: [-2147483648, 2] no nulls)]] -> n0_0:INTEGER\n", diff --git a/cpp/velox/tests/data/filter_upper.json b/cpp/velox/tests/data/filter_upper.json index f21fbe86db2c..22032b6d934a 100644 --- a/cpp/velox/tests/data/filter_upper.json +++ b/cpp/velox/tests/data/filter_upper.json @@ -96,14 +96,6 @@ } ] } - }, - "localFiles": { - "items": [{ - "uriFile": "file:///tmp/file.parquet", - "length": "1486", - "parquet": {} - } - ] } } }, diff --git a/cpp/velox/tests/data/filter_upper_split.json b/cpp/velox/tests/data/filter_upper_split.json new file mode 100644 index 000000000000..1a2e818d3d37 --- /dev/null +++ b/cpp/velox/tests/data/filter_upper_split.json @@ -0,0 +1,8 @@ +{ + "items": [{ + "uriFile": "file:///tmp/file.parquet", + "length": "1486", + "parquet": {} + } + ] +} \ No newline at end of file diff --git a/cpp/velox/tests/data/if_then.json b/cpp/velox/tests/data/if_then.json index ddac339b28a5..58f696215824 100644 --- a/cpp/velox/tests/data/if_then.json +++ b/cpp/velox/tests/data/if_then.json @@ -367,14 +367,6 @@ } ] } - }, - "localFiles": { - "items": [{ - "uriFile": "file:///tmp/tmp_file", - "length": "31979", - "parquet": {} - } - ] } } }, diff --git a/cpp/velox/tests/data/if_then_split.json b/cpp/velox/tests/data/if_then_split.json new file mode 100644 index 000000000000..f2352fb3137a --- /dev/null +++ b/cpp/velox/tests/data/if_then_split.json @@ -0,0 +1,8 @@ +{ + "items": [{ + "uriFile": "file:///tmp/tmp_file", + "length": "31979", + "parquet": {} + } + ] +} \ No newline at end of file diff --git a/cpp/velox/tests/data/q6_first_stage.json b/cpp/velox/tests/data/q6_first_stage.json index 24733dd0f185..031793a2b181 100644 --- a/cpp/velox/tests/data/q6_first_stage.json +++ b/cpp/velox/tests/data/q6_first_stage.json @@ -484,17 +484,6 @@ } } } - }, - "local_files": { - "items": [ - { - "partition_index": "0", - "start": "0", - "length": "3719", - "uri_file": "/mock_lineitem.dwrf", - "dwrf": {} - } - ] } } }, diff --git a/cpp/velox/tests/data/q6_first_stage_split.json b/cpp/velox/tests/data/q6_first_stage_split.json new file mode 100644 index 000000000000..7b02e4dc01d2 --- /dev/null +++ b/cpp/velox/tests/data/q6_first_stage_split.json @@ -0,0 +1,11 @@ +{ + "items": [ + { + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.dwrf", + "dwrf": {} + } + ] +} \ No newline at end of file diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index 6826feae6a43..f3dc564308ca 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -12,7 +12,7 @@ A micro benchmark for Velox backend is provided in Gluten Cpp to simulate the ex It serves as a more convenient alternative to debug in Gluten Cpp comparing with directly debugging in a Spark job. Developers can use it to create their own workloads, debug in native process, profile the hotspot and do optimizations. -To simulate a first stage, you need to dump the Substrait plan into a JSON file. The input URIs should be exising file locations, which can be either local or HDFS paths. +To simulate a first stage, you need to dump the Substrait plan and split info into two JSON file. The input URIs should be exising file locations, which can be either local or HDFS paths. To simulate a middle stage, in addition to the JSON file, you also need to save the input data of this stage into Parquet files. The benchmark will load the data into Arrow format, then add Arrow2Velox to feed @@ -116,20 +116,15 @@ Task stageId: 2, partitionId: 855, taskId: 857; {"extensions":[{"extensionFuncti Save the Substrait plan to a JSON file, suppose the name is "plan.json". -If you are simulating a first stage, the inputs should be exising file locations, which can be either local or HDFS paths. +If you are simulating a first stage, executor's stdout should contains split info, which include input file locations, which can be either local or HDFS paths. -```json -"localFiles": { - "items": [ - { - "uriFile": "file:///path_to_gluten/cpp/velox/benchmarks/data/tpch_sf10m/lineitem/part-00000-6c374e0a-7d76-401b-8458-a8e31f8ab704-c000.snappy.parquet", - "length": "1863237", - "parquet": {} - } - ] -} +```shell +################################################## received substrait::ReadRel.LocalFiles: +{"items":[{"uriFile":"hdfs://host:9000/user/hadoop/tpch_sf10/lineitem/part-00000-34fbf25c-7909-476d-a63a-b2b56f281c07-c000.snappy.parquet","partitionIndex":"2","start":"302582158","length":"151291079","parquet":{},"schema":{}}]} ``` +Dump it to a JSON file, suppose the name is "split.json". + Run benchmark. The first arg is the absolute path to JSON file. You should use `--skip-input` to skip specifying input data files used for first stages. By default, the result will be printed to stdout. You can use `--noprint-result` to suppress this output. @@ -138,6 +133,7 @@ By default, the result will be printed to stdout. You can use `--noprint-result` cd /path_to_gluten/cpp/build/velox/benchmarks ./generic_benchmark \ /plan/to/plan.json \ +/plan/to/split.json \ --threads 1 --skip-input --noprint-result ``` @@ -154,7 +150,7 @@ You need to re-run the query with below configuraions to dump the input data fil Suppose the dumped input files are /tmp/save/input_34_0_1.parquet and /tmp/save/input_34_0_2.parquet. Please use spark to combine the 2 files to 1 file. -```java +```scala val df = spark.read.format("parquet").load("/tmp/save") df.repartition(1).write.format("parquet").save("/tmp/new_save") ``` diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java index f781ed2b04e3..7be14fdde309 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java @@ -108,7 +108,7 @@ public List preferredLocations() { public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder(); // The input is iterator, and the path is in the format of: Iterator:index. - if (iterAsInput && paths.size() > 0) { + if (iterAsInput && !paths.isEmpty()) { ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder = ReadRel.LocalFiles.FileOrFiles.newBuilder(); fileBuilder.setUriFile(paths.get(0)); diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java index 0584317bd736..5bbf796a093b 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/RelBuilder.java @@ -93,6 +93,7 @@ public static RelNode makeAggregateRel( return new AggregateRelNode(input, groupings, aggregateFunctionNodes, filters, extensionNode); } + // CH public static RelNode makeReadRel( List types, List names, @@ -102,6 +103,7 @@ public static RelNode makeReadRel( return makeReadRel(types, names, null, filter, context, operatorId); } + // VL public static RelNode makeReadRel( List types, List names, diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index ef1542c08c9b..2f88ab5ab522 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -159,16 +159,16 @@ message ReadRel { DwrfReadOptions dwrf = 13; TextReadOptions text = 14; JsonReadOptions json = 15; - } + } - message partitionColumn { + message partitionColumn { string key = 1; string value = 2; - } - repeated partitionColumn partition_columns = 16; + } + repeated partitionColumn partition_columns = 16; - /// File schema - NamedStruct schema = 17; + /// File schema + NamedStruct schema = 17; } } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index 688755fb8ae2..f38466d92f99 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -34,16 +34,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch trait IteratorApi { - /** - * Generate native row partition. - * - * @return - */ def genSplitInfo( partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo + /** Generate native row partition. */ + def genPartitions( + wsCtx: WholeStageTransformContext, + splitInfos: Seq[Seq[SplitInfo]]): Seq[BaseGlutenPartition] + /** * Inject the task attempt temporary path for native write files, this method should be called * before `genFirstStageIterator` or `genFinalStageIterator` @@ -53,8 +53,6 @@ trait IteratorApi { /** * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other * SCAN inputs) - * - * @return */ def genFirstStageIterator( inputPartition: BaseGlutenPartition, @@ -68,8 +66,6 @@ trait IteratorApi { /** * Generate Iterator[ColumnarBatch] for final stage. ("Final" means it depends on other SCAN * inputs, maybe it was a mistake to use the word "final") - * - * @return */ // scalastyle:off argcount def genFinalStageIterator( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala index 34b0b8f599ff..0aa4eec7fff7 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala @@ -38,6 +38,7 @@ trait BaseGlutenPartition extends Partition with InputPartition { case class GlutenPartition( index: Int, plan: Array[Byte], + splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]], locations: Array[String] = Array.empty[String]) extends BaseGlutenPartition { @@ -111,9 +112,11 @@ class GlutenWholeStageColumnarRDD( } override protected def getPartitions: Array[Partition] = { - Array.tabulate[Partition](inputPartitions.size) { - i => FirstZippedPartitionsPartition(i, inputPartitions(i), rdds.getPartitions(i)) - } + inputPartitions.zipWithIndex + .map { + case (partition, i) => FirstZippedPartitionsPartition(i, partition, rdds.getPartitions(i)) + } + .toArray[Partition] } override protected def clearDependencies(): Unit = { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index 95249fcf0ca9..229ea26e74bc 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -28,7 +28,7 @@ import io.glutenproject.substrait.plan.{PlanBuilder, PlanNode} import io.glutenproject.substrait.rel.{RelNode, SplitInfo} import io.glutenproject.utils.SubstraitPlanPrinterUtil -import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkConf, TaskContext} +import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} @@ -39,7 +39,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.collect.Lists -import scala.collection.JavaConverters._ import scala.collection.mutable case class TransformContext( @@ -292,23 +291,14 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f * care of SCAN there won't be any other RDD for SCAN. As a result, genFirstStageIterator * rather than genFinalStageIterator will be invoked */ - val allScanSplitInfos = getSplitInfosFromScanTransformer(basicScanExecTransformers) - val (wsCxt, substraitPlanPartitions) = GlutenTimeMetric.withMillisTime { - val wsCxt = doWholeStageTransform() - - // generate each partition of all scan exec - val substraitPlanPartitions = allScanSplitInfos.zipWithIndex.map { - case (splitInfos, index) => - wsCxt.substraitContext.initSplitInfosIndex(0) - wsCxt.substraitContext.setSplitInfos(splitInfos) - val substraitPlan = wsCxt.root.toProtobuf - GlutenPartition( - index, - substraitPlan.toByteArray, - splitInfos.flatMap(_.preferredLocations().asScala).toArray) - } - (wsCxt, substraitPlanPartitions) + + val (wsCtx, inputPartitions) = GlutenTimeMetric.withMillisTime { + val wsCtx = doWholeStageTransform() + val partitions = + BackendsApiManager.getIteratorApiInstance.genPartitions(wsCtx, allScanSplitInfos) + + (wsCtx, partitions) }( t => logOnLevel( @@ -317,15 +307,15 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f new GlutenWholeStageColumnarRDD( sparkContext, - substraitPlanPartitions, + inputPartitions, inputRDDs, pipelineTime, leafMetricsUpdater().updateInputMetrics, BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction( child, - wsCxt.substraitContext.registeredRelMap, - wsCxt.substraitContext.registeredJoinParams, - wsCxt.substraitContext.registeredAggregationParams + wsCtx.substraitContext.registeredRelMap, + wsCtx.substraitContext.registeredJoinParams, + wsCtx.substraitContext.registeredAggregationParams ) ) } else { diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index bcf00b1bfb41..1c514ddfd8da 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -64,7 +64,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate partition.files.map(_.filePath.toString), partition.preferredLocations()) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("host-1", "host-2", "host-3")) { nativePartition.preferredLocations().toSet } @@ -95,7 +95,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate partition.files.map(_.filePath.toString), partition.preferredLocations()) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("host-1", "host-4", "host-5")) { nativePartition.preferredLocations().toSet @@ -127,7 +127,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate partition.files.map(_.filePath.toString), partition.preferredLocations()) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("executor_host-2_2", "executor_host-1_0")) { nativePartition.preferredLocations().toSet @@ -150,7 +150,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate partition.files.map(_.filePath.toString), partition.preferredLocations()) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("executor_host-1_1")) { nativePartition.preferredLocations().toSet @@ -182,7 +182,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate partition.files.map(_.filePath.toString), partition.preferredLocations()) - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) assertResult(Set("host-1", "host-5", "host-6")) { nativePartition.preferredLocations().toSet diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index eac84f5ae3db..e8e5243cdd91 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -66,7 +66,8 @@ public void injectWriteFilesTempPath(String path) { // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - byte[] wsPlan, List iterList) throws RuntimeException, IOException { + byte[] wsPlan, byte[][] splitInfo, List iterList) + throws RuntimeException, IOException { final AtomicReference outIterator = new AtomicReference<>(); final NativeMemoryManager nmm = NativeMemoryManagers.create( @@ -102,6 +103,7 @@ public Set applicablePhases() { jniWrapper.nativeCreateKernelWithIterator( memoryManagerHandle, wsPlan, + splitInfo, iterList.toArray(new GeneralInIterator[0]), TaskContext.get().stageId(), TaskContext.getPartitionId(), diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java index 46115c191555..6fbee973fd09 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java @@ -67,6 +67,7 @@ public long handle() { public native long nativeCreateKernelWithIterator( long memoryManagerHandle, byte[] wsPlan, + byte[][] splitInfo, GeneralInIterator[] batchItr, int stageId, int partitionId,