Skip to content

Commit

Permalink
[Enhancement] Stream load/routine support load channel profile (StarR…
Browse files Browse the repository at this point in the history
…ocks#55490)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Feb 12, 2025
1 parent bd0d111 commit ca8ee31
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 22 deletions.
24 changes: 21 additions & 3 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,32 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
_colocate_mv_index = table_sink.enable_colocate_mv_index && config::enable_load_colocate_mv;
}

// Query context is only available for pipeline engine
auto query_ctx = state->query_ctx();
if (query_ctx) {
if (state->query_ctx()) {
// Query context is only available for pipeline engine (insert/broker load)
auto query_ctx = state->query_ctx();
_load_channel_profile_config.set_enable_profile(query_ctx->get_enable_profile_flag());
_load_channel_profile_config.set_big_query_profile_threshold_ns(
query_ctx->get_big_query_profile_threshold_ns());
_load_channel_profile_config.set_runtime_profile_report_interval_ns(
query_ctx->get_runtime_profile_report_interval_ns());
} else {
// For non-pipeline engine (stream load/routine load), get the profile config from query options
auto& query_options = state->query_options();
bool enable_profile = query_options.__isset.enable_profile && query_options.enable_profile;
int64_t load_profile_collect_second =
query_options.__isset.load_profile_collect_second ? query_options.load_profile_collect_second : -1;
// when enable_profile and load_profile_collect_second are set, use big query threshold to control the profile
if (enable_profile && load_profile_collect_second > 0) {
_load_channel_profile_config.set_enable_profile(false);
_load_channel_profile_config.set_big_query_profile_threshold_ns(load_profile_collect_second * 1e9);
} else if (enable_profile) {
_load_channel_profile_config.set_enable_profile(true);
_load_channel_profile_config.set_big_query_profile_threshold_ns(-1);
} else {
_load_channel_profile_config.set_enable_profile(false);
_load_channel_profile_config.set_big_query_profile_threshold_ns(-1);
}
_load_channel_profile_config.set_runtime_profile_report_interval_ns(std::numeric_limits<int64_t>::max());
}
return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class OlapTableSink : public AsyncDataSink {

TabletSinkProfile* ts_profile() const { return _ts_profile; }

const PLoadChannelProfileConfig& load_channel_profile_config() const { return _load_channel_profile_config; }

private:
void _prepare_profile(RuntimeState* state);

Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ class FragmentExecState {
std::shared_ptr<RuntimeState> runtime_state() { return _runtime_state; }

private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);
void coordinator_callback(const Status& status, RuntimeProfile* profile, RuntimeProfile* load_channel_profile,
bool done);

std::shared_ptr<RuntimeState> _runtime_state = nullptr;

Expand Down Expand Up @@ -169,8 +170,9 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, const TUniqueId&
_backend_num(backend_num),
_exec_env(exec_env),
_coord_addr(coord_addr),
_executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)) {
_executor(exec_env,
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_3)) {
_start_time = DateTimeValue::local_time();
}

Expand Down Expand Up @@ -230,7 +232,8 @@ std::string FragmentExecState::to_http_path(const std::string& file_name) {
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, bool done) {
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile,
RuntimeProfile* load_channel_profile, bool done) {
DCHECK(status.ok() || done); // if !status.ok() => done
Status exec_status = update_status(status);

Expand All @@ -256,6 +259,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
profile->to_thrift(&params.profile);
params.__isset.profile = true;

load_channel_profile->to_thrift(&params.load_channel_profile);
params.__isset.load_channel_profile = true;

if (!runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : runtime_state->output_files()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void PlanFragmentExecutor::send_report(bool done) {
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
_report_status_cb(status, profile(), done || !status.ok());
_report_status_cb(status, profile(), _runtime_state->load_channel_profile(), done || !status.ok());
}

Status PlanFragmentExecutor::_get_next_internal_vectorized(ChunkPtr* chunk) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class PlanFragmentExecutor {
// Note: this does not take a const RuntimeProfile&, because it might need to call
// functions like PrettyPrint() or to_thrift(), neither of which is const
// because they take locks.
typedef std::function<void(const Status& status, RuntimeProfile* profile, bool done)> report_status_callback;
typedef std::function<void(const Status& status, RuntimeProfile* profile, RuntimeProfile* load_channel_profile,
bool done)>
report_status_callback;

// if report_status_cb is not empty, is used to report the accumulated profile
// information periodically during execution open().
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ set(EXEC_FILES
./exec/parquet_scanner_test.cpp
./exec/repeat_node_test.cpp
./exec/sorting_test.cpp
./exec/tablet_sink_index_channel_test.cpp
./exec/table_function_node_test.cpp
./exec/olap_scan_prepare_test.cpp
./exprs/agg/json_each_test.cpp
Expand Down
215 changes: 215 additions & 0 deletions be/test/exec/tablet_sink_index_channel_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/tablet_sink_index_channel.h"

#include <gtest/gtest.h>

#include "exec/tablet_info.h"
#include "exec/tablet_sink.h"
#include "runtime/descriptor_helper.h"
#include "storage/chunk_helper.h"
#include "testutil/assert.h"
#include "util/thrift_util.h"

namespace starrocks {

class TabletSinkIndexChannelTest : public testing::Test {
public:
void SetUp() override {
_db_id = 1;
_table_id = 2;
_txn_id = 3;
_exec_env = ExecEnv::GetInstance();
_object_pool = std::make_unique<ObjectPool>();
_desc_tbl = _build_descriptor_table();
_data_sink = _build_data_sink();
}

void test_load_channel_profile_base(RuntimeState* runtime_state, const PLoadChannelProfileConfig& expect_config);

protected:
std::unique_ptr<RuntimeState> _build_runtime_state(TQueryOptions& query_options) {
TUniqueId fragment_id;
TQueryGlobals query_globals;
return std::make_unique<RuntimeState>(fragment_id, query_options, query_globals, _exec_env);
}

TDescriptorTable _build_descriptor_table() {
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build());
tuple_builder.add_slot(TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build());
tuple_builder.build(&dtb);
return dtb.desc_tbl();
}

TDataSink _build_data_sink() {
TOlapTableSink table_sink;
table_sink.load_id.hi = 0;
table_sink.load_id.lo = 0;
table_sink.db_id = _db_id;
table_sink.db_name = "test";
table_sink.table_id = _table_id;
table_sink.table_name = "test";
table_sink.txn_id = _txn_id;
table_sink.num_replicas = 1;
table_sink.keys_type = TKeysType::DUP_KEYS;
table_sink.tuple_id = _desc_tbl.tupleDescriptors[0].id;

TOlapTableSchemaParam& schema = table_sink.schema;
schema.db_id = _db_id;
schema.table_id = _table_id;
schema.version = 0;
schema.tuple_desc = _desc_tbl.tupleDescriptors[0];
schema.slot_descs = _desc_tbl.slotDescriptors;
schema.indexes.resize(1);
schema.indexes[0].id = 0;
schema.indexes[0].columns = {"c1", "c2"};

TOlapTablePartitionParam& partition = table_sink.partition;
partition.db_id = _db_id;
partition.table_id = _table_id;
partition.version = 0;
partition.distributed_columns.push_back("c1");
partition.partitions.resize(1);
partition.partitions[0].id = 0;
partition.partitions[0].num_buckets = 1;
partition.partitions[0].indexes.resize(1);
partition.partitions[0].indexes[0].index_id = 0;
partition.partitions[0].indexes[0].tablets.push_back(0);

TOlapTableLocationParam& location = table_sink.location;
location.db_id = _db_id;
location.table_id = _table_id;
location.version = 0;
location.tablets.resize(1);
location.tablets[0].tablet_id = 0;
location.tablets[0].node_ids.push_back(0);

TNodesInfo& nodes_info = table_sink.nodes_info;
nodes_info.version = 0;
nodes_info.nodes.resize(1);
nodes_info.nodes[0].id = 0;
nodes_info.nodes[0].option = 0;
nodes_info.nodes[0].host = "10.128.8.78";
nodes_info.nodes[0].async_internal_port = 8060;

TDataSink data_sink;
data_sink.__set_olap_table_sink(table_sink);
return data_sink;
}

int64_t _db_id;
int64_t _table_id;
int64_t _txn_id;

ExecEnv* _exec_env;
std::unique_ptr<ObjectPool> _object_pool;
TDescriptorTable _desc_tbl;
TDataSink _data_sink;
};

void TabletSinkIndexChannelTest::test_load_channel_profile_base(RuntimeState* runtime_state,
const PLoadChannelProfileConfig& expect_config) {
DescriptorTbl* desc_tbl = nullptr;
ASSERT_OK(
DescriptorTbl::create(runtime_state, _object_pool.get(), _desc_tbl, &desc_tbl, config::vector_chunk_size));
runtime_state->set_desc_tbl(desc_tbl);
auto sink = std::make_unique<OlapTableSink>(_object_pool.get(), std::vector<TExpr>(), nullptr, runtime_state);
ASSERT_OK(sink->init(_data_sink, runtime_state));
ASSERT_OK(sink->prepare(runtime_state));
auto actual_config = sink->load_channel_profile_config();
ASSERT_EQ(expect_config.has_enable_profile(), actual_config.has_enable_profile());
if (expect_config.has_enable_profile()) {
ASSERT_EQ(expect_config.enable_profile(), actual_config.enable_profile());
}
ASSERT_EQ(expect_config.has_big_query_profile_threshold_ns(), actual_config.has_big_query_profile_threshold_ns());
if (expect_config.has_big_query_profile_threshold_ns()) {
ASSERT_EQ(expect_config.big_query_profile_threshold_ns(), actual_config.big_query_profile_threshold_ns());
}
ASSERT_EQ(expect_config.has_runtime_profile_report_interval_ns(),
actual_config.has_runtime_profile_report_interval_ns());
if (expect_config.has_runtime_profile_report_interval_ns()) {
ASSERT_EQ(expect_config.runtime_profile_report_interval_ns(),
actual_config.runtime_profile_report_interval_ns());
}
}

TEST_F(TabletSinkIndexChannelTest, non_pipeline_load_channel_profile) {
{
// not set enable_profile and load_profile_collect_second
TQueryOptions query_options;
auto runtime_state = _build_runtime_state(query_options);
PLoadChannelProfileConfig expect_config;
expect_config.set_enable_profile(false);
expect_config.set_big_query_profile_threshold_ns(-1);
expect_config.set_runtime_profile_report_interval_ns(std::numeric_limits<int64_t>::max());
test_load_channel_profile_base(runtime_state.get(), expect_config);
}

{
// only set load_profile_collect_second
TQueryOptions query_options;
query_options.__set_load_profile_collect_second(10);
auto runtime_state = _build_runtime_state(query_options);
PLoadChannelProfileConfig expect_config;
expect_config.set_enable_profile(false);
expect_config.set_big_query_profile_threshold_ns(-1);
expect_config.set_runtime_profile_report_interval_ns(std::numeric_limits<int64_t>::max());
test_load_channel_profile_base(runtime_state.get(), expect_config);
}

{
// only set enable_profile
TQueryOptions query_options;
query_options.__set_enable_profile(true);
auto runtime_state = _build_runtime_state(query_options);
PLoadChannelProfileConfig expect_config;
expect_config.set_enable_profile(true);
expect_config.set_big_query_profile_threshold_ns(-1);
expect_config.set_runtime_profile_report_interval_ns(std::numeric_limits<int64_t>::max());
test_load_channel_profile_base(runtime_state.get(), expect_config);
}

{
// set both enable_profile and load_profile_collect_second
TQueryOptions query_options;
query_options.__set_enable_profile(true);
query_options.__set_load_profile_collect_second(10);
auto runtime_state = _build_runtime_state(query_options);
PLoadChannelProfileConfig expect_config;
expect_config.set_enable_profile(false);
expect_config.set_big_query_profile_threshold_ns(10 * 1e9);
expect_config.set_runtime_profile_report_interval_ns(std::numeric_limits<int64_t>::max());
test_load_channel_profile_base(runtime_state.get(), expect_config);
}
}

TEST_F(TabletSinkIndexChannelTest, pipeline_load_channel_profile) {
TQueryOptions query_options;
pipeline::QueryContext query_ctx;
query_ctx.set_enable_profile();
query_ctx.set_big_query_profile_threshold(10, TTimeUnit::SECOND);
query_ctx.set_runtime_profile_report_interval(5);
auto runtime_state = _build_runtime_state(query_options);
runtime_state->set_query_ctx(&query_ctx);
PLoadChannelProfileConfig expect_config;
expect_config.set_enable_profile(true);
expect_config.set_big_query_profile_threshold_ns(10 * 1e9);
expect_config.set_runtime_profile_report_interval_ns(5 * 1e9);
test_load_channel_profile_base(runtime_state.get(), expect_config);
}

} // namespace starrocks
Original file line number Diff line number Diff line change
Expand Up @@ -1105,10 +1105,8 @@ public void collectProfile(boolean isAborted) {
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(true));
} else {
profile.addChild(coord.getQueryProfile());
}
profile.addChild(coord.buildQueryProfile(true));
}

ProfileManager.getInstance().pushProfile(null, profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public void initFragmentProfiles(int numFragments) {
}
}

public List<RuntimeProfile> getFragmentProfiles() {
return fragmentProfiles;
}

public List<String> getDeltaUrls() {
return deltaUrls;
}
Expand Down Expand Up @@ -405,9 +409,12 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}

public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
if (!needMerge) {
return queryProfile;
}
if (!jobSpec.isEnablePipeline()) {
return mergeNonPipelineProfile();
}

RuntimeProfile newQueryProfile = new RuntimeProfile(queryProfile.getName());
long start = System.nanoTime();
Expand Down Expand Up @@ -616,6 +623,21 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) {
return newQueryProfile;
}

RuntimeProfile mergeNonPipelineProfile() {
if (loadChannelProfile.isEmpty()) {
return queryProfile;
}
RuntimeProfile newQueryProfile = new RuntimeProfile(queryProfile.getName());
newQueryProfile.copyAllInfoStringsFrom(queryProfile, null);
newQueryProfile.copyAllCountersFrom(queryProfile);
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
newQueryProfile.addChild(fragmentProfile);
}
Optional<RuntimeProfile> mergedLoadChannelProfile = mergeLoadChannelProfile();
mergedLoadChannelProfile.ifPresent(newQueryProfile::addChild);
return newQueryProfile;
}

Optional<RuntimeProfile> mergeLoadChannelProfile() {
if (loadChannelProfile.isEmpty()) {
return Optional.empty();
Expand Down
Loading

0 comments on commit ca8ee31

Please sign in to comment.