-
-
Notifications
You must be signed in to change notification settings - Fork 7.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(GRPC): Stream decorator and stream handler pass-through for Controllers to support GRPC Duplex streams defined with Protobuf #1568
feat(GRPC): Stream decorator and stream handler pass-through for Controllers to support GRPC Duplex streams defined with Protobuf #1568
Conversation
Up to date with the latest master release
- GRPC Method added with definition of streaming parameter for mapping it to appropriate service - GRPC Stream annotation added - Commentary added
- createService method updated to differentiate handler depending on type of GRPC method definition - stream pass-through method added - explanatory comments added
- Result calculation updated for Controller - Test file updated with Test and expectations - GRPC Raw Stream connection added to test
Create pattern signature test updated to handle new signature for patterns.
Pull Request Test Coverage Report for Build 1543
💛 - Coveralls |
Pull Request Test Coverage Report for Build 1609
💛 - Coveralls |
@anton-alation First of all, GREAT WORK! I noticed that you decided to go with |
@mkaufmaner @kamilmysliwiec Thank you. So here should be some consensus on what we trying to achieve because converting the stream to Observable would be just excessive wrapping of an already simple interface with just However if let's say I would add something like: return async (call) => {
const req = new Subject<any>();
// Pass data to request
call.on('data', (m: any) => req.next(m));
// React on error
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
if (String(e).toLowerCase().indexOf('cancelled') > -1) {
call.end();
req.complete();
return;
}
// If another error then just pass it along
req.error(e);
});
// Pass request to handler
const handler = methodHandler(req.asObservable());
// Receive back observable
const observable = this.transformToObservable(await handler);
// Write until cancelled event happened
await observable.pipe(takeUntil(fromEvent(call, CANCEL_EVENT)))
.forEach(msg => call.write(msg));
call.end();
}; Would if be more satisfactory behavior? Only significant benefit of that I see in having predefined Observable interface rather than maybe least known GRPC interface Duplex stream: @GrpcStream('TestMethod')
async testStream(stream: Observable<TestMessage>) {
// Implementation
} Please thoughts and arguments toward one or other approach. For my current projects, it's cheaper to have PassThrough just because this handler passed down to Abstracted executor layer which do all of the Job. But if we need to be a bit fancier we can work on as PassThrough, as well on Rx Observable passed down. |
@anton-alation I think we should have both the passthrough and the observable. The observable does simplify implementation with GRPC while also providing a rich feature set. return async (call, methodHandler) => {
const req = new Subject<any>();
// Pass data to request
call.on('data', (m: any) => req.next(m));
// React on error
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
if (String(e).toLowerCase().indexOf('cancelled') > -1) {
call.end();
return;
}
// If another error then just pass it along
req.error(e);
});
call.on('end', () => req.complete());
// Pass request to handler
const handler = methodHandler(req.asObservable());
// Receive back observable
const res = this.transformToObservable(await handler);
// Write until cancelled event happened
await res.pipe(takeUntil(fromEvent(call, CANCEL_EVENT))).forEach(m => call.write(m));
call.end();
}; |
@mkaufmaner Let me add additional annotation and write tests to check that Rx behavior will work fine wrapping stream things. Later this week I will provide an update. |
- GrpcStream now renamed to GrpcStreamMethod - Grpc call passthrough now GrpcStreamCall
…mCall - Pattern updated - Handler selector updated - Duplex RX dispatch added - Duplex call dispatch added
- RX Streaming test - Call streaming test
@mkaufmaner @kamilmysliwiec Solution is in CI smoke now, locally it was fine. Will be editing PR documentation on naming. Feel free to review :) |
@mkaufmaner @kamilmysliwiec any news or updates? |
Hey @anton-alation, |
Thanks :) wasn't sure if patterns would be reviewed in close future, while the feature is very important. I understand the load :) |
Hey @kamilmysliwiec, this feature is really important for me so if you need some help i can help you asap. |
@PhilipMantrov with a few hacks (runtime-patches) you can go around current Nest behavior, but because we added this PR probably it's also very reasonable to just wait for the feature to merge. |
Hey @anton-alation, |
Thanks @kamilmysliwiec , please excuse me on losing track on master branch changes :) Sure, let me do this updates to markdown tomorrow :) |
No worries, it's my fault, I should look at this PR earlier!
Awesome :) |
@kamilmysliwiec Started with the doc here: commit for Docs, need to fulfill few outstanding topics which we made through the winter:
Need a few more days. Any thoughts appreciated :) |
Amazing @anton-alation, looking forward! |
Did you have some time to look at the docs @anton-alation? Specifically at streaming part :) (required to merge this PR) Thanks! |
@kamilmysliwiec Yeah, I will be finalizing today and tomorrow. Last few weeks have collided with the GA release cycle of our product, so I was pretty much into making all things smooth and silky, which can be a bit exhaustive sometimes :) |
So at the moment, I am writing tests to check that documentation I writing (almost wrote) would have the correct behavior within the nest, hope to finish tomorrow or Sunday. |
@kamilmysliwiec created docs PR: nestjs/docs.nestjs.com#378 probably need for review. As well I created a branch with actual tests related to those docs: #2075 targeting branch with the master updates. |
@kamilmysliwiec Hi Kamil, can we expect this to be released soon? :) |
@kamilmysliwiec any news on that one? |
@anton-alation There seems to be some conflicts with the PR. Can you resolve? |
Hi Michael, I believe Kamil dedicated his efforts to resolve conflicts here: Quoting:
As well as of discussion above there were updates to tests and docs. Do you want me to synchronize exactly this PR branch to resolve conflicts? |
@anton-alation I finished that part, no worries |
Anything I can do for help with this feature or tests or docs? |
This PR will be a part of the next 6.3.0 release :) |
Amazing! :) Thank you. If you need me to change something on the docs update, please tell :) |
I've updated my example using this new Streaming feature here: |
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
PR Checklist
Please check if your PR fulfills the following requirements:
PR Type
What kind of change does this PR introduce?
What is the current behavior?
Issue Number: #1286
Issue Number: #1264
Previous PR: #1292
Currently when protobuf rpc is defined like this:
Nest has no ability to pass a stream object down to the Controller handler, so there is no option to have full-duplex message interaction.
What is the new behavior?
Two new behaviors added, both are aiming same goals but serves different approach towards reaching the goal. Let's enumerate those:
RX Object approach
Developer receives RX object as the message and can return Promise or RX object as the stream
GRPC Call Object approach
Developer receives GRPC Stream wrapper and can operate it in the style of how streams are operated:
Does this PR introduce a breaking change?
Other information