Skip to content

Commit

Permalink
Merge pull request #1523 from Netflix/josh/servo
Browse files Browse the repository at this point in the history
replace servo with spectator in the client
  • Loading branch information
joshgord authored Jan 3, 2024
2 parents bae5c36 + ed4f0d8 commit c72e465
Show file tree
Hide file tree
Showing 22 changed files with 193 additions and 327 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ allprojects {
commonsConfigurationVersion = '1.10'
jsr305Version = '3.0.2'
guiceVersion = '4.1.0'
servoVersion = '0.12.21'
spectatorVersion = '1.7.3'
slf4jVersion = '1.7.36'
governatorVersion = '1.17.5'
archaiusVersion = '0.7.6'
jacksonVersion = '2.10.5'
Expand Down
3 changes: 3 additions & 0 deletions eureka-client-archaius2/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
apply plugin: 'nebula.test-jar'

def archaius2Version = '2.1.7'
def slf4jVersion = '1.7.36'

sourceSets {
test {
Expand All @@ -14,6 +15,8 @@ dependencies {
// archaius2
compile "com.netflix.archaius:archaius2-core:${archaius2Version}"
compile "com.netflix.archaius:archaius2-api:${archaius2Version}"
compile "org.slf4j:slf4j-api:${slf4jVersion}"
compile "com.netflix.spectator:spectator-api:${spectatorVersion}"

testCompile project(':eureka-test-utils')

Expand Down
2 changes: 2 additions & 0 deletions eureka-client-jersey2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
compile project(':eureka-client')
compile 'org.glassfish.jersey.core:jersey-client:2.23.1'
compile 'org.glassfish.jersey.connectors:jersey-apache-connector:2.23.1'
compile "org.slf4j:slf4j-api:1.7.36"
compile 'com.netflix.spectator:spectator-api:1.7.3'

testCompile (project(':eureka-test-utils')) {
// exclude all transitives to avoid bringing in jersey1 eureka-core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import static com.netflix.discovery.util.DiscoveryBuildInfo.buildVersion;

import com.netflix.discovery.util.ServoUtil;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.Timer;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
Expand Down Expand Up @@ -36,12 +40,6 @@
import com.netflix.discovery.converters.wrappers.DecoderWrapper;
import com.netflix.discovery.converters.wrappers.EncoderWrapper;
import com.netflix.discovery.provider.DiscoveryJerseyProvider;
import com.netflix.servo.monitor.BasicCounter;
import com.netflix.servo.monitor.BasicTimer;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;

/**
* @author Tomasz Bak
Expand Down Expand Up @@ -325,24 +323,19 @@ private PoolingHttpClientConnectionManager createCustomSslCM() {
private class ConnectionCleanerTask implements Runnable {

private final int connectionIdleTimeout;
private final BasicTimer executionTimeStats;
private final Timer executionTimeStats;
private final Counter cleanupFailed;

private ConnectionCleanerTask(int connectionIdleTimeout) {
this.connectionIdleTimeout = connectionIdleTimeout;
MonitorConfig.Builder monitorConfigBuilder = MonitorConfig.builder("Eureka-Connection-Cleaner-Time");
executionTimeStats = new BasicTimer(monitorConfigBuilder.build());
cleanupFailed = new BasicCounter(MonitorConfig.builder("Eureka-Connection-Cleaner-Failure").build());
try {
Monitors.registerObject(this);
} catch (Exception e) {
s_logger.error("Unable to register with servo.", e);
}
final com.netflix.spectator.api.Registry registry = Spectator.globalRegistry();
executionTimeStats = registry.timer("Eureka-Connection-Cleaner-Time");
cleanupFailed = registry.counter("Eureka-Connection-Cleaner-Failure");
}

@Override
public void run() {
Stopwatch start = executionTimeStats.start();
long monotonicTime = ServoUtil.time(executionTimeStats);
try {
HttpClientConnectionManager cm = (HttpClientConnectionManager) apacheHttpClient
.getConfiguration()
Expand All @@ -352,11 +345,8 @@ public void run() {
s_logger.error("Cannot clean connections", e);
cleanupFailed.increment();
} finally {
if (null != start) {
start.stop();
}
ServoUtil.record(executionTimeStats, monotonicTime);
}

}
}
}
6 changes: 4 additions & 2 deletions eureka-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ configurations.all {
}

dependencies {
compile "com.netflix.netflix-commons:netflix-eventbus:0.3.0"
compile 'com.thoughtworks.xstream:xstream:1.4.19'
compile "com.netflix.archaius:archaius-core:${archaiusVersion}"
compile 'javax.ws.rs:jsr311-api:1.1.1'
compile "com.netflix.servo:servo-core:${servoVersion}"
implementation "com.netflix.spectator:spectator-api:${spectatorVersion}"
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
implementation "com.netflix.netflix-commons:netflix-eventbus:0.3.0"
implementation "javax.annotation:javax.annotation-api:1.2"
compile "com.sun.jersey:jersey-core:${jerseyVersion}"
compile "com.sun.jersey:jersey-client:${jerseyVersion}"
compile "com.sun.jersey.contribs:jersey-apache-client4:${jerseyVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@

import static com.netflix.discovery.EurekaClientNames.METRIC_REGISTRATION_PREFIX;
import static com.netflix.discovery.EurekaClientNames.METRIC_REGISTRY_PREFIX;

import static com.netflix.spectator.api.Spectator.globalRegistry;

import com.netflix.discovery.util.ServoUtil;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,10 +90,6 @@
import com.netflix.discovery.shared.transport.jersey.Jersey1TransportClientFactories;
import com.netflix.discovery.shared.transport.jersey.TransportClientFactories;
import com.netflix.discovery.util.ThresholdLevelsMetric;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;

/**
* The class that is instrumental for interactions with <tt>Eureka Server</tt>.
Expand Down Expand Up @@ -130,11 +133,10 @@ public class DiscoveryClient implements EurekaClient {

// Timers
private static final String PREFIX = "DiscoveryClient_";
private final Counter RECONCILE_HASH_CODES_MISMATCH = Monitors.newCounter(PREFIX + "ReconcileHashCodeMismatch");
private final com.netflix.servo.monitor.Timer FETCH_REGISTRY_TIMER = Monitors
.newTimer(PREFIX + "FetchRegistry");
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX
+ "Reregister");
private final Counter RECONCILE_HASH_CODES_MISMATCH = globalRegistry().counter(PREFIX + "ReconcileHashCodeMismatch");
private final Timer FETCH_REGISTRY_TIMER = globalRegistry().timer(PREFIX + "FetchRegistry");
private final Counter REREGISTER_COUNTER = globalRegistry().counter(PREFIX
+ "Reregister");

// instance variables
/**
Expand Down Expand Up @@ -178,7 +180,10 @@ public class DiscoveryClient implements EurekaClient {

private InstanceInfoReplicator instanceInfoReplicator;

private volatile int registrySize = 0;
private final AtomicInteger registrySize = PolledMeter
.using(Spectator.globalRegistry())
.withName(METRIC_REGISTRY_PREFIX + "localRegistrySize")
.monitorValue(new AtomicInteger());
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private final ThresholdLevelsMetric heartbeatStalenessMonitor;
Expand Down Expand Up @@ -362,12 +367,19 @@ public synchronized BackupRegistry get() {
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
final Registry registry = globalRegistry();
PolledMeter.using(registry)
.withName(METRIC_REGISTRY_PREFIX + "lastSuccessfulRegistryFetchTimePeriod")
.monitorValue(this, DiscoveryClient::getLastSuccessfulRegistryFetchTimePeriodInternal);

if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
PolledMeter.using(registry)
.withName(METRIC_REGISTRATION_PREFIX + "lastSuccessfulHeartbeatTimePeriod")
.monitorValue(this, DiscoveryClient::getLastSuccessfulHeartbeatTimePeriodInternal);

logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

Expand All @@ -386,7 +398,7 @@ public synchronized BackupRegistry get() {

initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
registrySize.set(initRegistrySize);
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);

Expand Down Expand Up @@ -475,20 +487,14 @@ public synchronized BackupRegistry get() {
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();

try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
registrySize.set(initRegistrySize);
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
Expand Down Expand Up @@ -952,11 +958,6 @@ public synchronized void shutdown() {
eurekaTransport.shutdown();
}

heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();

Monitors.unregisterObject(this);

logger.info("Completed shut down of DiscoveryClient");
}
}
Expand Down Expand Up @@ -990,7 +991,7 @@ void unregister() {
* @return true if the registry was fetched
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
long monotonicTime = ServoUtil.time(FETCH_REGISTRY_TIMER);

try {
// If the delta is disabled or if it is the first time, get all
Expand Down Expand Up @@ -1022,9 +1023,7 @@ private boolean fetchRegistry(boolean forceFullRegistryFetch) {
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
ServoUtil.record(FETCH_REGISTRY_TIMER, monotonicTime);
}

// Notify about cache refresh before updating the instance remote status
Expand Down Expand Up @@ -1530,7 +1529,7 @@ void refreshRegistry() {

boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
registrySize.set(localRegionApps.get().size());
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}

Expand Down Expand Up @@ -1730,8 +1729,6 @@ public long getLastSuccessfulRegistryFetchTimePeriod() {
: System.currentTimeMillis() - lastSuccessfulRegistryFetchTimestamp;
}

@com.netflix.servo.annotations.Monitor(name = METRIC_REGISTRATION_PREFIX + "lastSuccessfulHeartbeatTimePeriod",
description = "How much time has passed from last successful heartbeat", type = DataSourceType.GAUGE)
private long getLastSuccessfulHeartbeatTimePeriodInternal() {
final long delay = (!clientConfig.shouldRegisterWithEureka() || isShutdown.get())
? 0
Expand All @@ -1742,8 +1739,6 @@ private long getLastSuccessfulHeartbeatTimePeriodInternal() {
}

// for metrics only
@com.netflix.servo.annotations.Monitor(name = METRIC_REGISTRY_PREFIX + "lastSuccessfulRegistryFetchTimePeriod",
description = "How much time has passed from last successful local registry update", type = DataSourceType.GAUGE)
private long getLastSuccessfulRegistryFetchTimePeriodInternal() {
final long delay = (!clientConfig.shouldFetchRegistry() || isShutdown.get())
? 0
Expand All @@ -1753,13 +1748,6 @@ private long getLastSuccessfulRegistryFetchTimePeriodInternal() {
return delay;
}

@com.netflix.servo.annotations.Monitor(name = METRIC_REGISTRY_PREFIX + "localRegistrySize",
description = "Count of instances in the local registry", type = DataSourceType.GAUGE)
public int localRegistrySize() {
return registrySize;
}


private long computeStalenessMonitorDelay(long delay) {
if (delay < 0) {
return System.currentTimeMillis() - initTimestampMs;
Expand Down Expand Up @@ -1794,7 +1782,7 @@ public long initTimestampMs() {
}

public int localRegistrySize() {
return registrySize;
return registrySize.get();
}

public long lastSuccessfulRegistryFetchTimestampMs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.netflix.discovery;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -9,10 +13,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.LongGauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +29,7 @@ public class TimedSupervisorTask extends TimerTask {
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final AtomicLong threadPoolLevelGauge;

private final String name;
private final ScheduledExecutorService scheduler;
Expand All @@ -51,12 +51,12 @@ public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, Thre
this.maxDelay = timeoutMillis * expBackOffBound;

// Initialize the counters and register.
successCounter = Monitors.newCounter("success");
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
final Registry registry = Spectator.globalRegistry();
successCounter = registry.counter("success");
timeoutCounter = registry.counter("timeouts");
rejectedCounter = registry.counter("rejectedExecutions");
throwableCounter = registry.counter("throwables");
threadPoolLevelGauge = PolledMeter.using(registry).withName("threadPoolUsed").monitorValue(new AtomicLong());
}

@Override
Expand Down Expand Up @@ -106,7 +106,6 @@ public void run() {

@Override
public boolean cancel() {
Monitors.unregisterObject(name, this);
return super.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.netflix.discovery.converters;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Spectator;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand All @@ -35,8 +37,6 @@
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import com.netflix.discovery.util.StringCache;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.thoughtworks.xstream.converters.Converter;
import com.thoughtworks.xstream.converters.MarshallingContext;
import com.thoughtworks.xstream.converters.UnmarshallingContext;
Expand Down Expand Up @@ -73,7 +73,7 @@ public final class Converters {

private static final Logger logger = LoggerFactory.getLogger(Converters.class);

private static final Counter UNMARSHALL_ERROR_COUNTER = Monitors.newCounter(UNMARSHAL_ERROR);
private static final Counter UNMARSHALL_ERROR_COUNTER = Spectator.globalRegistry().counter(UNMARSHAL_ERROR);

/**
* Serialize/deserialize {@link Applications} object types.
Expand Down
Loading

0 comments on commit c72e465

Please sign in to comment.