Skip to content

Commit

Permalink
Added a primitive "waiter" class for simplifying poll-until-condition…
Browse files Browse the repository at this point in the history
…-is-met behavior.

The waiter can execute a function multiple times until the function's response meets a desired state. This is useful for dealing with eventually consistent APIs without having to use arbitrary sleep durations.

Also:
1. Fixed error in lambda test where function was not being uploaded, but test expected it.
2. Retry WAF delete-ip-set multiple times in case the IPs had not yet been deleted on the service side.
  • Loading branch information
millems committed Nov 22, 2017
1 parent b3ad669 commit 0076962
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
* Integration tests of the user APIs of IAM.
*/
public class UserIntegrationTest extends IntegrationTestBase {

@Before
public void PreTestRun() {
IAMUtil.deleteUsersAndGroupsInTestNameSpace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class IntegrationTestBase extends AwsTestBase {
@BeforeClass
public static void setup() throws IOException {
setUpCredentials();
lambda = LambdaAsyncClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build();
lambda = LambdaAsyncClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).region(Region.US_WEST_2).build();

cloudFuncZip = setupFunctionZip(HELLOWORLD_JS);

Expand Down Expand Up @@ -132,7 +132,7 @@ private static void createLambdaServiceRole() {
}

protected static void createKinesisStream() {
kinesis = KinesisClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build();
kinesis = KinesisClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).region(Region.US_WEST_2).build();

kinesis.createStream(CreateStreamRequest.builder().streamName(KINESIS_STREAM_NAME).shardCount(1).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@

package software.amazon.awssdk.services.lambda;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -27,19 +31,20 @@
import software.amazon.awssdk.services.lambda.model.CreateEventSourceMappingResponse;
import software.amazon.awssdk.services.lambda.model.CreateFunctionResponse;
import software.amazon.awssdk.services.lambda.model.DeleteEventSourceMappingRequest;
import software.amazon.awssdk.services.lambda.model.DeleteFunctionRequest;
import software.amazon.awssdk.services.lambda.model.FunctionCode;
import software.amazon.awssdk.services.lambda.model.FunctionConfiguration;
import software.amazon.awssdk.services.lambda.model.GetEventSourceMappingRequest;
import software.amazon.awssdk.services.lambda.model.GetEventSourceMappingResponse;
import software.amazon.awssdk.services.lambda.model.GetFunctionConfigurationRequest;
import software.amazon.awssdk.services.lambda.model.GetFunctionConfigurationResponse;
import software.amazon.awssdk.services.lambda.model.GetFunctionRequest;
import software.amazon.awssdk.services.lambda.model.GetFunctionResponse;
import software.amazon.awssdk.services.lambda.model.InvocationType;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.ListFunctionsRequest;
import software.amazon.awssdk.services.lambda.model.ListFunctionsResponse;
import software.amazon.awssdk.services.lambda.model.LogType;
import software.amazon.awssdk.services.lambda.model.Runtime;
import software.amazon.awssdk.testutils.retry.RetryRule;
import software.amazon.awssdk.utils.Base64Utils;

Expand All @@ -55,6 +60,34 @@ public static void setUpKinesis() {
IntegrationTestBase.createKinesisStream();
}

@Before
public void uploadFunction() throws IOException {
// Upload function
byte[] functionBits;
InputStream functionZip = new FileInputStream(cloudFuncZip);
try {
functionBits = read(functionZip);
} finally {
functionZip.close();
}

CreateFunctionResponse result = lambda.createFunction(r -> r.description("My cloud function").functionName(FUNCTION_NAME)
.code(FunctionCode.builder().zipFile(ByteBuffer.wrap(functionBits)).build())
.handler("helloworld.handler")
.memorySize(128)
.runtime(Runtime.NODEJS4_3)
.timeout(10)
.role(lambdaServiceRoleArn)).join();

checkValid_CreateFunctionResponse(result);
}

@After
public void deleteFunction() {
lambda.deleteFunction(DeleteFunctionRequest.builder().functionName(FUNCTION_NAME).build());
}


private static void checkValid_CreateFunctionResponse(CreateFunctionResponse result) {

Assert.assertNotNull(result);
Expand Down Expand Up @@ -132,12 +165,12 @@ private static void checkValid_CreateEventSourceMappingResult(CreateEventSourceM
public void testFunctionOperations() throws IOException {

// Get function
GetFunctionResponse getFunc = lambda.getFunction(GetFunctionRequest.builder().functionName(FUNCTION_NAME).build()).join();
GetFunctionResponse getFunc = lambda.getFunction(r -> r.functionName(FUNCTION_NAME)).join();
checkValid_GetFunctionResponse(getFunc);

// Get function configuration
GetFunctionConfigurationResponse getConfig = lambda
.getFunctionConfiguration(GetFunctionConfigurationRequest.builder().functionName(FUNCTION_NAME).build()).join();
GetFunctionConfigurationResponse getConfig = lambda.getFunctionConfiguration(r -> r.functionName(FUNCTION_NAME)).join();

checkValid_GetFunctionConfigurationResponse(getConfig);

// List functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.services.waf.model.ChangeAction;
import software.amazon.awssdk.services.waf.model.CreateIPSetRequest;
import software.amazon.awssdk.services.waf.model.CreateIPSetResponse;
import software.amazon.awssdk.services.waf.model.DeleteIPSetRequest;
import software.amazon.awssdk.services.waf.model.GetChangeTokenRequest;
import software.amazon.awssdk.services.waf.model.GetChangeTokenResponse;
import software.amazon.awssdk.services.waf.model.GetIPSetRequest;
Expand All @@ -37,12 +36,13 @@
import software.amazon.awssdk.services.waf.model.ListIPSetsRequest;
import software.amazon.awssdk.services.waf.model.ListIPSetsResponse;
import software.amazon.awssdk.services.waf.model.UpdateIPSetRequest;
import software.amazon.awssdk.services.waf.model.WAFNonEmptyEntityException;
import software.amazon.awssdk.testutils.Waiter;
import software.amazon.awssdk.testutils.service.AwsTestBase;

public class WafIntegrationTest extends AwsTestBase {

private static final String IP_SET_NAME = "java-sdk-ipset-" + System.currentTimeMillis();
private static final long SLEEP_TIME_MILLIS = 5000;
private static final String IP_ADDRESS_RANGE = "192.0.2.0/24";
private static WAFClient client = null;
private static String ipSetId = null;
Expand All @@ -67,11 +67,9 @@ public static void tearDown() throws IOException {

private static void deleteIpSet() {
if (ipSetId != null) {
final String changeToken = newChangeToken();
client.deleteIPSet(DeleteIPSetRequest.builder()
.ipSetId(ipSetId)
.changeToken(changeToken)
.build());
Waiter.run(() -> client.deleteIPSet(r -> r.ipSetId(ipSetId).changeToken(newChangeToken())))
.ignoring(WAFNonEmptyEntityException.class)
.orFail();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.testutils;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

/**
* This retries a particular function multiple times until it returns an expected result (or fails with an exception). Certain
* expected exception types can be ignored.
*/
public final class Waiter<T> {
private static final Logger log = Logger.loggerFor(Waiter.class);

private final Supplier<T> thingToTry;
private Predicate<T> whenToStop = t -> true;
private Set<Class<? extends Throwable>> whatToIgnore = Collections.emptySet();

/**
* @see #run(Supplier)
*/
private Waiter(Supplier<T> thingToTry) {
Validate.paramNotNull(thingToTry, "thingToTry");
this.thingToTry = thingToTry;
}

/**
* Create a waiter that attempts executing the provided function until the condition set with {@link #until(Predicate)} is
* met or until it throws an exception. Expected exception types can be ignored with {@link #ignoring(Class[])}.
*/
public static <T> Waiter<T> run(Supplier<T> thingToTry) {
return new Waiter<>(thingToTry);
}

/**
* Define the condition under which the thing we are trying is complete.
*
* If this isn't set, it will always be true. ie. if the function call succeeds, we stop waiting.
*/
public Waiter<T> until(Predicate<T> whenToStop) {
this.whenToStop = whenToStop;
return this;
}

/**
* Define the exception types that should be ignored if the thing we are trying throws them.
*/
@SafeVarargs
public final Waiter<T> ignoring(Class<? extends Throwable>... whatToIgnore) {
this.whatToIgnore = new HashSet<>(Arrays.asList(whatToIgnore));
return this;
}

/**
* Execute the function, throwing an assertion error if the thing we're trying does not succeed after 30 seconds.
*/
public T orFail() {
return orFailAfter(Duration.ofSeconds(30));
}

/**
* Execute the function, throwing an assertion error if the thing we're trying does not succeed after the provided duration.
*/
public T orFailAfter(Duration howLongToTry) {
Validate.paramNotNull(howLongToTry, "howLongToTry");

Instant start = Instant.now();
int attempt = 0;

while (Duration.between(start, Instant.now()).compareTo(howLongToTry) < 0) {
++attempt;
try {
if (attempt > 1) {
wait(attempt);
}

T result = thingToTry.get();
if (whenToStop.test(result)) {
return result;
}
int unsuccessfulAttempt = attempt;
log.info(() -> "Attempt " + unsuccessfulAttempt + " failed predicate.");
} catch (RuntimeException e) {
Throwable t = e instanceof CompletionException ? e.getCause() : e;

if (whatToIgnore.contains(t.getClass())) {
int unsuccessfulAttempt = attempt;
log.info(() -> "Attempt " + unsuccessfulAttempt +
" failed with an expected exception (" + t.getClass() + ")");
} else {
throw e;
}
}
}

throw new AssertionError("Condition was not met after " + attempt + " attempts (" +
Duration.between(start, Instant.now()).getSeconds() + " seconds)");
}

private void wait(int attempt) {
int howLongToWaitMs = 250 << Math.min(attempt - 1, 4); // Max = 250 * 2^4 = 4_000.

try {
Thread.sleep(howLongToWaitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
}

0 comments on commit 0076962

Please sign in to comment.