Skip to content

Commit

Permalink
Merge pull request #2792 from murgatroid99/grpc-js_1.10.11_upmerge
Browse files Browse the repository at this point in the history
grpc-js: Merge 1.10.x branch into master
  • Loading branch information
murgatroid99 authored Jul 12, 2024
2 parents f867643 + 9ea4187 commit b35896b
Show file tree
Hide file tree
Showing 15 changed files with 603 additions and 314 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.10.8",
"version": "1.10.11",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
9 changes: 5 additions & 4 deletions packages/grpc-js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ export class Client {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received');
}
responseMessage = message;
},
Expand All @@ -345,7 +345,7 @@ export class Client {
callProperties.callback!(
callErrorFromStatus(
{
code: Status.INTERNAL,
code: Status.UNIMPLEMENTED,
details: 'No message received',
metadata: status.metadata,
},
Expand Down Expand Up @@ -463,9 +463,10 @@ export class Client {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received');
}
responseMessage = message;
call.startRead();
},
onReceiveStatus(status: StatusObject) {
if (receivedStatus) {
Expand All @@ -478,7 +479,7 @@ export class Client {
callProperties.callback!(
callErrorFromStatus(
{
code: Status.INTERNAL,
code: Status.UNIMPLEMENTED,
details: 'No message received',
metadata: status.metadata,
},
Expand Down
75 changes: 59 additions & 16 deletions packages/grpc-js/src/compression-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
import { Channel } from './channel';
import { ChannelOptions } from './channel-options';
import { CompressionAlgorithms } from './compression-algorithms';
import { LogVerbosity } from './constants';
import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
import * as logging from './logging';
import { Metadata, MetadataValue } from './metadata';
Expand Down Expand Up @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
}

class DeflateHandler extends CompressionHandler {
constructor(private maxRecvMessageLength: number) {
super();
}

compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.deflate(message, (err, output) => {
Expand All @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {

decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.inflate(message, (err, output) => {
if (err) {
reject(err);
} else {
resolve(output);
let totalLength = 0;
const messageParts: Buffer[] = [];
const decompresser = zlib.createInflate();
decompresser.on('data', (chunk: Buffer) => {
messageParts.push(chunk);
totalLength += chunk.byteLength;
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
decompresser.destroy();
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
});
}
});
decompresser.on('end', () => {
resolve(Buffer.concat(messageParts));
});
decompresser.write(message);
decompresser.end();
});
}
}

class GzipHandler extends CompressionHandler {
constructor(private maxRecvMessageLength: number) {
super();
}

compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.gzip(message, (err, output) => {
Expand All @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {

decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.unzip(message, (err, output) => {
if (err) {
reject(err);
} else {
resolve(output);
let totalLength = 0;
const messageParts: Buffer[] = [];
const decompresser = zlib.createGunzip();
decompresser.on('data', (chunk: Buffer) => {
messageParts.push(chunk);
totalLength += chunk.byteLength;
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
decompresser.destroy();
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
});
}
});
decompresser.on('end', () => {
resolve(Buffer.concat(messageParts));
});
decompresser.write(message);
decompresser.end();
});
}
}
Expand All @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
}
}

function getCompressionHandler(compressionName: string): CompressionHandler {
function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
switch (compressionName) {
case 'identity':
return new IdentityHandler();
case 'deflate':
return new DeflateHandler();
return new DeflateHandler(maxReceiveMessageSize);
case 'gzip':
return new GzipHandler();
return new GzipHandler(maxReceiveMessageSize);
default:
return new UnknownHandler(compressionName);
}
Expand All @@ -186,6 +218,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
private sendCompression: CompressionHandler = new IdentityHandler();
private receiveCompression: CompressionHandler = new IdentityHandler();
private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
private maxReceiveMessageLength: number;
private maxSendMessageLength: number;

constructor(
channelOptions: ChannelOptions,
Expand All @@ -195,6 +229,8 @@ export class CompressionFilter extends BaseFilter implements Filter {

const compressionAlgorithmKey =
channelOptions['grpc.default_compression_algorithm'];
this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
this.maxSendMessageLength = channelOptions['grpc.max_send_message_length'] ?? DEFAULT_MAX_SEND_MESSAGE_LENGTH;
if (compressionAlgorithmKey !== undefined) {
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
const clientSelectedEncoding = CompressionAlgorithms[
Expand All @@ -215,7 +251,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
) {
this.currentCompressionAlgorithm = clientSelectedEncoding;
this.sendCompression = getCompressionHandler(
this.currentCompressionAlgorithm
this.currentCompressionAlgorithm,
-1
);
}
} else {
Expand Down Expand Up @@ -247,7 +284,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
if (receiveEncoding.length > 0) {
const encoding: MetadataValue = receiveEncoding[0];
if (typeof encoding === 'string') {
this.receiveCompression = getCompressionHandler(encoding);
this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
}
}
metadata.remove('grpc-encoding');
Expand Down Expand Up @@ -279,6 +316,12 @@ export class CompressionFilter extends BaseFilter implements Filter {
* and the output is a framed and possibly compressed message. For this
* reason, this filter should be at the bottom of the filter stack */
const resolvedMessage: WriteObject = await message;
if (this.maxSendMessageLength !== -1 && resolvedMessage.message.length > this.maxSendMessageLength) {
throw {
code: Status.RESOURCE_EXHAUSTED,
details: `Attempted to send message with a size larger than ${this.maxSendMessageLength}`
};
}
let compress: boolean;
if (this.sendCompression instanceof IdentityHandler) {
compress = false;
Expand Down
33 changes: 29 additions & 4 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, QueuePicker } from './picker';
import { UnavailablePicker, Picker, QueuePicker, PickArgs, PickResult, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
Expand All @@ -33,7 +33,6 @@ import {
} from './resolver';
import { trace } from './logging';
import { SubchannelAddress } from './subchannel-address';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
Expand Down Expand Up @@ -144,6 +143,22 @@ class ChannelSubchannelWrapper
}
}

class ShutdownPicker implements Picker {
pick(pickArgs: PickArgs): PickResult {
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: 'Channel closed before call started',
metadata: new Metadata()
},
subchannel: null,
onCallStarted: null,
onCallEnded: null
}
}
}

export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
Expand Down Expand Up @@ -402,7 +417,6 @@ export class InternalChannel {
}
);
this.filterStackFactory = new FilterStackFactory([
new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this, this.options),
]);
this.trace(
Expand Down Expand Up @@ -538,7 +552,9 @@ export class InternalChannel {
}

getConfig(method: string, metadata: Metadata): GetConfigResult {
this.resolvingLoadBalancer.exitIdle();
if (this.connectivityState !== ConnectivityState.SHUTDOWN) {
this.resolvingLoadBalancer.exitIdle();
}
if (this.configSelector) {
return {
type: 'SUCCESS',
Expand Down Expand Up @@ -747,6 +763,15 @@ export class InternalChannel {
close() {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
this.currentPicker = new ShutdownPicker();
for (const call of this.configSelectionQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.configSelectionQueue = [];
for (const call of this.pickQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.pickQueue = [];
clearInterval(this.callRefTimer);
if (this.idleTimer) {
clearTimeout(this.idleTimer);
Expand Down
17 changes: 11 additions & 6 deletions packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import {
Expand Down Expand Up @@ -348,7 +348,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (newState !== ConnectivityState.READY) {
this.removeCurrentPick();
this.calculateAndReportNewState();
this.requestReresolution();
}
return;
}
Expand Down Expand Up @@ -483,6 +482,15 @@ export class PickFirstLoadBalancer implements LoadBalancer {
subchannel: this.channelControlHelper.createSubchannel(address, {}),
hasReportedTransientFailure: false,
}));
trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])');
for (const { subchannel } of newChildrenList) {
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
subchannel.addConnectivityStateListener(this.subchannelStateListener);
this.pickSubchannel(subchannel);
return;
}
}
/* Ref each subchannel before resetting the list, to ensure that
* subchannels shared between the list don't drop to 0 refs during the
* transition. */
Expand All @@ -494,10 +502,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.children = newChildrenList;
for (const { subchannel } of this.children) {
subchannel.addConnectivityStateListener(this.subchannelStateListener);
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
this.pickSubchannel(subchannel);
return;
}
}
for (const child of this.children) {
if (
Expand Down Expand Up @@ -527,6 +531,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
const rawAddressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])');
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
Expand Down
Loading

0 comments on commit b35896b

Please sign in to comment.