Skip to content

Commit

Permalink
[#9936] Remove all direct YBClient usage from PgSession
Browse files Browse the repository at this point in the history
Summary:
Moved the following functionality:
- AlterDatabase
- BackfillIndex
- CreateSequencesDataTable
- CreateTablegroup
- DropDatabase
- DropTable
- DropTablegroup
- GetCatalogMasterVersion
- GetDatabaseInfo
- ListLiveTabletServers
- TabletServerCount
- TruncateTable

Test Plan: Jenkins

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D13219
  • Loading branch information
spolitov committed Sep 28, 2021
1 parent cffaae8 commit 99e2395
Show file tree
Hide file tree
Showing 30 changed files with 856 additions and 503 deletions.
6 changes: 3 additions & 3 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1392,17 +1392,17 @@ yb_servers(PG_FUNCTION_ARGS)
HeapTuple tuple;
int cntr = funcctx->call_cntr;
YBCServerDescriptor *server = (YBCServerDescriptor *)funcctx->user_fctx + cntr;
bool is_primary = server->isPrimary;
bool is_primary = server->is_primary;
const char *node_type = is_primary ? "primary" : "read_replica";
// TODO: Remove hard coding of port and num_connections
values[0] = CStringGetTextDatum(server->host);
values[1] = Int64GetDatum(server->pgPort);
values[1] = Int64GetDatum(server->pg_port);
values[2] = Int64GetDatum(0);
values[3] = CStringGetTextDatum(node_type);
values[4] = CStringGetTextDatum(server->cloud);
values[5] = CStringGetTextDatum(server->region);
values[6] = CStringGetTextDatum(server->zone);
values[7] = CStringGetTextDatum(server->publicIp);
values[7] = CStringGetTextDatum(server->public_ip);
memset(nulls, 0, sizeof(nulls));
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
Expand Down
6 changes: 5 additions & 1 deletion src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ YB_CLIENT_SPECIALIZE_SIMPLE(IsDeleteNamespaceDone);
YBClient::Data::Data()
: leader_master_rpc_(rpcs_.InvalidHandle()),
latest_observed_hybrid_time_(YBClient::kNoHybridTime),
id_(ClientId::GenerateRandom()) {}
id_(ClientId::GenerateRandom()) {
for(auto& cache : tserver_count_cached_) {
cache.store(0, std::memory_order_relaxed);
}
}

YBClient::Data::~Data() {
rpcs_.Shutdown();
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class YBClient::Data {
simple_spinlock tablet_requests_mutex_;
std::unordered_map<TabletId, TabletRequests> tablet_requests_;

std::atomic<int> tserver_count_cached_{0};
std::array<std::atomic<int>, 2> tserver_count_cached_;

// The proxy for the node local tablet server.
std::shared_ptr<tserver::TabletServerForwardServiceProxy> node_local_forward_proxy_;
Expand Down
7 changes: 3 additions & 4 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,7 @@ TEST_F(ClientTest, TestListTables) {
}

TEST_F(ClientTest, TestListTabletServers) {
std::vector<std::unique_ptr<YBTabletServer>> tss;
ASSERT_OK(client_->ListTabletServers(&tss));
auto tss = ASSERT_RESULT(client_->ListTabletServers());
ASSERT_EQ(3, tss.size());
set<string> actual_ts_uuids;
set<string> actual_ts_hostnames;
Expand All @@ -673,9 +672,9 @@ TEST_F(ClientTest, TestListTabletServers) {
for (int i = 0; i < tss.size(); ++i) {
auto server = cluster_->mini_tablet_server(i)->server();
expected_ts_uuids.insert(server->instance_pb().permanent_uuid());
actual_ts_uuids.insert(tss[i]->uuid());
actual_ts_uuids.insert(tss[i].uuid);
expected_ts_hostnames.insert(server->options().broadcast_addresses[0].host());
actual_ts_hostnames.insert(tss[i]->hostname());
actual_ts_hostnames.insert(tss[i].hostname);
}
ASSERT_EQ(expected_ts_uuids, actual_ts_uuids);
ASSERT_EQ(expected_ts_hostnames, actual_ts_hostnames);
Expand Down
97 changes: 49 additions & 48 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,12 @@ Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) {
wait);
}

Status YBClient::DeleteTable(const string& table_id, bool wait) {
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
Status YBClient::DeleteTable(const string& table_id, bool wait, CoarseTimePoint deadline) {
return data_->DeleteTable(this,
YBTableName(),
table_id,
false /* is_index_table */,
deadline,
PatchAdminDeadline(deadline),
nullptr /* indexed_table_name */,
wait);
}
Expand All @@ -543,13 +542,13 @@ Status YBClient::DeleteIndexTable(const YBTableName& table_name,

Status YBClient::DeleteIndexTable(const string& table_id,
YBTableName* indexed_table_name,
bool wait) {
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
bool wait,
CoarseTimePoint deadline) {
return data_->DeleteTable(this,
YBTableName(),
table_id,
true /* is_index_table */,
deadline,
PatchAdminDeadline(deadline),
indexed_table_name,
wait);
}
Expand Down Expand Up @@ -742,9 +741,7 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,
txn->ToPB(req.mutable_transaction());
}
req.set_colocated(colocated);
if (deadline == CoarseTimePoint()) {
deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
}
deadline = PatchAdminDeadline(deadline);
Status s = data_->SyncLeaderMasterRpc<CreateNamespaceRequestPB, CreateNamespaceResponsePB>(
deadline, req, &resp, nullptr, "CreateNamespace", &MasterServiceProxy::CreateNamespace);
if (resp.has_error()) {
Expand Down Expand Up @@ -797,7 +794,8 @@ Status YBClient::IsCreateNamespaceInProgress(const std::string& namespace_name,

Status YBClient::DeleteNamespace(const std::string& namespace_name,
const boost::optional<YQLDatabase>& database_type,
const std::string& namespace_id) {
const std::string& namespace_id,
CoarseTimePoint deadline) {
DeleteNamespaceRequestPB req;
DeleteNamespaceResponsePB resp;
req.mutable_namespace_()->set_name(namespace_name);
Expand All @@ -808,7 +806,7 @@ Status YBClient::DeleteNamespace(const std::string& namespace_name,
req.set_database_type(*database_type);
req.mutable_namespace_()->set_database_type(*database_type);
}
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
deadline = PatchAdminDeadline(deadline);
Status s = data_->SyncLeaderMasterRpc<DeleteNamespaceRequestPB, DeleteNamespaceResponsePB>(
deadline, req, &resp, nullptr, "DeleteNamespace", &MasterServiceProxy::DeleteNamespace);
if (resp.has_error()) {
Expand Down Expand Up @@ -1451,7 +1449,8 @@ void YBClient::GetTableLocations(
}

Status YBClient::TabletServerCount(int *tserver_count, bool primary_only, bool use_cache) {
int tserver_count_cached = data_->tserver_count_cached_.load(std::memory_order_acquire);
int tserver_count_cached = data_->tserver_count_cached_[primary_only].load(
std::memory_order_acquire);
if (use_cache && tserver_count_cached > 0) {
*tserver_count = tserver_count_cached;
return Status::OK();
Expand All @@ -1461,69 +1460,64 @@ Status YBClient::TabletServerCount(int *tserver_count, bool primary_only, bool u
ListTabletServersResponsePB resp;
req.set_primary_only(primary_only);
CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTabletServers);
data_->tserver_count_cached_.store(resp.servers_size(), std::memory_order_release);
data_->tserver_count_cached_[primary_only].store(resp.servers_size(), std::memory_order_release);
*tserver_count = resp.servers_size();
return Status::OK();
}

Status YBClient::ListTabletServers(vector<std::unique_ptr<YBTabletServer>>* tablet_servers) {
Result<std::vector<YBTabletServer>> YBClient::ListTabletServers() {
ListTabletServersRequestPB req;
ListTabletServersResponsePB resp;
std::vector<YBTabletServer> result;
CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTabletServers);
result.reserve(resp.servers_size());
for (int i = 0; i < resp.servers_size(); i++) {
const ListTabletServersResponsePB_Entry& e = resp.servers(i);
auto ts = std::make_unique<YBTabletServer>(
e.instance_id().permanent_uuid(),
DesiredHostPort(e.registration().common(), data_->cloud_info_pb_).host(),
e.registration().common().placement_uuid());
tablet_servers->push_back(std::move(ts));
result.push_back(YBTabletServer::FromPB(e, data_->cloud_info_pb_));
}
return Status::OK();
return result;
}

Status YBClient::ListLiveTabletServers(TabletServersInfo* tablet_servers, bool primary_only) {
Result<TabletServersInfo> YBClient::ListLiveTabletServers(bool primary_only) {
ListLiveTabletServersRequestPB req;
if (primary_only) req.set_primary_only(true);
if (primary_only) {
req.set_primary_only(true);
}
ListLiveTabletServersResponsePB resp;
CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListLiveTabletServers);

TabletServersInfo result;
result.resize(resp.servers_size());
for (int i = 0; i < resp.servers_size(); i++) {
const ListLiveTabletServersResponsePB_Entry& entry = resp.servers(i);
CloudInfoPB cloud_info = entry.registration().common().cloud_info();
std::string cloud = "";
std::string region = "";
std::string zone = "";
int broadcast_sz = entry.registration().common().broadcast_addresses().size();
int private_ip_addresses_sz = entry.registration().common().private_rpc_addresses().size();

const auto privateIp =
private_ip_addresses_sz > 0
? entry.registration().common().private_rpc_addresses().Get(0).host()
: DesiredHostPort(entry.registration().common(), data_->cloud_info_pb_).host();

std::string publicIp = "";
if (broadcast_sz > 0) {
publicIp = entry.registration().common().broadcast_addresses().Get(0).host();
auto& out = result[i];
out.server = YBTabletServer::FromPB(entry, data_->cloud_info_pb_);
const CloudInfoPB& cloud_info = entry.registration().common().cloud_info();

const auto& private_addresses = entry.registration().common().private_rpc_addresses();
if (!private_addresses.empty()) {
out.server.hostname = private_addresses.Get(0).host();
}

const auto& broadcast_addresses = entry.registration().common().broadcast_addresses();
if (!broadcast_addresses.empty()) {
out.public_ip = broadcast_addresses.Get(0).host();
}

bool isPrimary = !entry.isfromreadreplica();
out.is_primary = !entry.isfromreadreplica();
if (cloud_info.has_placement_cloud()) {
cloud = cloud_info.placement_cloud();
out.cloud = cloud_info.placement_cloud();
if (cloud_info.has_placement_region()) {
region = cloud_info.placement_region();
out.region = cloud_info.placement_region();
}
if (cloud_info.has_placement_zone()) {
zone = cloud_info.placement_zone();
out.zone = cloud_info.placement_zone();
}
}

auto ts = std::make_unique<YBTabletServerPlacementInfo>(
entry.instance_id().permanent_uuid(), privateIp,
entry.registration().common().placement_uuid(), cloud, region, zone, isPrimary,
publicIp, entry.registration().common().pg_port());
tablet_servers->push_back(std::move(ts));
out.pg_port = entry.registration().common().pg_port();
}
return Status::OK();

return result;
}

void YBClient::SetLocalTabletServer(const string& ts_uuid,
Expand Down Expand Up @@ -1998,5 +1992,12 @@ void YBClient::SetLatestObservedHybridTime(uint64_t ht_hybrid_time) {
data_->UpdateLatestObservedHybridTime(ht_hybrid_time);
}

CoarseTimePoint YBClient::PatchAdminDeadline(CoarseTimePoint deadline) const {
if (deadline != CoarseTimePoint()) {
return deadline;
}
return CoarseMonoClock::Now() + default_admin_operation_timeout();
}

} // namespace client
} // namespace yb
20 changes: 12 additions & 8 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class YBClientBuilder {
DISALLOW_COPY_AND_ASSIGN(YBClientBuilder);
};

using TabletServersInfo = std::vector<YBTabletServerPlacementInfo>;

// The YBClient represents a connection to a cluster. From the user
// perspective, they should only need to create one of these in their
// application, likely a singleton -- but it's not a singleton in YB in any
Expand All @@ -262,8 +264,6 @@ class YBClientBuilder {
// This class is thread-safe.
class YBClient {
public:
using TabletServersInfo = std::vector<std::unique_ptr<yb::client::YBTabletServerPlacementInfo>>;

~YBClient();

std::unique_ptr<YBTableCreator> NewTableCreator();
Expand Down Expand Up @@ -292,7 +292,8 @@ class YBClient {
// Delete the specified table.
// Set 'wait' to true if the call must wait for the table to be fully deleted before returning.
CHECKED_STATUS DeleteTable(const YBTableName& table_name, bool wait = true);
CHECKED_STATUS DeleteTable(const std::string& table_id, bool wait = true);
CHECKED_STATUS DeleteTable(
const std::string& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint());

// Delete the specified index table.
// Set 'wait' to true if the call must wait for the table to be fully deleted before returning.
Expand All @@ -302,7 +303,8 @@ class YBClient {

CHECKED_STATUS DeleteIndexTable(const std::string& table_id,
YBTableName* indexed_table_name = nullptr,
bool wait = true);
bool wait = true,
CoarseTimePoint deadline = CoarseTimePoint());

// Flush or compact the specified tables.
CHECKED_STATUS FlushTables(const std::vector<TableId>& table_ids,
Expand Down Expand Up @@ -401,7 +403,8 @@ class YBClient {
// Delete namespace with the given name.
CHECKED_STATUS DeleteNamespace(const std::string& namespace_name,
const boost::optional<YQLDatabase>& database_type = boost::none,
const std::string& namespace_id = "");
const std::string& namespace_id = "",
CoarseTimePoint deadline = CoarseTimePoint());

// Set 'delete_in_progress' to true if a DeleteNamespace operation is in-progress.
CHECKED_STATUS IsDeleteNamespaceInProgress(const std::string& namespace_name,
Expand Down Expand Up @@ -564,10 +567,9 @@ class YBClient {
CHECKED_STATUS TabletServerCount(int *tserver_count, bool primary_only = false,
bool use_cache = false);

CHECKED_STATUS ListTabletServers(std::vector<std::unique_ptr<YBTabletServer>>* tablet_servers);
Result<std::vector<YBTabletServer>> ListTabletServers();

CHECKED_STATUS ListLiveTabletServers(TabletServersInfo* tablet_servers,
bool primary_only = false);
Result<TabletServersInfo> ListLiveTabletServers(bool primary_only = false);

// Sets local tserver and its proxy.
void SetLocalTabletServer(const std::string& ts_uuid,
Expand Down Expand Up @@ -811,6 +813,8 @@ class YBClient {
friend std::future<Result<internal::RemoteTabletPtr>> LookupFirstTabletFuture(
YBClient* client, const YBTablePtr& table);

CoarseTimePoint PatchAdminDeadline(CoarseTimePoint deadline) const;

YBClient();

ThreadPool* callback_threadpool();
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/client_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ class YBSchema;
class YBTableAlterer;
class YBTableCreator;
class YBTableName;
class YBTabletServer;
class YBTabletServerPlacementInfo;
class UniverseKeyClient;

struct YBTableInfo;
struct YBTabletServer;
struct YBTabletServerPlacementInfo;

typedef std::function<void(std::vector<const TabletId*>*)> LocalTabletFilter;

Expand Down
5 changes: 2 additions & 3 deletions src/yb/client/namespace_alterer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ YBNamespaceAlterer::YBNamespaceAlterer(

YBNamespaceAlterer::~YBNamespaceAlterer() {}

Status YBNamespaceAlterer::Alter() {
Status YBNamespaceAlterer::Alter(CoarseTimePoint deadline) {
master::AlterNamespaceRequestPB req;
RETURN_NOT_OK(ToRequest(&req));
return client_->data_->AlterNamespace(
client_, req, CoarseMonoClock::Now() + client_->default_admin_operation_timeout());
return client_->data_->AlterNamespace(client_, req, client_->PatchAdminDeadline(deadline));
}

YBNamespaceAlterer* YBNamespaceAlterer::RenameTo(const std::string& new_name) {
Expand Down
6 changes: 4 additions & 2 deletions src/yb/client/namespace_alterer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <boost/optional.hpp>

#include "yb/client/client_fwd.h"
#include "yb/util/status.h"

#include "yb/master/master.pb.h"

#include "yb/util/monotime.h"
#include "yb/util/status.h"

namespace yb {
namespace client {

Expand All @@ -33,7 +35,7 @@ class YBNamespaceAlterer {
YBNamespaceAlterer* RenameTo(const std::string& new_name);
YBNamespaceAlterer* SetDatabaseType(YQLDatabase type);

CHECKED_STATUS Alter();
CHECKED_STATUS Alter(CoarseTimePoint deadline = CoarseTimePoint());

private:
friend class YBClient;
Expand Down
Loading

0 comments on commit 99e2395

Please sign in to comment.