Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][venice-router] Add store d2 discovery metric #1408

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.Count;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateExpiredException;
Expand Down Expand Up @@ -155,6 +158,7 @@ public class MetaDataHandler extends SimpleChannelInboundHandler<HttpRequest> {
static final String REQUEST_BLOB_DISCOVERY_ERROR_PUSH_STORE =
"Blob Discovery: failed to get the live node hostNames for store:%s version:%s partition:%s";
private final VeniceVersionFinder veniceVersionFinder;
private final MetricsRepository metricsRepository;

public MetaDataHandler(
HelixCustomizedViewOfflinePushRepository routingDataRepository,
Expand All @@ -169,7 +173,8 @@ public MetaDataHandler(
String kafkaBootstrapServers,
boolean isSslToKafka,
VeniceVersionFinder versionFinder,
PushStatusStoreReader pushStatusStoreReader) {
PushStatusStoreReader pushStatusStoreReader,
MetricsRepository metricsRepository) {
super();
this.routingDataRepository = routingDataRepository;
this.schemaRepo = schemaRepo;
Expand All @@ -184,6 +189,7 @@ public MetaDataHandler(
this.isSslToKafka = isSslToKafka;
this.veniceVersionFinder = versionFinder;
this.pushStatusStoreReader = pushStatusStoreReader;
this.metricsRepository = metricsRepository;
}

@Override
Expand Down Expand Up @@ -476,6 +482,16 @@ private void handleD2ServiceLookup(ChannelHandlerContext ctx, VenicePathParserHe
setupErrorD2DiscoveryResponseAndFlush(NOT_FOUND, errorMsg, ctx);
return;
}

// Only create metrics for valid stores
Sensor d2DiscoverySensor = metricsRepository.sensor("venice.router.d2_discovery." + storeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow existing patterns and make a new class that extends AbstractVeniceStats and use the register sensor methods there instead of calling metricsRepository.sensor directly and performing metricsRepository.metrics().containsKey. See example usages of AbstractVeniceStats and AbstractVeniceAggStats for store level stats.

String d2DiscoveryRequestCountMetric = "venice.router.d2_discovery." + storeName + ".request.count";
// Check if metric already exists before adding
if (!metricsRepository.metrics().containsKey(d2DiscoveryRequestCountMetric)) {
d2DiscoverySensor.add(d2DiscoveryRequestCountMetric, new Count());
}
d2DiscoverySensor.record();

String clusterName = config.get().getCluster();
String d2Service = getD2ServiceByClusterName(clusterName);
if (StringUtils.isEmpty(d2Service)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ public boolean startInner() throws Exception {
config.getKafkaBootstrapServers(),
config.isSslToKafka(),
versionFinder,
pushStatusStoreReader);
pushStatusStoreReader,
metricsRepository);

// Setup stat tracking for exceptional case
RouterExceptionAndTrackingUtils.setRouterStats(routerStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -98,6 +99,8 @@ public class TestMetaDataHandler {
private final HelixHybridStoreQuotaRepository hybridStoreQuotaRepository =
Mockito.mock(HelixHybridStoreQuotaRepository.class);

private MetricsRepository metricsRepository = new MetricsRepository();

public FullHttpResponse passRequestToMetadataHandler(
String requestUri,
HelixCustomizedViewOfflinePushRepository routingDataRepository,
Expand Down Expand Up @@ -191,7 +194,8 @@ public FullHttpResponse passRequestToMetadataHandler(
KAFKA_BOOTSTRAP_SERVERS,
false,
null,
pushStatusStoreReader);
pushStatusStoreReader,
metricsRepository);
handler.channelRead0(ctx, httpRequest);
ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
Mockito.verify(ctx).writeAndFlush(captor.capture());
Expand Down Expand Up @@ -691,6 +695,10 @@ public void testD2ServiceLookup() throws IOException {
Assert.assertEquals(d2ServiceResponse.getName(), storeName);
Assert.assertFalse(d2ServiceResponse.isError());

Assert.assertEquals(
metricsRepository.getMetric("venice.router.d2_discovery." + storeName + ".request.count").value(),
1.0,
"Request count should be 1 after first request");
FullHttpResponse response2 = passRequestToMetadataHandler(
"http://myRouterHost:4567/discover_cluster?store_name=" + storeName,
null,
Expand All @@ -707,6 +715,11 @@ public void testD2ServiceLookup() throws IOException {
Assert.assertEquals(d2ServiceResponse2.getD2Service(), d2Service);
Assert.assertEquals(d2ServiceResponse2.getName(), storeName);
Assert.assertFalse(d2ServiceResponse2.isError());

Assert.assertEquals(
metricsRepository.getMetric("venice.router.d2_discovery." + storeName + ".request.count").value(),
2.0,
"Request count should be 2 after second request");
}

@Test
Expand Down Expand Up @@ -964,7 +977,8 @@ public void testStorageRequest() throws IOException {
KAFKA_BOOTSTRAP_SERVERS,
false,
null,
pushStatusStoreReader);
pushStatusStoreReader,
metricsRepository);
handler.channelRead0(ctx, httpRequest);
// '/storage' request should be handled by upstream, instead of current MetaDataHandler
Mockito.verify(ctx, Mockito.times(1)).fireChannelRead(Mockito.any());
Expand Down
Loading