Skip to content

Commit

Permalink
Merge pull request #2197 from mkouba/aws-lambda-cleanup
Browse files Browse the repository at this point in the history
Amazon Lambda - minor cleanup
  • Loading branch information
gsmet authored Apr 25, 2019
2 parents 39fe286 + 1a72573 commit e3c431b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ List<AmazonLambdaBuildItem> process(List<AmazonLambdaClassNameBuildItem> items,
List<AmazonLambdaBuildItem> ret = new ArrayList<>();
for (AmazonLambdaClassNameBuildItem i : items) {
ret.add(new AmazonLambdaBuildItem(i.getClassName(),
template.discoverParameterTypes((Class<? extends RequestHandler>) context.classProxy(i.getClassName()))));
template.discoverParameterTypes(
(Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getClassName()))));
}
return ret;
}
Expand Down Expand Up @@ -119,7 +120,8 @@ public void servlets(List<AmazonLambdaBuildItem> lambdas,
}
AmazonLambdaBuildItem lambda = lambdas.get(0);

template.start((Class<? extends RequestHandler>) context.classProxy(lambda.getHandlerClass()), shutdownContextBuildItem,
template.start((Class<? extends RequestHandler<?, ?>>) context.classProxy(lambda.getHandlerClass()),
shutdownContextBuildItem,
lambda.getTargetType(), beanContainerBuildItem.getValue());

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.quarkus.amazon.lambda.runtime;

import java.net.MalformedURLException;
import java.net.URL;

/**
* Various constants and util methods used for communication with the AWS API.
*/
public class AmazonLambdaApi {

// Response Headers
public static final String LAMBDA_RUNTIME_AWS_REQUEST_ID = "Lambda-Runtime-Aws-Request-Id";
public static final String LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN = "Lambda-Runtime-Invoked-Function-Arn";
public static final String LAMBDA_RUNTIME_COGNITO_IDENTITY = "Lambda-Runtime-Cognito-Identity";
public static final String LAMBDA_RUNTIME_CLIENT_CONTEXT = "Lambda-Runtime-Client-Context";
public static final String LAMBDA_RUNTIME_DEADLINE_MS = "Lambda-Runtime-Deadline-Ms";

// Test API
public static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

// API paths
public static final String API_PROTOCOL = "http://";
public static final String API_PATH_RUNTIME = "/2018-06-01/runtime/";
public static final String API_PATH_INVOCATION = API_PATH_RUNTIME + "invocation/";
public static final String API_PATH_INVOCATION_NEXT = API_PATH_INVOCATION + "next";
public static final String API_PATH_INIT_ERROR = API_PATH_RUNTIME + "init/error";
public static final String API_PATH_ERROR = "/error";
public static final String API_PATH_RESPONSE = "/response";

static URL invocationNext() throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION_NEXT);
}

static URL invocationError(String requestId) throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION + requestId + API_PATH_ERROR);
}

static URL invocationResponse(String requestId) throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION + requestId + API_PATH_RESPONSE);
}

static URL initError() throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INIT_ERROR);
}

static String logGroupName() {
return System.getenv("AWS_LAMBDA_LOG_GROUP_NAME");
}

static String functionMemorySize() {
return System.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
}

static String logStreamName() {
return System.getenv("AWS_LAMBDA_LOG_STREAM_NAME");
}

static String functionName() {
return System.getenv("AWS_LAMBDA_FUNCTION_NAME");
}

static String functionVersion() {
return System.getenv("AWS_LAMBDA_FUNCTION_VERSION");
}

private static String runtimeApi() {
String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API);
if (testApi != null) {
return testApi;
}
return System.getenv("AWS_LAMBDA_RUNTIME_API");
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package io.quarkus.amazon.lambda.runtime;

import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_CLIENT_CONTEXT;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_COGNITO_IDENTITY;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_DEADLINE_MS;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionMemorySize;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionName;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionVersion;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.logGroupName;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.logStreamName;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Date;
import java.util.StringJoiner;

import com.amazonaws.services.lambda.runtime.ClientContext;
import com.amazonaws.services.lambda.runtime.CognitoIdentity;
Expand All @@ -28,27 +37,27 @@ public class AmazonLambdaContext implements Context {

public AmazonLambdaContext(HttpURLConnection request, ObjectReader cognitoReader, ObjectReader clientCtxReader)
throws IOException {
awsRequestId = request.getHeaderField("Lambda-Runtime-Aws-Request-Id");
logGroupName = System.getenv("AWS_LAMBDA_LOG_GROUP_NAME");
logStreamName = System.getenv("AWS_LAMBDA_LOG_STREAM_NAME");
functionName = System.getenv("AWS_LAMBDA_FUNCTION_NAME");
functionVersion = System.getenv("AWS_LAMBDA_FUNCTION_VERSION");
invokedFunctionArn = request.getHeaderField("Lambda-Runtime-Invoked-Function-Arn");

String cognitoIdentityHeader = request.getHeaderField("Lambda-Runtime-Cognito-Identity");
awsRequestId = request.getHeaderField(LAMBDA_RUNTIME_AWS_REQUEST_ID);
logGroupName = logGroupName();
logStreamName = logStreamName();
functionName = functionName();
functionVersion = functionVersion();
invokedFunctionArn = request.getHeaderField(LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN);

String cognitoIdentityHeader = request.getHeaderField(LAMBDA_RUNTIME_COGNITO_IDENTITY);
if (cognitoIdentityHeader != null) {
cognitoIdentity = cognitoReader.readValue(cognitoIdentityHeader);
}

String clientContextHeader = request.getHeaderField("Lambda-Runtime-Client-Context");
String clientContextHeader = request.getHeaderField(LAMBDA_RUNTIME_CLIENT_CONTEXT);
if (clientContextHeader != null) {
clientContext = clientCtxReader.readValue(clientContextHeader);
}

String functionMemorySize = System.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
String functionMemorySize = functionMemorySize();
memoryLimitInMB = functionMemorySize != null ? Integer.valueOf(functionMemorySize) : 0;

String runtimeDeadline = request.getHeaderField("Lambda-Runtime-Deadline-Ms");
String runtimeDeadline = request.getHeaderField(LAMBDA_RUNTIME_DEADLINE_MS);
if (runtimeDeadline != null) {
runtimeDeadlineMs = Long.valueOf(runtimeDeadline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public class AmazonLambdaTemplate {

private static final Logger log = Logger.getLogger(AmazonLambdaTemplate.class);

protected static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

public void start(Class<? extends RequestHandler> handlerClass,
@SuppressWarnings("rawtypes")
public void start(Class<? extends RequestHandler<?, ?>> handlerClass,
ShutdownContext context,
RuntimeValue<Class<?>> handlerType,
BeanContainer beanContainer) {

RequestHandler handler = beanContainer.instance(handlerClass);

final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand All @@ -47,17 +47,17 @@ public void run() {
});

Thread t = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {

try {
URL requestUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/next");
URL requestUrl = AmazonLambdaApi.invocationNext();
while (running.get()) {

HttpURLConnection requestConnection = (HttpURLConnection) requestUrl.openConnection();
try {
String requestId = requestConnection.getHeaderField("Lambda-Runtime-Aws-Request-Id");
String requestId = requestConnection.getHeaderField(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID);
Object response;
try {
Object val = objectReader.readValue(requestConnection.getInputStream());
Expand All @@ -66,31 +66,24 @@ public void run() {
} catch (Exception e) {
log.error("Failed to run lambda", e);
FunctionError fe = new FunctionError(e.getClass().getName(), e.getMessage());
URL responseUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/"
+ requestId + "/error");

URL responseUrl = AmazonLambdaApi.invocationError(requestId);
HttpURLConnection responseConnection = (HttpURLConnection) responseUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
mapper.writeValue(responseConnection.getOutputStream(), fe);
while (responseConnection.getInputStream().read() != -1) {

// Read data
}

continue;
}

URL responseUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/"
+ requestId + "/response");

URL responseUrl = AmazonLambdaApi.invocationResponse(requestId);
HttpURLConnection responseConnection = (HttpURLConnection) responseUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
mapper.writeValue(responseConnection.getOutputStream(), response);
while (responseConnection.getInputStream().read() != -1) {

// Read data
}
} catch (Exception e) {
log.error("Error running lambda", e);
Expand All @@ -108,8 +101,7 @@ public void run() {
} catch (Exception e) {
try {
log.error("Lambda init error", e);
URL errorUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/init/error");
URL errorUrl = AmazonLambdaApi.initError();
HttpURLConnection responseConnection = (HttpURLConnection) errorUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
Expand All @@ -134,15 +126,7 @@ public void run() {

}

private String runtimeApi() {
String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API);
if (testApi != null) {
return testApi;
}
return System.getenv("AWS_LAMBDA_RUNTIME_API");
}

public RuntimeValue<Class<?>> discoverParameterTypes(Class<? extends RequestHandler> handlerClass) {
public RuntimeValue<Class<?>> discoverParameterTypes(Class<? extends RequestHandler<?, ?>> handlerClass) {
final Method[] methods = handlerClass.getMethods();
Method method = null;
for (int i = 0; i < methods.length && method == null; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.quarkus.amazon.lambda.test;

@SuppressWarnings("serial")
public class LambdaException extends RuntimeException {

final String type;

public LambdaException(String type, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.AmazonLambdaApi;
import io.quarkus.amazon.lambda.runtime.FunctionError;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.undertow.Undertow;
Expand All @@ -20,8 +21,6 @@

public class LambdaResourceManager implements QuarkusTestResourceLifecycleManager {

protected static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

private volatile Undertow undertow;

public static final int PORT = Integer.getInteger("quarkus-internal.aws-lambda.test-port", 5387);
Expand All @@ -30,7 +29,7 @@ public class LambdaResourceManager implements QuarkusTestResourceLifecycleManage
public Map<String, String> start() {

RoutingHandler routingHandler = new RoutingHandler(true);
routingHandler.add("GET", "/2018-06-01/runtime/invocation/next", new HttpHandler() {
routingHandler.add("GET", AmazonLambdaApi.API_PATH_INVOCATION_NEXT, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
LambdaStartedNotifier.started = true;
Expand All @@ -41,44 +40,46 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
return;
}
}
exchange.getResponseHeaders().put(new HttpString("Lambda-Runtime-Aws-Request-Id"), req.id);
exchange.getResponseHeaders().put(new HttpString(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID), req.id);
exchange.getResponseSender().send(req.json);
}
});
routingHandler.add("POST", "/2018-06-01/runtime/invocation/{req}/response", new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INVOCATION + "{req}" + AmazonLambdaApi.API_PATH_RESPONSE,
new HttpHandler() {
@Override
public void handle(HttpServerExchange exchange, String message) {
LambdaClient.REQUESTS.get(id).complete(message);
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
@Override
public void handle(HttpServerExchange exchange, String message) {
LambdaClient.REQUESTS.get(id).complete(message);
}
});
}
});
}
});

routingHandler.add("POST", "/2018-06-01/runtime/invocation/{req}/error", new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INVOCATION + "{req}" + AmazonLambdaApi.API_PATH_ERROR,
new HttpHandler() {
@Override
public void handle(HttpServerExchange exchange, String message) {
ObjectMapper mapper = new ObjectMapper();
try {
FunctionError result = mapper.readerFor(FunctionError.class).readValue(message);
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
@Override
public void handle(HttpServerExchange exchange, String message) {
ObjectMapper mapper = new ObjectMapper();
try {
FunctionError result = mapper.readerFor(FunctionError.class).readValue(message);

LambdaClient.REQUESTS.get(id).completeExceptionally(
new LambdaException(result.getErrorType(), result.getErrorMessage()));
} catch (IOException e) {
throw new RuntimeException(e);
}
LambdaClient.REQUESTS.get(id).completeExceptionally(
new LambdaException(result.getErrorType(), result.getErrorMessage()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
});
}
});
routingHandler.add("POST", "/2018-06-01/runtime/init/error", new HttpHandler() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INIT_ERROR, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
Expand All @@ -103,7 +104,7 @@ public void handle(HttpServerExchange exchange, String message) {
.setHandler(new BlockingHandler(routingHandler))
.build();
undertow.start();
return Collections.singletonMap(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, "localhost:" + PORT);
return Collections.singletonMap(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, "localhost:" + PORT);
}

@Override
Expand Down

0 comments on commit e3c431b

Please sign in to comment.