Skip to content

Commit

Permalink
Merge 53e039a into 7037279
Browse files Browse the repository at this point in the history
  • Loading branch information
mxkovalev authored Dec 11, 2024
2 parents 7037279 + 53e039a commit f7f4c32
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 2 deletions.
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/dq/actors/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
*ExecutionTimeoutCounter += 1;
Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED);
Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, true);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
})
Expand Down Expand Up @@ -279,7 +279,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
Send(ev->Sender, response.Release());
}

void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode)
void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool timeout = false)
{
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString();
if (Finished) {
Expand All @@ -292,6 +292,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
}
IssuesToMessage(Issues, result.MutableIssues());
result.SetStatusCode(statusCode);
result.SetTimeout(timeout);
Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result)));
Finished = true;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/dqs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message TQueryResponse {
uint64 RowsCount = 8;
NYql.NDqProto.StatusIds.StatusCode StatusCode = 9;
repeated NDqProto.TData Sample = 10;
bool Timeout = 11;
}

message TDqFailure {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message ExecuteGraphResponse {
Ydb.Operations.Operation Operation = 1;
repeated ResponseMetric Metric = 2;
bool Truncated = 3;
bool Timeout = 4;
}

message SvnRevisionRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
state->Statistics[state->MetricId++] = res.Statistics;

if (res.Fallback) {
if (res.Timeout) {
NotifyDqTimeout(state);
}
if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) {
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
issues.AddIssues(res.Issues());
Expand Down Expand Up @@ -1496,6 +1499,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
state->Metrics->IncCounter("dq", "Fallback");
}
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
if (res.Timeout) {
NotifyDqTimeout(state);
}
// never fallback will be captured in yql_facade
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
issues.AddIssues(res.Issues());
Expand Down Expand Up @@ -1998,6 +2004,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
state->Metrics->IncCounter("dq", "Fallback");
}
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
if (res.Timeout) {
NotifyDqTimeout(state);
}
}

CompleteNode(execState, node, [resIssues = res.Issues(), fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus {
Expand Down Expand Up @@ -2086,6 +2095,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
return status;
}

static void NotifyDqTimeout(const TDqStatePtr& state) {
for (const auto& provider : state->TypeCtx->DataSources) {
if (auto dqIntegration = provider->GetDqIntegration()) {
dqIntegration->NotifyDqTimeout();
}
}
if (state->Metrics) {
state->Metrics->IncCounter("dq", "Timeout");
}
}

private:
TDqStatePtr State;
ISkiffConverterPtr SkiffConverter;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>

bool error = false;
bool fallback = false;
result.Timeout = resp.GetTimeout();

if (status.Ok()) {
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::Ok";
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class IDqGateway : public TThrRefBase {
bool Retriable = false;
bool Truncated = false;
ui64 RowsCount = 0;
bool Timeout = false;

TOperationStatistics Statistics;

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/service/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ namespace NYql::NDqs {
operation.Mutableresult()->PackFrom(queryResult);
*operation.Mutableissues() = result.GetIssues();
ResponseBuffer.SetTruncated(result.GetTruncated());
ResponseBuffer.SetTimeout(result.GetTimeout());

Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0);
}
Expand Down

0 comments on commit f7f4c32

Please sign in to comment.