Skip to content

Commit

Permalink
Add option to disable tracing for individual commands #2373
Browse files Browse the repository at this point in the history
ObservationPredicate can now be used to filter unwanted commands.
  • Loading branch information
mp911de committed Oct 26, 2023
1 parent 9055a69 commit 8f43ee1
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 79 deletions.
1 change: 0 additions & 1 deletion src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ private void attachTracing(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> comm

if (traced != null) {
traced.setSpan(span);

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,94 @@ public class LettuceObservationContext extends SenderContext<Object> {

private volatile Endpoint endpoint;

/**
* Create a new {@code LettuceObservationContext} given the {@code serviceName}.
*
* @param serviceName service name.
*/
public LettuceObservationContext(String serviceName) {

super((carrier, key, value) -> {
}, Kind.CLIENT);

setRemoteServiceName(serviceName);
}

/**
* Returns the required {@link RedisCommand} or throws {@link IllegalStateException} if no command is associated with the
* context. Use {@link #hasCommand()} to check if the command is available.
*
* @return the required {@link RedisCommand}.
* @throws IllegalStateException if no command is associated with the context.
*/
public RedisCommand<?, ?, ?> getRequiredCommand() {

RedisCommand<?, ?, ?> local = command;

if (local == null) {
throw new IllegalArgumentException("LettuceObservationContext is not associated with a Command");
throw new IllegalStateException("LettuceObservationContext is not associated with a Command");
}

return local;
}

/**
* Set the {@link RedisCommand}.
*
* @param command the traced command.
*/
public void setCommand(RedisCommand<?, ?, ?> command) {
this.command = command;
}

/**
* @return {@code true} if the command is available;{@code false} otherwise.
*/
public boolean hasCommand() {
return this.command != null;
}

/**
* Returns the required {@link Endpoint} or throws {@link IllegalStateException} if no endpoint is associated with the
* context.
*
* @return the required {@link Endpoint}.
* @throws IllegalStateException if no endpoint is associated with the context.
*/
public Endpoint getRequiredEndpoint() {

Endpoint local = endpoint;

if (local == null) {
throw new IllegalArgumentException("LettuceObservationContext is not associated with a Endpoint");
throw new IllegalStateException("LettuceObservationContext is not associated with a Endpoint");
}

return local;
}

/**
* Set the {@link Endpoint}.
*
* @param endpoint the traced endpoint.
*/
public void setEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
}

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(getClass().getSimpleName());
sb.append(" [name=").append(getName());
sb.append(", contextualName=").append(getContextualName());
sb.append(", error=").append(getError());
sb.append(", lowCardinalityKeyValues=").append(getLowCardinalityKeyValues());
sb.append(", highCardinalityKeyValues=").append(getHighCardinalityKeyValues());
sb.append(", parentObservation=").append(getParentObservation());
sb.append(", command=").append(command);
sb.append(", endpoint=").append(endpoint);
sb.append(']');
return sb.toString();
}

}
103 changes: 32 additions & 71 deletions src/main/java/io/lettuce/core/tracing/MicrometerTracing.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import static io.lettuce.core.tracing.RedisObservation.*;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CompleteableCommand;
Expand Down Expand Up @@ -137,7 +140,7 @@ public MicrometerTracer(ObservationRegistry observationRegistry) {

@Override
public Span nextSpan() {
return this.postProcessSpan(createObservation());
return new MicrometerSpan(serviceName, this::createObservation);
}

@Override
Expand All @@ -151,67 +154,16 @@ public Span nextSpan(TraceContext traceContext) {
return nextSpan();
}

return postProcessSpan(createObservation().parentObservation(micrometerTraceContext.getObservation()));
return new MicrometerSpan(serviceName,
context -> createObservation(context).parentObservation(micrometerTraceContext.getObservation()));
}

return nextSpan();
}

private Observation createObservation() {
return REDIS_COMMAND_OBSERVATION.observation(observationRegistry, () -> new LettuceObservationContext(serviceName));
}

private Span postProcessSpan(Observation observation) {

return observation != null && !observation.isNoop()
? new MicrometerSpan(observation.observationConvention(observationConvention))
: NoOpSpan.INSTANCE;
}

}

/**
* No-op {@link Span} implemementation.
*/
static class NoOpSpan extends Span {

static final NoOpSpan INSTANCE = new NoOpSpan();

public NoOpSpan() {
}

@Override
public Span start(RedisCommand<?, ?, ?> command) {
return this;
}

@Override
public Span name(String name) {
return this;
}

@Override
public Span annotate(String value) {
return this;
}

@Override
public Span tag(String key, String value) {
return this;
}

@Override
public Span error(Throwable throwable) {
return this;
}

@Override
public Span remoteEndpoint(Endpoint endpoint) {
return this;
}

@Override
public void finish() {
private Observation createObservation(LettuceObservationContext context) {
return REDIS_COMMAND_OBSERVATION.observation(observationRegistry, () -> context)
.observationConvention(observationConvention);
}

}
Expand All @@ -221,20 +173,27 @@ public void finish() {
*/
static class MicrometerSpan extends Span {

private final Observation observation;
private final LettuceObservationContext context;

private RedisCommand<?, ?, ?> command;
private final Function<LettuceObservationContext, Observation> observationFactory;

public MicrometerSpan(Observation observation) {
this.observation = observation;
private Map<String, String> highCardinalityKeyValue;

private Observation observation;

public MicrometerSpan(String serviceName, Function<LettuceObservationContext, Observation> observationFactory) {
this.context = new LettuceObservationContext(serviceName);
this.observationFactory = observationFactory;
}

@Override
public Span start(RedisCommand<?, ?, ?> command) {

((LettuceObservationContext) observation.getContext()).setCommand(command);

this.command = command;
this.context.setCommand(command);
this.observation = observationFactory.apply(context);
if (this.highCardinalityKeyValue != null) {
this.highCardinalityKeyValue.forEach(this.observation::highCardinalityKeyValue);
}

if (command instanceof CompleteableCommand<?>) {

Expand All @@ -246,7 +205,7 @@ public Span start(RedisCommand<?, ?, ?> command) {

String error = command.getOutput().getError();
if (error != null) {
observation.highCardinalityKeyValue(HighCardinalityCommandKeyNames.ERROR.withValue(error));
this.observation.highCardinalityKeyValue(HighCardinalityCommandKeyNames.ERROR.withValue(error));
} else if (throwable != null) {
error(throwable);
}
Expand All @@ -259,7 +218,7 @@ public Span start(RedisCommand<?, ?, ?> command) {
+ " must implement CompleteableCommand to attach Span completion to command completion");
}

observation.start();
this.observation.start();
return this;
}

Expand All @@ -275,26 +234,28 @@ public Span annotate(String annotation) {

@Override
public Span tag(String key, String value) {
observation.highCardinalityKeyValue(key, value);
if (this.highCardinalityKeyValue == null) {
this.highCardinalityKeyValue = new HashMap<>();
}
this.highCardinalityKeyValue.put(key, value);
return this;
}

@Override
public Span error(Throwable throwable) {
observation.error(throwable);
this.observation.error(throwable);
return this;
}

@Override
public Span remoteEndpoint(Endpoint endpoint) {

((LettuceObservationContext) observation.getContext()).setEndpoint(endpoint);
this.context.setEndpoint(endpoint);
return this;
}

@Override
public void finish() {
observation.stop();
this.observation.stop();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ void aclLog() {
assertThatThrownBy(() -> redis.auth("non-existing2", "foobar"));
assertThat(redis.aclLog()).hasSize(2).first().hasFieldOrProperty("reason");
assertThat(redis.aclLog(1)).hasSize(1);
assertThat(redis.aclLogReset()).isEqualTo("OK");
assertThat(redis.aclLog()).isEmpty();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

import static org.assertj.core.api.Assertions.*;

import java.util.concurrent.LinkedBlockingQueue;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.test.resource.FastShutdown;
import io.lettuce.test.settings.TestSettings;
Expand Down Expand Up @@ -53,8 +57,18 @@ protected ObservationRegistry createObservationRegistry() {
@Override
public SampleTestRunnerConsumer yourCode() {

LinkedBlockingQueue<RedisCommand<?, ?, ?>> commands = new LinkedBlockingQueue<>();
ObservationRegistry observationRegistry = createObservationRegistry();
observationRegistry.observationConfig().observationPredicate((s, context) -> {

if (context instanceof LettuceObservationContext) {
commands.add(((LettuceObservationContext) context).getRequiredCommand());
}

return true;
});
ClientResources clientResources = ClientResources.builder()
.tracing(new MicrometerTracing(createObservationRegistry(), "Redis", true)).build();
.tracing(new MicrometerTracing(observationRegistry, "Redis", true)).build();

return (tracer, meterRegistry) -> {

Expand All @@ -65,6 +79,8 @@ public SampleTestRunnerConsumer yourCode() {
connection.sync().ping();

connection.close();
FastShutdown.shutdown(redisClient);
FastShutdown.shutdown(clientResources);

assertThat(tracer.getFinishedSpans()).isNotEmpty();
System.out.println(((SimpleMeterRegistry) meterRegistry).getMetersAsString());
Expand All @@ -78,8 +94,7 @@ public SampleTestRunnerConsumer yourCode() {
assertThat(finishedSpan.getTags()).containsKeys("db.operation");
}

FastShutdown.shutdown(redisClient);
FastShutdown.shutdown(clientResources);
assertThat(commands).extracting(RedisCommand::getType).contains(CommandType.PING, CommandType.HELLO);
};
}

Expand Down

0 comments on commit 8f43ee1

Please sign in to comment.