Skip to content

Commit

Permalink
Update and Fix Tests (#37225)
Browse files Browse the repository at this point in the history
* Add more tests

* Fix wrong httpclient for statsbeat tests

* Apply do not record

* Revert

* Fix

* Move statsbeat tests out of end to end

* Fix tests

* Update records

* Fix imports

* Ingore intellij suggestion

* Fix json
  • Loading branch information
heyams authored Dec 7, 2023
1 parent f4e9922 commit 3a0c2f1
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/monitor/azure-monitor-opentelemetry-exporter",
"Tag": "java/monitor/azure-monitor-opentelemetry-exporter_61683f4a82"
"Tag": "java/monitor/azure-monitor-opentelemetry-exporter_558a81e425"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.http.*;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.FluxUtil;
import com.azure.monitor.opentelemetry.exporter.implementation.MockHttpResponse;
import com.azure.core.http.HttpPipeline;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MessageData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MetricsData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.RemoteDependencyData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.TestUtils;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -26,18 +20,11 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -50,10 +37,6 @@ public class AzureMonitorExportersEndToEndTest extends MonitorExporterClientTest
+ "IngestionEndpoint=https://test.in.applicationinsights.azure.com/;"
+ "LiveEndpoint=https://test.livediagnostics.monitor.azure.com/";

private static final String STATSBEAT_CONNECTION_STRING =
"InstrumentationKey=00000000-0000-0000-0000-000000000000;"
+ "IngestionEndpoint=https://westus-0.in.applicationinsights.azure.com/;"
+ "LiveEndpoint=https://westus.livediagnostics.monitor.azure.com/";
private static final String INSTRUMENTATION_KEY = "00000000-0000-0000-0000-000000000000";

@Test
Expand All @@ -70,13 +53,13 @@ public void testBuildTraceExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate span
TelemetryItem spanTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("RemoteDependency"))
.findFirst()
.get();
Expand All @@ -100,13 +83,13 @@ public void testBuildMetricExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate metric
TelemetryItem metricTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Metric"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
Expand All @@ -131,13 +114,13 @@ public void testBuildLogExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate log
TelemetryItem logTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Message"))
.findFirst()
.get();
Expand All @@ -164,18 +147,18 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(6);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(6);

// validate telemetry
TelemetryItem spanTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("RemoteDependency"))
.findFirst()
.get();
TelemetryItem metricTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Metric"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
Expand All @@ -184,7 +167,7 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {
.findFirst()
.get();
TelemetryItem logTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Message"))
.findFirst()
.get();
Expand All @@ -195,94 +178,6 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {
// TODO (trask) also export and validate logs in this test
}

@Test
public void testStatsbeat() throws Exception {
// create the OpenTelemetry SDK
CountDownLatch countDownLatch = new CountDownLatch(1);
CustomValidationPolicy customValidationPolicy = new CustomValidationPolicy(countDownLatch);
OpenTelemetrySdk openTelemetry =
TestUtils.createOpenTelemetrySdk(
getHttpPipeline(customValidationPolicy), getStatsbeatConfiguration(), STATSBEAT_CONNECTION_STRING);

// generate a metric
generateMetric(openTelemetry);

// close to flush
openTelemetry.close();

Thread.sleep(2000);

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
.isEqualTo(new URL("https://westus-0.in.applicationinsights.azure.com/v2.1/track"));

TelemetryItem attachStatsbeat =
customValidationPolicy.actualTelemetryItems.stream()
.filter(item -> item.getName().equals("Statsbeat"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
return metricsData.getMetrics().stream().allMatch(metricDataPoint -> metricDataPoint.getName().equals("Attach"));
})
.findFirst()
.get();
validateAttachStatsbeat(attachStatsbeat);

TelemetryItem featureStatsbeat =
customValidationPolicy.actualTelemetryItems.stream()
.filter(item -> item.getName().equals("Statsbeat"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
return metricsData.getMetrics().stream().allMatch(metricDataPoint -> metricDataPoint.getName().equals("Feature"));
})
.findFirst()
.get();
validateFeatureStatsbeat(featureStatsbeat);
}

@Test
public void testStatsbeatShutdownWhen400InvalidIKeyReturned() throws Exception {
String fakeBody = "{\"itemsReceived\":1,\"itemsAccepted\":0,\"errors\":[{\"index\":0,\"statusCode\":400,\"message\":\"Invalid instrumentation key\"}]}";
MockedHttpClient mockedHttpClient =
new MockedHttpClient(
request -> {
return Mono.just(new MockHttpResponse(request, 400, new HttpHeaders(), fakeBody.getBytes()));
});

// create OpenTelemetrySdk
CountDownLatch countDownLatch = new CountDownLatch(1);
AzureMonitorExportersEndToEndTest.CustomValidationPolicy customValidationPolicy = new AzureMonitorExportersEndToEndTest.CustomValidationPolicy(countDownLatch);
OpenTelemetrySdk openTelemetrySdk =
TestUtils.createOpenTelemetrySdk(
getHttpPipeline(customValidationPolicy, mockedHttpClient), getConfiguration(), STATSBEAT_CONNECTION_STRING);

generateMetric(openTelemetrySdk);

// close to flush
openTelemetrySdk.close();

Thread.sleep(1000);

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
.isEqualTo(new URL("https://westus-0.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.stream().filter(item -> item.getName().equals("Statsbeat")).count()).isEqualTo(0);
}

private static Map<String, String> getConfiguration() {
return Collections.singletonMap("APPLICATIONINSIGHTS_CONNECTION_STRING", STATSBEAT_CONNECTION_STRING);
}

private static Map<String, String> getStatsbeatConfiguration() {
Map<String, String> map = new HashMap<>(3);
map.put("APPLICATIONINSIGHTS_CONNECTION_STRING", CONNECTION_STRING_ENV);
map.put("STATSBEAT_LONG_INTERVAL_SECONDS_PROPERTY_NAME", "1");
map.put("STATSBEAT_SHORT_INTERVAL_SECONDS_PROPERTY_NAME", "1");
return map;
}


@SuppressWarnings("try")
private static void generateSpan(OpenTelemetry openTelemetry) {
Tracer tracer = openTelemetry.getTracer("Sample");
Expand Down Expand Up @@ -327,22 +222,6 @@ private static void validateSpan(TelemetryItem telemetryItem) {
.containsExactly(entry("color", "red"), entry("name", "apple"));
}

private static void validateAttachStatsbeat(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getData().getBaseType()).isEqualTo("MetricData");
MetricsData actualMetricsData = (MetricsData) telemetryItem.getData().getBaseData();
assertThat(actualMetricsData.getMetrics().get(0).getName()).isEqualTo("Attach");
assertThat(actualMetricsData.getProperties()).contains(entry("rp", "unknown"), entry("attach", "Manual"), entry("language", "java"));
assertThat(actualMetricsData.getProperties()).containsKeys("attach", "cikey", "language", "os", "rp", "runtimeVersion", "version");
}

private static void validateFeatureStatsbeat(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getData().getBaseType()).isEqualTo("MetricData");
MetricsData actualMetricsData = (MetricsData) telemetryItem.getData().getBaseData();
assertThat(actualMetricsData.getMetrics().get(0).getName()).isEqualTo("Feature");
assertThat(actualMetricsData.getProperties()).contains(entry("type", "0"), entry("language", "java"));
assertThat(actualMetricsData.getProperties()).containsKeys("feature", "cikey", "language", "os", "rp", "runtimeVersion", "version");
}

private static void validateMetric(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getInstrumentationKey()).isEqualTo(INSTRUMENTATION_KEY);
assertThat(telemetryItem.getTags()).containsEntry("ai.cloud.role", "unknown_service:java");
Expand Down Expand Up @@ -373,83 +252,7 @@ private static void validateLog(TelemetryItem telemetryItem) {
entry("name", "apple"));
}

private static class CustomValidationPolicy implements HttpPipelinePolicy {

private final CountDownLatch countDown;
private volatile URL url;
private final List<TelemetryItem> actualTelemetryItems = new CopyOnWriteArrayList<>();

CustomValidationPolicy(CountDownLatch countDown) {
this.countDown = countDown;
}

@Override
public Mono<HttpResponse> process(
HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
url = context.getHttpRequest().getUrl();
Mono<String> asyncBytes =
FluxUtil.collectBytesInByteBufferStream(context.getHttpRequest().getBody())
.map(CustomValidationPolicy::ungzip);
asyncBytes.subscribe(
value -> {
ObjectMapper objectMapper = createObjectMapper();
try (MappingIterator<TelemetryItem> i =
objectMapper.readerFor(TelemetryItem.class).readValues(value)) {
while (i.hasNext()) {
actualTelemetryItems.add(i.next());
}
countDown.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return next.process();
}

// decode gzipped request raw bytes back to original request body
private static String ungzip(byte[] rawBytes) {
if (rawBytes.length == 0) {
return "";
}
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(rawBytes))) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] data = new byte[1024];
int read;
while ((read = in.read(data, 0, data.length)) != -1) {
baos.write(data, 0, read);
}
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static ObjectMapper createObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
// handle JSR-310 (java 8) dates with Jackson by configuring ObjectMapper to use this
// dependency and not (de)serialize Instant as timestamps that it does by default
objectMapper.findAndRegisterModules().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return objectMapper;
}
}

private static class MockedHttpClient implements HttpClient {

private final AtomicInteger count = new AtomicInteger();
private final Function<HttpRequest, Mono<HttpResponse>> handler;

MockedHttpClient(Function<HttpRequest, Mono<HttpResponse>> handler) {
this.handler = handler;
}

@Override
public Mono<HttpResponse> send(HttpRequest httpRequest) {
count.getAndIncrement();
return handler.apply(httpRequest);
}

int getCount() {
return count.get();
}
private static Map<String, String> getConfiguration() {
return Collections.singletonMap("APPLICATIONINSIGHTS_CONNECTION_STRING", CONNECTION_STRING_ENV);
}
}
Loading

0 comments on commit 3a0c2f1

Please sign in to comment.