Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add attach and get output to IngressClient #310

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk-api-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public class {{generatedClassSimpleName}} {
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
{{^outputEmpty}}return {{/outputEmpty}}this.{{methodName}}(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
dev.restate.sdk.client.CallRequestOptions.DEFAULT);
}

public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) {
{{^outputEmpty}}return {{/outputEmpty}}this.ingressClient.call(
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand All @@ -97,10 +97,10 @@ public class {{generatedClassSimpleName}} {
public {{#if outputEmpty}}java.util.concurrent.CompletableFuture<Void>{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.{{methodName}}Async(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
dev.restate.sdk.client.CallRequestOptions.DEFAULT);
}

public {{#if outputEmpty}}java.util.concurrent.CompletableFuture<Void>{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
public {{#if outputEmpty}}java.util.concurrent.CompletableFuture<Void>{{else}}java.util.concurrent.CompletableFuture<{{{boxedOutputFqcn}}}>{{/if}} {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) {
return this.ingressClient.callAsync(
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand Down Expand Up @@ -129,10 +129,10 @@ public class {{generatedClassSimpleName}} {
public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.{{methodName}}(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
dev.restate.sdk.client.CallRequestOptions.DEFAULT);
}

public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
public String {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) {
return IngressClient.this.ingressClient.send(
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand All @@ -144,10 +144,10 @@ public class {{generatedClassSimpleName}} {
public java.util.concurrent.CompletableFuture<String> {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.{{methodName}}Async(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
dev.restate.sdk.client.CallRequestOptions.DEFAULT);
}

public java.util.concurrent.CompletableFuture<String> {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
public java.util.concurrent.CompletableFuture<String> {{methodName}}Async({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.CallRequestOptions requestOptions) {
return IngressClient.this.ingressClient.sendAsync(
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand Down
4 changes: 2 additions & 2 deletions sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object {{generatedClassSimpleName}} {
class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isObject}}, private val key: String{{/isObject}}) {

{{#handlers}}
suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.CallRequestOptions = dev.restate.sdk.client.CallRequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
return this.ingressClient.callSuspend(
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, this.key, "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand All @@ -67,7 +67,7 @@ object {{generatedClassSimpleName}} {

inner class Send(private val delay: Duration) {
{{#handlers}}
suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.CallRequestOptions = dev.restate.sdk.client.CallRequestOptions.DEFAULT): String {
return [email protected](
{{#if isObject}}Target.virtualObject({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, [email protected], "{{name}}"){{else}}Target.service({{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME, "{{name}}"){{/if}},
{{inputSerdeRef}},
Expand Down
34 changes: 28 additions & 6 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.client.CallRequestOptions
import dev.restate.sdk.client.IngressClient
import dev.restate.sdk.client.RequestOptions
import dev.restate.sdk.common.Serde
Expand All @@ -23,7 +24,7 @@ suspend fun <Req, Res> IngressClient.callSuspend(
reqSerde: Serde<Req>,
resSerde: Serde<Res>,
req: Req,
options: RequestOptions = RequestOptions.DEFAULT
options: CallRequestOptions = CallRequestOptions.DEFAULT
): Res {
return this.callAsync(target, reqSerde, resSerde, req, options).await()
}
Expand All @@ -33,15 +34,36 @@ suspend fun <Req> IngressClient.sendSuspend(
reqSerde: Serde<Req>,
req: Req,
delay: Duration = Duration.ZERO,
options: RequestOptions = RequestOptions.DEFAULT
options: CallRequestOptions = CallRequestOptions.DEFAULT
): String {
return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await()
}

suspend fun <T> IngressClient.AwakeableHandle.resolveSuspend(serde: Serde<T>, payload: T) {
this.resolveAsync(serde, payload).await()
suspend fun <T> IngressClient.AwakeableHandle.resolveSuspend(
serde: Serde<T>,
payload: T,
options: RequestOptions = RequestOptions.DEFAULT
) {
this.resolveAsync(serde, payload, options).await()
}

suspend fun IngressClient.AwakeableHandle.rejectSuspend(
reason: String,
options: RequestOptions = RequestOptions.DEFAULT
) {
this.rejectAsync(reason, options).await()
}

suspend fun IngressClient.AwakeableHandle.rejectSuspend(reason: String) {
this.rejectAsync(reason).await()
suspend fun <T> IngressClient.InvocationHandle.attachSuspend(
resSerde: Serde<T>,
options: RequestOptions = RequestOptions.DEFAULT
) {
this.attachAsync(resSerde, options).await()
}

suspend fun <T> IngressClient.InvocationHandle.getOutputSuspend(
resSerde: Serde<T>,
options: RequestOptions = RequestOptions.DEFAULT
) {
this.getOutputAsync(resSerde, options).await()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.client;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public final class CallRequestOptions extends RequestOptions {

public static final CallRequestOptions DEFAULT = new CallRequestOptions();

private final String idempotencyKey;

public CallRequestOptions() {
this(new HashMap<>(), null);
}

public CallRequestOptions(Map<String, String> additionalHeaders, String idempotencyKey) {
super(additionalHeaders);
this.idempotencyKey = idempotencyKey;
}

public CallRequestOptions withIdempotency(String idempotencyKey) {
return new CallRequestOptions(new HashMap<>(this.additionalHeaders), idempotencyKey);
}

@Override
public CallRequestOptions withHeader(String name, String value) {
CallRequestOptions newOptions = this.copy();
newOptions.additionalHeaders.put(name, value);
return newOptions;
}

@Override
public CallRequestOptions withHeaders(Map<? extends String, ? extends String> additionalHeaders) {
CallRequestOptions newOptions = this.copy();
newOptions.additionalHeaders.putAll(additionalHeaders);
return newOptions;
}

public String getIdempotencyKey() {
return idempotencyKey;
}

@Override
public CallRequestOptions copy() {
return new CallRequestOptions(new HashMap<>(this.additionalHeaders), this.idempotencyKey);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;

CallRequestOptions that = (CallRequestOptions) o;
return Objects.equals(idempotencyKey, that.idempotencyKey);
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + Objects.hashCode(idempotencyKey);
return result;
}

@Override
public String toString() {
return "CallRequestOptions{"
+ "idempotencyKey='"
+ idempotencyKey
+ '\''
+ ", additionalHeaders="
+ additionalHeaders
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(
Serde<Req> reqSerde,
Serde<Res> resSerde,
Req req,
RequestOptions requestOptions) {
CallRequestOptions requestOptions) {
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions);
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
Expand All @@ -70,7 +70,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(

@Override
public <Req> CompletableFuture<String> sendAsync(
Target target, Serde<Req> reqSerde, Req req, Duration delay, RequestOptions options) {
Target target, Serde<Req> reqSerde, Req req, Duration delay, CallRequestOptions options) {
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, delay, options);
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
Expand Down Expand Up @@ -98,7 +98,8 @@ public <Req> CompletableFuture<String> sendAsync(
public AwakeableHandle awakeableHandle(String id) {
return new AwakeableHandle() {
@Override
public <T> CompletableFuture<Void> resolveAsync(Serde<T> serde, @NonNull T payload) {
public <T> CompletableFuture<Void> resolveAsync(
Serde<T> serde, @NonNull T payload, RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder().uri(baseUri.resolve("/restate/awakeables/" + id + "/resolve"));
Expand All @@ -110,6 +111,7 @@ public <T> CompletableFuture<Void> resolveAsync(Serde<T> serde, @NonNull T paylo

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request =
Expand All @@ -133,7 +135,7 @@ public <T> CompletableFuture<Void> resolveAsync(Serde<T> serde, @NonNull T paylo
}

@Override
public CompletableFuture<Void> rejectAsync(String reason) {
public CompletableFuture<Void> rejectAsync(String reason, RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
Expand All @@ -142,6 +144,7 @@ public CompletableFuture<Void> rejectAsync(String reason) {

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request = reqBuilder.POST(HttpRequest.BodyPublishers.ofString(reason)).build();
Expand All @@ -163,6 +166,86 @@ public CompletableFuture<Void> rejectAsync(String reason) {
};
}

@Override
public InvocationHandle invocationHandle(String invocationId) {
return new InvocationHandle() {
@Override
public <Res> CompletableFuture<Res> attachAsync(Serde<Res> resSerde, RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
.uri(baseUri.resolve("/restate/invocation/" + invocationId + "/attach"));

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request = reqBuilder.GET().build();
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
(response, throwable) -> {
if (throwable != null) {
throw new IngressException("Error when executing the request", throwable);
}

if (response.statusCode() >= 300) {
handleNonSuccessResponse(response);
}

try {
return resSerde.deserialize(response.body());
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
response.statusCode(),
response.body(),
e);
}
});
}

@Override
public <Res> CompletableFuture<Res> getOutputAsync(
Serde<Res> resSerde, RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
.uri(baseUri.resolve("/restate/invocation/" + invocationId + "/output"));

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request = reqBuilder.GET().build();
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
(response, throwable) -> {
if (throwable != null) {
throw new IngressException("Error when executing the request", throwable);
}

if (response.statusCode() >= 300) {
handleNonSuccessResponse(response);
}

try {
return resSerde.deserialize(response.body());
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
response.statusCode(),
response.body(),
e);
}
});
}
};
}

private URI toRequestURI(Target target, boolean isSend, Duration delay) {
StringBuilder builder = new StringBuilder();
builder.append("/").append(target.getService());
Expand All @@ -186,7 +269,7 @@ private <Req> HttpRequest prepareHttpRequest(
Serde<Req> reqSerde,
Req req,
Duration delay,
RequestOptions options) {
CallRequestOptions options) {
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend, delay));

// Add content-type
Expand Down
Loading
Loading