Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GRPC): Stream decorator and stream handler pass-through for Controllers to support GRPC Duplex streams defined with Protobuf #1568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 73 additions & 4 deletions integration/microservices/e2e/sum-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ import * as express from 'express';
import { join } from 'path';
import * as request from 'supertest';
import { GrpcController } from '../src/grpc/grpc.controller';
import * as ProtoLoader from '@grpc/proto-loader';
import * as GRPC from 'grpc';
import { expect } from 'chai';
import { fail } from 'assert';

describe('GRPC transport', () => {
let server;
let app: INestApplication;
let client: any;

beforeEach(async () => {
before(async () => {
const module = await Test.createTestingModule({
controllers: [GrpcController],
}).compile();

// Create gRPC + HTTP server
server = express();
app = module.createNestApplication(server);
app.connectMicroservice({
Expand All @@ -24,18 +29,82 @@ describe('GRPC transport', () => {
protoPath: join(__dirname, '../src/grpc/math.proto'),
},
});
// Start gRPC microservice
await app.startAllMicroservicesAsync();
await app.init();
// Load proto-buffers for test gRPC dispatch
const proto = ProtoLoader
.loadSync(join(__dirname, '../src/grpc/math.proto')) as any;
// Create Raw gRPC client object
const protoGRPC = GRPC.loadPackageDefinition(proto) as any;
// Create client connected to started services at standard 5000 port
client = new protoGRPC.math.Math(
'localhost:5000',
GRPC.credentials.createInsecure(),
);

});

it(`/POST`, () => {
it(`GRPC Sending and Receiving HTTP POST`, () => {
return request(server)
.post('/')
.send([1, 2, 3, 4, 5])
.expect(200, { result: 15 });
});

afterEach(async () => {
it('GRPC Sending and receiving Stream from RX handler', async () => {

const callHandler = client.SumStream();

callHandler.on('data', (msg: number) => {
// Do deep comparison (to.eql)
expect(msg).to.eql({result: 15});
callHandler.cancel();
});

callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});

return new Promise((resolve, reject) => {
callHandler.write({data: [1, 2, 3, 4, 5]});
setTimeout(() => resolve(), 1000);
});

});

it('GRPC Sending and receiving Stream from Call Passthrough handler', async () => {

const callHandler = client.SumStreamPass();

callHandler.on('data', (msg: number) => {
// Do deep comparison (to.eql)
expect(msg).to.eql({result: 15});
callHandler.cancel();
});

callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});

return new Promise((resolve, reject) => {
callHandler.write({data: [1, 2, 3, 4, 5]});
setTimeout(() => resolve(), 1000);
});

});

after(async () => {
await app.close();
client.close;
});

});
28 changes: 27 additions & 1 deletion integration/microservices/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Body, Controller, HttpCode, Post } from '@nestjs/common';
import { Client, ClientGrpc, GrpcMethod, Transport } from '@nestjs/microservices';
import {
Client, ClientGrpc, GrpcMethod,
GrpcStreamMethod, GrpcStreamCall, Transport,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of } from 'rxjs';

Expand Down Expand Up @@ -27,4 +30,27 @@ export class GrpcController {
result: data.reduce((a, b) => a + b),
});
}

@GrpcStreamMethod('Math')
async sumStream(messages: Observable<any>): Promise<any> {
// Form a resulting promise
return new Promise<any>((resolve, reject) => {
// Subscribe for a message to for a test answer
messages.subscribe(msg => {
// Resolve with reduce function
resolve({
result: msg.data.reduce((a, b) => a + b),
});
}, err => {
reject(err);
});
});
}

@GrpcStreamCall('Math')
async sumStreamPass(stream: any) {
stream.on('data', (msg: any) => {
stream.write({result: msg.data.reduce((a, b) => a + b)});
});
}
}
4 changes: 3 additions & 1 deletion integration/microservices/src/grpc/math.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ syntax = "proto3";
package math;

service Math {
rpc Sum (RequestSum) returns (SumResult) {}
rpc Sum (RequestSum) returns (SumResult);
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
}

message SumResult {
Expand Down
70 changes: 67 additions & 3 deletions packages/microservices/decorators/pattern.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { PATTERN_METADATA, PATTERN_HANDLER_METADATA } from '../constants';
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
// Required import for Reflect for assure that global namespace is imported
// for this particular implementations (allows testing only this file)
import 'reflect-metadata';

/**
* Subscribes to incoming messages which fulfils chosen pattern.
Expand All @@ -26,21 +29,82 @@ export function GrpcMethod(service: string, method?: string) {
};
}

/**
* Registers gRPC call through RX handler for service and method
*
* @param service : String parameter reflecting the name of service
* definition from proto file
* @constructor
*/
export function GrpcStreamMethod(service?: string);
/**
*
* @param service : String parameter reflecting the name of service
* definition from proto file
* @param method : Optional string parameter reflecting the name of
* method inside of a service definition coming after
* rpc keyword
* @constructor
*/
export function GrpcStreamMethod(service: string, method?: string);
export function GrpcStreamMethod(service: string, method?: string) {
return (target, key, descriptor: PropertyDescriptor) => {
const metadata = createMethodMetadata(
target, key, service, method, GrpcMethodStreamingType.RX_STREAMING
);
return MessagePattern(metadata)(target, key, descriptor);
};
}

/**
* Registers gRPC call pass through handler for service and method
*
* @param service : String parameter reflecting the name of service
* definition from proto file
* @constructor
*/
export function GrpcStreamCall(service?: string);
/**
*
* @param service : String parameter reflecting the name of service
* definition from proto file
* @param method : Optional string parameter reflecting the name of
* method inside of a service definition coming after
* rpc keyword
* @constructor
*/
export function GrpcStreamCall(service: string, method?: string);
export function GrpcStreamCall(service: string, method?: string) {
return (target, key, descriptor: PropertyDescriptor) => {
const metadata = createMethodMetadata(
target, key, service, method, GrpcMethodStreamingType.PT_STREAMING
);
return MessagePattern(metadata)(target, key, descriptor);
};
}

export function createMethodMetadata(
target: any,
key: string,
service: string | undefined,
method: string | undefined,
streaming = GrpcMethodStreamingType.NO_STREAMING
) {
const capitalizeFirstLetter = (str: string) =>
str.charAt(0).toUpperCase() + str.slice(1);

if (!service) {
const { name } = target.constructor;
return { service: name, rpc: capitalizeFirstLetter(key) };
return { service: name, rpc: capitalizeFirstLetter(key), streaming};
}
if (service && !method) {
return { service, rpc: capitalizeFirstLetter(key) };
return { service, rpc: capitalizeFirstLetter(key), streaming};
}
return { service, rpc: method };
return { service, rpc: method, streaming};
}

export enum GrpcMethodStreamingType {
NO_STREAMING = 'no_stream',
RX_STREAMING = 'rx_stream',
PT_STREAMING = 'pt_stream'
}
Loading