Skip to content

Commit

Permalink
feat: add configuration for minimum number of channels (#136)
Browse files Browse the repository at this point in the history
* feat: add configuration for minimum number of channels

* add a test and regenerate protos
  • Loading branch information
igorbernstein2 authored Apr 5, 2022
1 parent a05c3dd commit 2140577
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 12 deletions.
2 changes: 2 additions & 0 deletions protos/grpc_gcp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ message ApiConfig {
message ChannelPoolConfig {
// The max number of channels in the pool.
uint32 max_size = 1;
// The min number of channels in the pool.
uint32 min_size = 4;
// The idle timeout (seconds) of channels without bound affinity sessions.
uint64 idle_timeout = 2;
// The low watermark of max number of concurrent streams in a channel.
Expand Down
43 changes: 31 additions & 12 deletions src/gcp_channel_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export function getGcpChannelFactoryClass(
* A channel management factory that implements grpc.Channel APIs.
*/
return class GcpChannelFactory implements GcpChannelFactoryInterface {
private minSize: number;
private maxSize: number;
private maxConcurrentStreamsLowWatermark: number;
private options: {};
Expand Down Expand Up @@ -80,17 +81,23 @@ export function getGcpChannelFactoryClass(
'Channel options must be an object with string keys and integer or string values'
);
}
this.minSize = 1;
this.maxSize = 10;
this.maxConcurrentStreamsLowWatermark = 100;
const gcpApiConfig = options.gcpApiConfig;
if (gcpApiConfig) {
if (gcpApiConfig.channelPool) {
const channelPool = gcpApiConfig.channelPool;
if (channelPool.minSize) this.minSize = channelPool.minSize;
if (channelPool.maxSize) this.maxSize = channelPool.maxSize;
if (channelPool.maxConcurrentStreamsLowWatermark) {
this.maxConcurrentStreamsLowWatermark =
channelPool.maxConcurrentStreamsLowWatermark;
}

if (this.maxSize < this.minSize) {
throw new Error('Invalid channelPool config: minSize must <= maxSize')
}
}
this.initMethodToAffinityMap(gcpApiConfig);
}
Expand All @@ -99,8 +106,11 @@ export function getGcpChannelFactoryClass(
this.options = options;
this.target = address;
this.credentials = credentials;
// Initialize channel in the pool to avoid empty pool.
this.getChannelRef();

// Create initial channels
for (let i = 0; i < this.minSize; i++) {
this.addChannel();
}
}

getChannelzRef() {
Expand Down Expand Up @@ -154,21 +164,30 @@ export function getGcpChannelFactoryClass(
// If all existing channels are busy, and channel pool still has capacity,
// create a new channel in the pool.
if (size < this.maxSize) {
const channelOptions = Object.assign(
return this.addChannel();
} else {
return this.channelRefs[0];
}
}

/**
* Create a new channel and add it to the pool.
* @private
*/
private addChannel() : ChannelRef {
const size = this.channelRefs.length;
const channelOptions = Object.assign(
{[CLIENT_CHANNEL_ID]: size},
this.options
);
const grpcChannel = new grpc.Channel(
);
const grpcChannel = new grpc.Channel(
this.target,
this.credentials,
channelOptions
);
const channelRef = new ChannelRef(grpcChannel, size);
this.channelRefs.push(channelRef);
return channelRef;
} else {
return this.channelRefs[0];
}
);
const channelRef = new ChannelRef(grpcChannel, size);
this.channelRefs.push(channelRef);
return channelRef;
}

/**
Expand Down
6 changes: 6 additions & 0 deletions src/generated/grpc_gcp.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ export namespace grpc {
/** ChannelPoolConfig maxSize */
maxSize?: (number|null);

/** ChannelPoolConfig minSize */
minSize?: (number|null);

/** ChannelPoolConfig idleTimeout */
idleTimeout?: (number|Long|null);

Expand All @@ -126,6 +129,9 @@ export namespace grpc {
/** ChannelPoolConfig maxSize. */
public maxSize: number;

/** ChannelPoolConfig minSize. */
public minSize: number;

/** ChannelPoolConfig idleTimeout. */
public idleTimeout: (number|Long);

Expand Down
22 changes: 22 additions & 0 deletions src/generated/grpc_gcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ $root.grpc = (function() {
* @memberof grpc.gcp
* @interface IChannelPoolConfig
* @property {number|null} [maxSize] ChannelPoolConfig maxSize
* @property {number|null} [minSize] ChannelPoolConfig minSize
* @property {number|Long|null} [idleTimeout] ChannelPoolConfig idleTimeout
* @property {number|null} [maxConcurrentStreamsLowWatermark] ChannelPoolConfig maxConcurrentStreamsLowWatermark
*/
Expand Down Expand Up @@ -297,6 +298,14 @@ $root.grpc = (function() {
*/
ChannelPoolConfig.prototype.maxSize = 0;

/**
* ChannelPoolConfig minSize.
* @member {number} minSize
* @memberof grpc.gcp.ChannelPoolConfig
* @instance
*/
ChannelPoolConfig.prototype.minSize = 0;

/**
* ChannelPoolConfig idleTimeout.
* @member {number|Long} idleTimeout
Expand Down Expand Up @@ -343,6 +352,8 @@ $root.grpc = (function() {
writer.uint32(/* id 2, wireType 0 =*/16).uint64(message.idleTimeout);
if (message.maxConcurrentStreamsLowWatermark != null && Object.hasOwnProperty.call(message, "maxConcurrentStreamsLowWatermark"))
writer.uint32(/* id 3, wireType 0 =*/24).uint32(message.maxConcurrentStreamsLowWatermark);
if (message.minSize != null && Object.hasOwnProperty.call(message, "minSize"))
writer.uint32(/* id 4, wireType 0 =*/32).uint32(message.minSize);
return writer;
};

Expand Down Expand Up @@ -380,6 +391,9 @@ $root.grpc = (function() {
case 1:
message.maxSize = reader.uint32();
break;
case 4:
message.minSize = reader.uint32();
break;
case 2:
message.idleTimeout = reader.uint64();
break;
Expand Down Expand Up @@ -424,6 +438,9 @@ $root.grpc = (function() {
if (message.maxSize != null && message.hasOwnProperty("maxSize"))
if (!$util.isInteger(message.maxSize))
return "maxSize: integer expected";
if (message.minSize != null && message.hasOwnProperty("minSize"))
if (!$util.isInteger(message.minSize))
return "minSize: integer expected";
if (message.idleTimeout != null && message.hasOwnProperty("idleTimeout"))
if (!$util.isInteger(message.idleTimeout) && !(message.idleTimeout && $util.isInteger(message.idleTimeout.low) && $util.isInteger(message.idleTimeout.high)))
return "idleTimeout: integer|Long expected";
Expand All @@ -447,6 +464,8 @@ $root.grpc = (function() {
var message = new $root.grpc.gcp.ChannelPoolConfig();
if (object.maxSize != null)
message.maxSize = object.maxSize >>> 0;
if (object.minSize != null)
message.minSize = object.minSize >>> 0;
if (object.idleTimeout != null)
if ($util.Long)
(message.idleTimeout = $util.Long.fromValue(object.idleTimeout)).unsigned = true;
Expand Down Expand Up @@ -482,6 +501,7 @@ $root.grpc = (function() {
} else
object.idleTimeout = options.longs === String ? "0" : 0;
object.maxConcurrentStreamsLowWatermark = 0;
object.minSize = 0;
}
if (message.maxSize != null && message.hasOwnProperty("maxSize"))
object.maxSize = message.maxSize;
Expand All @@ -492,6 +512,8 @@ $root.grpc = (function() {
object.idleTimeout = options.longs === String ? $util.Long.prototype.toString.call(message.idleTimeout) : options.longs === Number ? new $util.LongBits(message.idleTimeout.low >>> 0, message.idleTimeout.high >>> 0).toNumber(true) : message.idleTimeout;
if (message.maxConcurrentStreamsLowWatermark != null && message.hasOwnProperty("maxConcurrentStreamsLowWatermark"))
object.maxConcurrentStreamsLowWatermark = message.maxConcurrentStreamsLowWatermark;
if (message.minSize != null && message.hasOwnProperty("minSize"))
object.minSize = message.minSize;
return object;
};

Expand Down
15 changes: 15 additions & 0 deletions test/unit/channel_factory_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ for (const grpcLibName of ['grpc', '@grpc/grpc-js']) {
assert(channel instanceof grpcGcp.GcpChannelFactory);
});
});

it('should build min channels', () => {
const channel = new grpcGcp.GcpChannelFactory(
'hostname',
insecureCreds,
{
gcpApiConfig: grpcGcp.createGcpApiConfig({
"channelPool": {
"minSize": 3
}
})
}
);
assert.equal(channel.channelRefs.length, 3);
})
});
describe('close', () => {
let channel;
Expand Down

0 comments on commit 2140577

Please sign in to comment.