Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Define BackendConfig protobuf and adapt it in Java #17201

Merged
merged 30 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6b71de0
Define Java Backend.
Jun 1, 2021
18e89bd
Add test case.
Jun 2, 2021
32829f3
Fix RayServeReplicaTest
Jun 2, 2021
380b0cc
Fix code style.
Jun 8, 2021
55de40a
Add ray serve test in bazel.
Jun 30, 2021
cbd548c
revert maven repo.
Jun 30, 2021
f575b51
Fix BUILD.bazel's format.
Jun 30, 2021
1f18d2f
Fix RayServeReplicaTest.
Jul 1, 2021
c7e1919
Fix RayServeReplicaTest(add Ray.shutdown)
Jul 1, 2021
4354294
Delete SINGLE_PROCESS in RayServeReplicaTest.
Jul 1, 2021
a5ef9d3
Merge branch 'master' of github.com:alipay/ray into define-java-backend
Jul 12, 2021
4a4d5c8
Merge pull request #178 from alipay/define-java-backend
liuyang-my Jul 12, 2021
062cbd0
Add BackendConfig protobuf define.
Jul 12, 2021
6e513a4
Add BackendConfig protobuf.
Jul 13, 2021
f69adc2
Revert maven repo
Jul 13, 2021
abc778b
Merge origin/master and fix conflict.
Jul 20, 2021
c74adde
Fix serve.proto
Jul 20, 2021
45a0ef7
User msgpack serializer.
Aug 3, 2021
8fe6135
Move the getUserConfig method to BackendConfigUtil.
Aug 3, 2021
caf80c6
Add protobuf maven in bazel file.
Aug 4, 2021
9c8f117
Add protobuf dependency in pom for serve.
Aug 4, 2021
044bdd7
Add java/serve/generated file to gitigonre.
Aug 4, 2021
aa59cd5
Fix CI:all_test_deploy problem of serve.
Aug 4, 2021
5b002ed
Delete unused deps in serve.
Aug 4, 2021
f9f935b
Add serve/generated exclude in java pom.
Aug 4, 2021
a05a85b
Java checkstyle filter add serve/generated.
Aug 4, 2021
56a9089
Format serve.proto file.
Aug 4, 2021
f22e8be
Fix bazel file's format.
Aug 4, 2021
1cec6f6
Fix RayServeReplicaTest.java
Aug 4, 2021
127eadd
Merge branch 'master' of github.com:alipay/ray into java-backend-conf…
Aug 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ define_java_module(
"@maven//:com_google_guava_guava",
"@maven//:org_slf4j_slf4j_api",
"@maven//:org_testng_testng",
"@maven//:com_caucho_hessian",
],
visibility = ["//visibility:public"],
deps = [
Expand All @@ -152,6 +153,7 @@ define_java_module(
"@maven//:com_google_guava_guava",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_slf4j_slf4j_api",
"@maven//:com_caucho_hessian",
],
)

Expand All @@ -178,6 +180,11 @@ java_proto_compile(
deps = ["@//src/ray/protobuf:gcs_proto"],
)

java_proto_compile(
name = "serve_java_proto",
deps = ["@//src/ray/protobuf:serve_proto"],
)

filegroup(
name = "all_java_proto",
srcs = [
Expand All @@ -201,6 +208,7 @@ genrule(
srcs = [
":all_java_proto",
":copy_pom_file",
":serve_java_proto",
],
outs = ["cp_java_generated.out"],
cmd = """
Expand All @@ -210,6 +218,10 @@ genrule(
for f in $(locations //java:all_java_proto); do
unzip "$$f" -x META-INF/MANIFEST.MF -d "$$WORK_DIR/java/runtime/src/main/java"
done
rm -rf "$$WORK_DIR/java/serve/src/main/java/io/ray/serve/generated"
for f in $(locations //java:serve_java_proto); do
unzip "$$f" -x META-INF/MANIFEST.MF -d "$$WORK_DIR/java/serve/src/main/java"
done
date > $@
""",
local = 1,
Expand Down
1 change: 1 addition & 0 deletions java/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def gen_java_deps():
"com.lmax:disruptor:3.3.4",
"org.yaml:snakeyaml:1.26",
"net.java.dev.jna:jna:5.5.0",
"com.caucho:hessian:4.0.62",
maven.artifact(
group = "org.testng",
artifact = "testng",
Expand Down
5 changes: 5 additions & 0 deletions java/serve/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>ray-runtime</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.62</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
86 changes: 0 additions & 86 deletions java/serve/src/main/java/io/ray/serve/BackendConfig.java

This file was deleted.

7 changes: 4 additions & 3 deletions java/serve/src/main/java/io/ray/serve/RayServeReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.ray.runtime.metric.MetricConfig;
import io.ray.runtime.metric.Metrics;
import io.ray.serve.api.Serve;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.poll.KeyListener;
import io.ray.serve.poll.KeyType;
import io.ray.serve.poll.LongPollClient;
Expand Down Expand Up @@ -190,7 +191,7 @@ private Method getRunnerMethod(Query query) {
public void drainPendingQueries() {
while (true) {
try {
Thread.sleep(config.getExperimentalGracefulShutdownWaitLoopS() * 1000);
Thread.sleep((long) (config.getExperimentalGracefulShutdownWaitLoopS() * 1000));
} catch (InterruptedException e) {
LOGGER.error(
"Replica {} was interrupted in sheep when draining pending queries", replicaTag);
Expand All @@ -212,8 +213,8 @@ public void drainPendingQueries() {
*
* @param userConfig new user's configuration
*/
private void reconfigure(Object userConfig) {
if (userConfig == null) {
private void reconfigure(com.google.protobuf.ByteString userConfig) {
if (userConfig == null || userConfig.size() == 0) {
return;
}
try {
Expand Down
38 changes: 33 additions & 5 deletions java/serve/src/main/java/io/ray/serve/RayServeWrappedReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.serve.api.Serve;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.serializer.Hessian2Seserializer;
import io.ray.serve.util.BackendConfigUtil;
import io.ray.serve.util.ReflectUtil;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -19,11 +23,17 @@ public RayServeWrappedReplica(
String backendTag,
String replicaTag,
String backendDef,
Object[] initArgs,
BackendConfig backendConfig,
byte[] initArgsbytes,
byte[] backendConfigBytes,
String controllerName)
throws ClassNotFoundException, NoSuchMethodException, InstantiationException,
IllegalAccessException, IllegalArgumentException, InvocationTargetException {
IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException {

// Parse BackendConfig.
BackendConfig backendConfig = BackendConfigUtil.parseFrom(backendConfigBytes);

// Parse init args.
Object[] initArgs = parseInitArgs(initArgsbytes, backendConfig);

// Instantiate the object defined by backendDef.
Class backendClass = Class.forName(backendDef);
Expand All @@ -43,6 +53,24 @@ public RayServeWrappedReplica(
backend = new RayServeReplica(callable, backendConfig, optional.get());
}

private Object[] parseInitArgs(byte[] initArgsbytes, BackendConfig backendConfig)
throws IOException {

if (initArgsbytes == null || initArgsbytes.length == 0) {
return new Object[0];
}

if (!backendConfig.getIsCrossLanguage()) {
// If the construction request is from Java API, deserialize initArgsbytes to Object[]
// directly.
Object result = Hessian2Seserializer.decode(initArgsbytes);
return (Object[]) result;
} else {
// TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain the TODO?

return new Object[0];
}
}

/**
* The entry method to process the request.
*
Expand All @@ -51,7 +79,7 @@ public RayServeWrappedReplica(
* backendDef.
* @return the result of request being processed
*/
public Object handle_request(RequestMetadata requestMetadata, Object[] requestArgs) {
public Object handleRequest(RequestMetadata requestMetadata, Object[] requestArgs) {
return backend.handleRequest(new Query(requestArgs, requestMetadata));
}

Expand All @@ -61,7 +89,7 @@ public void ready() {
}

/** Wait until there is no request in processing. It is used for stopping replica gracefully. */
public void drain_pending_queries() {
public void drainPendingQueries() {
backend.drainPendingQueries();
}
}
2 changes: 1 addition & 1 deletion java/serve/src/main/java/io/ray/serve/RequestMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class RequestMetadata implements Serializable {

private String endpoint;

private String callMethod = "__call__";
private String callMethod = "call";

private String httpMethod;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.ray.serve.serializer;

import com.caucho.hessian.io.Hessian2Input;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use Hessian as the serialization format here? If it's expected to be used cross language we should use messagepack, if not is Hessian a standard serialization protocol for Java? If so does Ray core use it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hesssian's compatibility is better when adding new properties to the object. But MsgPack performance better in cross-language scenarios. I think MsgPack is the right choice here.

import com.caucho.hessian.io.Hessian2Output;
import com.caucho.hessian.io.SerializerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class Hessian2Seserializer {

private static SerializerFactory DEFAULT_FACTORY = new SerializerFactory();

public static byte[] encode(Object data) throws IOException {
return encode(data, DEFAULT_FACTORY);
}

public static byte[] encode(Object data, SerializerFactory serializerFactory) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Hessian2Output hessian2Output = new Hessian2Output(byteArrayOutputStream);
hessian2Output.setSerializerFactory(serializerFactory);
hessian2Output.writeObject(data);
hessian2Output.flush();
return byteArrayOutputStream.toByteArray();
}

public static Object decode(byte[] data) throws IOException {
return decode(data, DEFAULT_FACTORY);
}

public static Object decode(byte[] data, SerializerFactory serializerFactory) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
Hessian2Input hessian2Input = new Hessian2Input(byteArrayInputStream);
hessian2Input.setSerializerFactory(serializerFactory);
return hessian2Input.readObject();
}
}
69 changes: 69 additions & 0 deletions java/serve/src/main/java/io/ray/serve/util/BackendConfigUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.ray.serve.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.serve.RayServeException;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.generated.BackendLanguage;

public class BackendConfigUtil {

public static BackendConfig parseFrom(byte[] backendConfigBytes)
throws InvalidProtocolBufferException {

// Parse BackendConfig from byte[].
BackendConfig inputBackendConfig = BackendConfig.parseFrom(backendConfigBytes);
if (inputBackendConfig == null) {
return null;
}

// Set default values.
BackendConfig.Builder builder = BackendConfig.newBuilder();

if (inputBackendConfig.getNumReplicas() == 0) {
builder.setNumReplicas(1);
} else {
builder.setNumReplicas(inputBackendConfig.getNumReplicas());
}

Preconditions.checkArgument(
inputBackendConfig.getMaxConcurrentQueries() >= 0, "max_concurrent_queries must be >= 0");
if (inputBackendConfig.getMaxConcurrentQueries() == 0) {
builder.setMaxConcurrentQueries(100);
} else {
builder.setMaxConcurrentQueries(inputBackendConfig.getMaxConcurrentQueries());
}

builder.setUserConfig(inputBackendConfig.getUserConfig());

if (inputBackendConfig.getExperimentalGracefulShutdownWaitLoopS() == 0) {
builder.setExperimentalGracefulShutdownWaitLoopS(2);
} else {
builder.setExperimentalGracefulShutdownWaitLoopS(
inputBackendConfig.getExperimentalGracefulShutdownWaitLoopS());
}

if (inputBackendConfig.getExperimentalGracefulShutdownTimeoutS() == 0) {
builder.setExperimentalGracefulShutdownTimeoutS(20);
} else {
builder.setExperimentalGracefulShutdownTimeoutS(
inputBackendConfig.getExperimentalGracefulShutdownTimeoutS());
}

builder.setIsCrossLanguage(inputBackendConfig.getIsCrossLanguage());

if (inputBackendConfig.getBackendLanguage() == BackendLanguage.UNRECOGNIZED) {
throw new RayServeException(
LogUtil.format(
"Unrecognized backend language {}. Backend language must be in {}.",
inputBackendConfig.getBackendLanguageValue(),
Lists.newArrayList(BackendLanguage.values())));
} else {
builder.setBackendLanguage(inputBackendConfig.getBackendLanguage());
}

return builder.build();
}

}
Loading