From f99f062eeffcb2d872a7f26edeffa74ef8ea39a0 Mon Sep 17 00:00:00 2001 From: Dmitry Uspenskiy <47734295+d-uspenskiy@users.noreply.github.com> Date: Wed, 18 Mar 2020 22:49:21 +0300 Subject: [PATCH] #3991 [YSQL] Handle transaction conflict status in case of parallel read operations Summary: For now `SELECT` command can generate multiple `PgDocReadOp` and execute them in single batch by calling `PgSession::RunAsync` method which takes list of operations. These read operations can conflicts with running transaction. Status for these operations will be `Status::TryAgain` with appropriate error code(s). It could be `YBPgErrorCode`, `TransactionErrorCode` or other. `PgSession` return single status as a result of `PgSession::RunAsync` method. Base of this status upper level will try to restart operations in case no data was sent to users. In case all failed operations has status with same code and same additional error codes single status is returned with same this code and additional error codes. Message of new status enumerates all error statuses with prefix `Multiple homogeneous errors`. Example: single `TryAgain` status constructed from 2 homogeneous statuses (status messages may be different): ``` Operation failed. Try again. (yb/yql/pggate/pg_session.cc:169): Multiple homogeneous errors: [ Operation failed. Try again. (yb/tablet/running_transaction.cc:385): Transaction aborted: b8f9ae55-3c84-4ed6-8354-fafbb8cb803f (pgsql error 25P02), Operation failed. Try again. (yb/tablet/transaction_participant.cc:263): Unknown transaction, could be recently aborted: b8f9ae55-3c84-4ed6-8354-fafbb8cb803f (pgsql error 25P02)] (pgsql error 25P02) ``` Test Plan: Run existed unit test `./yb_build.sh --java-test org.yb.pgsql.TestPgReadRestarts#selectCountPrepared` Reviewers: raju, mihnea, alex Reviewed By: alex Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D8144 --- src/yb/yql/pggate/pg_session.cc | 82 +++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index 5f5332f00dd2..e72b6a46b39a 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -14,6 +14,7 @@ //-------------------------------------------------------------------------------------------------- #include +#include #include "yb/yql/pggate/pg_expr.h" #include "yb/yql/pggate/pg_session.h" @@ -31,7 +32,6 @@ #include "yb/client/yb_op.h" #include "yb/common/pgsql_error.h" -#include "yb/common/ql_protocol_util.h" #include "yb/common/ql_value.h" #include "yb/common/row_mark.h" #include "yb/common/transaction_error.h" @@ -107,41 +107,46 @@ string GetStatusStringSet(const client::CollectedErrors& errors) { return RangeToString(status_strings.begin(), status_strings.end()); } +bool IsHomogeneousErrors(const client::CollectedErrors& errors) { + if (errors.size() < 2) { + return true; + } + auto i = errors.begin(); + const auto& status = (**i).status(); + const auto codes = status.ErrorCodesSlice(); + for (++i; i != errors.end(); ++i) { + const auto& s = (**i).status(); + if (s.code() != status.code() || codes != s.ErrorCodesSlice()) { + return false; + } + } + return true; +} + +boost::optional PsqlErrorCode(const Status& status) { + const uint8_t* err_data = status.ErrorData(PgsqlErrorTag::kCategory); + if (err_data) { + return PgsqlErrorTag::Decode(err_data); + } + return boost::none; +} + // Get a common Postgres error code from the status and all errors, and append it to a previous // result. // If any of those have different conflicting error codes, previous result is returned as-is. -Status AppendPsqlErrorCode(const Status& prev_result, - const Status& status, +Status AppendPsqlErrorCode(const Status& status, const client::CollectedErrors& errors) { - auto DecodeError = [](const Status& s) { - boost::optional err_code_option; - const uint8_t* err_data = s.ErrorData(PgsqlErrorTag::kCategory); - if (err_data) { - err_code_option = PgsqlErrorTag::Decode(err_data); - } - return err_code_option; - }; - - auto err_code_option = DecodeError(status); - - for (const auto& error : errors) { - auto err_code_option_2 = DecodeError(error->status()); - if (err_code_option_2) { - if (!err_code_option) { - // Replacing no code with a given error code. - err_code_option = err_code_option_2; - } else if (err_code_option != err_code_option_2) { - // We have multiple conflicting error codes, don't set any. - return prev_result; - } + boost::optional common_psql_error = boost::make_optional(false, YBPgErrorCode()); + for(const auto& error : errors) { + const auto psql_error = PsqlErrorCode(error->status()); + if (!common_psql_error) { + common_psql_error = psql_error; + } else if (psql_error && common_psql_error != psql_error) { + common_psql_error = boost::none; + break; } } - - if (err_code_option) { - return prev_result.CloneAndAddErrorCode(PgsqlError(*err_code_option)); - } else { - return prev_result; - } + return common_psql_error ? status.CloneAndAddErrorCode(PgsqlError(*common_psql_error)) : status; } // Given a set of errors from operations, this function attempts to combine them into one status @@ -154,8 +159,17 @@ Status CombineErrorsToStatus(client::CollectedErrors errors, Status status) { // TODO: move away from string comparison here and use a more specific status than IOError. // See https://github.com/YugaByte/yugabyte-db/issues/702 status.message() == client::internal::Batcher::kErrorReachingOutToTServersMsg && - errors.size() == 1) { - return errors.front()->status(); + IsHomogeneousErrors(errors)) { + const auto& result = errors.front()->status(); + if (errors.size() == 1) { + return result; + } + return Status(result.code(), + __FILE__, + __LINE__, + "Multiple homogeneous errors: " + GetStatusStringSet(errors), + result.ErrorCodesSlice(), + DupFileName::kFalse); } Status result = @@ -163,9 +177,7 @@ Status CombineErrorsToStatus(client::CollectedErrors errors, Status status) { ? STATUS(InternalError, GetStatusStringSet(errors)) : status.CloneAndAppend(". Errors from tablet servers: " + GetStatusStringSet(errors)); - result = AppendPsqlErrorCode(result, status, errors); - - return result; + return AppendPsqlErrorCode(result, errors); } docdb::PrimitiveValue NullValue(ColumnSchema::SortingType sorting) {