Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequestStreamHandler functionality added to Amazon Lambda #7866

Merged
merged 1 commit into from
Mar 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ public final class AmazonLambdaBuildItem extends MultiBuildItem {

private final String handlerClass;
private final String name;
private final boolean streamHandler;

public AmazonLambdaBuildItem(String handlerClass, String name) {
public AmazonLambdaBuildItem(String handlerClass, String name, boolean streamHandler) {
this.handlerClass = handlerClass;
this.name = name;
this.streamHandler = streamHandler;
}

public String getHandlerClass() {
Expand All @@ -19,4 +21,8 @@ public String getHandlerClass() {
public String getName() {
return name;
}

public boolean isStreamHandler() {
return streamHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.joda.time.DateTime;

import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;

import io.quarkus.amazon.lambda.runtime.AmazonLambdaRecorder;
import io.quarkus.amazon.lambda.runtime.FunctionError;
Expand Down Expand Up @@ -49,6 +50,7 @@ public final class AmazonLambdaProcessor {
public static final String AWS_LAMBDA_EVENTS_ARCHIVE_MARKERS = "com/amazonaws/services/lambda/runtime/events";

private static final DotName REQUEST_HANDLER = DotName.createSimple(RequestHandler.class.getName());
private static final DotName REQUEST_STREAM_HANDLER = DotName.createSimple(RequestStreamHandler.class.getName());

private static final DotName NAMED = DotName.createSimple(Named.class.getName());
private static final Logger log = Logger.getLogger(AmazonLambdaProcessor.class);
Expand All @@ -70,6 +72,10 @@ List<AmazonLambdaBuildItem> discover(CombinedIndexBuildItem combinedIndexBuildIt
BuildProducer<ReflectiveHierarchyBuildItem> reflectiveHierarchy,
BuildProducer<ReflectiveClassBuildItem> reflectiveClassBuildItemBuildProducer) throws BuildException {
Collection<ClassInfo> allKnownImplementors = combinedIndexBuildItem.getIndex().getAllKnownImplementors(REQUEST_HANDLER);
Collection<ClassInfo> allKnownStreamImplementors = combinedIndexBuildItem.getIndex()
.getAllKnownImplementors(REQUEST_STREAM_HANDLER);

allKnownImplementors.addAll(allKnownStreamImplementors);
if (allKnownImplementors.size() > 0 && providedLambda.isPresent()) {
throw new BuildException(
"Multiple handler classes. You have a custom handler class and the " + providedLambda.get().getProvider()
Expand All @@ -90,24 +96,30 @@ List<AmazonLambdaBuildItem> discover(CombinedIndexBuildItem combinedIndexBuildIt
}

final String lambda = name.toString();
ret.add(new AmazonLambdaBuildItem(lambda, cdiName));
reflectiveClassBuildItemBuildProducer.produce(new ReflectiveClassBuildItem(true, false, lambda));

ClassInfo current = info;
boolean done = false;
boolean streamHandler = false;
while (current != null && !done) {
for (MethodInfo method : current.methods()) {
if (method.name().equals("handleRequest")
&& method.parameters().size() == 2
&& !method.parameters().get(0).name().equals(DotName.createSimple(Object.class.getName()))) {
reflectiveHierarchy.produce(new ReflectiveHierarchyBuildItem(method.parameters().get(0)));
reflectiveHierarchy.produce(new ReflectiveHierarchyBuildItem(method.returnType()));
done = true;
break;
if (method.name().equals("handleRequest")) {
if (method.parameters().size() == 3) {
streamHandler = true;
done = true;
break;
} else if (method.parameters().size() == 2
&& !method.parameters().get(0).name().equals(DotName.createSimple(Object.class.getName()))) {
reflectiveHierarchy.produce(new ReflectiveHierarchyBuildItem(method.parameters().get(0)));
reflectiveHierarchy.produce(new ReflectiveHierarchyBuildItem(method.returnType()));
done = true;
break;
}
}
}
current = combinedIndexBuildItem.getIndex().getClassByName(current.superName());
}
ret.add(new AmazonLambdaBuildItem(lambda, cdiName, streamHandler));
}
additionalBeanBuildItemBuildProducer.produce(builder.build());
reflectiveClassBuildItemBuildProducer
Expand Down Expand Up @@ -159,21 +171,50 @@ public void recordHandlerClass(List<AmazonLambdaBuildItem> lambdas,
List<ServiceStartBuildItem> orderServicesFirst, // try to order this after service recorders
RecorderContext context) {
if (providedLambda.isPresent()) {
Class<? extends RequestHandler<?, ?>> handlerClass = (Class<? extends RequestHandler<?, ?>>) context
.classProxy(providedLambda.get().getHandlerClass().getName());
recorder.setHandlerClass(handlerClass, beanContainerBuildItem.getValue());
boolean useStreamHandler = false;
for (Class handleInterface : providedLambda.get().getHandlerClass().getInterfaces()) {
if (handleInterface.getName().equals(RequestStreamHandler.class.getName())) {
useStreamHandler = true;
}
}

if (useStreamHandler) {
Class<? extends RequestStreamHandler> handlerClass = (Class<? extends RequestStreamHandler>) context
.classProxy(providedLambda.get().getHandlerClass().getName());
recorder.setStreamHandlerClass(handlerClass, beanContainerBuildItem.getValue());
} else {
Class<? extends RequestHandler<?, ?>> handlerClass = (Class<? extends RequestHandler<?, ?>>) context
.classProxy(providedLambda.get().getHandlerClass().getName());

recorder.setHandlerClass(handlerClass, beanContainerBuildItem.getValue());
}
} else if (lambdas != null) {
List<Class<? extends RequestHandler<?, ?>>> unnamed = new ArrayList<>();
Map<String, Class<? extends RequestHandler<?, ?>>> named = new HashMap<>();

List<Class<? extends RequestStreamHandler>> unnamedStreamHandler = new ArrayList<>();
Map<String, Class<? extends RequestStreamHandler>> namedStreamHandler = new HashMap<>();

for (AmazonLambdaBuildItem i : lambdas) {
if (i.getName() == null) {
unnamed.add((Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getHandlerClass()));
if (i.isStreamHandler()) {
if (i.getName() == null) {
unnamedStreamHandler
.add((Class<? extends RequestStreamHandler>) context.classProxy(i.getHandlerClass()));
} else {
namedStreamHandler.put(i.getName(),
(Class<? extends RequestStreamHandler>) context.classProxy(i.getHandlerClass()));
}
} else {
named.put(i.getName(), (Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getHandlerClass()));
if (i.getName() == null) {
unnamed.add((Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getHandlerClass()));
} else {
named.put(i.getName(), (Class<? extends RequestHandler<?, ?>>) context.classProxy(i.getHandlerClass()));
}
}
}
recorder.chooseHandlerClass(unnamed, named, beanContainerBuildItem.getValue(), config);

recorder.chooseHandlerClass(unnamed, named, unnamedStreamHandler, namedStreamHandler,
beanContainerBuildItem.getValue(), config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ function cmd_create() {
--handler ${HANDLER} \
--runtime ${RUNTIME} \
--role ${LAMBDA_ROLE_ARN} \
--timeout 15 \
--memory-size 256 \
${LAMBDA_META}
# Enable and move this param above ${LAMBDA_META}, if using AWS X-Ray
# --tracing-config Mode=Active \
}

function cmd_delete() {
Expand All @@ -27,6 +31,8 @@ function cmd_invoke() {
--log-type Tail \
--query 'LogResult' \
--output text | base64 -d
{ set +x; } 2>/dev/null
cat response.txt && rm -f response.txt
}

function cmd_update() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-amazon-lambda</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-amazon-lambda</artifactId>
<version>${quarkus.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ${package};

import javax.inject.Inject;
import javax.inject.Named;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;


@Named("stream")
public class StreamLambda implements RequestStreamHandler {

@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
int letter;
while ((letter = inputStream.read()) != -1) {
int character = Character.toUpperCase(letter);
outputStream.write(character);
}
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
quarkus.lambda.handler=test

quarkus.lambda.enable-polling-jvm-mode=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ${package};

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.amazon.lambda.test.LambdaClient;
import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class LambdaHandlerTest {

@Test
public void testSimpleLambdaSuccess() throws Exception {
InputObject in = new InputObject();
in.setGreeting("Hello");
in.setName("Stu");
OutputObject out = LambdaClient.invoke(OutputObject.class, in);
Assertions.assertEquals("Hello Stu", out.getResult());
Assertions.assertTrue(out.getRequestId().matches("aws-request-\\d"), "Expected requestId as 'aws-request-<number>'");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
quarkus.lambda.handler=test

quarkus.lambda.enable-polling-jvm-mode=true


Loading