Skip to content

Commit

Permalink
[native] Add list all task endpoint to server operation
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 4, 2023
1 parent ed5727c commit 92a4592
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 27 deletions.
61 changes: 51 additions & 10 deletions presto-native-execution/presto_cpp/main/PrestoServerOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ void PrestoServerOperations::runOperation(
case ServerOperation::Target::kVeloxQueryConfig:
http::sendOkResponse(
downstream, veloxQueryConfigOperation(op, message));
case ServerOperation::Target::kDebug:
http::sendOkResponse(downstream, debugOperation(op, message));
break;
case ServerOperation::Target::kTask:
http::sendOkResponse(downstream, taskOperation(op, message));
break;
case ServerOperation::Target::kServer:
http::sendOkResponse(downstream, serverOperation(op, message));
break;
}
} catch (const velox::VeloxUserError& ex) {
Expand Down Expand Up @@ -185,22 +189,59 @@ std::string PrestoServerOperations::veloxQueryConfigOperation(
return unsupportedAction(op);
}

std::string PrestoServerOperations::debugOperation(
std::string PrestoServerOperations::taskOperation(
const ServerOperation& op,
proxygen::HTTPMessage* message) {
if (taskManager_ == nullptr) {
return "Task Manager not found";
}
const auto taskMap = taskManager_->tasks();
switch (op.action) {
case ServerOperation::Action::kTask: {
case ServerOperation::Action::kGetDetail: {
const auto id = message->getQueryParam("id");
if (!taskManager_) {
return "Task Manager not found";
}
const auto& map = taskManager_->tasks();
const auto& task = map.find(id);
if (task == map.end()) {
const auto& task = taskMap.find(id);
if (task == taskMap.end()) {
return fmt::format("No task found with id {}", id);
}
return task->second->toJsonString();
}
case ServerOperation::Action::kListAll: {
uint32_t limit;
try {
const auto& limitStr = message->getQueryParam("limit");
limit = limitStr == proxygen::empty_string
? std::numeric_limits<uint32_t>::max()
: stoi(limitStr);
} catch (std::exception& ex) {
VELOX_USER_FAIL(ex.what());
}
std::stringstream oss;
oss << "[";
uint32_t count = 0;
for (const auto& task : taskMap) {
const auto& veloxTask = task.second->task;
if (++count > limit) {
oss << "... " << (taskMap.size() - limit) << " more tasks ...\n";
break;
}
oss << task.first << "("
<< (veloxTask == nullptr ? "null"
: taskStateString(veloxTask->state()))
<< "),\n";
}
oss << "]";
return oss.str();
}
default:
break;
}
return unsupportedAction(op);
}

std::string PrestoServerOperations::serverOperation(
const ServerOperation& op,
proxygen::HTTPMessage* /* unused */) {
switch (op.action) {
case ServerOperation::Action::kTrace: {
return velox::process::TraceContext::statusLine();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ class PrestoServerOperations {
const ServerOperation& op,
proxygen::HTTPMessage* message);

std::string debugOperation(
std::string taskOperation(
const ServerOperation& op,
proxygen::HTTPMessage* message);

std::string serverOperation(
const ServerOperation& op,
proxygen::HTTPMessage* message);

Expand Down
20 changes: 10 additions & 10 deletions presto-native-execution/presto_cpp/main/ServerOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,35 @@ const folly::F14FastMap<std::string, ServerOperation::Action>
{"getCacheStats", ServerOperation::Action::kGetCacheStats},
{"setProperty", ServerOperation::Action::kSetProperty},
{"getProperty", ServerOperation::Action::kGetProperty},
{"task", ServerOperation::Action::kTask},
{"trace", ServerOperation::Action::kTrace},
};
{"getDetail", ServerOperation::Action::kGetDetail},
{"listAll", ServerOperation::Action::kListAll},
{"trace", ServerOperation::Action::kTrace}};

const folly::F14FastMap<ServerOperation::Action, std::string>
ServerOperation::kReverseActionLookup{
{ServerOperation::Action::kClearCache, "clearCache"},
{ServerOperation::Action::kGetCacheStats, "getCacheStats"},
{ServerOperation::Action::kSetProperty, "setProperty"},
{ServerOperation::Action::kGetProperty, "getProperty"},
{ServerOperation::Action::kTask, "task"},
{ServerOperation::Action::kTrace, "trace"},
};
{ServerOperation::Action::kGetDetail, "getDetail"},
{ServerOperation::Action::kListAll, "listAll"},
{ServerOperation::Action::kTrace, "trace"}};

const folly::F14FastMap<std::string, ServerOperation::Target>
ServerOperation::kTargetLookup{
{"connector", ServerOperation::Target::kConnector},
{"systemConfig", ServerOperation::Target::kSystemConfig},
{"veloxQueryConfig", ServerOperation::Target::kVeloxQueryConfig},
{"debug", ServerOperation::Target::kDebug},
};
{"task", ServerOperation::Target::kTask},
{"server", ServerOperation::Target::kServer}};

const folly::F14FastMap<ServerOperation::Target, std::string>
ServerOperation::kReverseTargetLookup{
{ServerOperation::Target::kConnector, "connector"},
{ServerOperation::Target::kSystemConfig, "systemConfig"},
{ServerOperation::Target::kVeloxQueryConfig, "veloxQueryConfig"},
{ServerOperation::Target::kDebug, "debug"},
};
{ServerOperation::Target::kTask, "task"},
{ServerOperation::Target::kServer, "server"}};

ServerOperation::Target ServerOperation::targetFromString(
const std::string& str) {
Expand Down
9 changes: 6 additions & 3 deletions presto-native-execution/presto_cpp/main/ServerOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

namespace facebook::presto {

/// Defines a server operation.
/// Defines a server operation. A server operation is accessed through http
/// endpoint /v1/operation/<target>/<action>?param1=value1&param2=value2...
struct ServerOperation {
/// The target this operation is operating upon
enum class Target {
kConnector,
kSystemConfig,
kVeloxQueryConfig,
kDebug,
kTask,
kServer,
};

/// The action this operation is trying to take
Expand All @@ -34,7 +36,8 @@ struct ServerOperation {
kGetCacheStats,
kSetProperty,
kGetProperty,
kTask,
kGetDetail,
kListAll,
kTrace,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ TEST_F(ServerOperationTest, buildServerOp) {
EXPECT_EQ(ServerOperation::Target::kSystemConfig, op.target);
EXPECT_EQ(ServerOperation::Action::kSetProperty, op.action);

op = buildServerOpFromHttpMsgPath("/v1/operation/debug/task");
EXPECT_EQ(ServerOperation::Target::kDebug, op.target);
EXPECT_EQ(ServerOperation::Action::kTask, op.action);
op = buildServerOpFromHttpMsgPath("/v1/operation/task/getDetail");
EXPECT_EQ(ServerOperation::Target::kTask, op.target);
EXPECT_EQ(ServerOperation::Action::kGetDetail, op.action);

op = buildServerOpFromHttpMsgPath("/v1/operation/task/listAll");
EXPECT_EQ(ServerOperation::Target::kTask, op.target);
EXPECT_EQ(ServerOperation::Action::kListAll, op.action);

op = buildServerOpFromHttpMsgPath("/v1/operation/server/trace");
EXPECT_EQ(ServerOperation::Target::kServer, op.target);
EXPECT_EQ(ServerOperation::Action::kTrace, op.action);

EXPECT_THROW(
op = buildServerOpFromHttpMsgPath("/v1/operation/whatzit/setProperty"),
Expand Down

0 comments on commit 92a4592

Please sign in to comment.