diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 42ac4412f195..bca17e24fcff 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -93,7 +93,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED); + Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) @@ -279,7 +279,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { Send(ev->Sender, response.Release()); } - void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode) + void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool timeout = false) { YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast(statusCode) << " issues=" << Issues.ToString(); if (Finished) { @@ -292,6 +292,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { } IssuesToMessage(Issues, result.MutableIssues()); result.SetStatusCode(statusCode); + result.SetTimeout(timeout); Send(ControlId, MakeHolder(std::move(result))); Finished = true; } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 41eaeedf109d..b488e07f65c1 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -184,6 +184,7 @@ message TQueryResponse { uint64 RowsCount = 8; NYql.NDqProto.StatusIds.StatusCode StatusCode = 9; repeated NDqProto.TData Sample = 10; + bool Timeout = 11; } message TDqFailure { diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index 1ed427ddb116..2ae1c3786d57 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -98,6 +98,7 @@ message ExecuteGraphResponse { Ydb.Operations.Operation Operation = 1; repeated ResponseMetric Metric = 2; bool Truncated = 3; + bool Timeout = 4; } message SvnRevisionRequest { diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index f1daa3b7604a..1f2f224fbee2 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1062,6 +1062,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters state->Statistics[state->MetricId++] = res.Statistics; if (res.Fallback) { + if (res.Timeout) { + NotifyDqTimeout(state); + } if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) { auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; issues.AddIssues(res.Issues()); @@ -1496,6 +1499,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters state->Metrics->IncCounter("dq", "Fallback"); } state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); + if (res.Timeout) { + NotifyDqTimeout(state); + } // never fallback will be captured in yql_facade auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; issues.AddIssues(res.Issues()); @@ -1998,6 +2004,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters state->Metrics->IncCounter("dq", "Fallback"); } state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); + if (res.Timeout) { + NotifyDqTimeout(state); + } } CompleteNode(execState, node, [resIssues = res.Issues(), fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus { @@ -2086,6 +2095,14 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters return status; } + static void NotifyDqTimeout(const TDqStatePtr& state) { + auto integrations = GetUniqueIntegrations(*state->TypeCtx); + std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::NotifyDqTimeout, std::placeholders::_1)); + if (state->Metrics) { + state->Metrics->IncCounter("dq", "Timeout"); + } + } + private: TDqStatePtr State; ISkiffConverterPtr SkiffConverter; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 4d6fc4804a8d..b8710aac78ea 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -176,6 +176,7 @@ class TDqGatewaySession: public std::enable_shared_from_this bool error = false; bool fallback = false; + result.Timeout = resp.GetTimeout(); if (status.Ok()) { YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::Ok"; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index cab6c3fdc6df..5b7c27cfe1ff 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -78,6 +78,7 @@ class IDqGateway : public TThrRefBase { bool Retriable = false; bool Truncated = false; ui64 RowsCount = 0; + bool Timeout = false; TOperationStatistics Statistics; diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 14c9a29e3c2f..3d0da9ce955e 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -178,6 +178,7 @@ namespace NYql::NDqs { operation.Mutableresult()->PackFrom(queryResult); *operation.Mutableissues() = result.GetIssues(); ResponseBuffer.SetTruncated(result.GetTruncated()); + ResponseBuffer.SetTimeout(result.GetTimeout()); Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0); }