From eff081ad21a27730f902ffbd45f86922b0a502c3 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 May 2024 14:43:04 +0200 Subject: [PATCH 1/4] Split RequestOptions in CallRequestOptions, to split headers from idempotency key. --- .../src/main/resources/templates/Client.hbs | 16 ++--- .../src/main/resources/templates/Client.hbs | 4 +- .../kotlin/dev/restate/sdk/kotlin/ingress.kt | 6 +- .../sdk/client/CallRequestOptions.java | 70 +++++++++++++++++++ .../sdk/client/DefaultIngressClient.java | 6 +- .../dev/restate/sdk/client/IngressClient.java | 24 ++++--- .../restate/sdk/client/RequestOptions.java | 46 +++++------- .../workflow/impl/WorkflowCodegenUtil.java | 14 ++-- 8 files changed, 126 insertions(+), 60 deletions(-) create mode 100644 sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java diff --git a/sdk-api-gen/src/main/resources/templates/Client.hbs b/sdk-api-gen/src/main/resources/templates/Client.hbs index ac3c80fcb..dcc265829 100644 --- a/sdk-api-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-gen/src/main/resources/templates/Client.hbs @@ -82,10 +82,10 @@ public class {{generatedClassSimpleName}} { public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { {{^outputEmpty}}return {{/outputEmpty}}this.{{methodName}}( {{^inputEmpty}}req, {{/inputEmpty}} - dev.restate.sdk.client.RequestOptions.DEFAULT); + dev.restate.sdk.client.CallRequestOptions.DEFAULT); } - public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { + public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) { {{^outputEmpty}}return {{/outputEmpty}}this.ingressClient.call( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, @@ -97,10 +97,10 @@ public class {{generatedClassSimpleName}} { public {{#if outputEmpty}}java.util.concurrent.CompletableFuture{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { return this.{{methodName}}Async( {{^inputEmpty}}req, {{/inputEmpty}} - dev.restate.sdk.client.RequestOptions.DEFAULT); + dev.restate.sdk.client.CallRequestOptions.DEFAULT); } - public {{#if outputEmpty}}java.util.concurrent.CompletableFuture{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { + public {{#if outputEmpty}}java.util.concurrent.CompletableFuture{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) { return this.ingressClient.callAsync( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, @@ -129,10 +129,10 @@ public class {{generatedClassSimpleName}} { public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { return this.{{methodName}}( {{^inputEmpty}}req, {{/inputEmpty}} - dev.restate.sdk.client.RequestOptions.DEFAULT); + dev.restate.sdk.client.CallRequestOptions.DEFAULT); } - public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { + public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) { return IngressClient.this.ingressClient.send( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, @@ -144,10 +144,10 @@ public class {{generatedClassSimpleName}} { public java.util.concurrent.CompletableFuture {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { return this.{{methodName}}Async( {{^inputEmpty}}req, {{/inputEmpty}} - dev.restate.sdk.client.RequestOptions.DEFAULT); + dev.restate.sdk.client.CallRequestOptions.DEFAULT); } - public java.util.concurrent.CompletableFuture {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { + public java.util.concurrent.CompletableFuture {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) { return IngressClient.this.ingressClient.sendAsync( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, diff --git a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs index 5d5bfd4ad..315df5829 100644 --- a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs @@ -52,7 +52,7 @@ object {{generatedClassSimpleName}} { class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isObject}}, private val key: String{{/isObject}}) { {{#handlers}} - suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} { + suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.CallRequestOptions = dev.restate.sdk.client.CallRequestOptions.DEFAULT): {{{boxedOutputFqcn}}} { return this.ingressClient.callSuspend( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, @@ -67,7 +67,7 @@ object {{generatedClassSimpleName}} { inner class Send(private val delay: Duration) { {{#handlers}} - suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String { + suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.CallRequestOptions = dev.restate.sdk.client.CallRequestOptions.DEFAULT): String { return this@IngressClient.ingressClient.sendSuspend( {{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}}, {{inputSerdeRef}}, diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt index df1accb63..eb24e04c9 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt @@ -8,8 +8,8 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.kotlin +import dev.restate.sdk.client.CallRequestOptions import dev.restate.sdk.client.IngressClient -import dev.restate.sdk.client.RequestOptions import dev.restate.sdk.common.Serde import dev.restate.sdk.common.Target import kotlin.time.Duration @@ -23,7 +23,7 @@ suspend fun IngressClient.callSuspend( reqSerde: Serde, resSerde: Serde, req: Req, - options: RequestOptions = RequestOptions.DEFAULT + options: CallRequestOptions = CallRequestOptions.DEFAULT ): Res { return this.callAsync(target, reqSerde, resSerde, req, options).await() } @@ -33,7 +33,7 @@ suspend fun IngressClient.sendSuspend( reqSerde: Serde, req: Req, delay: Duration = Duration.ZERO, - options: RequestOptions = RequestOptions.DEFAULT + options: CallRequestOptions = CallRequestOptions.DEFAULT ): String { return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await() } diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java b/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java new file mode 100644 index 000000000..8710c5a10 --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java @@ -0,0 +1,70 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public final class CallRequestOptions extends RequestOptions { + + public static final CallRequestOptions DEFAULT = new CallRequestOptions(); + + private final String idempotencyKey; + + public CallRequestOptions() { + this(new HashMap<>(), null); + } + + public CallRequestOptions(Map additionalHeaders, String idempotencyKey) { + super(additionalHeaders); + this.idempotencyKey = idempotencyKey; + } + + public CallRequestOptions withIdempotency(String idempotencyKey) { + return new CallRequestOptions(new HashMap<>(this.additionalHeaders), idempotencyKey); + } + + public String getIdempotencyKey() { + return idempotencyKey; + } + + @Override + public CallRequestOptions copy() { + return new CallRequestOptions(new HashMap<>(this.additionalHeaders), this.idempotencyKey); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + CallRequestOptions that = (CallRequestOptions) o; + return Objects.equals(idempotencyKey, that.idempotencyKey); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + Objects.hashCode(idempotencyKey); + return result; + } + + @Override + public String toString() { + return "CallRequestOptions{" + + "idempotencyKey='" + + idempotencyKey + + '\'' + + ", additionalHeaders=" + + additionalHeaders + + '}'; + } +} diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index ebff7dc96..257991379 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -45,7 +45,7 @@ public CompletableFuture callAsync( Serde reqSerde, Serde resSerde, Req req, - RequestOptions requestOptions) { + CallRequestOptions requestOptions) { HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions); return httpClient .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) @@ -70,7 +70,7 @@ public CompletableFuture callAsync( @Override public CompletableFuture sendAsync( - Target target, Serde reqSerde, Req req, Duration delay, RequestOptions options) { + Target target, Serde reqSerde, Req req, Duration delay, CallRequestOptions options) { HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, delay, options); return httpClient .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) @@ -186,7 +186,7 @@ private HttpRequest prepareHttpRequest( Serde reqSerde, Req req, Duration delay, - RequestOptions options) { + CallRequestOptions options) { var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend, delay)); // Add content-type diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java index a82510b1b..488672654 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java @@ -22,15 +22,15 @@ public interface IngressClient { CompletableFuture callAsync( - Target target, Serde reqSerde, Serde resSerde, Req req, RequestOptions options); + Target target, Serde reqSerde, Serde resSerde, Req req, CallRequestOptions options); default CompletableFuture callAsync( Target target, Serde reqSerde, Serde resSerde, Req req) { - return callAsync(target, reqSerde, resSerde, req, RequestOptions.DEFAULT); + return callAsync(target, reqSerde, resSerde, req, CallRequestOptions.DEFAULT); } default Res call( - Target target, Serde reqSerde, Serde resSerde, Req req, RequestOptions options) + Target target, Serde reqSerde, Serde resSerde, Req req, CallRequestOptions options) throws IngressException { try { return callAsync(target, reqSerde, resSerde, req, options).join(); @@ -44,7 +44,7 @@ default Res call( default Res call(Target target, Serde reqSerde, Serde resSerde, Req req) throws IngressException { - return call(target, reqSerde, resSerde, req, RequestOptions.DEFAULT); + return call(target, reqSerde, resSerde, req, CallRequestOptions.DEFAULT); } CompletableFuture sendAsync( @@ -52,19 +52,23 @@ CompletableFuture sendAsync( Serde reqSerde, Req req, @Nullable Duration delay, - RequestOptions options); + CallRequestOptions options); default CompletableFuture sendAsync( Target target, Serde reqSerde, Req req, @Nullable Duration delay) { - return sendAsync(target, reqSerde, req, delay, RequestOptions.DEFAULT); + return sendAsync(target, reqSerde, req, delay, CallRequestOptions.DEFAULT); } default CompletableFuture sendAsync(Target target, Serde reqSerde, Req req) { - return sendAsync(target, reqSerde, req, null, RequestOptions.DEFAULT); + return sendAsync(target, reqSerde, req, null, CallRequestOptions.DEFAULT); } default String send( - Target target, Serde reqSerde, Req req, @Nullable Duration delay, RequestOptions options) + Target target, + Serde reqSerde, + Req req, + @Nullable Duration delay, + CallRequestOptions options) throws IngressException { try { return sendAsync(target, reqSerde, req, delay, options).join(); @@ -78,11 +82,11 @@ default String send( default String send(Target target, Serde reqSerde, Req req, @Nullable Duration delay) throws IngressException { - return send(target, reqSerde, req, delay, RequestOptions.DEFAULT); + return send(target, reqSerde, req, delay, CallRequestOptions.DEFAULT); } default String send(Target target, Serde reqSerde, Req req) throws IngressException { - return send(target, reqSerde, req, null, RequestOptions.DEFAULT); + return send(target, reqSerde, req, null, CallRequestOptions.DEFAULT); } /** diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java b/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java index 62f4679aa..ac5dd6e55 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java @@ -10,64 +10,56 @@ import java.util.HashMap; import java.util.Map; -import java.util.Objects; public class RequestOptions { - public static final RequestOptions DEFAULT = new RequestOptions(); - private String idempotencyKey; - private final Map additionalHeaders = new HashMap<>(); + final Map additionalHeaders; - public RequestOptions withIdempotency(String idempotencyKey) { - this.idempotencyKey = idempotencyKey; - return this; + public RequestOptions() { + this(new HashMap<>()); } - public RequestOptions withHeader(String name, String value) { - this.additionalHeaders.put(name, value); - return this; + public RequestOptions(Map additionalHeaders) { + this.additionalHeaders = additionalHeaders; } - public RequestOptions withHeaders(Map additionalHeaders) { - this.additionalHeaders.putAll(additionalHeaders); - return this; + public RequestOptions withHeader(String name, String value) { + RequestOptions newOptions = this.copy(); + newOptions.additionalHeaders.put(name, value); + return newOptions; } - public String getIdempotencyKey() { - return idempotencyKey; + public RequestOptions withHeaders(Map additionalHeaders) { + RequestOptions newOptions = this.copy(); + newOptions.additionalHeaders.putAll(additionalHeaders); + return newOptions; } public Map getAdditionalHeaders() { return additionalHeaders; } + public RequestOptions copy() { + return new RequestOptions(new HashMap<>(this.additionalHeaders)); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RequestOptions that = (RequestOptions) o; - - if (!Objects.equals(idempotencyKey, that.idempotencyKey)) return false; return additionalHeaders.equals(that.additionalHeaders); } @Override public int hashCode() { - int result = idempotencyKey != null ? idempotencyKey.hashCode() : 0; - result = 31 * result + additionalHeaders.hashCode(); - return result; + return additionalHeaders.hashCode(); } @Override public String toString() { - return "RequestOptions{" - + "idempotencyKey='" - + idempotencyKey - + '\'' - + ", additionalHeaders=" - + additionalHeaders - + '}'; + return "RequestOptions{" + "additionalHeaders=" + additionalHeaders + '}'; } } diff --git a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java index 25070acc0..937531480 100644 --- a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java +++ b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java @@ -12,8 +12,8 @@ import dev.restate.sdk.Awaitable; import dev.restate.sdk.Context; +import dev.restate.sdk.client.CallRequestOptions; import dev.restate.sdk.client.IngressClient; -import dev.restate.sdk.client.RequestOptions; import dev.restate.sdk.common.*; import dev.restate.sdk.workflow.WorkflowExecutionState; import dev.restate.sdk.workflow.generated.GetOutputResponse; @@ -153,7 +153,7 @@ public static WorkflowExecutionState submit( WorkflowImpl.INVOKE_REQUEST_SERDE, WorkflowImpl.WORKFLOW_EXECUTION_STATE_SERDE, InvokeRequest.fromAny(workflowKey, payload), - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); } public static Optional getOutput( @@ -165,7 +165,7 @@ public static Optional getOutput( CoreSerdes.VOID, WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE, null, - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); if (response.hasNotCompleted()) { return Optional.empty(); } @@ -185,7 +185,7 @@ public static boolean isCompleted( CoreSerdes.VOID, WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE, null, - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); if (response.hasFailure()) { throw new TerminalException( response.getFailure().getCode(), response.getFailure().getMessage()); @@ -205,7 +205,7 @@ public static T invokeShared( WorkflowImpl.INVOKE_REQUEST_SERDE, resSerde, InvokeRequest.fromAny(workflowKey, payload), - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); } public static void invokeSharedSend( @@ -219,7 +219,7 @@ public static void invokeSharedSend( WorkflowImpl.INVOKE_REQUEST_SERDE, InvokeRequest.fromAny(workflowKey, payload), null, - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); } public static Optional getState( @@ -231,7 +231,7 @@ public static Optional getState( CoreSerdes.JSON_STRING, WorkflowImpl.GET_STATE_RESPONSE_SERDE, key.name(), - RequestOptions.DEFAULT); + CallRequestOptions.DEFAULT); if (response.hasEmpty()) { return Optional.empty(); } From dc8b1ab78d8a1da0fb64942f7e935572649b1f7b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 May 2024 15:17:40 +0200 Subject: [PATCH 2/4] Add InvocationHandle to attach/get output of invocation --- .../kotlin/dev/restate/sdk/kotlin/ingress.kt | 32 ++++++-- .../sdk/client/DefaultIngressClient.java | 71 +++++++++++++++- .../dev/restate/sdk/client/IngressClient.java | 80 +++++++++++++++++-- 3 files changed, 170 insertions(+), 13 deletions(-) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt index eb24e04c9..cc516c449 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt @@ -10,11 +10,12 @@ package dev.restate.sdk.kotlin import dev.restate.sdk.client.CallRequestOptions import dev.restate.sdk.client.IngressClient +import dev.restate.sdk.client.RequestOptions import dev.restate.sdk.common.Serde import dev.restate.sdk.common.Target +import kotlinx.coroutines.future.await import kotlin.time.Duration import kotlin.time.toJavaDuration -import kotlinx.coroutines.future.await // Extension methods for the IngressClient @@ -38,10 +39,31 @@ suspend fun IngressClient.sendSuspend( return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await() } -suspend fun IngressClient.AwakeableHandle.resolveSuspend(serde: Serde, payload: T) { - this.resolveAsync(serde, payload).await() +suspend fun IngressClient.AwakeableHandle.resolveSuspend( + serde: Serde, + payload: T, + options: RequestOptions = RequestOptions.DEFAULT +) { + this.resolveAsync(serde, payload, options).await() +} + +suspend fun IngressClient.AwakeableHandle.rejectSuspend( + reason: String, + options: RequestOptions = RequestOptions.DEFAULT +) { + this.rejectAsync(reason, options).await() +} + +suspend fun IngressClient.InvocationHandle.attachSuspend( + resSerde: Serde, + options: RequestOptions = RequestOptions.DEFAULT +) { + this.attachAsync(resSerde, options).await() } -suspend fun IngressClient.AwakeableHandle.rejectSuspend(reason: String) { - this.rejectAsync(reason).await() +suspend fun IngressClient.InvocationHandle.getOutputSuspend( + resSerde: Serde, + options: RequestOptions = RequestOptions.DEFAULT +) { + this.getOutputAsync(resSerde, options).await() } diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index 257991379..22b15c00e 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -98,7 +98,8 @@ public CompletableFuture sendAsync( public AwakeableHandle awakeableHandle(String id) { return new AwakeableHandle() { @Override - public CompletableFuture resolveAsync(Serde serde, @NonNull T payload) { + public CompletableFuture resolveAsync( + Serde serde, @NonNull T payload, RequestOptions options) { // Prepare request var reqBuilder = HttpRequest.newBuilder().uri(baseUri.resolve("/restate/awakeables/" + id + "/resolve")); @@ -110,6 +111,7 @@ public CompletableFuture resolveAsync(Serde serde, @NonNull T paylo // Add headers headers.forEach(reqBuilder::header); + options.getAdditionalHeaders().forEach(reqBuilder::header); // Build and Send request HttpRequest request = @@ -133,7 +135,7 @@ public CompletableFuture resolveAsync(Serde serde, @NonNull T paylo } @Override - public CompletableFuture rejectAsync(String reason) { + public CompletableFuture rejectAsync(String reason, RequestOptions options) { // Prepare request var reqBuilder = HttpRequest.newBuilder() @@ -142,6 +144,7 @@ public CompletableFuture rejectAsync(String reason) { // Add headers headers.forEach(reqBuilder::header); + options.getAdditionalHeaders().forEach(reqBuilder::header); // Build and Send request HttpRequest request = reqBuilder.POST(HttpRequest.BodyPublishers.ofString(reason)).build(); @@ -163,6 +166,70 @@ public CompletableFuture rejectAsync(String reason) { }; } + @Override + public InvocationHandle invocationHandle(String invocationId) { + return new InvocationHandle() { + @Override + public CompletableFuture attachAsync(Serde resSerde, RequestOptions options) { + // Prepare request + var reqBuilder = + HttpRequest.newBuilder() + .uri(baseUri.resolve("/restate/invocation/" + invocationId + "/attach")); + + // Add headers + headers.forEach(reqBuilder::header); + options.getAdditionalHeaders().forEach(reqBuilder::header); + + // Build and Send request + HttpRequest request = reqBuilder.GET().build(); + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) + .handle( + (response, throwable) -> { + if (throwable != null) { + throw new IngressException("Error when executing the request", throwable); + } + + if (response.statusCode() >= 300) { + handleNonSuccessResponse(response); + } + + return null; + }); + } + + @Override + public CompletableFuture getOutputAsync( + Serde resSerde, RequestOptions options) { + // Prepare request + var reqBuilder = + HttpRequest.newBuilder() + .uri(baseUri.resolve("/restate/invocation/" + invocationId + "/output")); + + // Add headers + headers.forEach(reqBuilder::header); + options.getAdditionalHeaders().forEach(reqBuilder::header); + + // Build and Send request + HttpRequest request = reqBuilder.GET().build(); + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) + .handle( + (response, throwable) -> { + if (throwable != null) { + throw new IngressException("Error when executing the request", throwable); + } + + if (response.statusCode() >= 300) { + handleNonSuccessResponse(response); + } + + return null; + }); + } + }; + } + private URI toRequestURI(Target target, boolean isSend, Duration delay) { StringBuilder builder = new StringBuilder(); builder.append("/").append(target.getService()); diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java index 488672654..dfbac7b38 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java @@ -101,8 +101,26 @@ default String send(Target target, Serde reqSerde, Req req) throws In * ingress */ interface AwakeableHandle { + /** Same as {@link #resolve(Serde, Object)} but async with options. */ + CompletableFuture resolveAsync( + Serde serde, @NonNull T payload, RequestOptions options); + /** Same as {@link #resolve(Serde, Object)} but async. */ - CompletableFuture resolveAsync(Serde serde, @NonNull T payload); + default CompletableFuture resolveAsync(Serde serde, @NonNull T payload) { + return resolveAsync(serde, payload, RequestOptions.DEFAULT); + } + + /** Same as {@link #resolve(Serde, Object)} with options. */ + default void resolve(Serde serde, @NonNull T payload, RequestOptions options) { + try { + resolveAsync(serde, payload, options).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw new RuntimeException(e.getCause()); + } + } /** * Complete with success the Awakeable. @@ -111,8 +129,21 @@ interface AwakeableHandle { * @param payload the result payload. MUST NOT be null. */ default void resolve(Serde serde, @NonNull T payload) { + this.resolve(serde, payload, RequestOptions.DEFAULT); + } + + /** Same as {@link #reject(String)} but async with options. */ + CompletableFuture rejectAsync(String reason, RequestOptions options); + + /** Same as {@link #reject(String)} but async. */ + default CompletableFuture rejectAsync(String reason) { + return rejectAsync(reason, RequestOptions.DEFAULT); + } + + /** Same as {@link #reject(String)} with options. */ + default void reject(String reason, RequestOptions options) { try { - resolveAsync(serde, payload).join(); + rejectAsync(reason, options).join(); } catch (CompletionException e) { if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); @@ -121,17 +152,28 @@ default void resolve(Serde serde, @NonNull T payload) { } } - /** Same as {@link #reject(String)} but async. */ - CompletableFuture rejectAsync(String reason); - /** * Complete with failure the Awakeable. * * @param reason the rejection reason. MUST NOT be null. */ default void reject(String reason) { + this.reject(reason, RequestOptions.DEFAULT); + } + } + + InvocationHandle invocationHandle(String invocationId); + + interface InvocationHandle { + CompletableFuture attachAsync(Serde resSerde, RequestOptions options); + + default CompletableFuture attachAsync(Serde resSerde) { + return attachAsync(resSerde, RequestOptions.DEFAULT); + } + + default Res attach(Serde resSerde, RequestOptions options) throws IngressException { try { - rejectAsync(reason).join(); + return attachAsync(resSerde, options).join(); } catch (CompletionException e) { if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); @@ -139,6 +181,32 @@ default void reject(String reason) { throw new RuntimeException(e.getCause()); } } + + default Res attach(Serde resSerde) throws IngressException { + return attach(resSerde, RequestOptions.DEFAULT); + } + + CompletableFuture getOutputAsync(Serde resSerde, RequestOptions options); + + default CompletableFuture getOutputAsync(Serde resSerde) { + return getOutputAsync(resSerde, RequestOptions.DEFAULT); + } + + default Res getOutput(Serde resSerde, RequestOptions options) + throws IngressException { + try { + return getOutputAsync(resSerde, options).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw new RuntimeException(e.getCause()); + } + } + + default Res getOutput(Serde resSerde) throws IngressException { + return getOutput(resSerde, RequestOptions.DEFAULT); + } } static IngressClient defaultClient(String baseUri) { From cd7332d77985325604dc648cdc3e37f758b4b0ed Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 May 2024 15:38:39 +0200 Subject: [PATCH 3/4] Fix CallRequestOptions#withHeader --- .../dev/restate/sdk/client/CallRequestOptions.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java b/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java index 8710c5a10..1870de676 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/CallRequestOptions.java @@ -31,6 +31,20 @@ public CallRequestOptions withIdempotency(String idempotencyKey) { return new CallRequestOptions(new HashMap<>(this.additionalHeaders), idempotencyKey); } + @Override + public CallRequestOptions withHeader(String name, String value) { + CallRequestOptions newOptions = this.copy(); + newOptions.additionalHeaders.put(name, value); + return newOptions; + } + + @Override + public CallRequestOptions withHeaders(Map additionalHeaders) { + CallRequestOptions newOptions = this.copy(); + newOptions.additionalHeaders.putAll(additionalHeaders); + return newOptions; + } + public String getIdempotencyKey() { return idempotencyKey; } From 1411eff4a28e65c470f4c6a0f110f1ebbb3b275d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 May 2024 15:48:57 +0200 Subject: [PATCH 4/4] Fix response of attach/getOutput --- .../kotlin/dev/restate/sdk/kotlin/ingress.kt | 2 +- .../sdk/client/DefaultIngressClient.java | 20 +++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt index cc516c449..c11610eb0 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt @@ -13,9 +13,9 @@ import dev.restate.sdk.client.IngressClient import dev.restate.sdk.client.RequestOptions import dev.restate.sdk.common.Serde import dev.restate.sdk.common.Target -import kotlinx.coroutines.future.await import kotlin.time.Duration import kotlin.time.toJavaDuration +import kotlinx.coroutines.future.await // Extension methods for the IngressClient diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index 22b15c00e..2cc1497b3 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -194,7 +194,15 @@ public CompletableFuture attachAsync(Serde resSerde, RequestOpti handleNonSuccessResponse(response); } - return null; + try { + return resSerde.deserialize(response.body()); + } catch (Exception e) { + throw new IngressException( + "Cannot deserialize the response", + response.statusCode(), + response.body(), + e); + } }); } @@ -224,7 +232,15 @@ public CompletableFuture getOutputAsync( handleNonSuccessResponse(response); } - return null; + try { + return resSerde.deserialize(response.body()); + } catch (Exception e) { + throw new IngressException( + "Cannot deserialize the response", + response.statusCode(), + response.body(), + e); + } }); } };