diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index a882c3d7f26a..60fcbf49b62f 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -46,6 +46,16 @@ namespace NDiscovery { return false; } + bool CheckEndpointId(const TString& endpointId, const NKikimrStateStorage::TEndpointBoardEntry &entry) { + if (endpointId.empty() && !entry.HasEndpointId()) + return true; + + if (entry.HasEndpointId() && entry.GetEndpointId() == endpointId) + return true; + + return false; + } + bool IsSafeLocationMarker(TStringBuf location) { const ui8* isrc = reinterpret_cast(location.data()); for (auto idx : xrange(location.size())) { @@ -128,6 +138,7 @@ namespace NDiscovery { const TMap& prevInfoEntries, TMap newInfoEntries, TSet services, + TString endpointId, const THolder& nameserviceResponse) { TMap infoEntries; if (prevInfoEntries.empty()) { @@ -170,6 +181,10 @@ namespace NDiscovery { continue; } + if (!CheckEndpointId(endpointId, entry)) { + continue; + } + if (entry.GetSsl()) { AddEndpoint(cachedMessageSsl, statesSsl, entry); } else { @@ -264,7 +279,7 @@ namespace NDiscoveryPrivate { currentCachedMessage = std::make_shared( NDiscovery::CreateCachedMessage( - currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, NameserviceResponse) + currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse) ); auto it = Requested.find(path); @@ -278,7 +293,7 @@ namespace NDiscoveryPrivate { const auto& path = msg->Path; auto newCachedData = std::make_shared( - NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, NameserviceResponse) + NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse) ); newCachedData->Status = msg->Status; diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h index 2602abb325e4..2eab01dae01a 100644 --- a/ydb/core/discovery/discovery.h +++ b/ydb/core/discovery/discovery.h @@ -59,6 +59,7 @@ namespace NDiscovery { const TMap&, TMap, TSet, + TString, const THolder&); } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 5aa970c3cd04..d230a0164d21 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1692,6 +1692,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV6, config.GetPublicAddressesV6()); desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end()); + if (config.HasEndpointId()) { + desc->EndpointId = config.GetEndpointId(); + } endpoints.push_back(std::move(desc)); } @@ -1706,6 +1709,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se desc->TargetNameOverride = config.GetPublicTargetNameOverride(); desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end()); + if (config.HasEndpointId()) { + desc->EndpointId = config.GetEndpointId(); + } endpoints.push_back(std::move(desc)); } @@ -1721,6 +1727,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se stringsFromProto(desc->AddressesV6, sx.GetPublicAddressesV6()); desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end()); + if (sx.HasEndpointId()) { + desc->EndpointId = sx.GetEndpointId(); + } endpoints.push_back(std::move(desc)); } @@ -1735,6 +1744,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se desc->TargetNameOverride = sx.GetPublicTargetNameOverride(); desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end()); + if (sx.HasEndpointId()) { + desc->EndpointId = sx.GetEndpointId(); + } endpoints.push_back(std::move(desc)); } } diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 930e80aedbbc..5f583f420638 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -940,6 +940,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { opts.SetKeepAliveEnable(false); } + if (grpcConfig.HasEndpointId()) { + opts.SetEndpointId(grpcConfig.GetEndpointId()); + } + NConsole::SetGRpcLibraryFunction(); #define GET_PATH_TO_FILE(GRPC_CONFIG, PRIMARY_FIELD, SECONDARY_FIELD) \ @@ -981,6 +985,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (ex.GetHost()) xopts.SetHost(ex.GetHost()); + if (ex.HasEndpointId()) { + xopts.SetEndpointId(ex.GetEndpointId()); + } + GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) }); fillFn(ex, *GRpcServers.back().second, xopts); } @@ -989,6 +997,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { NYdbGrpc::TServerOptions xopts = opts; xopts.SetPort(ex.GetSslPort()); + if (ex.HasEndpointId()) { + xopts.SetEndpointId(ex.GetEndpointId()); + } NYdbGrpc::TSslData sslData; diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index a576ecd99337..5008c31cc06d 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -1318,6 +1318,10 @@ class TGRpcRequestWrapperImpl Ctx_->ReplyError(code, msg, details); } + TString GetEndpointId() const { + return Ctx_->GetEndpointId(); + } + private: void Reply(NProtoBuf::Message *resp, ui32 status) override { // End Of Request for non streaming requests diff --git a/ydb/core/grpc_services/grpc_endpoint.h b/ydb/core/grpc_services/grpc_endpoint.h index b0f592abf8a7..0f107803f426 100644 --- a/ydb/core/grpc_services/grpc_endpoint.h +++ b/ydb/core/grpc_services/grpc_endpoint.h @@ -17,6 +17,7 @@ struct TGrpcEndpointDescription : public TThrRefBase { TVector ServedServices; TVector ServedDatabases; + TString EndpointId; }; IActor* CreateGrpcEndpointPublishActor(TGrpcEndpointDescription *description); diff --git a/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp b/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp index 1a6aeea134ce..19e0e4871a68 100644 --- a/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp +++ b/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp @@ -48,6 +48,9 @@ class TGRpcEndpointPublishActor : public TActorBootstrappedTargetNameOverride) { entry.SetTargetNameOverride(Description->TargetNameOverride); } + if (Description->EndpointId) { + entry.SetEndpointId(Description->EndpointId); + } for (const auto &service : Description->ServedServices) entry.AddServices(service); diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h index 6da8e538097e..21bce5d19b07 100644 --- a/ydb/core/grpc_services/local_grpc/local_grpc.h +++ b/ydb/core/grpc_services/local_grpc/local_grpc.h @@ -54,6 +54,8 @@ class TContextBase : public NYdbGrpc::IRequestContextBase { return GRPC_COMPRESS_LEVEL_NONE; } + TString GetEndpointId() const override { return {}; } + google::protobuf::Arena* GetArena() override { return &Arena_; } diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp index bfa4adc83056..3f7d5f92a930 100644 --- a/ydb/core/grpc_services/rpc_discovery.cpp +++ b/ydb/core/grpc_services/rpc_discovery.cpp @@ -134,14 +134,16 @@ class TListEndpointsRPC : public TActorBootstrapped { TString cachedMessage, cachedMessageSsl; - if (services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() && + TString endpointId = Request->GetEndpointId(); + + if (endpointId.empty() && services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() && !LookupResponse->CachedMessageData->CachedMessageSsl.empty()) { cachedMessage = LookupResponse->CachedMessageData->CachedMessage; cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl; } else { auto cachedMessageData = NDiscovery::CreateCachedMessage( {}, std::move(LookupResponse->CachedMessageData->InfoEntries), - std::move(services), NameserviceResponse); + std::move(services), std::move(endpointId), NameserviceResponse); cachedMessage = std::move(cachedMessageData.CachedMessage); cachedMessageSsl = std::move(cachedMessageData.CachedMessageSsl); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 72a8ef05b96c..a44ef0b6c876 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -707,6 +707,7 @@ message TGRpcConfig { optional uint32 GRpcProxyCount = 106 [default = 2]; optional bool EnableGRpcMemoryQuota = 107 [default = false]; + optional string EndpointId = 108; repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints } diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto index ecbbf83fde36..bc0bf4014a96 100644 --- a/ydb/core/protos/statestorage.proto +++ b/ydb/core/protos/statestorage.proto @@ -137,5 +137,6 @@ message TEndpointBoardEntry { repeated string AddressesV4 = 8; repeated string AddressesV6 = 9; optional string TargetNameOverride = 10; + optional string EndpointId = 11; }; diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp index 42ef99493631..d2df3c8899c2 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.cpp +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -78,4 +78,6 @@ namespace NKikimr::NPublicHttp { return RequestContext.GetPeer(); } + TString TGrpcRequestContextWrapper::GetEndpointId() const { return {}; } + } // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index eb90dd65eb3c..86d62ab7eac0 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -34,6 +34,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase { virtual TVector GetPeerMetaValues(TStringBuf key) const; virtual TVector FindClientCert() const {return {};} virtual grpc_compression_level GetCompressionLevel() const { return GRPC_COMPRESS_LEVEL_NONE; } + virtual TString GetEndpointId() const; virtual google::protobuf::Arena* GetArena(); diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index 67435f502984..f120615583f9 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -185,6 +185,10 @@ class TGRpcRequestImpl return TBaseAsyncContext::GetCompressionLevel(); } + TString GetEndpointId() const override { + return Server_->GetEndpointId(); + } + //! Get pointer to the request's message. const NProtoBuf::Message* GetRequest() const override { return Request_; diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h index a43c1c41d618..0f1b877470e0 100644 --- a/ydb/library/grpc/server/grpc_request_base.h +++ b/ydb/library/grpc/server/grpc_request_base.h @@ -125,6 +125,8 @@ class IRequestContextBase: public TThrRefBase { //! Returns true if client was not interested in result (but we still must send response to make grpc happy) virtual bool IsClientLost() const = 0; + + virtual TString GetEndpointId() const = 0; }; } // namespace NYdbGrpc diff --git a/ydb/library/grpc/server/grpc_server.h b/ydb/library/grpc/server/grpc_server.h index 4a6c31c23781..3e97ef56abb5 100644 --- a/ydb/library/grpc/server/grpc_server.h +++ b/ydb/library/grpc/server/grpc_server.h @@ -108,6 +108,8 @@ struct TServerOptions { //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). DECLARE_FIELD(Logger, TLoggerPtr, nullptr); + DECLARE_FIELD(EndpointId, TString, ""); + #undef DECLARE_FIELD }; @@ -203,6 +205,8 @@ class IGRpcService: public TThrRefBase { * service to inspect server options and initialize accordingly. */ virtual void SetServerOptions(const TServerOptions& options) = 0; + + virtual TString GetEndpointId() const = 0; }; class TGrpcServiceProtectiable: public IGRpcService { @@ -282,6 +286,11 @@ class TGrpcServiceProtectiable: public IGRpcService { void SetServerOptions(const TServerOptions& options) override { SslServer_ = bool(options.SslData); NeedAuth_ = options.UseAuth; + EndpointId_ = options.EndpointId; + } + + TString GetEndpointId() const override { + return EndpointId_; } //! Check if the server is going to shut down. @@ -303,6 +312,7 @@ class TGrpcServiceProtectiable: public IGRpcService { bool SslServer_ = false; bool NeedAuth_ = false; + TString EndpointId_; struct TShard { TAdaptiveLock Lock_; diff --git a/ydb/tests/functional/api/test_discovery.py b/ydb/tests/functional/api/test_discovery.py index 0988fc5e29d3..e4aec1011593 100644 --- a/ydb/tests/functional/api/test_discovery.py +++ b/ydb/tests/functional/api/test_discovery.py @@ -19,8 +19,10 @@ class TestDiscoveryExtEndpoint(object): @classmethod def setup_class(cls): conf = KikimrConfigGenerator() - cls.ext_port = conf.port_allocator.get_node_port_allocator(0).ext_port - conf.clone_grpc_as_ext_endpoint(cls.ext_port) + cls.ext_port_1 = conf.port_allocator.get_node_port_allocator(0).ext_port + cls.ext_port_2 = conf.port_allocator.get_node_port_allocator(1).ext_port + conf.clone_grpc_as_ext_endpoint(cls.ext_port_1, "extserv1") + conf.clone_grpc_as_ext_endpoint(cls.ext_port_2, "extserv2") cls.cluster = kikimr_cluster_factory( configurator=conf ) @@ -42,18 +44,68 @@ def teardown_class(cls): cls.cluster.stop() def test_scenario(self): - ext_port = TestDiscoveryExtEndpoint.ext_port + ext_port_1 = TestDiscoveryExtEndpoint.ext_port_1 + ext_port_2 = TestDiscoveryExtEndpoint.ext_port_2 driver_config = ydb.DriverConfig( - "%s:%s" % (self.cluster.nodes[1].host, ext_port), self.database_name) + "%s:%s" % (self.cluster.nodes[1].host, self.cluster.nodes[1].port), self.database_name) resolver = ydb.DiscoveryEndpointsResolver(driver_config) driver = ydb.Driver(driver_config) driver.wait(timeout=10) endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints] + # Discovery has been performed using default endpoint + # but ext endpoint marked with label + # such ext endpoint should not present in discovery + assert_that(ext_port_1 not in endpoint_ports) + assert_that(ext_port_2 not in endpoint_ports) for slot in self.cluster.slots.values(): assert_that(slot.grpc_port in endpoint_ports) - assert_that(slot.grpc_port != ext_port) + assert_that(slot.grpc_port != ext_port_1) + assert_that(slot.grpc_port != ext_port_2) + + driver_config = ydb.DriverConfig( + "%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name) + resolver = ydb.DiscoveryEndpointsResolver(driver_config) + driver = ydb.Driver(driver_config) + driver.wait(timeout=10) + + endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints] + # Discovery has been performed using external endpoint with label + # only endpoint with such label expected + assert_that(ext_port_1 in endpoint_ports) + assert_that(ext_port_2 not in endpoint_ports) + + for slot in self.cluster.slots.values(): + assert_that(slot.grpc_port not in endpoint_ports) + + # Just check again to cover discovery cache issue + driver_config = ydb.DriverConfig( + "%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name) + resolver = ydb.DiscoveryEndpointsResolver(driver_config) + driver = ydb.Driver(driver_config) + driver.wait(timeout=10) + + endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints] + assert_that(ext_port_1 in endpoint_ports) + assert_that(ext_port_2 not in endpoint_ports) + + for slot in self.cluster.slots.values(): + assert_that(slot.grpc_port not in endpoint_ports) + + # Repeat using other ext endpoint + driver_config = ydb.DriverConfig( + "%s:%s" % (self.cluster.nodes[1].host, ext_port_2), self.database_name) + resolver = ydb.DiscoveryEndpointsResolver(driver_config) + driver = ydb.Driver(driver_config) + driver.wait(timeout=10) + + endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints] + assert_that(ext_port_1 not in endpoint_ports) + assert_that(ext_port_2 in endpoint_ports) + + for slot in self.cluster.slots.values(): + assert_that(slot.grpc_port not in endpoint_ports) @six.add_metaclass(abc.ABCMeta) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index e572e6388e2d..fd405ed58933 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -546,13 +546,16 @@ def write_proto_configs(self, configs_path): with open(os.path.join(configs_path, "config.yaml"), "w") as writer: writer.write(yaml.safe_dump(self.yaml_config)) - def clone_grpc_as_ext_endpoint(self, port): + def clone_grpc_as_ext_endpoint(self, port, endpoint_id=None): cur_grpc_config = copy.deepcopy(self.yaml_config['grpc_config']) if 'ext_endpoints' in cur_grpc_config: del cur_grpc_config['ext_endpoints'] cur_grpc_config['port'] = port + if endpoint_id is not None: + cur_grpc_config['endpoint_id'] = endpoint_id + if 'ext_endpoints' not in self.yaml_config['grpc_config']: self.yaml_config['grpc_config']['ext_endpoints'] = []