Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5] feat(server): Add basic Jetty server support for Graviton #31

Merged
merged 12 commits into from
May 30, 2023
2 changes: 2 additions & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies {
}
implementation(libs.substrait.java.core) {
exclude("org.slf4j")
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
}
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
Expand Down
13 changes: 12 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
junit = "5.8.1"
protoc = "3.17.3"
substrait = "0.9.0"
jackson = "2.13.4"
jackson = "2.14.1"
guava = "29.0-jre"
lombok = "1.18.20"
slf4j = "2.0.7"
log4j = "2.20.0"
jetty = "9.4.51.v20230217"
jersey = "2.39.1"

protobuf-plugin = "0.9.2"
spotless-plugin = '6.11.0'
Expand All @@ -31,9 +33,18 @@ log4j-slf4j2-impl = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-i
log4j-api = { group = "org.apache.logging.log4j", name = "log4j-api", version.ref = "log4j" }
log4j-core = { group = "org.apache.logging.log4j", name = "log4j-core", version.ref = "log4j" }
log4j-12-api = { group = "org.apache.logging.log4j", name = "log4j-1.2-api", version.ref = "log4j" }
jetty-server = { group = "org.eclipse.jetty", name = "jetty-server", version.ref = "jetty" }
jetty-servlet = { group = "org.eclipse.jetty", name = "jetty-servlet", version.ref = "jetty" }
jersey-server = { group = "org.glassfish.jersey.core", name = "jersey-server", version.ref = "jersey" }
jersey-container-servlet-core = { group = "org.glassfish.jersey.containers", name = "jersey-container-servlet-core", version.ref = "jersey" }
jersey-container-jetty-http = { group = "org.glassfish.jersey.containers", name = "jersey-container-jetty-http", version.ref = "jersey" }
jersey-media-json-jackson = { group = "org.glassfish.jersey.media", name = "jersey-media-json-jackson", version.ref = "jersey" }
jersey-hk2 = { group = "org.glassfish.jersey.inject", name = "jersey-hk2", version.ref = "jersey" }

[bundles]
log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"]
jetty = ["jetty-server", "jetty-servlet"]
jersey = ["jersey-server", "jersey-container-servlet-core", "jersey-container-jetty-http", "jersey-media-json-jackson", "jersey-hk2"]

[plugins]
protobuf = { id = "com.google.protobuf", version.ref = "protobuf-plugin" }
Expand Down
2 changes: 2 additions & 0 deletions schema/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ dependencies {
implementation(libs.protobuf.java)
implementation(libs.substrait.java.core) {
exclude("org.slf4j")
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
}
}

Expand Down
27 changes: 27 additions & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
plugins {
`maven-publish`
id("java")
id("idea")
id("com.diffplug.spotless")
}

dependencies {
implementation(project(":core"));
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)

compileOnly(libs.lombok)
annotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)
testAnnotationProcessor(libs.lombok)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.datastrato.graviton.server;

public class GravitonServerException extends RuntimeException {

public GravitonServerException(String exception) {
super(exception);
}

public GravitonServerException(String exception, Throwable cause) {
super(exception, cause);
}

public GravitonServerException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.datastrato.graviton.server;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.config.ConfigBuilder;
import com.datastrato.graviton.config.ConfigEntry;

public class ServerConfig extends Config {

public static final ConfigEntry<String> WEBSERVER_HOST =
new ConfigBuilder("graviton.server.webserver.host")
.doc("The host name of the built-in web server")
.version("0.1.0")
.stringConf()
.createWithDefault("0.0.0.0");

public static final ConfigEntry<Integer> WEBSERVER_HTTP_PORT =
new ConfigBuilder("graviton.server.webserver.httpPort")
.doc("The http port number of the built-in web server")
.version("0.1.0")
.intConf()
.createWithDefault(8090);

public static final ConfigEntry<Integer> WEBSERVER_CORE_THREADS =
new ConfigBuilder("graviton.server.webserver.coreThreads")
.doc("The core thread size of the built-in web server")
.version("0.1.0")
.intConf()
.createWithDefault(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100));

public static final ConfigEntry<Integer> WEBSERVER_MAX_THREADS =
new ConfigBuilder("graviton.server.webserver.maxThreads")
.doc("The max thread size of the built-in web server")
.version("0.1.0")
.intConf()
.createWithDefault(Math.max(Runtime.getRuntime().availableProcessors() * 4, 400));

public static final ConfigEntry<Long> WEBSERVER_STOP_IDLE_TIMEOUT =
new ConfigBuilder("graviton.server.webserver.stopIdleTimeout")
.doc("The stop idle timeout of the built-in web server")
.version("0.1.0")
.longConf()
.createWithDefault(30 * 1000L);

public static final ConfigEntry<Integer> WEBSERVER_REQUEST_HEADER_SIZE =
new ConfigBuilder("graviton.server.webserver.requestHeaderSize")
.doc("The request header size of the built-in web server")
.version("0.1.0")
.intConf()
.createWithDefault(128 * 1024);

public static final ConfigEntry<Integer> WEBSERVER_RESPONSE_HEADER_SIZE =
new ConfigBuilder("graviton.server.webserver.responseHeaderSize")
.doc("The response header size of the built-in web server")
.version("0.1.0")
.intConf()
.createWithDefault(128 * 1024);

public ServerConfig(boolean loadDefaults) {
super(loadDefaults);
}

public ServerConfig() {
this(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package com.datastrato.graviton.server.web;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.server.GravitonServerException;
import com.datastrato.graviton.server.ServerConfig;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.BindException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.Servlet;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class JettyServer {

private static final Logger LOG = LoggerFactory.getLogger(JettyServer.class);

private Server server;

private String host;

private int httpPort;

private ServletContextHandler servletContextHandler;

public JettyServer() {}

public synchronized void initialize(Config config) {
int coreThreads = config.get(ServerConfig.WEBSERVER_CORE_THREADS);
int maxThreads = config.get(ServerConfig.WEBSERVER_MAX_THREADS);
ExecutorThreadPool threadPool = createThreadPool(coreThreads, maxThreads);

// Create and config Jetty Server
server = new Server(threadPool);
server.setStopAtShutdown(true);
server.setStopTimeout(config.get(ServerConfig.WEBSERVER_STOP_IDLE_TIMEOUT));

// Set error handler for Jetty Server
ErrorHandler errorHandler = new ErrorHandler();
errorHandler.setShowStacks(true);
errorHandler.setServer(server);
server.addBean(errorHandler);

// Create and set Http ServerConnector
int reqHeaderSize = config.get(ServerConfig.WEBSERVER_REQUEST_HEADER_SIZE);
int respHeaderSize = config.get(ServerConfig.WEBSERVER_RESPONSE_HEADER_SIZE);
host = config.get(ServerConfig.WEBSERVER_HOST);
httpPort = config.get(ServerConfig.WEBSERVER_HTTP_PORT);
ServerConnector httpConnector =
createHttpServerConnector(server, reqHeaderSize, respHeaderSize, host, httpPort);
server.addConnector(httpConnector);

// TODO. Create and set https connector @jerry

// Initialize ServletContextHandler
initializeServletContextHandler(server);
}

public synchronized void start() throws GravitonServerException {
try {
server.start();
} catch (BindException e) {
LOG.error(
"Failed to start web server on host {} port {}, which is already in use.",
host,
httpPort,
e);
throw new GravitonServerException("Failed to start web server.", e);

} catch (Exception e) {
LOG.error("Failed to start web server.", e);
throw new GravitonServerException("Failed to start web server.", e);
}

LOG.info("Graviton web server started on host {} port {}.", host, httpPort);
}

public synchronized void join() {
try {
server.join();
} catch (InterruptedException e) {
LOG.info("Interrupted while web server is joining.");
}
}

public synchronized void stop() {
if (server != null) {
try {
// Referring from Spark's implementation to avoid the issues.
ThreadPool threadPool = server.getThreadPool();
if (threadPool instanceof QueuedThreadPool) {
((QueuedThreadPool) threadPool).setStopTimeout(0);
}

server.stop();

if (threadPool instanceof LifeCycle) {
((LifeCycle) threadPool).stop();
}

LOG.info("Graviton web server stopped on host {} port {}.", host, httpPort);
} catch (Exception e) {
// Swallow the exception.
LOG.warn("Failed to stop web server.", e);
}

server = null;
}
}

public void addServlet(Servlet servlet, String pathSpec) {
servletContextHandler.addServlet(new ServletHolder(servlet), pathSpec);
}

private void initializeServletContextHandler(Server server) {
this.servletContextHandler = new ServletContextHandler();
servletContextHandler.setContextPath("/");
servletContextHandler.addServlet(DefaultServlet.class, "/");

HandlerCollection handlers = new HandlerCollection();
handlers.addHandler(servletContextHandler);

server.setHandler(handlers);
}

private ServerConnector createHttpServerConnector(
Server server, int reqHeaderSize, int respHeaderSize, String host, int port) {
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setRequestHeaderSize(reqHeaderSize);
httpConfig.setResponseHeaderSize(respHeaderSize);
httpConfig.setSendServerVersion(true);

HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
ServerConnector connector =
creatorServerConnector(server, new ConnectionFactory[] {httpConnectionFactory});
connector.setHost(host);
connector.setPort(port);
connector.setReuseAddress(true);

return connector;
}

private ServerConnector creatorServerConnector(
Server server, ConnectionFactory[] connectionFactories) {
Scheduler serverExecutor =
new ScheduledExecutorScheduler("graviton-webserver-JettyScheduler", true);

return new ServerConnector(server, null, serverExecutor, null, -1, -1, connectionFactories);
}

private ExecutorThreadPool createThreadPool(int coreThreads, int maxThreads) {
return new ExecutorThreadPool(
new ThreadPoolExecutor(
coreThreads,
maxThreads,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("jetty-webserver-%d")
.build()));
}
}