diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java index f931cb322..24a6e4644 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java @@ -14,14 +14,20 @@ */ package software.amazon.kinesis.metrics; -import java.util.ArrayList; -import java.util.List; - import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.kinesis.retrieval.AWSExceptionManager; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * Publisher that contains the logic to publish metrics. @@ -30,12 +36,17 @@ public class CloudWatchMetricsPublisher { // CloudWatch API has a limit of 20 MetricDatums per request private static final int BATCH_SIZE = 20; + private static final int PUT_TIMEOUT_MILLIS = 5000; + private static final AWSExceptionManager CW_EXCEPTION_MANAGER = new AWSExceptionManager(); + static { + CW_EXCEPTION_MANAGER.add(CloudWatchException.class, t -> t); + } private final String namespace; - private final CloudWatchAsyncClient cloudWatchClient; + private final CloudWatchAsyncClient cloudWatchAsyncClient; public CloudWatchMetricsPublisher(CloudWatchAsyncClient cloudWatchClient, String namespace) { - this.cloudWatchClient = cloudWatchClient; + this.cloudWatchAsyncClient = cloudWatchClient; this.namespace = namespace; } @@ -56,16 +67,28 @@ public void publishMetrics(List> dataToP for (int i = startIndex; i < endIndex; i++) { metricData.add(dataToPublish.get(i).datum); } - request = request.metricData(metricData); - try { - cloudWatchClient.putMetricData(request.build()); - - log.debug("Successfully published {} datums.", endIndex - startIndex); - } catch (CloudWatchException e) { + PutMetricDataRequest.Builder finalRequest = request; + // This needs to be blocking. Making it asynchronous leads to increased throttling. + blockingExecute(cloudWatchAsyncClient.putMetricData(finalRequest.build()), PUT_TIMEOUT_MILLIS, + CW_EXCEPTION_MANAGER); + } catch(CloudWatchException | TimeoutException e) { log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, e); + } catch (Exception e) { + log.error("Unknown exception while publishing {} datums to CloudWatch", endIndex - startIndex, e); } } } + + private static void blockingExecute(CompletableFuture future, long timeOutMillis, + AWSExceptionManager exceptionManager) throws TimeoutException { + try { + future.get(timeOutMillis, MILLISECONDS); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } catch (InterruptedException e) { + log.info("Thread interrupted."); + } + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisherTest.java index 79d2d15c2..7f40266b1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisherTest.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.junit.Assert; import org.junit.Before; @@ -31,8 +32,12 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class CloudWatchMetricsPublisherTest { private static final String NAMESPACE = "fakeNamespace"; @@ -51,6 +56,10 @@ public void setup() { */ @Test public void testMetricsPublisher() { + final CompletableFuture putResponseFuture = new CompletableFuture<>(); + putResponseFuture.complete(PutMetricDataResponse.builder().build()); + when(cloudWatchClient.putMetricData(any(PutMetricDataRequest.class))).thenReturn(putResponseFuture); + List> dataToPublish = constructMetricDatumWithKeyList(25); List> expectedData = constructMetricDatumListMap(dataToPublish); publisher.publishMetrics(dataToPublish);