Skip to content

Commit

Permalink
EvWrite codes unification with kqp (ydb-platform#9698)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent b2a7f87 commit dea369d
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 37 deletions.
36 changes: 6 additions & 30 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/data_events/common/error_codes.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
Expand Down Expand Up @@ -840,37 +841,12 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
void ShardError(const NKikimrDataEvents::TEvWriteResult& result) {
NYql::TIssues issues;
NYql::IssuesFromMessage(result.GetIssues(), issues);

switch (result.GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
YQL_ENSURE(false);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: {
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: {
return ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
return ReplyErrorAndDie(Ydb::StatusIds::OVERLOADED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: {
return ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: {
return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: {
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
}
auto statusConclusion = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(result.GetStatus());
AFL_ENSURE(statusConclusion.IsSuccess())("error", statusConclusion.GetErrorMessage());
if (result.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN) {
issues.AddIssue(NYql::YqlIssue({}, statusConclusion->GetIssueCode(), statusConclusion->GetIssueGeneralText()));
}
return ReplyErrorAndDie(statusConclusion->GetYdbStatusCode(), issues);
}

void PQTabletError(const NKikimrPQ::TEvProposeTransactionResult& result) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7659,13 +7659,13 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
}
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
if (Self->TablesManager.HasTable(userData->GetPathId())) {
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
auto counters = Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
Self->Counters.GetTabletCounters()->OnWriteCommitted(counters);
}
Self->UpdateInsertTableCounters();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class TPortionBuckets {
Y_UNUSED(LeftBucket->Actualize(currentInstant));
AddBucketToRating(LeftBucket);
for (auto&& i : Buckets) {
const i64 rating = i.second->GetWeight();
const i64 rating = i.second->GetLastWeight();
if (i.second->Actualize(currentInstant)) {
RemoveBucketFromRating(i.second, rating);
AddBucketToRating(i.second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*t
ActualSchema->PrepareForModification(batchConclusion.DetachResult(), WriteData.GetWriteMeta().GetModificationType());
if (preparedConclusion.IsFail()) {
ReplyError("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage(),
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage());
}
auto batch = preparedConclusion.DetachResult();
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/tx/data_events/common/error_codes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "error_codes.h"

namespace NKikimr::NEvWrite::NErrorCodes {

TConclusion<NErrorCodes::TOperator::TYdbStatusInfo> TOperator::GetStatusInfo(
const NKikimrDataEvents::TEvWriteResult::EStatus value) {
switch (value) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
return TConclusionStatus::Fail("Incorrect status for interpretation to YdbStatus");
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED:
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, "Request aborted");
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, "Disk space exhausted");
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR:
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, "Request aborted");
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED:
return TYdbStatusInfo(Ydb::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, "System overloaded");
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED:
return TYdbStatusInfo(Ydb::StatusIds::CANCELLED, NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, "Request cancelled");
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST:
return TYdbStatusInfo(Ydb::StatusIds::BAD_REQUEST, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect request");
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED:
return TYdbStatusInfo(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssuesIds::KIKIMR_SCHEMA_CHANGED, "Schema changed");
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.");
}
}
}

}
31 changes: 31 additions & 0 deletions ydb/core/tx/data_events/common/error_codes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>

namespace NKikimr::NEvWrite::NErrorCodes {

class TOperator {
public:
class TYdbStatusInfo {
private:
YDB_READONLY(Ydb::StatusIds::StatusCode, YdbStatusCode, Ydb::StatusIds::STATUS_CODE_UNSPECIFIED);
YDB_READONLY(NYql::TIssuesIds::EIssueCode, IssueCode, NYql::TIssuesIds::UNEXPECTED);
YDB_READONLY_DEF(TString, IssueGeneralText);

public:
TYdbStatusInfo(const Ydb::StatusIds::StatusCode code, const NYql::TIssuesIds::EIssueCode issueCode, const TString& issueMessage)
: YdbStatusCode(code)
, IssueCode(issueCode)
, IssueGeneralText(issueMessage) {
}
};

static TConclusion<TYdbStatusInfo> GetStatusInfo(const NKikimrDataEvents::TEvWriteResult::EStatus value);
};

} // namespace NKikimr::NEvWrite::NErrorCodes
3 changes: 3 additions & 0 deletions ydb/core/tx/data_events/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ LIBRARY()

PEERDIR(
ydb/core/protos
ydb/library/yql/core/issue/protos
ydb/public/api/protos
)

SRCS(
modification_type.cpp
error_codes.cpp
)

END()
6 changes: 4 additions & 2 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "shard_writer.h"
#include "common/error_codes.h"

#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tablet_pipecache.h>
Expand Down Expand Up @@ -86,8 +87,9 @@ namespace NKikimr::NEvWrite {

auto gPassAway = PassAwayGuard();
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
ExternalController->OnFail(Ydb::StatusIds::GENERIC_ERROR,
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
auto statusInfo = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(ydbStatus).DetachResult();
ExternalController->OnFail(statusInfo.GetYdbStatusCode(),
TStringBuilder() << "Cannot write data into shard(" << statusInfo.GetIssueGeneralText() << ") " << ShardId << " in longTx " <<
ExternalController->GetLongTxId().ToString());
return;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/core/issue/protos/issue_id.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ message TIssuesIds {
KIKIMR_UNSUPPORTED = 2030;
KIKIMR_BAD_COLUMN_TYPE = 2031;
KIKIMR_NO_COLUMN_DEFAULT_VALUE = 2032;
KIKIMR_DISK_SPACE_EXHAUSTED = 2033;
KIKIMR_SCHEMA_CHANGED = 2034;
KIKIMR_INTERNAL_ERROR = 2035;

// kikimr warnings
KIKIMR_READ_MODIFIED_TABLE = 2500;
Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/core/issue/yql_issue.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,18 @@ ids {
code: KIKIMR_NO_COLUMN_DEFAULT_VALUE
severity: S_ERROR
}
ids {
code: KIKIMR_DISK_SPACE_EXHAUSTED
severity: S_ERROR
}
ids {
code: KIKIMR_SCHEMA_CHANGED
severity: S_ERROR
}
ids {
code: KIKIMR_INTERNAL_ERROR
severity: S_ERROR
}
ids {
code: YQL_PRAGMA_WARNING_MSG
severity: S_WARNING
Expand Down

0 comments on commit dea369d

Please sign in to comment.