Skip to content

Commit

Permalink
feat: add ability to periodically request debug headers (#139)
Browse files Browse the repository at this point in the history
* feat: add ability to periodically request debug headers
  • Loading branch information
igorbernstein2 authored Apr 6, 2022
1 parent 2140577 commit e551d92
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 6 deletions.
8 changes: 8 additions & 0 deletions protos/grpc_gcp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ message ChannelPoolConfig {
// New channel will be created once it get hit, until we reach the max size
// of the channel pool.
uint32 max_concurrent_streams_low_watermark = 3;

// This will request debug response headers from the load balancer.
// The headers are meant to help diagnose issues when connecting to GCP
// services. The headers are primarily useful to support engineers that will
// be able to decrypt them. The headers have a fairly large payload (~1kib),
// so will be requested at most once per this period. A negative number will
// request the headers for every request, 0 will never request headers.
uint32 debug_header_interval_secs = 5;
}

message MethodConfig {
Expand Down
27 changes: 27 additions & 0 deletions src/channel_ref.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ export class ChannelRef {
private readonly channelId: number;
private affinityCount: number;
private activeStreamsCount: number;
private debugHeadersRequestedAt: Date | null;
private shouldForceDebugHeadersOnNextRequest: boolean;
private closed: boolean;

/**
* @param channel The underlying grpc channel.
Expand All @@ -44,6 +47,18 @@ export class ChannelRef {
this.channelId = channelId;
this.affinityCount = affinityCount ? affinityCount : 0;
this.activeStreamsCount = activeStreamsCount ? activeStreamsCount : 0;
this.debugHeadersRequestedAt = null;
this.shouldForceDebugHeadersOnNextRequest = false;
this.closed = false;
}

close() {
this.closed = true;
this.channel.close();
}

isClosed() {
return this.closed;
}

affinityCountIncr() {
Expand All @@ -70,6 +85,18 @@ export class ChannelRef {
return this.activeStreamsCount;
}

forceDebugHeadersOnNextRequest() {
this.shouldForceDebugHeadersOnNextRequest = true;
}
notifyDebugHeadersRequested() {
this.debugHeadersRequestedAt = new Date();
this.shouldForceDebugHeadersOnNextRequest = false;
}

getDebugHeadersRequestedAt(): Date | null {
return this.debugHeadersRequestedAt;
}

getChannel() {
return this.channel;
}
Expand Down
41 changes: 39 additions & 2 deletions src/gcp_channel_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {promisify} from 'util';

import {ChannelRef} from './channel_ref';
import * as protoRoot from './generated/grpc_gcp';

import {connectivityState} from '@grpc/grpc-js';
import ApiConfig = protoRoot.grpc.gcp.ApiConfig;
import IAffinityConfig = protoRoot.grpc.gcp.IAffinityConfig;

Expand All @@ -34,6 +34,8 @@ export interface GcpChannelFactoryInterface extends grpcType.ChannelInterface {
getAffinityConfig(methodName: string): IAffinityConfig;
bind(channelRef: ChannelRef, affinityKey: string): void;
unbind(boundKey?: string): void;
shouldRequestDebugHeaders(lastRequested: Date | null) : boolean;

}

export interface GcpChannelFactoryConstructor {
Expand Down Expand Up @@ -61,6 +63,7 @@ export function getGcpChannelFactoryClass(
private channelRefs: ChannelRef[] = [];
private target: string;
private credentials: grpcType.ChannelCredentials;
private debugHeaderIntervalSecs: number;

/**
* @param address The address of the server to connect to.
Expand All @@ -84,6 +87,7 @@ export function getGcpChannelFactoryClass(
this.minSize = 1;
this.maxSize = 10;
this.maxConcurrentStreamsLowWatermark = 100;
this.debugHeaderIntervalSecs = 0;
const gcpApiConfig = options.gcpApiConfig;
if (gcpApiConfig) {
if (gcpApiConfig.channelPool) {
Expand All @@ -98,6 +102,7 @@ export function getGcpChannelFactoryClass(
if (this.maxSize < this.minSize) {
throw new Error('Invalid channelPool config: minSize must <= maxSize')
}
this.debugHeaderIntervalSecs = channelPool.debugHeaderIntervalSecs || 0;
}
this.initMethodToAffinityMap(gcpApiConfig);
}
Expand Down Expand Up @@ -187,9 +192,33 @@ export function getGcpChannelFactoryClass(
);
const channelRef = new ChannelRef(grpcChannel, size);
this.channelRefs.push(channelRef);

if (this.debugHeaderIntervalSecs) {
this.setupDebugHeadersOnChannelTransition(channelRef);
}

return channelRef;
}

private setupDebugHeadersOnChannelTransition(channel: ChannelRef) {
const self = this;

if (channel.isClosed()) {
return;
}

let currentState = channel.getChannel().getConnectivityState(false);
if (currentState == connectivityState.SHUTDOWN) {
return;
}

channel.getChannel().watchConnectivityState(currentState, Infinity, (e) => {
channel.forceDebugHeadersOnNextRequest();
self.setupDebugHeadersOnChannelTransition(channel);
});
}


/**
* Get AffinityConfig associated with a certain method.
* @param methodName Method name of the request.
Expand All @@ -198,6 +227,14 @@ export function getGcpChannelFactoryClass(
return this.methodToAffinity[methodName];
}

shouldRequestDebugHeaders(lastRequested: Date | null) : boolean {
if (this.debugHeaderIntervalSecs < 0) return true;
else if (this.debugHeaderIntervalSecs == 0) return false;
else if (!lastRequested) return true;

return new Date().getTime() - lastRequested.getTime() > this.debugHeaderIntervalSecs * 1000;
}

/**
* Bind channel with affinity key.
* @param channelRef ChannelRef instance that contains the grpc channel.
Expand Down Expand Up @@ -232,7 +269,7 @@ export function getGcpChannelFactoryClass(
*/
close(): void {
this.channelRefs.forEach(ref => {
ref.getChannel().close();
ref.close();
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/generated/grpc_gcp.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ export namespace grpc {

/** ChannelPoolConfig maxConcurrentStreamsLowWatermark */
maxConcurrentStreamsLowWatermark?: (number|null);

/** ChannelPoolConfig debugHeaderIntervalSecs */
debugHeaderIntervalSecs?: (number|null);
}

/** Represents a ChannelPoolConfig. */
Expand All @@ -138,6 +141,9 @@ export namespace grpc {
/** ChannelPoolConfig maxConcurrentStreamsLowWatermark. */
public maxConcurrentStreamsLowWatermark: number;

/** ChannelPoolConfig debugHeaderIntervalSecs. */
public debugHeaderIntervalSecs: number;

/**
* Creates a new ChannelPoolConfig instance using the specified properties.
* @param [properties] Properties to set
Expand Down
22 changes: 22 additions & 0 deletions src/generated/grpc_gcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ $root.grpc = (function() {
* @property {number|null} [minSize] ChannelPoolConfig minSize
* @property {number|Long|null} [idleTimeout] ChannelPoolConfig idleTimeout
* @property {number|null} [maxConcurrentStreamsLowWatermark] ChannelPoolConfig maxConcurrentStreamsLowWatermark
* @property {number|null} [debugHeaderIntervalSecs] ChannelPoolConfig debugHeaderIntervalSecs
*/

/**
Expand Down Expand Up @@ -322,6 +323,14 @@ $root.grpc = (function() {
*/
ChannelPoolConfig.prototype.maxConcurrentStreamsLowWatermark = 0;

/**
* ChannelPoolConfig debugHeaderIntervalSecs.
* @member {number} debugHeaderIntervalSecs
* @memberof grpc.gcp.ChannelPoolConfig
* @instance
*/
ChannelPoolConfig.prototype.debugHeaderIntervalSecs = 0;

/**
* Creates a new ChannelPoolConfig instance using the specified properties.
* @function create
Expand Down Expand Up @@ -354,6 +363,8 @@ $root.grpc = (function() {
writer.uint32(/* id 3, wireType 0 =*/24).uint32(message.maxConcurrentStreamsLowWatermark);
if (message.minSize != null && Object.hasOwnProperty.call(message, "minSize"))
writer.uint32(/* id 4, wireType 0 =*/32).uint32(message.minSize);
if (message.debugHeaderIntervalSecs != null && Object.hasOwnProperty.call(message, "debugHeaderIntervalSecs"))
writer.uint32(/* id 5, wireType 0 =*/40).uint32(message.debugHeaderIntervalSecs);
return writer;
};

Expand Down Expand Up @@ -400,6 +411,9 @@ $root.grpc = (function() {
case 3:
message.maxConcurrentStreamsLowWatermark = reader.uint32();
break;
case 5:
message.debugHeaderIntervalSecs = reader.uint32();
break;
default:
reader.skipType(tag & 7);
break;
Expand Down Expand Up @@ -447,6 +461,9 @@ $root.grpc = (function() {
if (message.maxConcurrentStreamsLowWatermark != null && message.hasOwnProperty("maxConcurrentStreamsLowWatermark"))
if (!$util.isInteger(message.maxConcurrentStreamsLowWatermark))
return "maxConcurrentStreamsLowWatermark: integer expected";
if (message.debugHeaderIntervalSecs != null && message.hasOwnProperty("debugHeaderIntervalSecs"))
if (!$util.isInteger(message.debugHeaderIntervalSecs))
return "debugHeaderIntervalSecs: integer expected";
return null;
};

Expand Down Expand Up @@ -477,6 +494,8 @@ $root.grpc = (function() {
message.idleTimeout = new $util.LongBits(object.idleTimeout.low >>> 0, object.idleTimeout.high >>> 0).toNumber(true);
if (object.maxConcurrentStreamsLowWatermark != null)
message.maxConcurrentStreamsLowWatermark = object.maxConcurrentStreamsLowWatermark >>> 0;
if (object.debugHeaderIntervalSecs != null)
message.debugHeaderIntervalSecs = object.debugHeaderIntervalSecs >>> 0;
return message;
};

Expand All @@ -502,6 +521,7 @@ $root.grpc = (function() {
object.idleTimeout = options.longs === String ? "0" : 0;
object.maxConcurrentStreamsLowWatermark = 0;
object.minSize = 0;
object.debugHeaderIntervalSecs = 0;
}
if (message.maxSize != null && message.hasOwnProperty("maxSize"))
object.maxSize = message.maxSize;
Expand All @@ -514,6 +534,8 @@ $root.grpc = (function() {
object.maxConcurrentStreamsLowWatermark = message.maxConcurrentStreamsLowWatermark;
if (message.minSize != null && message.hasOwnProperty("minSize"))
object.minSize = message.minSize;
if (message.debugHeaderIntervalSecs != null && message.hasOwnProperty("debugHeaderIntervalSecs"))
object.debugHeaderIntervalSecs = message.debugHeaderIntervalSecs;
return object;
};

Expand Down
10 changes: 8 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ export = (grpc: GrpcModule) => {
function gcpCallInvocationTransformer<RequestType, ResponseType>(
callProperties: grpcType.CallProperties<RequestType, ResponseType>
): grpcType.CallProperties<RequestType, ResponseType> {
const channelFactory = callProperties.channel;
if (!channelFactory || !(channelFactory instanceof GcpChannelFactory)) {
if (!callProperties.channel || !(callProperties.channel instanceof GcpChannelFactory)) {
// The gcpCallInvocationTransformer needs to use gcp channel factory.
return callProperties;
}

const channelFactory = callProperties.channel as GcpChannelFactoryInterface;

const argument = callProperties.argument;
const metadata = callProperties.metadata;
const call = callProperties.call;
Expand Down Expand Up @@ -152,6 +153,11 @@ export = (grpc: GrpcModule) => {
: [];
newCallOptions.interceptors = interceptors.concat([postProcessInterceptor]);

if (channelFactory.shouldRequestDebugHeaders(channelRef.getDebugHeadersRequestedAt())) {
metadata.set('x-return-encrypted-headers', 'all_response');
channelRef.notifyDebugHeadersRequested();
}

return {
argument,
metadata,
Expand Down
69 changes: 69 additions & 0 deletions test/integration/local_service_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
const protoLoader = require('@grpc/proto-loader');
const assert = require('assert');
const getGrpcGcpObjects = require('../../build/src');
const { promisify } = require('util')

const PROTO_PATH = __dirname + '/../../protos/test_service.proto';
const packageDef = protoLoader.loadSync(PROTO_PATH);
Expand Down Expand Up @@ -129,6 +130,74 @@ for (const grpcLibName of ['grpc', '@grpc/grpc-js']) {
});
});

// describe('Debug headers', () => {
// let client;
// beforeEach(() => {
// const channelOptions = {
// channelFactoryOverride: grpcGcp.gcpChannelFactoryOverride,
// callInvocationTransformer: grpcGcp.gcpCallInvocationTransformer,
// gcpApiConfig: grpcGcp.createGcpApiConfig({
// channelPool: {
// maxSize: 1,
// maxConcurrentStreamsLowWatermark: 1,
// debugHeaderIntervalSecs: 1
// },
// }),
// };

// client = new Client(
// 'localhost:' + port,
// grpc.credentials.createInsecure(),
// channelOptions
// );
// });
// afterEach(() => {
// client.close();
// });
// it('with unary call', async () => {
// function makeCallAndReturnMeta() {
// return new Promise((resolve, reject) => {
// let lastMeta = null;

// const call = client.unary({}, new grpc.Metadata(), (err, data) => {
// if (err) reject(err);
// else resolve(lastMeta);
// });

// call.on('metadata', meta => lastMeta = meta);
// });
// }
// let m1 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m1.get('x-return-encrypted-headers'), ['all_response']);

// let m2 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m2.get('x-return-encrypted-headers'), []);

// await promisify(setTimeout)(1100);

// let m3 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m3.get('x-return-encrypted-headers'), ['all_response']);
// });
// it('with server streaming call', async () => {
// function makeCallAndReturnMeta() {
// return new Promise((resolve, reject) => {
// const call = client.serverStream({}, new grpc.Metadata());
// call.on('metadata', meta => resolve(meta));
// call.on('data', (d) => {})
// });
// }
// let m1 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m1.get('x-return-encrypted-headers'), ['all_response']);

// let m2 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m2.get('x-return-encrypted-headers'), []);

// await promisify(setTimeout)(1100);

// let m3 = await makeCallAndReturnMeta();
// assert.deepStrictEqual(m3.get('x-return-encrypted-headers'), ['all_response']);
// });
// });
describe('Echo metadata', () => {
let metadata;
let client;
Expand Down
4 changes: 2 additions & 2 deletions test/integration/spanner_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const grpcLibName = process.argv[argIndex + 1];

describe('Using ' + grpcLibName, () => {
before(async function () {
this.timeout(60000);
this.timeout(120000);
// Create test instance.
console.log(`Creating instance ${instance.formattedName_}.`);
const [, instOp] = await instance.create({
Expand Down Expand Up @@ -101,7 +101,7 @@ describe('Using ' + grpcLibName, () => {
});

after(async function () {
this.timeout(60000);
this.timeout(120000);
// Delete test instance.
console.log(`Deleting instance ${instance.id}...`);
await instance.delete();
Expand Down

0 comments on commit e551d92

Please sign in to comment.