Skip to content

Commit

Permalink
le Plug-in contextual logging. With this change in place we now injec…
Browse files Browse the repository at this point in the history
…t restateServiceMethod and restateInvocationId in log4j2 context. These two variables can then be used in the log pattern, in the json appender, etc.
  • Loading branch information
slinkydeveloper committed Oct 30, 2023
1 parent 26105f1 commit 67096de
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/http/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
appender.testlogger.layout.pattern = %-4r %-5p [$${ctx:restateServiceMethod}][$${ctx:restateInvocationId}] [%t] %c - %m%n
3 changes: 3 additions & 0 deletions sdk-core-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
implementation(coreLibs.grpc.protobuf)
implementation(coreLibs.log4j.api)

// We don't want a hard-dependency on it
compileOnly(coreLibs.log4j.core)

implementation(platform(coreLibs.opentelemetry.bom))
implementation(coreLibs.opentelemetry.api)
implementation(coreLibs.opentelemetry.semconv)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dev.restate.sdk.core.impl;

import dev.restate.sdk.core.InvocationId;
import java.util.Map;
import org.apache.logging.log4j.core.util.ContextDataProvider;

/**
* Log4j2 ContextDataProvider inferring context from the Grpc context.
*
* <p>This is used to propagate the context to the user code, such that log statements from the user
* will contain the restate logging context variables.
*/
public class GrpcContextDataProvider implements ContextDataProvider {
@Override
public Map<String, String> supplyContextData() {
InvocationId invocationId = InvocationId.INVOCATION_ID_KEY.get();
String serviceMethod = RestateGrpcServer.SERVICE_METHOD.get();

// We use Map.of constructors to avoid allocating hashmaps
if (invocationId == null && serviceMethod == null) {
return Map.of();
} else if (invocationId == null) {
return Map.of(RestateGrpcServer.LoggingContextSetter.SERVICE_METHOD_KEY, serviceMethod);
} else {
return Map.of(
RestateGrpcServer.LoggingContextSetter.INVOCATION_ID_KEY,
invocationId.toString(),
RestateGrpcServer.LoggingContextSetter.SERVICE_METHOD_KEY,
serviceMethod);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;

public class RestateGrpcServer {

private static final Logger LOG = LogManager.getLogger(RestateGrpcServer.class);
static final Context.Key<String> SERVICE_METHOD =
Context.key("restate.dev/logging_service_method");

private final Map<String, ServerServiceDefinition> services;
private final Serde serde;
Expand All @@ -46,6 +49,7 @@ public InvocationHandler resolve(
String serviceName,
String methodName,
io.opentelemetry.context.Context otelContext,
LoggingContextSetter loggingContextSetter,
@Nullable Executor syscallExecutor,
@Nullable Executor serverCallListenerExecutor)
throws ProtocolException {
Expand All @@ -54,9 +58,9 @@ public InvocationHandler resolve(
if (svc == null) {
throw ProtocolException.methodNotFound(serviceName, methodName);
}
String serviceMethodName = serviceName + "/" + methodName;
ServerMethodDefinition<MessageLite, MessageLite> method =
(ServerMethodDefinition<MessageLite, MessageLite>)
svc.getMethod(serviceName + "/" + methodName);
(ServerMethodDefinition<MessageLite, MessageLite>) svc.getMethod(serviceMethodName);
if (method == null) {
throw ProtocolException.methodNotFound(serviceName, methodName);
}
Expand All @@ -72,6 +76,9 @@ public InvocationHandler resolve(
.setAttribute(SemanticAttributes.RPC_METHOD, methodName)
.startSpan();

// Setup logging context
loggingContextSetter.setServiceMethod(serviceMethodName);

// Instantiate state machine, syscall and grpc bridge
InvocationStateMachine stateMachine = new InvocationStateMachine(serviceName, span);
SyscallsInternal syscalls =
Expand Down Expand Up @@ -100,10 +107,14 @@ public void start() {
LOG.debug("Start processing call to {}/{}", serviceName, methodName);
stateMachine.start(
invocationId -> {
// Set invocation id in logging context
loggingContextSetter.setInvocationId(invocationId.toString());

// Create the listener and create the decorators chain
ServerCall.Listener<MessageLite> grpcListener =
Contexts.interceptCall(
Context.current()
.withValue(SERVICE_METHOD, serviceMethodName)
.withValue(InvocationId.INVOCATION_ID_KEY, invocationId)
.withValue(Syscalls.SYSCALLS_KEY, syscalls),
bridge,
Expand Down Expand Up @@ -180,4 +191,34 @@ public RestateGrpcServer build() {
tracer);
}
}

/**
* Interface to abstract setting the logging context variables.
*
* <p>In classic multithreaded environments, you can just use {@link
* LoggingContextSetter#THREAD_LOCAL_INSTANCE}, though the caller of {@link RestateGrpcServer}
* must take care of the cleanup of the thread local map.
*/
public interface LoggingContextSetter {

String INVOCATION_ID_KEY = "restateInvocationId";
String SERVICE_METHOD_KEY = "restateServiceMethod";

LoggingContextSetter THREAD_LOCAL_INSTANCE =
new LoggingContextSetter() {
@Override
public void setServiceMethod(String serviceMethod) {
ThreadContext.put(INVOCATION_ID_KEY, serviceMethod);
}

@Override
public void setInvocationId(String id) {
ThreadContext.put(SERVICE_METHOD_KEY, id);
}
};

void setServiceMethod(String serviceMethod);

void setInvocationId(String id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dev.restate.sdk.core.impl.GrpcContextDataProvider
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.ThreadContext;

public final class MockMultiThreaded implements TestDefinitions.TestExecutor {

Expand Down Expand Up @@ -38,6 +39,7 @@ public void executeTest(TestDefinitions.TestDefinition definition) {
svc.getServiceDescriptor().getName(),
definition.getMethod(),
io.opentelemetry.context.Context.current(),
RestateGrpcServer.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
syscallsExecutor,
userExecutor);

Expand Down Expand Up @@ -65,5 +67,8 @@ public void executeTest(TestDefinitions.TestDefinition definition) {
.succeedsWithin(Duration.ofSeconds(1))
.satisfies(definition.getOutputAssert());
assertThat(inputPublisher.isSubscriptionCancelled()).isTrue();

// Clean logging
ThreadContext.clearAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dev.restate.sdk.core.impl.TestDefinitions.TestExecutor;
import io.grpc.ServerServiceDefinition;
import java.time.Duration;
import org.apache.logging.log4j.ThreadContext;

public final class MockSingleThread implements TestExecutor {

Expand Down Expand Up @@ -35,6 +36,7 @@ public void executeTest(TestDefinition definition) {
svc.getServiceDescriptor().getName(),
definition.getMethod(),
io.opentelemetry.context.Context.current(),
RestateGrpcServer.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
null,
null);

Expand All @@ -54,5 +56,8 @@ public void executeTest(TestDefinition definition) {
.succeedsWithin(Duration.ZERO)
.satisfies(definition.getOutputAssert());
assertThat(inputPublisher.isSubscriptionCancelled()).isTrue();

// Clean logging
ThreadContext.clearAll();
}
}
2 changes: 1 addition & 1 deletion sdk-core-impl/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
appender.testlogger.layout.pattern = %-4r %-5p %notEmpty{[%X{restateServiceMethod}]}%notEmpty{[%X{restateInvocationId}]} [%t] %c - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public interface InvocationId {

/** gRPC {@link Context} key for invocation id. */
Context.Key<InvocationId> INVOCATION_ID_KEY = Context.key("restate.dev/service_invocation_id");
Context.Key<InvocationId> INVOCATION_ID_KEY = Context.key("restate.dev/invocation_id");

/**
* @return the current invocation id from the current gRPC {@link Context}.
Expand Down
1 change: 1 addition & 0 deletions sdk-http-vertx/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation(platform(coreLibs.opentelemetry.bom))
implementation(coreLibs.opentelemetry.api)
implementation(coreLibs.log4j.api)
implementation("io.reactiverse:reactiverse-contextual-logging:1.1.2")

testImplementation(project(":sdk-java-blocking"))
testImplementation(project(":sdk-kotlin"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.netty.util.AsciiString;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.reactiverse.contextual.logging.ContextualData;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand Down Expand Up @@ -106,6 +107,18 @@ public void handle(HttpServerRequest request) {
serviceName,
methodName,
otelContext,
new RestateGrpcServer.LoggingContextSetter() {
@Override
public void setServiceMethod(String serviceMethod) {
ContextualData.put(
RestateGrpcServer.LoggingContextSetter.SERVICE_METHOD_KEY, serviceMethod);
}

@Override
public void setInvocationId(String id) {
ContextualData.put(RestateGrpcServer.LoggingContextSetter.INVOCATION_ID_KEY, id);
}
},
isBlockingService ? currentContextExecutor(vertxCurrentContext) : null,
isBlockingService ? blockingExecutor(serviceName) : null);
} catch (ProtocolException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.reactiverse.contextual.logging.VertxContextDataProvider
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class BlockingGreeterService extends GreeterGrpc.GreeterImplBase
implements RestateBlockingService {

private static final Logger LOG = LogManager.getLogger(BlockingGreeterService.class);
public static final StateKey<Long> COUNTER =
StateKey.of(
"counter",
Expand All @@ -24,6 +27,8 @@ public class BlockingGreeterService extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
// restateContext() is invoked everytime to make sure context propagation works!

LOG.info("Greet invoked!");

var count = restateContext().get(COUNTER).orElse(0L) + 1;
restateContext().set(COUNTER, count);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ import dev.restate.sdk.core.impl.testservices.greetingResponse
import dev.restate.sdk.kotlin.RestateCoroutineService
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.seconds
import org.apache.logging.log4j.LogManager

class GreeterKtService(coroutineContext: CoroutineContext) :
GreeterGrpcKt.GreeterCoroutineImplBase(coroutineContext), RestateCoroutineService {

private val LOG = LogManager.getLogger(GreeterKtService::class.java)

override suspend fun greet(request: GreetingRequest): GreetingResponse {
LOG.info("Greet invoked!")

val count = (restateContext().get(BlockingGreeterService.COUNTER) ?: 0) + 1
restateContext().set(BlockingGreeterService.COUNTER, count)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;

public final class LambdaRestateServer {

Expand Down Expand Up @@ -106,7 +107,14 @@ private APIGatewayProxyResponseEvent handleInvoke(APIGatewayProxyRequestEvent in
// Resolve handler
InvocationHandler handler;
try {
handler = this.restateGrpcServer.resolve(service, method, otelContext, null, null);
handler =
this.restateGrpcServer.resolve(
service,
method,
otelContext,
RestateGrpcServer.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
null,
null);
} catch (ProtocolException e) {
LOG.warn("Error when resolving the grpc handler", e);
return new APIGatewayProxyResponseEvent()
Expand All @@ -127,6 +135,9 @@ private APIGatewayProxyResponseEvent handleInvoke(APIGatewayProxyRequestEvent in
// computation. Hence, we should have a result available at this point.
byte[] responseBody = subscriber.getResult();

// Clear logging
ThreadContext.clearAll();

final APIGatewayProxyResponseEvent response = new APIGatewayProxyResponseEvent();
response.setHeaders(INVOKE_RESPONSE_HEADERS);
response.setIsBase64Encoded(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import dev.restate.generated.sdk.java.Java;
import dev.restate.generated.service.discovery.Discovery;
import dev.restate.generated.service.protocol.Protocol;
import dev.restate.sdk.core.impl.InvocationFlow;
import dev.restate.sdk.core.impl.InvocationHandler;
import dev.restate.sdk.core.impl.MessageHeader;
import dev.restate.sdk.core.impl.RestateGrpcServer;
import dev.restate.sdk.core.impl.*;
import io.grpc.*;
import io.grpc.protobuf.ProtoMethodDescriptorSupplier;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -99,7 +96,13 @@ private void handle(

// Create invocation handler on the side of the service
InvocationHandler serviceInvocationStateMachineHandler =
server.resolve(serviceName, method, io.opentelemetry.context.Context.current(), null, null);
server.resolve(
serviceName,
method,
io.opentelemetry.context.Context.current(),
RestateGrpcServer.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
null,
null);

// Get the key of the instance. Either value of key, random value (unkeyed service) or empty
// value (singleton).
Expand Down

0 comments on commit 67096de

Please sign in to comment.