From 4d8b90fa5607044f7c8593172a86186dc6ce4442 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 18 Apr 2024 16:43:21 +0200 Subject: [PATCH] Implement Shared handlers annotation for virtual objects --- .../java/my/restate/sdk/examples/Counter.java | 5 +- .../my/restate/sdk/examples/CounterKt.kt | 5 +- .../dev/restate/sdk/gen/ElementConverter.java | 3 ++ .../resources/templates/BindableService.hbs | 2 +- .../java/dev/restate/sdk/CodegenTest.java | 14 ++++-- .../sdk/kotlin/gen/KElementConverter.kt | 12 +++-- .../resources/templates/BindableService.hbs | 2 +- .../dev/restate/sdk/kotlin/CodegenTest.kt | 14 ++++-- .../kotlin/dev/restate/sdk/kotlin/Service.kt | 30 ++++++++---- .../main/kotlin/dev/restate/sdk/kotlin/api.kt | 9 +++- .../sdk/kotlin/KotlinCoroutinesTests.kt | 2 +- .../java/dev/restate/sdk/ObjectContext.java | 28 +---------- .../main/java/dev/restate/sdk/Service.java | 16 +++++-- .../dev/restate/sdk/SharedObjectContext.java | 48 +++++++++++++++++++ .../dev/restate/sdk/JavaBlockingTests.java | 2 +- .../dev/restate/sdk/common/HandlerType.java | 14 ++++++ .../common/syscalls/HandlerDefinition.java | 13 ++++- .../restate/sdk/core/DeploymentManifest.java | 18 ++++++- .../core/ComponentDiscoveryHandlerTest.java | 5 +- .../vertx/testservices/GreeterKtComponent.kt | 2 +- .../testservices/KotlinCounterService.kt | 4 +- .../restate/sdk/workflow/WorkflowBuilder.java | 5 +- .../sdk/workflow/impl/WorkflowImpl.java | 20 ++++---- 23 files changed, 201 insertions(+), 72 deletions(-) create mode 100644 sdk-api/src/main/java/dev/restate/sdk/SharedObjectContext.java create mode 100644 sdk-common/src/main/java/dev/restate/sdk/common/HandlerType.java diff --git a/examples/src/main/java/my/restate/sdk/examples/Counter.java b/examples/src/main/java/my/restate/sdk/examples/Counter.java index 8ee4e9d1..1b014a38 100644 --- a/examples/src/main/java/my/restate/sdk/examples/Counter.java +++ b/examples/src/main/java/my/restate/sdk/examples/Counter.java @@ -9,7 +9,9 @@ package my.restate.sdk.examples; import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.SharedObjectContext; import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Shared; import dev.restate.sdk.annotation.VirtualObject; import dev.restate.sdk.common.CoreSerdes; import dev.restate.sdk.common.StateKey; @@ -36,8 +38,9 @@ public void add(ObjectContext ctx, Long request) { ctx.set(TOTAL, newValue); } + @Shared @Handler - public Long get(ObjectContext ctx) { + public Long get(SharedObjectContext ctx) { return ctx.get(TOTAL).orElse(0L); } diff --git a/examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt b/examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt index d2633592..9940e3da 100644 --- a/examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt +++ b/examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt @@ -9,10 +9,12 @@ package my.restate.sdk.examples import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder import dev.restate.sdk.kotlin.KtStateKey import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.SharedObjectContext import kotlinx.serialization.Serializable import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -40,7 +42,8 @@ class CounterKt { } @Handler - suspend fun get(ctx: ObjectContext): Long { + @Shared + suspend fun get(ctx: SharedObjectContext): Long { return ctx.get(TOTAL) ?: 0L } diff --git a/sdk-api-gen/src/main/java/dev/restate/sdk/gen/ElementConverter.java b/sdk-api-gen/src/main/java/dev/restate/sdk/gen/ElementConverter.java index a2238585..9b37f3b7 100644 --- a/sdk-api-gen/src/main/java/dev/restate/sdk/gen/ElementConverter.java +++ b/sdk-api-gen/src/main/java/dev/restate/sdk/gen/ElementConverter.java @@ -10,6 +10,7 @@ import dev.restate.sdk.Context; import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.SharedObjectContext; import dev.restate.sdk.annotation.Exclusive; import dev.restate.sdk.annotation.Shared; import dev.restate.sdk.annotation.Workflow; @@ -233,6 +234,8 @@ private void validateMethodSignature( case SHARED: if (serviceType == ServiceType.WORKFLOW) { validateFirstParameterType(WorkflowSharedContext.class, element); + } else if (serviceType == ServiceType.VIRTUAL_OBJECT) { + validateFirstParameterType(SharedObjectContext.class, element); } else { messager.printMessage( Diagnostic.Kind.ERROR, diff --git a/sdk-api-gen/src/main/resources/templates/BindableService.hbs b/sdk-api-gen/src/main/resources/templates/BindableService.hbs index ad4c7e5b..132adc65 100644 --- a/sdk-api-gen/src/main/resources/templates/BindableService.hbs +++ b/sdk-api-gen/src/main/resources/templates/BindableService.hbs @@ -13,7 +13,7 @@ public class {{generatedClassSimpleName}} implements dev.restate.sdk.common.Bind public {{generatedClassSimpleName}}({{originalClassFqcn}} bindableService, dev.restate.sdk.Service.Options options) { this.service = dev.restate.sdk.Service.{{#if isObject}}virtualObject{{else}}service{{/if}}(SERVICE_NAME) {{#handlers}} - .with( + .{{#if isShared}}withShared{{else if isExclusive}}withExclusive{{else}}with{{/if}}( dev.restate.sdk.Service.HandlerSignature.of("{{name}}", {{{inputSerdeDecl}}}, {{{outputSerdeDecl}}}), (ctx, req) -> { {{#if outputEmpty}} diff --git a/sdk-api-gen/src/test/java/dev/restate/sdk/CodegenTest.java b/sdk-api-gen/src/test/java/dev/restate/sdk/CodegenTest.java index 72974ffc..eeae2221 100644 --- a/sdk-api-gen/src/test/java/dev/restate/sdk/CodegenTest.java +++ b/sdk-api-gen/src/test/java/dev/restate/sdk/CodegenTest.java @@ -12,10 +12,8 @@ import static dev.restate.sdk.core.TestDefinitions.testInvocation; import com.google.protobuf.ByteString; -import dev.restate.sdk.annotation.Exclusive; -import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.*; import dev.restate.sdk.annotation.Service; -import dev.restate.sdk.annotation.VirtualObject; import dev.restate.sdk.common.CoreSerdes; import dev.restate.sdk.common.Target; import dev.restate.sdk.core.ProtoUtils; @@ -39,6 +37,12 @@ static class ObjectGreeter { String greet(ObjectContext context, String request) { return request; } + + @Handler + @Shared + String sharedGreet(SharedObjectContext context, String request) { + return request; + } } @VirtualObject @@ -113,6 +117,10 @@ public Stream definitions() { .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) .onlyUnbuffered() .expectingOutput(outputMessage("Francesco"), END_MESSAGE), + testInvocation(ObjectGreeter::new, "sharedGreet") + .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) + .onlyUnbuffered() + .expectingOutput(outputMessage("Francesco"), END_MESSAGE), testInvocation(ObjectGreeterImplementedFromInterface::new, "greet") .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) .onlyUnbuffered() diff --git a/sdk-api-kotlin-gen/src/main/kotlin/dev/restate/sdk/kotlin/gen/KElementConverter.kt b/sdk-api-kotlin-gen/src/main/kotlin/dev/restate/sdk/kotlin/gen/KElementConverter.kt index c08f8e18..5c335f23 100644 --- a/sdk-api-kotlin-gen/src/main/kotlin/dev/restate/sdk/kotlin/gen/KElementConverter.kt +++ b/sdk-api-kotlin-gen/src/main/kotlin/dev/restate/sdk/kotlin/gen/KElementConverter.kt @@ -23,6 +23,7 @@ import dev.restate.sdk.gen.model.PayloadType import dev.restate.sdk.gen.model.Service import dev.restate.sdk.kotlin.Context import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.SharedObjectContext import java.util.regex.Pattern import kotlin.reflect.KClass @@ -128,7 +129,7 @@ class KElementConverter(private val logger: KSPLogger, private val builtIns: KSB } val isAnnotatedWithShared = - function.isAnnotationPresent(dev.restate.sdk.annotation.Service::class) + function.isAnnotationPresent(dev.restate.sdk.annotation.Shared::class) val isAnnotatedWithExclusive = function.isAnnotationPresent(dev.restate.sdk.annotation.Exclusive::class) @@ -190,8 +191,13 @@ class KElementConverter(private val logger: KSPLogger, private val builtIns: KSB } when (handlerType) { HandlerType.SHARED -> - logger.error( - "The annotation @Shared is not supported by the service type $serviceType", function) + if (serviceType == ServiceType.VIRTUAL_OBJECT) { + validateFirstParameterType(SharedObjectContext::class, function) + } else { + logger.error( + "The annotation @Shared is not supported by the service type $serviceType", + function) + } HandlerType.EXCLUSIVE -> if (serviceType == ServiceType.VIRTUAL_OBJECT) { validateFirstParameterType(ObjectContext::class, function) diff --git a/sdk-api-kotlin-gen/src/main/resources/templates/BindableService.hbs b/sdk-api-kotlin-gen/src/main/resources/templates/BindableService.hbs index 5e7d044f..bdbb106d 100644 --- a/sdk-api-kotlin-gen/src/main/resources/templates/BindableService.hbs +++ b/sdk-api-kotlin-gen/src/main/resources/templates/BindableService.hbs @@ -11,7 +11,7 @@ class {{generatedClassSimpleName}}( val service: dev.restate.sdk.kotlin.Service = dev.restate.sdk.kotlin.Service.{{#if isObject}}virtualObject{{else}}service{{/if}}(SERVICE_NAME, options) { {{#handlers}} - handler(dev.restate.sdk.kotlin.Service.HandlerSignature("{{name}}", {{{inputSerdeDecl}}}, {{{outputSerdeDecl}}})) { ctx, req -> + {{#if isShared}}sharedHandler{{else if isExclusive}}exclusiveHandler{{else}}handler{{/if}}(dev.restate.sdk.kotlin.Service.HandlerSignature("{{name}}", {{{inputSerdeDecl}}}, {{{outputSerdeDecl}}})) { ctx, req -> {{#if inputEmpty}}bindableService.{{name}}(ctx){{else}}bindableService.{{name}}(ctx, req){{/if}} } {{/handlers}} diff --git a/sdk-api-kotlin-gen/src/test/kotlin/dev/restate/sdk/kotlin/CodegenTest.kt b/sdk-api-kotlin-gen/src/test/kotlin/dev/restate/sdk/kotlin/CodegenTest.kt index eb7bb94e..d2c2f5b4 100644 --- a/sdk-api-kotlin-gen/src/test/kotlin/dev/restate/sdk/kotlin/CodegenTest.kt +++ b/sdk-api-kotlin-gen/src/test/kotlin/dev/restate/sdk/kotlin/CodegenTest.kt @@ -9,10 +9,8 @@ package dev.restate.sdk.kotlin import com.google.protobuf.ByteString -import dev.restate.sdk.annotation.Exclusive -import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.* import dev.restate.sdk.annotation.Service -import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.common.CoreSerdes import dev.restate.sdk.common.Target import dev.restate.sdk.core.ProtoUtils.* @@ -36,6 +34,12 @@ class CodegenTest : TestDefinitions.TestSuite { suspend fun greet(context: ObjectContext, request: String): String { return request } + + @Handler + @Shared + suspend fun sharedGreet(context: SharedObjectContext, request: String): String { + return request + } } @VirtualObject @@ -104,6 +108,10 @@ class CodegenTest : TestDefinitions.TestSuite { .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) .onlyUnbuffered() .expectingOutput(outputMessage("Francesco"), END_MESSAGE), + testInvocation({ ObjectGreeter() }, "sharedGreet") + .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) + .onlyUnbuffered() + .expectingOutput(outputMessage("Francesco"), END_MESSAGE), testInvocation({ ObjectGreeterImplementedFromInterface() }, "greet") .withInput(startMessage(1, "slinkydeveloper"), inputMessage("Francesco")) .onlyUnbuffered() diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Service.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Service.kt index 78f18cb8..da5cddfe 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Service.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Service.kt @@ -9,10 +9,7 @@ package dev.restate.sdk.kotlin import com.google.protobuf.ByteString -import dev.restate.sdk.common.BindableService -import dev.restate.sdk.common.Serde -import dev.restate.sdk.common.ServiceType -import dev.restate.sdk.common.TerminalException +import dev.restate.sdk.common.* import dev.restate.sdk.common.syscalls.* import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.CoroutineScope @@ -65,18 +62,31 @@ private constructor( class VirtualObjectBuilder internal constructor(private val name: String) { private val handlers: MutableMap> = mutableMapOf() - fun handler( + fun sharedHandler( sig: HandlerSignature, runner: suspend (ObjectContext, REQ) -> RES ): VirtualObjectBuilder { - handlers[sig.name] = Handler(sig, runner) + handlers[sig.name] = Handler(sig, HandlerType.SHARED, runner) return this } - inline fun handler( + inline fun sharedHandler( name: String, noinline runner: suspend (ObjectContext, REQ) -> RES - ) = this.handler(HandlerSignature(name, KtSerdes.json(), KtSerdes.json()), runner) + ) = this.sharedHandler(HandlerSignature(name, KtSerdes.json(), KtSerdes.json()), runner) + + fun exclusiveHandler( + sig: HandlerSignature, + runner: suspend (ObjectContext, REQ) -> RES + ): VirtualObjectBuilder { + handlers[sig.name] = Handler(sig, HandlerType.EXCLUSIVE, runner) + return this + } + + inline fun exclusiveHandler( + name: String, + noinline runner: suspend (ObjectContext, REQ) -> RES + ) = this.exclusiveHandler(HandlerSignature(name, KtSerdes.json(), KtSerdes.json()), runner) fun build(options: Options) = Service(this.name, true, this.handlers, options) } @@ -88,7 +98,7 @@ private constructor( sig: HandlerSignature, runner: suspend (Context, REQ) -> RES ): ServiceBuilder { - handlers[sig.name] = Handler(sig, runner) + handlers[sig.name] = Handler(sig, HandlerType.SHARED, runner) return this } @@ -102,6 +112,7 @@ private constructor( class Handler( private val handlerSignature: HandlerSignature, + private val handlerType: HandlerType, private val runner: suspend (CTX, REQ) -> RES, ) : InvocationHandler { @@ -112,6 +123,7 @@ private constructor( fun toHandlerDefinition() = HandlerDefinition( handlerSignature.name, + handlerType, handlerSignature.requestSerde.schema(), handlerSignature.responseSerde.schema(), this) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index 926a7b9c..12084ce8 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -247,7 +247,7 @@ suspend inline fun Context.awakeable(): Awakeable { * This interface extends [Context] adding access to the virtual object instance key-value state * storage. */ -sealed interface ObjectContext : Context { +sealed interface SharedObjectContext : Context { /** @return the key of this object */ fun key(): String @@ -267,6 +267,13 @@ sealed interface ObjectContext : Context { * @return the immutable collection of known state keys. */ suspend fun stateKeys(): Collection +} + +/** + * This interface extends [Context] adding access to the virtual object instance key-value state + * storage. + */ +sealed interface ObjectContext : SharedObjectContext { /** * Sets the given value under the given key, serializing the value using the [StateKey.serde]. diff --git a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/KotlinCoroutinesTests.kt b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/KotlinCoroutinesTests.kt index 9bc8d00b..9a1e0ad9 100644 --- a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/KotlinCoroutinesTests.kt +++ b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/KotlinCoroutinesTests.kt @@ -51,7 +51,7 @@ class KotlinCoroutinesTests : TestRunner() { ): TestInvocationBuilder { return TestDefinitions.testInvocation( Service.virtualObject(name, Service.Options(Dispatchers.Unconfined)) { - handler("run", runner) + exclusiveHandler("run", runner) }, "run") } diff --git a/sdk-api/src/main/java/dev/restate/sdk/ObjectContext.java b/sdk-api/src/main/java/dev/restate/sdk/ObjectContext.java index 887ac603..6e87bce4 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/ObjectContext.java +++ b/sdk-api/src/main/java/dev/restate/sdk/ObjectContext.java @@ -9,8 +9,6 @@ package dev.restate.sdk; import dev.restate.sdk.common.*; -import java.util.Collection; -import java.util.Optional; import org.jspecify.annotations.NonNull; /** @@ -22,31 +20,7 @@ * * @see Context */ -public interface ObjectContext extends Context { - - /** - * @return the key of this object - */ - String key(); - - /** - * Gets the state stored under key, deserializing the raw value using the {@link Serde} in the - * {@link StateKey}. - * - * @param key identifying the state to get and its type. - * @return an {@link Optional} containing the stored state deserialized or an empty {@link - * Optional} if not set yet. - * @throws RuntimeException when the state cannot be deserialized. - */ - Optional get(StateKey key); - - /** - * Gets all the known state keys for this virtual object instance. - * - * @return the immutable collection of known state keys. - */ - Collection stateKeys(); - +public interface ObjectContext extends SharedObjectContext { /** * Clears the state stored under key. * diff --git a/sdk-api/src/main/java/dev/restate/sdk/Service.java b/sdk-api/src/main/java/dev/restate/sdk/Service.java index cb2391b7..3f0ccc14 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/Service.java +++ b/sdk-api/src/main/java/dev/restate/sdk/Service.java @@ -71,9 +71,15 @@ public static class VirtualObjectBuilder extends AbstractServiceBuilder { super(name); } - public VirtualObjectBuilder with( + public VirtualObjectBuilder withShared( + HandlerSignature sig, BiFunction runner) { + this.handlers.put(sig.getName(), new Handler<>(sig, HandlerType.SHARED, runner)); + return this; + } + + public VirtualObjectBuilder withExclusive( HandlerSignature sig, BiFunction runner) { - this.handlers.put(sig.getName(), new Handler<>(sig, runner)); + this.handlers.put(sig.getName(), new Handler<>(sig, HandlerType.EXCLUSIVE, runner)); return this; } @@ -90,7 +96,7 @@ public static class ServiceBuilder extends AbstractServiceBuilder { public ServiceBuilder with( HandlerSignature sig, BiFunction runner) { - this.handlers.put(sig.getName(), new Handler<>(sig, runner)); + this.handlers.put(sig.getName(), new Handler<>(sig, HandlerType.SHARED, runner)); return this; } @@ -102,14 +108,17 @@ public Service build(Service.Options options) { @SuppressWarnings("unchecked") public static class Handler implements InvocationHandler { private final HandlerSignature handlerSignature; + private final HandlerType handlerType; private final BiFunction runner; private static final Logger LOG = LogManager.getLogger(Handler.class); public Handler( HandlerSignature handlerSignature, + HandlerType handlerType, BiFunction runner) { this.handlerSignature = handlerSignature; + this.handlerType = handlerType; this.runner = (BiFunction) runner; } @@ -124,6 +133,7 @@ public BiFunction getRunner() { public HandlerDefinition toHandlerDefinition() { return new HandlerDefinition<>( this.handlerSignature.name, + this.handlerType, this.handlerSignature.requestSerde.schema(), this.handlerSignature.responseSerde.schema(), this); diff --git a/sdk-api/src/main/java/dev/restate/sdk/SharedObjectContext.java b/sdk-api/src/main/java/dev/restate/sdk/SharedObjectContext.java new file mode 100644 index 00000000..2f095c09 --- /dev/null +++ b/sdk-api/src/main/java/dev/restate/sdk/SharedObjectContext.java @@ -0,0 +1,48 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk; + +import dev.restate.sdk.common.Serde; +import dev.restate.sdk.common.StateKey; +import java.util.Collection; +import java.util.Optional; + +/** + * This interface extends {@link Context} adding access to the virtual object instance key-value + * state storage + * + *

NOTE: This interface MUST NOT be accessed concurrently since it can lead to different + * orderings of user actions, corrupting the execution of the invocation. + * + * @see Context + */ +public interface SharedObjectContext extends Context { + /** + * @return the key of this object + */ + String key(); + + /** + * Gets the state stored under key, deserializing the raw value using the {@link Serde} in the + * {@link StateKey}. + * + * @param key identifying the state to get and its type. + * @return an {@link Optional} containing the stored state deserialized or an empty {@link + * Optional} if not set yet. + * @throws RuntimeException when the state cannot be deserialized. + */ + Optional get(StateKey key); + + /** + * Gets all the known state keys for this virtual object instance. + * + * @return the immutable collection of known state keys. + */ + Collection stateKeys(); +} diff --git a/sdk-api/src/test/java/dev/restate/sdk/JavaBlockingTests.java b/sdk-api/src/test/java/dev/restate/sdk/JavaBlockingTests.java index fdca53a7..82d0e023 100644 --- a/sdk-api/src/test/java/dev/restate/sdk/JavaBlockingTests.java +++ b/sdk-api/src/test/java/dev/restate/sdk/JavaBlockingTests.java @@ -58,7 +58,7 @@ public static TestInvocationBuilder testDefinitionForVirtualObject( String name, Serde reqSerde, Serde resSerde, BiFunction runner) { return TestDefinitions.testInvocation( Service.virtualObject(name) - .with(Service.HandlerSignature.of("run", reqSerde, resSerde), runner) + .withExclusive(Service.HandlerSignature.of("run", reqSerde, resSerde), runner) .build(Service.Options.DEFAULT), "run"); } diff --git a/sdk-common/src/main/java/dev/restate/sdk/common/HandlerType.java b/sdk-common/src/main/java/dev/restate/sdk/common/HandlerType.java new file mode 100644 index 00000000..d08d32bd --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/common/HandlerType.java @@ -0,0 +1,14 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.common; + +public enum HandlerType { + SHARED, + EXCLUSIVE +} diff --git a/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/HandlerDefinition.java b/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/HandlerDefinition.java index 53fae8db..64287eb5 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/HandlerDefinition.java +++ b/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/HandlerDefinition.java @@ -8,17 +8,24 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.common.syscalls; +import dev.restate.sdk.common.HandlerType; import java.util.Objects; public final class HandlerDefinition { private final String name; + private final HandlerType handlerType; private final Object inputSchema; private final Object outputSchema; private final InvocationHandler handler; public HandlerDefinition( - String name, Object inputSchema, Object outputSchema, InvocationHandler handler) { + String name, + HandlerType handlerType, + Object inputSchema, + Object outputSchema, + InvocationHandler handler) { this.name = name; + this.handlerType = handlerType; this.inputSchema = inputSchema; this.outputSchema = outputSchema; this.handler = handler; @@ -28,6 +35,10 @@ public String getName() { return name; } + public HandlerType getHandlerType() { + return handlerType; + } + public Object getInputSchema() { return inputSchema; } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/DeploymentManifest.java b/sdk-core/src/main/java/dev/restate/sdk/core/DeploymentManifest.java index f99d9e65..bbe8d1fe 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/DeploymentManifest.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/DeploymentManifest.java @@ -8,6 +8,7 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.core; +import dev.restate.sdk.common.HandlerType; import dev.restate.sdk.common.ServiceType; import dev.restate.sdk.common.syscalls.ServiceDefinition; import dev.restate.sdk.core.manifest.Component; @@ -36,7 +37,12 @@ public DeploymentManifest( .withComponentType(convertServiceType(svc.getServiceType())) .withHandlers( svc.getHandlers().stream() - .map(method -> new Handler().withName(method.getName())) + .map( + method -> + new Handler() + .withHandlerType( + convertHandlerType(method.getHandlerType())) + .withName(method.getName())) .collect(Collectors.toList()))) .collect(Collectors.toList())); } @@ -55,4 +61,14 @@ private static Component.ComponentType convertServiceType(ServiceType serviceTyp } throw new IllegalStateException(); } + + private static Handler.HandlerType convertHandlerType(HandlerType handlerType) { + switch (handlerType) { + case EXCLUSIVE: + return Handler.HandlerType.EXCLUSIVE; + case SHARED: + return Handler.HandlerType.SHARED; + } + throw new IllegalStateException(); + } } diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java index c85d4bdc..e095dbb3 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java @@ -10,6 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import dev.restate.sdk.common.HandlerType; import dev.restate.sdk.common.ServiceType; import dev.restate.sdk.common.syscalls.HandlerDefinition; import dev.restate.sdk.common.syscalls.ServiceDefinition; @@ -31,7 +32,9 @@ void handleWithMultipleServices() { new ServiceDefinition<>( "MyGreeter", ServiceType.SERVICE, - List.of(new HandlerDefinition<>("greet", null, null, null))))); + List.of( + new HandlerDefinition<>( + "greet", HandlerType.EXCLUSIVE, null, null, null))))); DeploymentManifestSchema manifest = deploymentManifest.manifest(); diff --git a/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/testservices/GreeterKtComponent.kt b/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/testservices/GreeterKtComponent.kt index d1231cf8..77fac03c 100644 --- a/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/testservices/GreeterKtComponent.kt +++ b/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/testservices/GreeterKtComponent.kt @@ -19,7 +19,7 @@ private val COUNTER: StateKey = BlockingGreeter.COUNTER fun greeter(): BindableService<*> = Service.virtualObject("KtGreeter") { - handler("greet") { ctx, request: String -> + exclusiveHandler("greet") { ctx, request: String -> LOG.info("Greet invoked!") val count = (ctx.get(COUNTER) ?: 0) + 1 diff --git a/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt b/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt index 93641e7d..6b878da0 100644 --- a/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt +++ b/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt @@ -22,4 +22,6 @@ private val COUNTER: StateKey = { v: ByteArray? -> String(v!!, StandardCharsets.UTF_8).toLong() })) fun counter(): BindableService<*> = - Service.virtualObject("KtCounter") { handler("get") { ctx, _: Unit -> ctx.get(COUNTER) ?: -1 } } + Service.virtualObject("KtCounter") { + exclusiveHandler("get") { ctx, _: Unit -> ctx.get(COUNTER) ?: -1 } + } diff --git a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/WorkflowBuilder.java b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/WorkflowBuilder.java index 0f62d787..da2749bf 100644 --- a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/WorkflowBuilder.java +++ b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/WorkflowBuilder.java @@ -10,6 +10,7 @@ import dev.restate.sdk.Service; import dev.restate.sdk.common.BindableService; +import dev.restate.sdk.common.HandlerType; import dev.restate.sdk.workflow.impl.WorkflowImpl; import java.util.HashMap; import java.util.function.BiFunction; @@ -28,7 +29,7 @@ private WorkflowBuilder(String name, Service.Handler workflowMethod) { public WorkflowBuilder withShared( Service.HandlerSignature sig, BiFunction runner) { - this.sharedMethods.put(sig.getName(), new Service.Handler<>(sig, runner)); + this.sharedMethods.put(sig.getName(), new Service.Handler<>(sig, HandlerType.SHARED, runner)); return this; } @@ -40,6 +41,6 @@ public static WorkflowBuilder named( String name, Service.HandlerSignature sig, BiFunction runner) { - return new WorkflowBuilder(name, new Service.Handler<>(sig, runner)); + return new WorkflowBuilder(name, new Service.Handler<>(sig, HandlerType.SHARED, runner)); } } diff --git a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowImpl.java b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowImpl.java index 9b2237b7..e5484dd4 100644 --- a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowImpl.java +++ b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowImpl.java @@ -354,22 +354,22 @@ public List> definitions() { // Prepare workflow manager service Service workflowManager = Service.virtualObject(workflowManagerObjectName(name)) - .with( + .withExclusive( HandlerSignature.of("getState", CoreSerdes.JSON_STRING, GET_STATE_RESPONSE_SERDE), this::getState) - .with( + .withExclusive( HandlerSignature.of("setState", SET_STATE_REQUEST_SERDE, CoreSerdes.VOID), (context, setStateRequest) -> { this.setState(context, setStateRequest); return null; }) - .with( + .withExclusive( HandlerSignature.of("clearState", CoreSerdes.JSON_STRING, CoreSerdes.VOID), (context, s) -> { this.clearState(context, s); return null; }) - .with( + .withExclusive( HandlerSignature.of( "waitDurablePromiseCompletion", WAIT_DURABLE_PROMISE_COMPLETION_REQUEST_SERDE, @@ -378,13 +378,13 @@ public List> definitions() { this.waitDurablePromiseCompletion(context, waitDurablePromiseCompletionRequest); return null; }) - .with( + .withExclusive( HandlerSignature.of( "getDurablePromiseCompletion", CoreSerdes.JSON_STRING, MAYBE_DURABLE_PROMISE_COMPLETION_SERDE), this::getDurablePromiseCompletion) - .with( + .withExclusive( HandlerSignature.of( "completeDurablePromise", COMPLETE_DURABLE_PROMISE_REQUEST_SERDE, @@ -393,19 +393,19 @@ public List> definitions() { this.completeDurablePromise(context, completeDurablePromiseRequest); return null; }) - .with( + .withExclusive( HandlerSignature.of("tryStart", CoreSerdes.VOID, WORKFLOW_EXECUTION_STATE_SERDE), (context, unused) -> this.tryStart(context)) - .with( + .withExclusive( HandlerSignature.of("getOutput", CoreSerdes.VOID, GET_OUTPUT_RESPONSE_SERDE), (context, unused) -> this.getOutput(context)) - .with( + .withExclusive( HandlerSignature.of("setOutput", SET_OUTPUT_REQUEST_SERDE, CoreSerdes.VOID), (context, setOutputRequest) -> { this.setOutput(context, setOutputRequest); return null; }) - .with( + .withExclusive( HandlerSignature.of("cleanup", CoreSerdes.VOID, CoreSerdes.VOID), (context, unused) -> { this.cleanup(context);