Skip to content

Commit

Permalink
Modularize and refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
Ananto30 committed Apr 2, 2021
1 parent 31b5fdc commit 8da4506
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 75 deletions.
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ FROM openjdk:11-jre-slim

COPY target/starter-1.0.0-SNAPSHOT-fat.jar /app.jar

ENV CLUSTER_PUBLIC_PORT 17001
ENV CLUSTER_PUBLIC_PORT 15701
ENV CLUSTER_PUBLIC_HOST 192.168.0.100

CMD java -jar /app.jar -cluster -cluster-public-port $CLUSTER_PUBLIC_PORT -cluster-public-host $CLUSTER_PUBLIC_HOST -instance 4
#CMD java -jar /app.jar -cluster -cluster-public-port $CLUSTER_PUBLIC_PORT -cluster-public-host $CLUSTER_PUBLIC_HOST -instance 4
#CMD java -jar /app.jar -cluster -cluster-port $CLUSTER_PUBLIC_PORT -cluster-host $CLUSTER_PUBLIC_HOST -instance 4
#CMD java -jar /app.jar -cluster -cluster-host $CLUSTER_PUBLIC_HOST -instance 4
CMD java -jar /app.jar -cluster -instance 4
4 changes: 3 additions & 1 deletion java_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

./mvnw clean package -Dmaven.test.skip=true

java -jar target/starter-1.0.0-SNAPSHOT-fat.jar -cluster -cluster-host 192.168.0.100 -instance 4
#java -jar target/starter-1.0.0-SNAPSHOT-fat.jar -cluster -cluster-host 192.168.0.100 -instance 4

java -jar target/starter-1.0.0-SNAPSHOT-fat.jar -cluster -cluster-public-host 192.168.0.100 -cluster-public-port 15701 -instance 4
8 changes: 5 additions & 3 deletions make_two_docker_instances.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ docker rm -f vertx-async-gateway-2
./mvnw clean package -Dmaven.test.skip=true

docker build -t ananto30/vertx-example:vertx-async-gateway .
docker push ananto30/vertx-example:vertx-async-gateway
#docker push ananto30/vertx-example:vertx-async-gateway

docker run --name vertx-async-gateway-1 -p 8888:8888 -e CLUSTER_PUBLIC_HOST=192.168.0.119 -d ananto30/vertx-example:vertx-async-gateway
docker run --name vertx-async-gateway-2 -p 8889:8888 -e CLUSTER_PUBLIC_HOST=192.168.0.100 -d vertx-async-gateway
docker run --name vertx-async-gateway-1 -p 8888:8888 -p 5701:5701 -e CLUSTER_PUBLIC_HOST=192.168.0.100 -d ananto30/vertx-example:vertx-async-gateway
docker run --name vertx-async-gateway-2 -p 8889:8888 -e CLUSTER_PUBLIC_HOST=192.168.0.100 -d ananto30/vertx-example:vertx-async-gateway

# https://github.com/docker/for-mac/issues/67#issuecomment-241997148
18 changes: 18 additions & 0 deletions src/main/java/com/ananto/asyncgateway/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ananto.asyncgateway;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/


public enum Constants {
ASYNC_RESULT_EVENT("ASYNC_RESULT_EVENT"),
ASYNC_CALLBACK_IDENTIFIER("id");

public final String val;

Constants(String val) {
this.val = val;
}
}
89 changes: 20 additions & 69 deletions src/main/java/com/ananto/asyncgateway/MainVerticle.java
Original file line number Diff line number Diff line change
@@ -1,90 +1,41 @@
package com.ananto.asyncgateway;

import com.ananto.asyncgateway.verticles.AsyncVerticle;
import com.ananto.asyncgateway.verticles.WebVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.LoggerHandler;
import io.vertx.ext.web.handler.TimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;

public class MainVerticle extends AbstractVerticle {

private static final int PORT = 8888;
private static final String ASYNC_RESULT_EVENT = "async.result";
private static final long DEFAULT_REQUEST_TIMEOUT = 10000L;

private static final Logger logger = LoggerFactory.getLogger(MainVerticle.class);


@Override
public void start(Promise<Void> startPromise) throws Exception {

// keep track of the requests (actually response), to send them later when arrives in callback
ConcurrentHashMap<String, HttpServerResponse> requests = new ConcurrentHashMap<>();

HttpServer server = vertx.createHttpServer();

Router router = Router.router(vertx);
router.route().handler(LoggerHandler.create());
router.route().handler(BodyHandler.create());
router.route().handler(TimeoutHandler.create(DEFAULT_REQUEST_TIMEOUT)); // change default timeout to 10sec
router.route().failureHandler(ctx -> {
logger.error(ctx.failure().getMessage(), ctx.failure());
ctx.next();
});

router.get("/health").handler(ctx -> {
HttpServerResponse response = ctx.response();
response.end("OK");
});

router.get("/async/:id").blockingHandler(ctx -> asyncHandler(ctx, requests));
router.post("/callback/:id").handler(this::callbackHandler);

server.requestHandler(router).listen(PORT, http -> {
if (http.succeeded()) {
startPromise.complete();
logger.info("HTTP server started on port " + PORT);
requestProcessor(requests);
} else {
startPromise.fail(http.cause());
}
CompositeFuture.all(
deploy(WebVerticle.class.getName()),
deploy(AsyncVerticle.class.getName())
).onComplete(result -> {
if (result.succeeded()) startPromise.complete();
else startPromise.fail(result.cause());
});
}

private void requestProcessor(ConcurrentHashMap<String, HttpServerResponse> requests) {
vertx.eventBus().consumer(ASYNC_RESULT_EVENT).handler(data -> {
var body = (JsonObject) data.body();
var id = body.getString("id");
logger.info(String.format("Event received | %s : %s%n", id, body));
var entry = requests.get(id);
if (entry != null) {
if (entry.ended()) requests.remove(id);// timout request
else entry.end(body.toString());
private Future<Void> deploy(String name) {
final Promise<Void> promise = Promise.promise();
vertx.deployVerticle(name, res -> {
if (res.failed()) {
logger.error("Failed to deploy verticle " + name);
promise.fail(res.cause());
} else {
logger.info("Deployed verticle " + name);
promise.complete();
}
});
}

private void callbackHandler(io.vertx.ext.web.RoutingContext ctx) {
var body = ctx.getBodyAsJson();
var id = ctx.pathParam("id");
body.put("id", id);
vertx.eventBus().publish(ASYNC_RESULT_EVENT, body);
ctx.response().end("OK");
}

private void asyncHandler(io.vertx.ext.web.RoutingContext ctx, ConcurrentHashMap<String, HttpServerResponse> requests) {
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "application/json");
var id = ctx.pathParam("id");
requests.put(id, response);
return promise.future();
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ananto.asyncgateway.handlers;

import com.ananto.asyncgateway.Constants;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/
public class CallbackHandler implements Handler<RoutingContext> {
@Override
public void handle(RoutingContext routingContext) {
var body = routingContext.getBodyAsJson();
routingContext.vertx().eventBus().publish(Constants.ASYNC_RESULT_EVENT.val, body);
routingContext.response().end("OK");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.ananto.asyncgateway.handlers;

import com.ananto.asyncgateway.store.Store;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/
public abstract class CommonAsyncHandler implements Handler<RoutingContext> {

abstract String handleRequest(RoutingContext routingContext);

@Override
public void handle(RoutingContext routingContext) {
HttpServerResponse response = routingContext.response();
response.putHeader("content-type", "application/json"); // Please remove this if all responses are not json
var id = handleRequest(routingContext);
Store.requests.put(id, response);
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/ananto/asyncgateway/handlers/ExampleHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ananto.asyncgateway.handlers;

import com.ananto.asyncgateway.Constants;
import io.vertx.ext.web.RoutingContext;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/
public class ExampleHandler extends CommonAsyncHandler {

@Override
String handleRequest(RoutingContext routingContext) {
return routingContext.pathParam(Constants.ASYNC_CALLBACK_IDENTIFIER.val);
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/ananto/asyncgateway/store/Store.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ananto.asyncgateway.store;

import io.vertx.core.http.HttpServerResponse;

import java.util.concurrent.ConcurrentHashMap;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/
public class Store {

// keep track of the requests (actually response), to send them later when arrives in callback
public static ConcurrentHashMap<String, HttpServerResponse> requests = new ConcurrentHashMap<>();
}
30 changes: 30 additions & 0 deletions src/main/java/com/ananto/asyncgateway/verticles/AsyncVerticle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.ananto.asyncgateway.verticles;

import com.ananto.asyncgateway.Constants;
import com.ananto.asyncgateway.store.Store;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/

public class AsyncVerticle extends AbstractVerticle {

private static final Logger logger = LoggerFactory.getLogger(AsyncVerticle.class);

@Override
public void start() throws Exception {
vertx.eventBus().consumer(Constants.ASYNC_RESULT_EVENT.val).handler(data -> {
var body = (JsonObject) data.body();
var id = body.getString(Constants.ASYNC_CALLBACK_IDENTIFIER.val);
logger.info(String.format("Event received | %s : %s%n", id, body));
var request = Store.requests.remove(id);
if (request != null) request.end(body.toBuffer());
});
}

}
60 changes: 60 additions & 0 deletions src/main/java/com/ananto/asyncgateway/verticles/WebVerticle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.ananto.asyncgateway.verticles;

import com.ananto.asyncgateway.handlers.CallbackHandler;
import com.ananto.asyncgateway.handlers.ExampleHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.LoggerHandler;
import io.vertx.ext.web.handler.TimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Azizul Haque Ananto
* @since 2/4/21
*/
public class WebVerticle extends AbstractVerticle {

private static final int PORT = 8888;
private static final long DEFAULT_REQUEST_TIMEOUT = 10000L;

private static final Logger logger = LoggerFactory.getLogger(WebVerticle.class);

@Override
public void start(Promise<Void> startPromise) throws Exception {

HttpServer server = vertx.createHttpServer();

Router router = Router.router(vertx);
router.route().handler(LoggerHandler.create());
router.route().handler(BodyHandler.create());
router.route().handler(TimeoutHandler.create(DEFAULT_REQUEST_TIMEOUT)); // change default timeout to 10sec
router.route().failureHandler(ctx -> {
logger.error(ctx.failure().getMessage(), ctx.failure());
ctx.next();
});

router.get("/health").handler(ctx -> {
HttpServerResponse response = ctx.response();
response.end("OK");
});

router.get("/async/:id").blockingHandler(new ExampleHandler());
router.post("/callback").handler(new CallbackHandler());

server.requestHandler(router).listen(PORT, http -> {
if (http.succeeded()) {
startPromise.complete();
logger.info("HTTP server started on port " + PORT);
} else {
startPromise.fail(http.cause());
}
});

}

}

0 comments on commit 8da4506

Please sign in to comment.