Skip to content

Commit

Permalink
[Serve] Define BackendConfig protobuf and adapt it in Java (ray-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyang-my authored Aug 6, 2021
1 parent ac9a1a2 commit 12bd904
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 102 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
/src/ray/object_manager/format/*_generated.h
/src/ray/raylet/format/*_generated.h
/java/runtime/src/main/java/io/ray/runtime/generated/*
/java/serve/src/main/java/io/ray/serve/generated/*

# Files genrated by c++ worker should be ignored.
/cpp/example/thirdparty/
Expand Down
21 changes: 20 additions & 1 deletion java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,20 @@ define_java_module(

define_java_module(
name = "serve",
additional_srcs = [
":serve_java_proto",
],
define_test_lib = True,
exclude_srcs = [
"serve/src/main/java/io/ray/serve/generated/*.java",
],
test_deps = [
":io_ray_ray_api",
":io_ray_ray_runtime",
":io_ray_ray_serve",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_slf4j_slf4j_api",
"@maven//:org_testng_testng",
],
Expand All @@ -150,6 +158,7 @@ define_java_module(
":io_ray_ray_api",
":io_ray_ray_runtime",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_slf4j_slf4j_api",
],
Expand Down Expand Up @@ -178,6 +187,11 @@ java_proto_compile(
deps = ["@//src/ray/protobuf:gcs_proto"],
)

java_proto_compile(
name = "serve_java_proto",
deps = ["@//src/ray/protobuf:serve_proto"],
)

filegroup(
name = "all_java_proto",
srcs = [
Expand All @@ -201,6 +215,7 @@ genrule(
srcs = [
":all_java_proto",
":copy_pom_file",
":serve_java_proto",
],
outs = ["cp_java_generated.out"],
cmd = """
Expand All @@ -210,6 +225,10 @@ genrule(
for f in $(locations //java:all_java_proto); do
unzip "$$f" -x META-INF/MANIFEST.MF -d "$$WORK_DIR/java/runtime/src/main/java"
done
rm -rf "$$WORK_DIR/java/serve/src/main/java/io/ray/serve/generated"
for f in $(locations //java:serve_java_proto); do
unzip "$$f" -x META-INF/MANIFEST.MF -d "$$WORK_DIR/java/serve/src/main/java"
done
date > $@
""",
local = 1,
Expand Down
2 changes: 1 addition & 1 deletion java/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<!-- exclude the generated files from being parsed, which make checkstyle check more fast -->
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern"
value="io[\\/]ray[\\/].*runtime[\\/]generated[\\/]|RayCall.java|ActorCall.java"/>
value="io[\\/]ray[\\/].*runtime[\\/]generated[\\/]|RayCall.java|ActorCall.java|io[\\/]ray[\\/].*serve[\\/]generated[\\/]"/>
</module>

<module name="TreeWalker">
Expand Down
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@
<exclude>**/ActorCall.java</exclude>
<exclude>**/PyActorCall.java</exclude>
<exclude>**/runtime/generated/**/*.*</exclude>
<exclude>**/serve/generated/**/*.*</exclude>
</excludes>
<googleJavaFormat>
<version>1.7</version>
Expand Down
5 changes: 5 additions & 0 deletions java/serve/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
86 changes: 0 additions & 86 deletions java/serve/src/main/java/io/ray/serve/BackendConfig.java

This file was deleted.

6 changes: 4 additions & 2 deletions java/serve/src/main/java/io/ray/serve/RayServeReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import io.ray.runtime.metric.MetricConfig;
import io.ray.runtime.metric.Metrics;
import io.ray.serve.api.Serve;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.poll.KeyListener;
import io.ray.serve.poll.KeyType;
import io.ray.serve.poll.LongPollClient;
import io.ray.serve.poll.LongPollNamespace;
import io.ray.serve.util.BackendConfigUtil;
import io.ray.serve.util.LogUtil;
import io.ray.serve.util.ReflectUtil;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -57,7 +59,7 @@ public RayServeReplica(
this.replicaTag = Serve.getReplicaContext().getReplicaTag();
this.callable = callable;
this.config = backendConfig;
this.reconfigure(backendConfig.getUserConfig());
this.reconfigure(BackendConfigUtil.getUserConfig(backendConfig));

Map<KeyType, KeyListener> keyListeners = new HashMap<>();
keyListeners.put(
Expand Down Expand Up @@ -190,7 +192,7 @@ private Method getRunnerMethod(Query query) {
public void drainPendingQueries() {
while (true) {
try {
Thread.sleep(config.getExperimentalGracefulShutdownWaitLoopS() * 1000);
Thread.sleep((long) (config.getExperimentalGracefulShutdownWaitLoopS() * 1000));
} catch (InterruptedException e) {
LOGGER.error(
"Replica {} was interrupted in sheep when draining pending queries", replicaTag);
Expand Down
37 changes: 32 additions & 5 deletions java/serve/src/main/java/io/ray/serve/RayServeWrappedReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.runtime.serializer.MessagePackSerializer;
import io.ray.serve.api.Serve;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.util.BackendConfigUtil;
import io.ray.serve.util.ReflectUtil;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -19,11 +23,17 @@ public RayServeWrappedReplica(
String backendTag,
String replicaTag,
String backendDef,
Object[] initArgs,
BackendConfig backendConfig,
byte[] initArgsbytes,
byte[] backendConfigBytes,
String controllerName)
throws ClassNotFoundException, NoSuchMethodException, InstantiationException,
IllegalAccessException, IllegalArgumentException, InvocationTargetException {
IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException {

// Parse BackendConfig.
BackendConfig backendConfig = BackendConfigUtil.parseFrom(backendConfigBytes);

// Parse init args.
Object[] initArgs = parseInitArgs(initArgsbytes, backendConfig);

// Instantiate the object defined by backendDef.
Class backendClass = Class.forName(backendDef);
Expand All @@ -43,6 +53,23 @@ public RayServeWrappedReplica(
backend = new RayServeReplica(callable, backendConfig, optional.get());
}

private Object[] parseInitArgs(byte[] initArgsbytes, BackendConfig backendConfig)
throws IOException {

if (initArgsbytes == null || initArgsbytes.length == 0) {
return new Object[0];
}

if (!backendConfig.getIsCrossLanguage()) {
// If the construction request is from Java API, deserialize initArgsbytes to Object[]
// directly.
return MessagePackSerializer.decode(initArgsbytes, Object[].class);
} else {
// For other language like Python API, not support Array type.
return new Object[] {MessagePackSerializer.decode(initArgsbytes, Object.class)};
}
}

/**
* The entry method to process the request.
*
Expand All @@ -51,7 +78,7 @@ public RayServeWrappedReplica(
* backendDef.
* @return the result of request being processed
*/
public Object handle_request(RequestMetadata requestMetadata, Object[] requestArgs) {
public Object handleRequest(RequestMetadata requestMetadata, Object[] requestArgs) {
return backend.handleRequest(new Query(requestArgs, requestMetadata));
}

Expand All @@ -61,7 +88,7 @@ public void ready() {
}

/** Wait until there is no request in processing. It is used for stopping replica gracefully. */
public void drain_pending_queries() {
public void drainPendingQueries() {
backend.drainPendingQueries();
}
}
2 changes: 1 addition & 1 deletion java/serve/src/main/java/io/ray/serve/RequestMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class RequestMetadata implements Serializable {

private String endpoint;

private String callMethod = "__call__";
private String callMethod = "call";

private String httpMethod;

Expand Down
76 changes: 76 additions & 0 deletions java/serve/src/main/java/io/ray/serve/util/BackendConfigUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.ray.serve.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.runtime.serializer.MessagePackSerializer;
import io.ray.serve.RayServeException;
import io.ray.serve.generated.BackendConfig;
import io.ray.serve.generated.BackendLanguage;

public class BackendConfigUtil {

public static BackendConfig parseFrom(byte[] backendConfigBytes)
throws InvalidProtocolBufferException {

// Parse BackendConfig from byte[].
BackendConfig inputBackendConfig = BackendConfig.parseFrom(backendConfigBytes);
if (inputBackendConfig == null) {
return null;
}

// Set default values.
BackendConfig.Builder builder = BackendConfig.newBuilder();

if (inputBackendConfig.getNumReplicas() == 0) {
builder.setNumReplicas(1);
} else {
builder.setNumReplicas(inputBackendConfig.getNumReplicas());
}

Preconditions.checkArgument(
inputBackendConfig.getMaxConcurrentQueries() >= 0, "max_concurrent_queries must be >= 0");
if (inputBackendConfig.getMaxConcurrentQueries() == 0) {
builder.setMaxConcurrentQueries(100);
} else {
builder.setMaxConcurrentQueries(inputBackendConfig.getMaxConcurrentQueries());
}

builder.setUserConfig(inputBackendConfig.getUserConfig());

if (inputBackendConfig.getExperimentalGracefulShutdownWaitLoopS() == 0) {
builder.setExperimentalGracefulShutdownWaitLoopS(2);
} else {
builder.setExperimentalGracefulShutdownWaitLoopS(
inputBackendConfig.getExperimentalGracefulShutdownWaitLoopS());
}

if (inputBackendConfig.getExperimentalGracefulShutdownTimeoutS() == 0) {
builder.setExperimentalGracefulShutdownTimeoutS(20);
} else {
builder.setExperimentalGracefulShutdownTimeoutS(
inputBackendConfig.getExperimentalGracefulShutdownTimeoutS());
}

builder.setIsCrossLanguage(inputBackendConfig.getIsCrossLanguage());

if (inputBackendConfig.getBackendLanguage() == BackendLanguage.UNRECOGNIZED) {
throw new RayServeException(
LogUtil.format(
"Unrecognized backend language {}. Backend language must be in {}.",
inputBackendConfig.getBackendLanguageValue(),
Lists.newArrayList(BackendLanguage.values())));
} else {
builder.setBackendLanguage(inputBackendConfig.getBackendLanguage());
}

return builder.build();
}

public static Object getUserConfig(BackendConfig backendConfig) {
if (backendConfig.getUserConfig() == null || backendConfig.getUserConfig().size() == 0) {
return null;
}
return MessagePackSerializer.decode(backendConfig.getUserConfig().toByteArray(), Object.class);
}
}
Loading

0 comments on commit 12bd904

Please sign in to comment.