Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: udsink bidirectional streaming #141

Merged
merged 2 commits into from
Sep 26, 2024
Merged

feat: udsink bidirectional streaming #141

merged 2 commits into from
Sep 26, 2024

Conversation

yhl25
Copy link
Contributor

@yhl25 yhl25 commented Sep 26, 2024

No description provided.

Copy link

codecov bot commented Sep 26, 2024

Codecov Report

Attention: Patch coverage is 60.46512% with 34 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@babc8e1). Learn more about missing BASE report.

Files with missing lines Patch % Lines
...ava/io/numaproj/numaflow/sinker/SinkerTestKit.java 0.00% 25 Missing ⚠️
...main/java/io/numaproj/numaflow/sinker/Service.java 89.58% 2 Missing and 3 partials ⚠️
...ain/java/io/numaproj/numaflow/sourcer/Service.java 66.66% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #141   +/-   ##
=======================================
  Coverage        ?   59.87%           
  Complexity      ?      362           
=======================================
  Files           ?      122           
  Lines           ?     2472           
  Branches        ?      175           
=======================================
  Hits            ?     1480           
  Misses          ?      863           
  Partials        ?      129           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.getHandshake().getSot()) {
responseObserver.onError(new Exception("Handshake request not received"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please cover L48-49

if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the ack requests
if (!handshakeDone) {
if (!request.getHandshake().getSot()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the hasHandshake check? it could be null. Same for sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added it back.

responseObserver.onError(new Exception("Handshake request not received"));
return;
}
responseObserver.onNext(SinkOuterClass.SinkResponse.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In go sdk, we also set result status to success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to, I will change that in go sdk.

Signed-off-by: Yashash H L <[email protected]>
@yhl25 yhl25 requested a review from KeranYang September 26, 2024 16:33
@yhl25 yhl25 merged commit f68f9f2 into main Sep 26, 2024
3 checks passed
@yhl25 yhl25 deleted the bidi-sink branch September 26, 2024 16:42
@@ -113,11 +113,13 @@ public Client(String host, int port) {
* @return response from the server as a ResponseList
*/
public ResponseList sendRequest(DatumIterator datumIterator) {
CompletableFuture<SinkOuterClass.SinkResponse> future = new CompletableFuture<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, as to why SinkerTestKit lives in this package?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for enabling users do "Component testing".

responseListBuilder.addResponse(Response.responseOK(result.getId()));
} else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
for (SinkOuterClass.SinkResponse result : outputResponses) {
if (result.getHandshake().getSot()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does getSot do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sot means start of the transmission. when set to true, it means the handshake is successful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants