Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazon Lambda - minor cleanup #2197

Merged
merged 1 commit into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ List<AmazonLambdaBuildItem> process(List<AmazonLambdaClassNameBuildItem> items,
List<AmazonLambdaBuildItem> ret = new ArrayList<>();
for (AmazonLambdaClassNameBuildItem i : items) {
ret.add(new AmazonLambdaBuildItem(i.getClassName(),
template.discoverParameterTypes((Class<? extends RequestHandler>) context.classProxy(i.getClassName()))));
template.discoverParameterTypes(
(Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getClassName()))));
}
return ret;
}
Expand Down Expand Up @@ -119,7 +120,8 @@ public void servlets(List<AmazonLambdaBuildItem> lambdas,
}
AmazonLambdaBuildItem lambda = lambdas.get(0);

template.start((Class<? extends RequestHandler>) context.classProxy(lambda.getHandlerClass()), shutdownContextBuildItem,
template.start((Class<? extends RequestHandler<?, ?>>) context.classProxy(lambda.getHandlerClass()),
shutdownContextBuildItem,
lambda.getTargetType(), beanContainerBuildItem.getValue());

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.quarkus.amazon.lambda.runtime;

import java.net.MalformedURLException;
import java.net.URL;

/**
* Various constants and util methods used for communication with the AWS API.
*/
public class AmazonLambdaApi {

// Response Headers
public static final String LAMBDA_RUNTIME_AWS_REQUEST_ID = "Lambda-Runtime-Aws-Request-Id";
public static final String LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN = "Lambda-Runtime-Invoked-Function-Arn";
public static final String LAMBDA_RUNTIME_COGNITO_IDENTITY = "Lambda-Runtime-Cognito-Identity";
public static final String LAMBDA_RUNTIME_CLIENT_CONTEXT = "Lambda-Runtime-Client-Context";
public static final String LAMBDA_RUNTIME_DEADLINE_MS = "Lambda-Runtime-Deadline-Ms";

// Test API
public static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

// API paths
public static final String API_PROTOCOL = "http://";
public static final String API_PATH_RUNTIME = "/2018-06-01/runtime/";
public static final String API_PATH_INVOCATION = API_PATH_RUNTIME + "invocation/";
public static final String API_PATH_INVOCATION_NEXT = API_PATH_INVOCATION + "next";
public static final String API_PATH_INIT_ERROR = API_PATH_RUNTIME + "init/error";
public static final String API_PATH_ERROR = "/error";
public static final String API_PATH_RESPONSE = "/response";

static URL invocationNext() throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION_NEXT);
}

static URL invocationError(String requestId) throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION + requestId + API_PATH_ERROR);
}

static URL invocationResponse(String requestId) throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INVOCATION + requestId + API_PATH_RESPONSE);
}

static URL initError() throws MalformedURLException {
return new URL(API_PROTOCOL + runtimeApi() + API_PATH_INIT_ERROR);
}

static String logGroupName() {
return System.getenv("AWS_LAMBDA_LOG_GROUP_NAME");
}

static String functionMemorySize() {
return System.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
}

static String logStreamName() {
return System.getenv("AWS_LAMBDA_LOG_STREAM_NAME");
}

static String functionName() {
return System.getenv("AWS_LAMBDA_FUNCTION_NAME");
}

static String functionVersion() {
return System.getenv("AWS_LAMBDA_FUNCTION_VERSION");
}

private static String runtimeApi() {
String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API);
if (testApi != null) {
return testApi;
}
return System.getenv("AWS_LAMBDA_RUNTIME_API");
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package io.quarkus.amazon.lambda.runtime;

import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_CLIENT_CONTEXT;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_COGNITO_IDENTITY;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_DEADLINE_MS;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionMemorySize;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionName;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.functionVersion;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.logGroupName;
import static io.quarkus.amazon.lambda.runtime.AmazonLambdaApi.logStreamName;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Date;
import java.util.StringJoiner;

import com.amazonaws.services.lambda.runtime.ClientContext;
import com.amazonaws.services.lambda.runtime.CognitoIdentity;
Expand All @@ -28,27 +37,27 @@ public class AmazonLambdaContext implements Context {

public AmazonLambdaContext(HttpURLConnection request, ObjectReader cognitoReader, ObjectReader clientCtxReader)
throws IOException {
awsRequestId = request.getHeaderField("Lambda-Runtime-Aws-Request-Id");
logGroupName = System.getenv("AWS_LAMBDA_LOG_GROUP_NAME");
logStreamName = System.getenv("AWS_LAMBDA_LOG_STREAM_NAME");
functionName = System.getenv("AWS_LAMBDA_FUNCTION_NAME");
functionVersion = System.getenv("AWS_LAMBDA_FUNCTION_VERSION");
invokedFunctionArn = request.getHeaderField("Lambda-Runtime-Invoked-Function-Arn");

String cognitoIdentityHeader = request.getHeaderField("Lambda-Runtime-Cognito-Identity");
awsRequestId = request.getHeaderField(LAMBDA_RUNTIME_AWS_REQUEST_ID);
logGroupName = logGroupName();
logStreamName = logStreamName();
functionName = functionName();
functionVersion = functionVersion();
invokedFunctionArn = request.getHeaderField(LAMBDA_RUNTIME_INVOKED_FUNCTION_ARN);

String cognitoIdentityHeader = request.getHeaderField(LAMBDA_RUNTIME_COGNITO_IDENTITY);
if (cognitoIdentityHeader != null) {
cognitoIdentity = cognitoReader.readValue(cognitoIdentityHeader);
}

String clientContextHeader = request.getHeaderField("Lambda-Runtime-Client-Context");
String clientContextHeader = request.getHeaderField(LAMBDA_RUNTIME_CLIENT_CONTEXT);
if (clientContextHeader != null) {
clientContext = clientCtxReader.readValue(clientContextHeader);
}

String functionMemorySize = System.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
String functionMemorySize = functionMemorySize();
memoryLimitInMB = functionMemorySize != null ? Integer.valueOf(functionMemorySize) : 0;

String runtimeDeadline = request.getHeaderField("Lambda-Runtime-Deadline-Ms");
String runtimeDeadline = request.getHeaderField(LAMBDA_RUNTIME_DEADLINE_MS);
if (runtimeDeadline != null) {
runtimeDeadlineMs = Long.valueOf(runtimeDeadline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public class AmazonLambdaTemplate {

private static final Logger log = Logger.getLogger(AmazonLambdaTemplate.class);

protected static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

public void start(Class<? extends RequestHandler> handlerClass,
@SuppressWarnings("rawtypes")
public void start(Class<? extends RequestHandler<?, ?>> handlerClass,
ShutdownContext context,
RuntimeValue<Class<?>> handlerType,
BeanContainer beanContainer) {

RequestHandler handler = beanContainer.instance(handlerClass);

final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand All @@ -47,17 +47,17 @@ public void run() {
});

Thread t = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {

try {
URL requestUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/next");
URL requestUrl = AmazonLambdaApi.invocationNext();
while (running.get()) {

HttpURLConnection requestConnection = (HttpURLConnection) requestUrl.openConnection();
try {
String requestId = requestConnection.getHeaderField("Lambda-Runtime-Aws-Request-Id");
String requestId = requestConnection.getHeaderField(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID);
Object response;
try {
Object val = objectReader.readValue(requestConnection.getInputStream());
Expand All @@ -66,31 +66,24 @@ public void run() {
} catch (Exception e) {
log.error("Failed to run lambda", e);
FunctionError fe = new FunctionError(e.getClass().getName(), e.getMessage());
URL responseUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/"
+ requestId + "/error");

URL responseUrl = AmazonLambdaApi.invocationError(requestId);
HttpURLConnection responseConnection = (HttpURLConnection) responseUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
mapper.writeValue(responseConnection.getOutputStream(), fe);
while (responseConnection.getInputStream().read() != -1) {

// Read data
}

continue;
}

URL responseUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/invocation/"
+ requestId + "/response");

URL responseUrl = AmazonLambdaApi.invocationResponse(requestId);
HttpURLConnection responseConnection = (HttpURLConnection) responseUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
mapper.writeValue(responseConnection.getOutputStream(), response);
while (responseConnection.getInputStream().read() != -1) {

// Read data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think it reads the data until the end of the stream is reached. I've just added a comment because an empty while loop looks weird to me ;-).

}
} catch (Exception e) {
log.error("Error running lambda", e);
Expand All @@ -108,8 +101,7 @@ public void run() {
} catch (Exception e) {
try {
log.error("Lambda init error", e);
URL errorUrl = new URL(
"http://" + runtimeApi() + "/2018-06-01/runtime/init/error");
URL errorUrl = AmazonLambdaApi.initError();
HttpURLConnection responseConnection = (HttpURLConnection) errorUrl.openConnection();
responseConnection.setDoOutput(true);
responseConnection.setRequestMethod("POST");
Expand All @@ -134,15 +126,7 @@ public void run() {

}

private String runtimeApi() {
String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API);
if (testApi != null) {
return testApi;
}
return System.getenv("AWS_LAMBDA_RUNTIME_API");
}

public RuntimeValue<Class<?>> discoverParameterTypes(Class<? extends RequestHandler> handlerClass) {
public RuntimeValue<Class<?>> discoverParameterTypes(Class<? extends RequestHandler<?, ?>> handlerClass) {
final Method[] methods = handlerClass.getMethods();
Method method = null;
for (int i = 0; i < methods.length && method == null; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.quarkus.amazon.lambda.test;

@SuppressWarnings("serial")
public class LambdaException extends RuntimeException {

final String type;

public LambdaException(String type, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.AmazonLambdaApi;
import io.quarkus.amazon.lambda.runtime.FunctionError;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.undertow.Undertow;
Expand All @@ -20,8 +21,6 @@

public class LambdaResourceManager implements QuarkusTestResourceLifecycleManager {

protected static final String QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API = "quarkus-internal.aws-lambda.test-api";

private volatile Undertow undertow;

public static final int PORT = Integer.getInteger("quarkus-internal.aws-lambda.test-port", 5387);
Expand All @@ -30,7 +29,7 @@ public class LambdaResourceManager implements QuarkusTestResourceLifecycleManage
public Map<String, String> start() {

RoutingHandler routingHandler = new RoutingHandler(true);
routingHandler.add("GET", "/2018-06-01/runtime/invocation/next", new HttpHandler() {
routingHandler.add("GET", AmazonLambdaApi.API_PATH_INVOCATION_NEXT, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
LambdaStartedNotifier.started = true;
Expand All @@ -41,44 +40,46 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
return;
}
}
exchange.getResponseHeaders().put(new HttpString("Lambda-Runtime-Aws-Request-Id"), req.id);
exchange.getResponseHeaders().put(new HttpString(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID), req.id);
exchange.getResponseSender().send(req.json);
}
});
routingHandler.add("POST", "/2018-06-01/runtime/invocation/{req}/response", new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INVOCATION + "{req}" + AmazonLambdaApi.API_PATH_RESPONSE,
new HttpHandler() {
@Override
public void handle(HttpServerExchange exchange, String message) {
LambdaClient.REQUESTS.get(id).complete(message);
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
@Override
public void handle(HttpServerExchange exchange, String message) {
LambdaClient.REQUESTS.get(id).complete(message);
}
});
}
});
}
});

routingHandler.add("POST", "/2018-06-01/runtime/invocation/{req}/error", new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INVOCATION + "{req}" + AmazonLambdaApi.API_PATH_ERROR,
new HttpHandler() {
@Override
public void handle(HttpServerExchange exchange, String message) {
ObjectMapper mapper = new ObjectMapper();
try {
FunctionError result = mapper.readerFor(FunctionError.class).readValue(message);
public void handleRequest(HttpServerExchange exchange) throws Exception {
String id = exchange.getQueryParameters().get("req").getFirst();
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
@Override
public void handle(HttpServerExchange exchange, String message) {
ObjectMapper mapper = new ObjectMapper();
try {
FunctionError result = mapper.readerFor(FunctionError.class).readValue(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should cache the reader here. It can be done in another PR though as it was already preexisting. I don't know if we use this pattern elsewhere in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is safe to share a reader then +1 for a new PR ;-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed it's just in the tests. We don't really care then.


LambdaClient.REQUESTS.get(id).completeExceptionally(
new LambdaException(result.getErrorType(), result.getErrorMessage()));
} catch (IOException e) {
throw new RuntimeException(e);
}
LambdaClient.REQUESTS.get(id).completeExceptionally(
new LambdaException(result.getErrorType(), result.getErrorMessage()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
});
}
});
routingHandler.add("POST", "/2018-06-01/runtime/init/error", new HttpHandler() {
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INIT_ERROR, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
exchange.getRequestReceiver().receiveFullString(new Receiver.FullStringCallback() {
Expand All @@ -103,7 +104,7 @@ public void handle(HttpServerExchange exchange, String message) {
.setHandler(new BlockingHandler(routingHandler))
.build();
undertow.start();
return Collections.singletonMap(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, "localhost:" + PORT);
return Collections.singletonMap(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, "localhost:" + PORT);
}

@Override
Expand Down