Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEWRELIC-4191 Properly link tokens so transaction is available to user code #1105

Merged
merged 3 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion instrumentation/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ subprojects {
test {
// FIXME: these are all flaky on GHA so temporarily disabling
// It is likely they can be fixed with test containers
exclude '**/BasicRequestsTest.class'
//exclude '**/BasicRequestsTest.class'
exclude '**/BlazeClientTest'
exclude '**/CassandraTest.class'
exclude '**/DefaultDynamoDbClient_InstrumentationTest.class'
Expand Down
16 changes: 7 additions & 9 deletions instrumentation/grpc-1.40.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ buildscript {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.13'
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.1'

}
}
apply plugin: 'com.google.protobuf'

def grpcVersion = '1.43.2'
def protobufVersion = '3.19.1'

dependencies {
implementation(project(":agent-bridge"))
implementation("io.grpc:grpc-all:1.40.1")
implementation("com.google.protobuf:protobuf-java:3.7.0")
implementation("io.grpc:grpc-protobuf:1.40.1")
implementation("io.grpc:grpc-all:${grpcVersion}")
implementation("com.google.protobuf:protobuf-java:${protobufVersion}")
implementation("io.grpc:grpc-protobuf:${grpcVersion}")
implementation("io.perfmark:perfmark-api:0.23.0")
}

Expand All @@ -26,13 +28,9 @@ verifyInstrumentation {
passesOnly 'io.grpc:grpc-all:[1.40.0,)'
}

def grpcVersion = '1.41.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.7.0'
def protocVersion = protobufVersion

// to generate the proto classes, run ./gradlew generateTestProto
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
protoc { artifact = "com.google.protobuf:protoc:${protobufVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.grpc;

import com.newrelic.agent.bridge.Token;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Transaction;
import io.grpc.Metadata;
import io.grpc.Status;

public class GrpcUtil {

/**
* Finalize the transaction when a Stream closed or is cancelled by linking the supplied token,
* setting the appropriate headers and marking the transaction as sent.
*
* @param token The {@link Token} to expire
* @param status The {@link Status} of the completed/cancelled operation
* @param metadata Operation {@link Metadata} to be included with the transaction
*/
public static void finalizeTransaction(Token token, Status status, Metadata metadata) {
if (token != null) {
token.link();
Transaction transaction = NewRelic.getAgent().getTransaction();
transaction.setWebResponse(new GrpcResponse(status, metadata));
transaction.addOutboundResponseHeaders();
transaction.markResponseSent();
}
}

/**
* Set the response.status attribute and error cause (if applicable) on the transaction
* when the ServerStream is closed or cancelled.
*
* @param status The {@link Status} of the completed/cancelled operation
*/
public static void setServerStreamResponseStatus(Status status) {
if (status != null) {
NewRelic.addCustomParameter("response.status", status.getCode().value());
if (GrpcConfig.errorsEnabled && status.getCause() != null) {
// If an error occurred during the close of this server call we should record it
NewRelic.noticeError(status.getCause());
}
}
}

/**
* Expire the supplied {@link Token} instance if it's non-null.
*
* @param token The {@link Token} to expire
*/
public static void expireToken(Token token) {
if (token != null) {
token.expire();
token = null;
jtduffy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ public abstract class ServerCallListener_Instrumentation {

@Trace(async = true)
public void onHalfClose() {
// onHalfClose gets executed right before we enter customer code. This helps ensure that they will have a transaction available on the thread
// onHalfClose gets executed right before we enter customer code.
// This helps ensure that they will have a transaction available on the thread
if (token != null) {
token.linkAndExpire();
token.link();
this.token = null;
}
Weaver.callOriginal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,46 @@

package io.grpc.internal;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCallListener_Instrumentation;
import io.perfmark.Tag;

@Weave(originalName = "io.grpc.internal.ServerCallImpl")
final class ServerCallImpl_Instrumentation {
final class ServerCallImpl_Instrumentation<ReqT, RespT> {

@NewField
Token token;

/**
* We use the constructor to capture the token created in the dispatcher transaction, which is
* available on the supplied stream variable. This is later used to assign the token
* to the listener when the newServerStreamListener method is called.
*/
ServerCallImpl_Instrumentation(ServerStream_Instrumentation stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer, Tag tag) {
this.token = stream.token;
}

@Trace(async = true)
ServerStreamListener newServerStreamListener(ServerCallListener_Instrumentation listener) {
// This is the point where a request comes into grpc
// Store a token on the listener so we can bring the transaction into customer code
listener.token = AgentBridge.getAgent().getTransaction().getToken();
listener.token = this.token;

if (token != null) {
token.link();
token = null;
}

return Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.agent.instrumentation.grpc.GrpcConfig;
import com.nr.agent.instrumentation.grpc.GrpcResponse;
import com.nr.agent.instrumentation.grpc.GrpcUtil;
import io.grpc.Metadata;
import io.grpc.Status;

Expand All @@ -28,56 +29,24 @@ public abstract class ServerStream_Instrumentation {
public Token token;

@Trace(async = true)
public void close(Status status, Metadata trailers) {
if (token != null) {
token.link();
Transaction transaction = NewRelic.getAgent().getTransaction();
transaction.setWebResponse(new GrpcResponse(status, trailers));
transaction.addOutboundResponseHeaders();
transaction.markResponseSent();
}

if (status != null) {
NewRelic.addCustomParameter("response.status", status.getCode().value());
if (GrpcConfig.errorsEnabled && status.getCause() != null) {
// If an error occurred during the close of this server call we should record it
NewRelic.noticeError(status.getCause());
}
}
public void close(Status status, Metadata metadata) {
GrpcUtil.finalizeTransaction(token, status, metadata);
GrpcUtil.setServerStreamResponseStatus(status);

Weaver.callOriginal();

if (token != null) {
token.expire();
token = null;
}
GrpcUtil.expireToken(token);
jtduffy marked this conversation as resolved.
Show resolved Hide resolved
}

// server had an internal error
@Trace(async = true)
public void cancel(Status status) {
if (token != null) {
token.link();
Transaction transaction = token.getTransaction();
transaction.setWebResponse(new GrpcResponse(status, new Metadata()));
transaction.addOutboundResponseHeaders();
transaction.markResponseSent();
}

if (status != null) {
NewRelic.addCustomParameter("response.status", status.getCode().value());
if (GrpcConfig.errorsEnabled && status.getCause() != null) {
// If an error occurred during the close of this server call we should record it
NewRelic.noticeError(status.getCause());
}
}
GrpcUtil.finalizeTransaction(token, status, new Metadata());
GrpcUtil.setServerStreamResponseStatus(status);

Weaver.callOriginal();

if (token != null) {
token.expire();
token = null;
}
GrpcUtil.expireToken(token);
jtduffy marked this conversation as resolved.
Show resolved Hide resolved
}

public abstract String getAuthority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.newrelic.agent.introspec.Introspector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

Expand Down Expand Up @@ -76,6 +77,9 @@ public void testAsyncRequest() {
}

@Test
@Ignore
// Failing test -- related to another ticket for fixing a streaming request bug. Originally the entire
// test class was excluded in the parent gradle file.
public void testStreamingRequest() {
client.helloStreaming("Streaming");

Expand Down