Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Update generic benchmark usage and doc #4485

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
- name: Run micro benchmarks
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh 'cd /opt/gluten/cpp/build/velox/benchmarks && \
./generic_benchmark --with-shuffle --threads 1 --iterations 1'
./generic_benchmark --run-example --with-shuffle --threads 1 --iterations 1'
- name: Exit docker container
if: ${{ always() }}
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package io.glutenproject.benchmarks
import io.glutenproject.GlutenConfig
import io.glutenproject.execution.{VeloxWholeStageTransformerSuite, WholeStageTransformer}

import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -50,6 +52,12 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
createTPCHNotNullTables()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.gluten.sql.debug", "true")
.set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0")
}

test("Test plan json non-empty - AQE off") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
Expand Down Expand Up @@ -98,6 +106,7 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true"
) {
logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir")
val q4_lineitem = spark
.sql(s"""
|select l_orderkey from lineitem where l_commitdate < l_receiptdate
Expand Down Expand Up @@ -128,27 +137,29 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
spark.sql("""
|select * from q4_orders left semi join q4_lineitem on l_orderkey = o_orderkey
|""".stripMargin)

val executedPlan = df.queryExecution.executedPlan
executedPlan.execute()

val finalPlan =
executedPlan match {
case aqe: AdaptiveSparkPlanExec =>
aqe.executedPlan match {
case s: ShuffleQueryStageExec => s.shuffle.child
case other => other
}
case plan => plan
}
val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer])
assert(lastStageTransformer.nonEmpty)
val plan =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')

val exampleJsonFile = Paths.get(generatedPlanDir, "example.json")
Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
generateSubstraitJson(df, "example.json")
}
spark.sparkContext.setLogLevel(logLevel)
}

def generateSubstraitJson(df: DataFrame, file: String): Unit = {
val executedPlan = df.queryExecution.executedPlan
executedPlan.execute()
val finalPlan =
executedPlan match {
case aqe: AdaptiveSparkPlanExec =>
aqe.executedPlan match {
case s: ShuffleQueryStageExec => s.shuffle.child
case other => other
}
case plan => plan
}
val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer])
assert(lastStageTransformer.nonEmpty)
val plan =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')

val exampleJsonFile = Paths.get(generatedPlanDir, file)
Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
}
}
101 changes: 67 additions & 34 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@
using namespace gluten;

namespace {
DEFINE_bool(skip_input, false, "Skip specifying input files.");
DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single");
DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec");
DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec");
DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec");
DEFINE_bool(iaa_gzip, false, "Use IAA GZIP as shuffle compression codec");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions");
DEFINE_bool(run_example, false, "Run the example and exit.");

DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
DEFINE_string(split, "", "Path to input json file of the splits. Only valid for simulating the first stage.");
DEFINE_string(data, "", "Path to input data files in parquet format. Only valid for simulating the middle stage.");

struct WriterMetrics {
int64_t splitTime;
Expand Down Expand Up @@ -238,44 +242,72 @@ int main(int argc, char** argv) {
::benchmark::Initialize(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);

std::string substraitJsonFile;
std::string splitFile;
std::vector<std::string> inputFiles;
// Init Velox backend.
std::unordered_map<std::string, std::string> conf;

conf.insert({gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)});
conf.insert({kDebugModeEnabled, "true"});
initVeloxBackend(conf);

try {
if (argc < 2) {
LOG(INFO)
<< "No input args. Usage: " << std::endl
<< "./generic_benchmark /absolute-path/to/substrait_json_file /absolute-path/to/split_json_file(optional)"
<< " /absolute-path/to/data_file_1 /absolute-path/to/data_file_2 ...";
LOG(INFO) << "Running example...";
inputFiles.resize(2);
// Parse substrait plan, split file and data files.
std::string substraitJsonFile = FLAGS_plan;
std::string splitFile = FLAGS_split;
std::vector<std::string> inputFiles{};
if (!FLAGS_data.empty()) {
inputFiles.emplace_back(FLAGS_data);
}
// Parse the remaining data files.
for (auto i = 1; i < argc; ++i) {
inputFiles.emplace_back(std::string(argv[i]));
}

if (FLAGS_run_example) {
LOG(INFO) << "Running example...";
inputFiles.resize(2);
try {
substraitJsonFile = getGeneratedFilePath("example.json");
inputFiles[0] = getGeneratedFilePath("example_orders");
inputFiles[1] = getGeneratedFilePath("example_lineitem");
} else {
substraitJsonFile = argv[1];
splitFile = argv[2];
abortIfFileNotExists(substraitJsonFile);
LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile;
LOG(INFO) << "Using " << argc - 2 << " input data file(s): ";
for (auto i = 3; i < argc; ++i) {
inputFiles.emplace_back(argv[i]);
abortIfFileNotExists(inputFiles.back());
LOG(INFO) << inputFiles.back();
}
} catch (const std::exception& e) {
LOG(ERROR) << "Failed to run example. " << e.what();
}
} catch (const std::exception& e) {
LOG(INFO) << "Failed to run benchmark: " << e.what();
::benchmark::Shutdown();
}

// Validate input args.
std::string errorMsg{};
if (substraitJsonFile.empty()) {
errorMsg = "Missing '--plan' option.";
} else if (splitFile.empty() && inputFiles.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
} else if (!splitFile.empty() && !inputFiles.empty()) {
errorMsg = "Duplicated option '--split' and '--data'.";
}
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl
<< "If simulating a first stage, the usage is:" << std::endl
<< "./generic_benchmark "
<< "--plan /absolute-path/to/substrait_json_file "
<< "--split /absolute-path/to/split_json_file" << std::endl
<< "If simulating a middle stage, the usage is:" << std::endl
<< "./generic_benchmark "
<< "--plan /absolute-path/to/substrait_json_file "
<< "--data /absolute-path/to/data_file_1 /absolute-path/to/data_file_2 ...";
LOG(ERROR) << "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***";
std::exit(EXIT_FAILURE);
}

// Check whether input files exist.
LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile;
abortIfFileNotExists(substraitJsonFile);
if (!splitFile.empty()) {
LOG(INFO) << "Using split json file: " << std::endl << splitFile;
abortIfFileNotExists(splitFile);
}
LOG(INFO) << "Using " << inputFiles.size() << " input data file(s): ";
for (const auto& dataFile : inputFiles) {
LOG(INFO) << dataFile;
abortIfFileNotExists(dataFile);
}

#define GENERIC_BENCHMARK(NAME, READER_TYPE) \
do { \
auto* bm = \
Expand All @@ -292,14 +324,15 @@ int main(int argc, char** argv) {
} \
} while (0)

DLOG(INFO) << "FLAGS_threads:" << FLAGS_threads;
DLOG(INFO) << "FLAGS_iterations:" << FLAGS_iterations;
DLOG(INFO) << "FLAGS_cpu:" << FLAGS_cpu;
DLOG(INFO) << "FLAGS_print_result:" << FLAGS_print_result;
DLOG(INFO) << "FLAGS_write_file:" << FLAGS_write_file;
DLOG(INFO) << "FLAGS_batch_size:" << FLAGS_batch_size;
LOG(INFO) << "Using options: ";
LOG(INFO) << "threads: " << FLAGS_threads;
LOG(INFO) << "iterations: " << FLAGS_iterations;
LOG(INFO) << "cpu: " << FLAGS_cpu;
LOG(INFO) << "print_result: " << FLAGS_print_result;
LOG(INFO) << "write_file: " << FLAGS_write_file;
LOG(INFO) << "batch_size: " << FLAGS_batch_size;

if (FLAGS_skip_input) {
if (!splitFile.empty()) {
GENERIC_BENCHMARK("SkipInput", FileReaderType::kNone);
} else {
GENERIC_BENCHMARK("InputFromBatchVector", FileReaderType::kBuffered);
Expand Down
9 changes: 0 additions & 9 deletions cpp/velox/benchmarks/data/generic_q5/q5_first_stage_0.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,6 @@
}
]
}
},
"localFiles": {
"items": [
{
"uriFile": "LINEITEM",
"length": "1863237",
"parquet": {}
}
]
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"items": [
{
"uriFile": "LINEITEM",
"length": "1863237",
"parquet": {}
}
]
}
Loading
Loading