diff --git a/src/yb/yql/pgwrapper/libpq_test_base.cc b/src/yb/yql/pgwrapper/libpq_test_base.cc index e65373e24bfd..07c38954d99d 100644 --- a/src/yb/yql/pgwrapper/libpq_test_base.cc +++ b/src/yb/yql/pgwrapper/libpq_test_base.cc @@ -15,6 +15,7 @@ #include "yb/common/common.pb.h" #include "yb/common/pgsql_error.h" +#include "yb/util/backoff_waiter.h" #include "yb/util/monotime.h" #include "yb/util/size_literals.h" #include "yb/yql/pgwrapper/libpq_utils.h" @@ -90,22 +91,44 @@ Result GetDatabaseOid(PGConn* conn, const std::string& db_name) { Format("SELECT oid FROM pg_database WHERE datname = '$0'", db_name)); } -Status LibPqTestBase::BumpCatalogVersion(int num_versions, PGConn* conn, +// Bump catalog version num_bumps times using conn. After each bump, wait for the new catalog +// version to propagate to conn in order to avoid catalog version mismatch errors. +// Prerequisites: +// - conn should not be in the middle of a transaction +// - there should be no other concurrent catalog version bumps +Status LibPqTestBase::BumpCatalogVersion(int num_bumps, PGConn* conn, const std::string& alter_value) { - LOG(INFO) << "Do " << num_versions << " breaking catalog version bumps"; + const auto query = "SELECT catalog_version FROM pg_stat_activity WHERE pid = pg_backend_pid()"; + auto initial_catalog_version = VERIFY_RESULT(conn->FetchRow(query)); + LOG(INFO) << "Do " << num_bumps << " breaking catalog version bumps starting at " + << initial_catalog_version; if (alter_value.empty()) { - for (int i = 0; i < num_versions; ++i) { + for (int i = 1; i <= num_bumps; ++i) { RETURN_NOT_OK(IncrementAllDBCatalogVersions( *conn, IsBreakingCatalogVersionChange::kTrue /* is_breaking */)); - // Here we call increment_all_db_catalog_versions to bump the catalog version. - // Add a sleep for the new catalog version to propagate to avoid catalog version - // mismatch error when we have back-to-back calls to increment_all_db_catalog_versions. - SleepFor(2s); + auto target_catalog_version = initial_catalog_version + i; + RETURN_NOT_OK(LoggedWaitFor( + [conn, target_catalog_version, &query]() -> Result { + auto current_catalog_version = VERIFY_RESULT(conn->FetchRow(query)); + if (current_catalog_version == target_catalog_version) { + return true; + } + if (current_catalog_version < target_catalog_version) { + return false; + } + return STATUS_FORMAT( + IllegalState, + "unexpected catalog version $0 > target $1:" + " does the test do concurrent DDLs without synchronization?", + current_catalog_version, target_catalog_version); + }, + 10s, + Format("wait for catalog version $0 to propagate", target_catalog_version))); } return Status::OK(); } - // Some tests cannot tolerate the added sleep if using increment_all_db_catalog_versions. - SCHECK_EQ(num_versions, 1, InvalidArgument, "cannot bump more than one version with alter_value"); + // Some tests cannot tolerate the added wait if using increment_all_db_catalog_versions. + SCHECK_EQ(num_bumps, 1, InvalidArgument, "cannot bump more than one version with alter_value"); return conn->ExecuteFormat("ALTER ROLE yugabyte $0", alter_value); } diff --git a/src/yb/yql/pgwrapper/libpq_test_base.h b/src/yb/yql/pgwrapper/libpq_test_base.h index b2e7d2fe202c..0dc5d6a93ff7 100644 --- a/src/yb/yql/pgwrapper/libpq_test_base.h +++ b/src/yb/yql/pgwrapper/libpq_test_base.h @@ -39,7 +39,7 @@ class LibPqTestBase : public PgWrapperTestBase { Result ConnectToDBWithReplication(const std::string& db_name); void SerializableColoringHelper(int min_duration_seconds = 0); static bool TransactionalFailure(const Status& status); - static Status BumpCatalogVersion(int num_versions, PGConn* conn, + static Status BumpCatalogVersion(int num_bumps, PGConn* conn, const std::string& alter_value = ""); static void UpdateMiniClusterFailOnConflict(ExternalMiniClusterOptions* options); };