Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EvWrite codes unification with kqp #9698

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 6 additions & 30 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/data_events/common/error_codes.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
Expand Down Expand Up @@ -862,37 +863,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
void ShardError(const NKikimrDataEvents::TEvWriteResult& result) {
NYql::TIssues issues;
NYql::IssuesFromMessage(result.GetIssues(), issues);

switch (result.GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
YQL_ENSURE(false);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: {
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: {
return ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
return ReplyErrorAndDie(Ydb::StatusIds::OVERLOADED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: {
return ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: {
return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: {
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
}
auto statusConclusion = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(result.GetStatus());
AFL_ENSURE(statusConclusion.IsSuccess())("error", statusConclusion.GetErrorMessage());
if (result.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN) {
issues.AddIssue(NYql::YqlIssue({}, statusConclusion->GetIssueCode(), statusConclusion->GetIssueGeneralText()));
}
return ReplyErrorAndDie(statusConclusion->GetYdbStatusCode(), issues);
}

void PQTabletError(const NKikimrPQ::TEvProposeTransactionResult& result) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7953,13 +7953,13 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
}
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
if (Self->TablesManager.HasTable(userData->GetPathId())) {
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
auto counters = Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
Self->Counters.GetTabletCounters()->OnWriteCommitted(counters);
}
Self->UpdateInsertTableCounters();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class TPortionBuckets {
Y_UNUSED(LeftBucket->Actualize(currentInstant));
AddBucketToRating(LeftBucket);
for (auto&& i : Buckets) {
const i64 rating = i.second->GetWeight();
const i64 rating = i.second->GetLastWeight();
if (i.second->Actualize(currentInstant)) {
RemoveBucketFromRating(i.second, rating);
AddBucketToRating(i.second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*t
ActualSchema->PrepareForModification(batchConclusion.DetachResult(), WriteData.GetWriteMeta().GetModificationType());
if (preparedConclusion.IsFail()) {
ReplyError("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage(),
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage());
}
auto batch = preparedConclusion.DetachResult();
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/tx/data_events/common/error_codes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "error_codes.h"

namespace NKikimr::NEvWrite::NErrorCodes {

TConclusion<NErrorCodes::TOperator::TYdbStatusInfo> TOperator::GetStatusInfo(
const NKikimrDataEvents::TEvWriteResult::EStatus value) {
switch (value) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
return TConclusionStatus::Fail("Incorrect status for interpretation to YdbStatus");
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED:
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, "Request aborted");
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, "Disk space exhausted");
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR:
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, "Request aborted");
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED:
return TYdbStatusInfo(Ydb::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, "System overloaded");
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED:
return TYdbStatusInfo(Ydb::StatusIds::CANCELLED, NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, "Request cancelled");
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST:
return TYdbStatusInfo(Ydb::StatusIds::BAD_REQUEST, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect request");
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED:
return TYdbStatusInfo(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssuesIds::KIKIMR_SCHEMA_CHANGED, "Schema changed");
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.");
}
}
}

}
31 changes: 31 additions & 0 deletions ydb/core/tx/data_events/common/error_codes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>

namespace NKikimr::NEvWrite::NErrorCodes {

class TOperator {
public:
class TYdbStatusInfo {
private:
YDB_READONLY(Ydb::StatusIds::StatusCode, YdbStatusCode, Ydb::StatusIds::STATUS_CODE_UNSPECIFIED);
YDB_READONLY(NYql::TIssuesIds::EIssueCode, IssueCode, NYql::TIssuesIds::UNEXPECTED);
YDB_READONLY_DEF(TString, IssueGeneralText);

public:
TYdbStatusInfo(const Ydb::StatusIds::StatusCode code, const NYql::TIssuesIds::EIssueCode issueCode, const TString& issueMessage)
: YdbStatusCode(code)
, IssueCode(issueCode)
, IssueGeneralText(issueMessage) {
}
};

static TConclusion<TYdbStatusInfo> GetStatusInfo(const NKikimrDataEvents::TEvWriteResult::EStatus value);
};

} // namespace NKikimr::NEvWrite::NErrorCodes
3 changes: 3 additions & 0 deletions ydb/core/tx/data_events/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ LIBRARY()

PEERDIR(
ydb/core/protos
ydb/library/yql/core/issue/protos
ydb/public/api/protos
)

SRCS(
modification_type.cpp
error_codes.cpp
)

END()
6 changes: 4 additions & 2 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "shard_writer.h"
#include "common/error_codes.h"

#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tablet_pipecache.h>
Expand Down Expand Up @@ -86,8 +87,9 @@ namespace NKikimr::NEvWrite {

auto gPassAway = PassAwayGuard();
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
ExternalController->OnFail(Ydb::StatusIds::GENERIC_ERROR,
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
auto statusInfo = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(ydbStatus).DetachResult();
ExternalController->OnFail(statusInfo.GetYdbStatusCode(),
TStringBuilder() << "Cannot write data into shard(" << statusInfo.GetIssueGeneralText() << ") " << ShardId << " in longTx " <<
ExternalController->GetLongTxId().ToString());
return;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/core/issue/protos/issue_id.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ message TIssuesIds {
KIKIMR_UNSUPPORTED = 2030;
KIKIMR_BAD_COLUMN_TYPE = 2031;
KIKIMR_NO_COLUMN_DEFAULT_VALUE = 2032;
KIKIMR_DISK_SPACE_EXHAUSTED = 2033;
KIKIMR_SCHEMA_CHANGED = 2034;
KIKIMR_INTERNAL_ERROR = 2035;

// kikimr warnings
KIKIMR_READ_MODIFIED_TABLE = 2500;
Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/core/issue/yql_issue.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,18 @@ ids {
code: KIKIMR_NO_COLUMN_DEFAULT_VALUE
severity: S_ERROR
}
ids {
code: KIKIMR_DISK_SPACE_EXHAUSTED
severity: S_ERROR
}
ids {
code: KIKIMR_SCHEMA_CHANGED
severity: S_ERROR
}
ids {
code: KIKIMR_INTERNAL_ERROR
severity: S_ERROR
}
ids {
code: YQL_PRAGMA_WARNING_MSG
severity: S_WARNING
Expand Down
Loading