Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove protobuf from the public api of sdk-common #321

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk-api-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation(testingLibs.junit.jupiter)
testImplementation(testingLibs.assertj)
testImplementation(coreLibs.log4j.core)
testImplementation(coreLibs.protobuf.java)

testImplementation(project(":sdk-core", "testArchive"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.syscalls.Deferred
import dev.restate.sdk.common.syscalls.Result
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine

Expand Down Expand Up @@ -79,14 +79,14 @@ internal abstract class BaseSingleMappedAwaitableImpl<T : Any, U : Any>(
internal open class SingleSerdeAwaitableImpl<T : Any>
internal constructor(
syscalls: Syscalls,
deferred: Deferred<ByteString>,
deferred: Deferred<ByteBuffer>,
private val serde: Serde<T>,
) :
BaseSingleMappedAwaitableImpl<ByteString, T>(
BaseSingleMappedAwaitableImpl<ByteBuffer, T>(
SingleAwaitableImpl(syscalls, deferred),
) {
@Suppress("UNCHECKED_CAST")
override suspend fun map(res: Result<ByteString>): Result<T> {
override suspend fun map(res: Result<ByteBuffer>): Result<T> {
return if (res.isSuccess) {
// This propagates exceptions as non-terminal
Result.success(serde.deserializeWrappingException(syscalls, res.value!!))
Expand Down Expand Up @@ -151,7 +151,7 @@ internal fun wrapAnyAwaitable(awaitables: List<Awaitable<*>>): AnyAwaitable {
internal class AwakeableImpl<T : Any>
internal constructor(
syscalls: Syscalls,
deferred: Deferred<ByteString>,
deferred: Deferred<ByteBuffer>,
serde: Serde<T>,
override val id: String
) : SingleSerdeAwaitableImpl<T>(syscalls, deferred, serde), Awakeable<T> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.*
import dev.restate.sdk.common.Target
import dev.restate.sdk.common.syscalls.Deferred
import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback
import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlin.coroutines.resume
import kotlin.time.Duration
import kotlin.time.toJavaDuration
Expand All @@ -33,8 +33,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override suspend fun <T : Any> get(key: StateKey<T>): T? {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.get(key.name(), completingContinuation(cont))
}

Expand Down Expand Up @@ -109,8 +109,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
): Awaitable<R> {
val input = inputSerde.serializeWrappingException(syscalls, parameter)

val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.call(target, input, completingContinuation(cont))
}

Expand All @@ -136,19 +136,19 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteString?) {
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()
override fun onSuccess(t: ByteBuffer?) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.complete(t!!)
cont.resume(deferred)
}

override fun onFailure(t: TerminalException) {
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.completeExceptionally(t)
cont.resume(deferred)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)

val exitCallback =
object : ExitSideEffectSyscallCallback {
override fun onSuccess(t: ByteString?) {
override fun onSuccess(t: ByteBuffer?) {
exitResult.complete(t!!)
}

Expand All @@ -208,7 +208,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
override suspend fun <T : Any> awakeable(serde: Serde<T>): Awakeable<T> {
val (aid, deferredResult) =
suspendCancellableCoroutine {
cont: CancellableContinuation<Map.Entry<String, Deferred<ByteString>>> ->
cont: CancellableContinuation<Map.Entry<String, Deferred<ByteBuffer>>> ->
syscalls.awakeable(completingContinuation(cont))
}

Expand All @@ -234,17 +234,17 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
inner class DurablePromiseImpl<T : Any>(private val key: DurablePromiseKey<T>) :
DurablePromise<T> {
override suspend fun awaitable(): Awaitable<T> {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.promise(key.name(), completingContinuation(cont))
}

return SingleSerdeAwaitableImpl(syscalls, deferred, key.serde())
}

override suspend fun peek(): T? {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
}

Expand All @@ -265,8 +265,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override suspend fun isCompleted(): Boolean {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.TerminalException
import dev.restate.sdk.common.syscalls.HandlerSpecification
import dev.restate.sdk.common.syscalls.SyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import io.opentelemetry.extension.kotlin.asContextElement
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -45,7 +45,7 @@ internal constructor(
handlerSpecification: HandlerSpecification<REQ, RES>,
syscalls: Syscalls,
options: Options?,
callback: SyscallCallback<ByteString>
callback: SyscallCallback<ByteBuffer>
) {
val ctx: Context = ContextImpl(syscalls)

Expand All @@ -57,7 +57,7 @@ internal constructor(
.asContextElement(syscalls) +
syscalls.request().otelContext()!!.asContextElement())
scope.launch {
val serializedResult: ByteString
val serializedResult: ByteBuffer

try {
// Parse input
Expand All @@ -77,7 +77,7 @@ internal constructor(

// Serialize output
try {
serializedResult = handlerSpecification.responseSerde.serializeToByteString(res)
serializedResult = handlerSpecification.responseSerde.serializeToByteBuffer(res)
} catch (e: Error) {
throw e
} catch (e: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.DurablePromiseKey
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import kotlin.reflect.typeOf
import kotlinx.serialization.KSerializer
Expand Down Expand Up @@ -51,15 +51,15 @@ object KtSerdes {
return ByteArray(0)
}

override fun serializeToByteString(value: Unit?): ByteString {
return ByteString.EMPTY
override fun serializeToByteBuffer(value: Unit?): ByteBuffer {
return ByteBuffer.allocate(0)
}

override fun deserialize(value: ByteArray) {
return
}

override fun deserialize(byteString: ByteString) {
override fun deserialize(byteBuffer: ByteBuffer) {
return
}

Expand All @@ -71,12 +71,12 @@ object KtSerdes {
/** Creates a [Serde] implementation using the `kotlinx.serialization` json module. */
fun <T> json(serializer: KSerializer<T>): Serde<T> {
return object : Serde<T> {
override fun serialize(value: T?): ByteArray {
override fun serialize(value: T): ByteArray {
return Json.encodeToString(serializer, value!!).encodeToByteArray()
}

override fun deserialize(value: ByteArray?): T {
return Json.decodeFromString(serializer, String(value!!, StandardCharsets.UTF_8))
override fun deserialize(value: ByteArray): T {
return Json.decodeFromString(serializer, String(value, StandardCharsets.UTF_8))
}

override fun contentType(): String {
Expand Down
10 changes: 5 additions & 5 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.syscalls.SyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlin.coroutines.resume
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
Expand All @@ -32,9 +32,9 @@ internal fun completingUnitContinuation(
internal fun <T : Any?> Serde<T>.serializeWrappingException(
syscalls: Syscalls,
value: T?
): ByteString? {
): ByteBuffer {
return try {
this.serializeToByteString(value)
this.serializeToByteBuffer(value)
} catch (e: Exception) {
syscalls.fail(e)
throw CancellationException("Failed serialization", e)
Expand All @@ -43,10 +43,10 @@ internal fun <T : Any?> Serde<T>.serializeWrappingException(

internal fun <T : Any?> Serde<T>.deserializeWrappingException(
syscalls: Syscalls,
byteString: ByteString
ByteBuffer: ByteBuffer
): T {
return try {
this.deserialize(byteString)
this.deserialize(ByteBuffer)
} catch (e: Exception) {
syscalls.fail(e)
throw CancellationException("Failed deserialization", e)
Expand Down
6 changes: 3 additions & 3 deletions sdk-api/src/main/java/dev/restate/sdk/Awakeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk;

import com.google.protobuf.ByteString;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.syscalls.Deferred;
import dev.restate.sdk.common.syscalls.Result;
import dev.restate.sdk.common.syscalls.Syscalls;
import java.nio.ByteBuffer;

/**
* An {@link Awakeable} is a special type of {@link Awaitable} which can be arbitrarily completed by
Expand All @@ -28,11 +28,11 @@
* <p>NOTE: This interface MUST NOT be accessed concurrently since it can lead to different
* orderings of user actions, corrupting the execution of the invocation.
*/
public final class Awakeable<T> extends Awaitable.MappedAwaitable<ByteString, T> {
public final class Awakeable<T> extends Awaitable.MappedAwaitable<ByteBuffer, T> {

private final String identifier;

Awakeable(Syscalls syscalls, Deferred<ByteString> deferred, Serde<T> serde, String identifier) {
Awakeable(Syscalls syscalls, Deferred<ByteBuffer> deferred, Serde<T> serde, String identifier) {
super(
Awaitable.single(syscalls, deferred),
res -> {
Expand Down
Loading
Loading