Skip to content

Commit

Permalink
feat: implementation of useAbortSignal option for grpc-web (#777)
Browse files Browse the repository at this point in the history
* implement abort option

* fix format

* update example

* add useAbortSignal option

* make more readable

* add useAbortSignal option

* move abortSignal from 2nd to 3rd arguments

---------

Co-authored-by: Francois HERSENT <[email protected]>
  • Loading branch information
hersentino and hersentino authored Feb 22, 2023
1 parent be5465f commit 7a3d429
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 15 deletions.
26 changes: 22 additions & 4 deletions integration/grpc-web-no-streaming-observable/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ export const Empty = {
* but with the streaming method removed.
*/
export interface DashState {
UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState>;
UserSettings(
request: DeepPartial<Empty>,
metadata?: grpc.Metadata,
abortSignal?: AbortSignal,
): Observable<DashUserSettingsState>;
}

export class DashStateClientImpl implements DashState {
Expand All @@ -332,8 +336,12 @@ export class DashStateClientImpl implements DashState {
this.UserSettings = this.UserSettings.bind(this);
}

UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), metadata);
UserSettings(
request: DeepPartial<Empty>,
metadata?: grpc.Metadata,
abortSignal?: AbortSignal,
): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata);
}
}

Expand Down Expand Up @@ -374,6 +382,7 @@ interface Rpc {
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
abortSignal?: AbortSignal,
): Observable<any>;
}

Expand Down Expand Up @@ -405,13 +414,14 @@ export class GrpcWebImpl {
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined,
abortSignal?: AbortSignal,
): Observable<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata = metadata && this.options.metadata
? new BrowserHeaders({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable((observer) => {
grpc.unary(methodDesc, {
const client = grpc.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -427,6 +437,14 @@ export class GrpcWebImpl {
}
},
});

const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) {
abortSignal.addEventListener("abort", abortHandler);
}
}).pipe(take(1));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
outputClientImpl=grpc-web,returnObservable=true
outputClientImpl=grpc-web,returnObservable=true,useAbortSignal=true
6 changes: 5 additions & 1 deletion integration/grpc-web/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,11 @@ export class GrpcWebImpl {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) {
return client.close();
}
});
});
upStream();
}).pipe(share());
Expand Down
70 changes: 64 additions & 6 deletions src/generate-grpc-web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export function generateGrpcClientImpl(
/** Creates the RPC methods that client code actually calls. */
function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) {
assertInstanceOf(methodDesc, FormattedMethodDescriptor);
const { options } = ctx;
const { useAbortSignal } = options;
const requestMessage = rawRequestType(ctx, methodDesc);
const inputType = requestType(ctx, methodDesc, true);
const returns = responsePromiseOrObservable(ctx, methodDesc);
Expand All @@ -57,6 +59,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
${methodDesc.formattedName}(
request: ${inputType},
metadata?: grpc.Metadata,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${returns} {
throw new Error('ts-proto does not yet support client streaming!');
}
Expand All @@ -68,10 +71,12 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
${methodDesc.formattedName}(
request: ${inputType},
metadata?: grpc.Metadata,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${returns} {
return this.rpc.${method}(
${methodDescName(serviceDesc, methodDesc)},
${requestMessage}.fromPartial(request),
${useAbortSignal ? "abortSignal," : ""}
metadata,
);
}
Expand Down Expand Up @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code
/** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */
function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code {
const chunks: Code[] = [];
const { options } = ctx;
const { useAbortSignal } = options;

chunks.push(code`interface Rpc {`);

Expand All @@ -174,6 +181,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${wrapper}<any>;
`);

Expand All @@ -183,6 +191,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any>;
`);
}
Expand Down Expand Up @@ -230,19 +239,33 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami
}

function createPromiseUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
client.close();
reject(new Error("Aborted"));
}
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): Promise<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata =
metadata && this.options.metadata
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Promise((resolve, reject) => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -257,25 +280,39 @@ function createPromiseUnaryMethod(ctx: Context): Code {
}
},
});
${maybeAbortSignal}
});
}
`;
}

function createObservableUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";
return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata =
metadata && this.options.metadata
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable(observer => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -291,17 +328,34 @@ function createObservableUnaryMethod(ctx: Context): Code {
}
},
});
${maybeAbortSignal}
}).pipe(${take}(1));
}
`;
}

function createInvokeMethod(ctx: Context) {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any> {
const upStreamCodes = this.options.upStreamRetryCodes || [];
const DEFAULT_TIMEOUT_TIME: number = 3_000;
Expand Down Expand Up @@ -332,7 +386,11 @@ function createInvokeMethod(ctx: Context) {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) return client.close()
});
${maybeAbortSignal}
});
upStream();
}).pipe(${share}());
Expand Down
6 changes: 3 additions & 3 deletions src/generate-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ export function generateService(
const partialInput = options.outputClientImpl === "grpc-web";
const inputType = requestType(ctx, methodDesc, partialInput);
params.push(code`request: ${inputType}`);
if (options.useAbortSignal) {
params.push(code`abortSignal?: AbortSignal`);
}

// Use metadata as last argument for interface only configuration
if (options.outputClientImpl === "grpc-web") {
Expand All @@ -78,6 +75,9 @@ export function generateService(
const Metadata = imp(options.metadataType);
params.push(code`metadata?: ${Metadata}`);
}
if (options.useAbortSignal) {
params.push(code`abortSignal?: AbortSignal`);
}
if (options.addNestjsRestParameter) {
params.push(code`...rest: any`);
}
Expand Down

0 comments on commit 7a3d429

Please sign in to comment.