Skip to content

Commit

Permalink
endpoint slicing support (#10313)
Browse files Browse the repository at this point in the history
This change allows to make logical slice of endpoints. Only endpoints with same endpointId will be used in discovery result for request which served via endpoint with endpointId

This allows to create configuration where nodes have multiple endpoints in different networks.

Changelog category

New feature
  • Loading branch information
dcherednik authored Oct 11, 2024
1 parent 6e6f79a commit edc1471
Show file tree
Hide file tree
Showing 18 changed files with 137 additions and 10 deletions.
19 changes: 17 additions & 2 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace NDiscovery {
return false;
}

bool CheckEndpointId(const TString& endpointId, const NKikimrStateStorage::TEndpointBoardEntry &entry) {
if (endpointId.empty() && !entry.HasEndpointId())
return true;

if (entry.HasEndpointId() && entry.GetEndpointId() == endpointId)
return true;

return false;
}

bool IsSafeLocationMarker(TStringBuf location) {
const ui8* isrc = reinterpret_cast<const ui8*>(location.data());
for (auto idx : xrange(location.size())) {
Expand Down Expand Up @@ -128,6 +138,7 @@ namespace NDiscovery {
const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& prevInfoEntries,
TMap<TActorId, TEvStateStorage::TBoardInfoEntry> newInfoEntries,
TSet<TString> services,
TString endpointId,
const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) {
TMap<TActorId, TEvStateStorage::TBoardInfoEntry> infoEntries;
if (prevInfoEntries.empty()) {
Expand Down Expand Up @@ -170,6 +181,10 @@ namespace NDiscovery {
continue;
}

if (!CheckEndpointId(endpointId, entry)) {
continue;
}

if (entry.GetSsl()) {
AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
Expand Down Expand Up @@ -264,7 +279,7 @@ namespace NDiscoveryPrivate {

currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage(
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, NameserviceResponse)
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
);

auto it = Requested.find(path);
Expand All @@ -278,7 +293,7 @@ namespace NDiscoveryPrivate {
const auto& path = msg->Path;

auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, NameserviceResponse)
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
);
newCachedData->Status = msg->Status;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace NDiscovery {
const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>&,
TMap<TActorId, TEvStateStorage::TBoardInfoEntry>,
TSet<TString>,
TString,
const THolder<TEvInterconnect::TEvNodeInfo>&);
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
stringsFromProto(desc->AddressesV6, config.GetPublicAddressesV6());

desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end());
if (config.HasEndpointId()) {
desc->EndpointId = config.GetEndpointId();
}
endpoints.push_back(std::move(desc));
}

Expand All @@ -1706,6 +1709,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
desc->TargetNameOverride = config.GetPublicTargetNameOverride();

desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end());
if (config.HasEndpointId()) {
desc->EndpointId = config.GetEndpointId();
}
endpoints.push_back(std::move(desc));
}

Expand All @@ -1721,6 +1727,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
stringsFromProto(desc->AddressesV6, sx.GetPublicAddressesV6());

desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end());
if (sx.HasEndpointId()) {
desc->EndpointId = sx.GetEndpointId();
}
endpoints.push_back(std::move(desc));
}

Expand All @@ -1735,6 +1744,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
desc->TargetNameOverride = sx.GetPublicTargetNameOverride();

desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end());
if (sx.HasEndpointId()) {
desc->EndpointId = sx.GetEndpointId();
}
endpoints.push_back(std::move(desc));
}
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
opts.SetKeepAliveEnable(false);
}

if (grpcConfig.HasEndpointId()) {
opts.SetEndpointId(grpcConfig.GetEndpointId());
}

NConsole::SetGRpcLibraryFunction();

#define GET_PATH_TO_FILE(GRPC_CONFIG, PRIMARY_FIELD, SECONDARY_FIELD) \
Expand Down Expand Up @@ -981,6 +985,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (ex.GetHost())
xopts.SetHost(ex.GetHost());

if (ex.HasEndpointId()) {
xopts.SetEndpointId(ex.GetEndpointId());
}

GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
fillFn(ex, *GRpcServers.back().second, xopts);
}
Expand All @@ -989,6 +997,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
NYdbGrpc::TServerOptions xopts = opts;
xopts.SetPort(ex.GetSslPort());

if (ex.HasEndpointId()) {
xopts.SetEndpointId(ex.GetEndpointId());
}

NYdbGrpc::TSslData sslData;

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,10 @@ class TGRpcRequestWrapperImpl
Ctx_->ReplyError(code, msg, details);
}

TString GetEndpointId() const {
return Ctx_->GetEndpointId();
}

private:
void Reply(NProtoBuf::Message *resp, ui32 status) override {
// End Of Request for non streaming requests
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct TGrpcEndpointDescription : public TThrRefBase {

TVector<TString> ServedServices;
TVector<TString> ServedDatabases;
TString EndpointId;
};

IActor* CreateGrpcEndpointPublishActor(TGrpcEndpointDescription *description);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class TGRpcEndpointPublishActor : public TActorBootstrapped<TGRpcEndpointPublish
if (Description->TargetNameOverride) {
entry.SetTargetNameOverride(Description->TargetNameOverride);
}
if (Description->EndpointId) {
entry.SetEndpointId(Description->EndpointId);
}
for (const auto &service : Description->ServedServices)
entry.AddServices(service);

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/grpc_services/local_grpc/local_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
return GRPC_COMPRESS_LEVEL_NONE;
}

TString GetEndpointId() const override { return {}; }

google::protobuf::Arena* GetArena() override {
return &Arena_;
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/grpc_services/rpc_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,16 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {

TString cachedMessage, cachedMessageSsl;

if (services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() &&
TString endpointId = Request->GetEndpointId();

if (endpointId.empty() && services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() &&
!LookupResponse->CachedMessageData->CachedMessageSsl.empty()) {
cachedMessage = LookupResponse->CachedMessageData->CachedMessage;
cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl;
} else {
auto cachedMessageData = NDiscovery::CreateCachedMessage(
{}, std::move(LookupResponse->CachedMessageData->InfoEntries),
std::move(services), NameserviceResponse);
std::move(services), std::move(endpointId), NameserviceResponse);
cachedMessage = std::move(cachedMessageData.CachedMessage);
cachedMessageSsl = std::move(cachedMessageData.CachedMessageSsl);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ message TGRpcConfig {

optional uint32 GRpcProxyCount = 106 [default = 2];
optional bool EnableGRpcMemoryQuota = 107 [default = false];
optional string EndpointId = 108;

repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/statestorage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,6 @@ message TEndpointBoardEntry {
repeated string AddressesV4 = 8;
repeated string AddressesV6 = 9;
optional string TargetNameOverride = 10;
optional string EndpointId = 11;
};

2 changes: 2 additions & 0 deletions ydb/core/public_http/grpc_request_context_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ namespace NKikimr::NPublicHttp {
return RequestContext.GetPeer();
}

TString TGrpcRequestContextWrapper::GetEndpointId() const { return {}; }

} // namespace NKikimr::NPublicHttp
1 change: 1 addition & 0 deletions ydb/core/public_http/grpc_request_context_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const;
virtual TVector<TStringBuf> FindClientCert() const {return {};}
virtual grpc_compression_level GetCompressionLevel() const { return GRPC_COMPRESS_LEVEL_NONE; }
virtual TString GetEndpointId() const;

virtual google::protobuf::Arena* GetArena();

Expand Down
4 changes: 4 additions & 0 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ class TGRpcRequestImpl
return TBaseAsyncContext<TService>::GetCompressionLevel();
}

TString GetEndpointId() const override {
return Server_->GetEndpointId();
}

//! Get pointer to the request's message.
const NProtoBuf::Message* GetRequest() const override {
return Request_;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/grpc/server/grpc_request_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class IRequestContextBase: public TThrRefBase {

//! Returns true if client was not interested in result (but we still must send response to make grpc happy)
virtual bool IsClientLost() const = 0;

virtual TString GetEndpointId() const = 0;
};

} // namespace NYdbGrpc
10 changes: 10 additions & 0 deletions ydb/library/grpc/server/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ struct TServerOptions {
//! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
DECLARE_FIELD(Logger, TLoggerPtr, nullptr);

DECLARE_FIELD(EndpointId, TString, "");

#undef DECLARE_FIELD
};

Expand Down Expand Up @@ -203,6 +205,8 @@ class IGRpcService: public TThrRefBase {
* service to inspect server options and initialize accordingly.
*/
virtual void SetServerOptions(const TServerOptions& options) = 0;

virtual TString GetEndpointId() const = 0;
};

class TGrpcServiceProtectiable: public IGRpcService {
Expand Down Expand Up @@ -282,6 +286,11 @@ class TGrpcServiceProtectiable: public IGRpcService {
void SetServerOptions(const TServerOptions& options) override {
SslServer_ = bool(options.SslData);
NeedAuth_ = options.UseAuth;
EndpointId_ = options.EndpointId;
}

TString GetEndpointId() const override {
return EndpointId_;
}

//! Check if the server is going to shut down.
Expand All @@ -303,6 +312,7 @@ class TGrpcServiceProtectiable: public IGRpcService {

bool SslServer_ = false;
bool NeedAuth_ = false;
TString EndpointId_;

struct TShard {
TAdaptiveLock Lock_;
Expand Down
62 changes: 57 additions & 5 deletions ydb/tests/functional/api/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ class TestDiscoveryExtEndpoint(object):
@classmethod
def setup_class(cls):
conf = KikimrConfigGenerator()
cls.ext_port = conf.port_allocator.get_node_port_allocator(0).ext_port
conf.clone_grpc_as_ext_endpoint(cls.ext_port)
cls.ext_port_1 = conf.port_allocator.get_node_port_allocator(0).ext_port
cls.ext_port_2 = conf.port_allocator.get_node_port_allocator(1).ext_port
conf.clone_grpc_as_ext_endpoint(cls.ext_port_1, "extserv1")
conf.clone_grpc_as_ext_endpoint(cls.ext_port_2, "extserv2")
cls.cluster = kikimr_cluster_factory(
configurator=conf
)
Expand All @@ -42,18 +44,68 @@ def teardown_class(cls):
cls.cluster.stop()

def test_scenario(self):
ext_port = TestDiscoveryExtEndpoint.ext_port
ext_port_1 = TestDiscoveryExtEndpoint.ext_port_1
ext_port_2 = TestDiscoveryExtEndpoint.ext_port_2
driver_config = ydb.DriverConfig(
"%s:%s" % (self.cluster.nodes[1].host, ext_port), self.database_name)
"%s:%s" % (self.cluster.nodes[1].host, self.cluster.nodes[1].port), self.database_name)
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
driver = ydb.Driver(driver_config)
driver.wait(timeout=10)

endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
# Discovery has been performed using default endpoint
# but ext endpoint marked with label
# such ext endpoint should not present in discovery
assert_that(ext_port_1 not in endpoint_ports)
assert_that(ext_port_2 not in endpoint_ports)

for slot in self.cluster.slots.values():
assert_that(slot.grpc_port in endpoint_ports)
assert_that(slot.grpc_port != ext_port)
assert_that(slot.grpc_port != ext_port_1)
assert_that(slot.grpc_port != ext_port_2)

driver_config = ydb.DriverConfig(
"%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name)
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
driver = ydb.Driver(driver_config)
driver.wait(timeout=10)

endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
# Discovery has been performed using external endpoint with label
# only endpoint with such label expected
assert_that(ext_port_1 in endpoint_ports)
assert_that(ext_port_2 not in endpoint_ports)

for slot in self.cluster.slots.values():
assert_that(slot.grpc_port not in endpoint_ports)

# Just check again to cover discovery cache issue
driver_config = ydb.DriverConfig(
"%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name)
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
driver = ydb.Driver(driver_config)
driver.wait(timeout=10)

endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
assert_that(ext_port_1 in endpoint_ports)
assert_that(ext_port_2 not in endpoint_ports)

for slot in self.cluster.slots.values():
assert_that(slot.grpc_port not in endpoint_ports)

# Repeat using other ext endpoint
driver_config = ydb.DriverConfig(
"%s:%s" % (self.cluster.nodes[1].host, ext_port_2), self.database_name)
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
driver = ydb.Driver(driver_config)
driver.wait(timeout=10)

endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
assert_that(ext_port_1 not in endpoint_ports)
assert_that(ext_port_2 in endpoint_ports)

for slot in self.cluster.slots.values():
assert_that(slot.grpc_port not in endpoint_ports)


@six.add_metaclass(abc.ABCMeta)
Expand Down
5 changes: 4 additions & 1 deletion ydb/tests/library/harness/kikimr_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,16 @@ def write_proto_configs(self, configs_path):
with open(os.path.join(configs_path, "config.yaml"), "w") as writer:
writer.write(yaml.safe_dump(self.yaml_config))

def clone_grpc_as_ext_endpoint(self, port):
def clone_grpc_as_ext_endpoint(self, port, endpoint_id=None):
cur_grpc_config = copy.deepcopy(self.yaml_config['grpc_config'])
if 'ext_endpoints' in cur_grpc_config:
del cur_grpc_config['ext_endpoints']

cur_grpc_config['port'] = port

if endpoint_id is not None:
cur_grpc_config['endpoint_id'] = endpoint_id

if 'ext_endpoints' not in self.yaml_config['grpc_config']:
self.yaml_config['grpc_config']['ext_endpoints'] = []

Expand Down

0 comments on commit edc1471

Please sign in to comment.