Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update and Fix Tests #37225

Merged
merged 13 commits into from
Dec 7, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/monitor/azure-monitor-opentelemetry-exporter",
"Tag": "java/monitor/azure-monitor-opentelemetry-exporter_61683f4a82"
"Tag": "java/monitor/azure-monitor-opentelemetry-exporter_558a81e425"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.http.*;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.FluxUtil;
import com.azure.monitor.opentelemetry.exporter.implementation.MockHttpResponse;
import com.azure.core.http.HttpPipeline;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MessageData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MetricsData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.RemoteDependencyData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.TestUtils;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -26,18 +20,11 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -50,10 +37,6 @@ public class AzureMonitorExportersEndToEndTest extends MonitorExporterClientTest
+ "IngestionEndpoint=https://test.in.applicationinsights.azure.com/;"
+ "LiveEndpoint=https://test.livediagnostics.monitor.azure.com/";

private static final String STATSBEAT_CONNECTION_STRING =
"InstrumentationKey=00000000-0000-0000-0000-000000000000;"
+ "IngestionEndpoint=https://westus-0.in.applicationinsights.azure.com/;"
+ "LiveEndpoint=https://westus.livediagnostics.monitor.azure.com/";
private static final String INSTRUMENTATION_KEY = "00000000-0000-0000-0000-000000000000";

@Test
Expand All @@ -70,13 +53,13 @@ public void testBuildTraceExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate span
TelemetryItem spanTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("RemoteDependency"))
.findFirst()
.get();
Expand All @@ -100,13 +83,13 @@ public void testBuildMetricExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate metric
TelemetryItem metricTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Metric"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
Expand All @@ -131,13 +114,13 @@ public void testBuildLogExporter() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(2);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(2);

// validate log
TelemetryItem logTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Message"))
.findFirst()
.get();
Expand All @@ -164,18 +147,18 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
assertThat(customValidationPolicy.getUrl())
.isEqualTo(new URL("https://test.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.size()).isEqualTo(6);
assertThat(customValidationPolicy.getActualTelemetryItems().size()).isEqualTo(6);

// validate telemetry
TelemetryItem spanTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("RemoteDependency"))
.findFirst()
.get();
TelemetryItem metricTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Metric"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
Expand All @@ -184,7 +167,7 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {
.findFirst()
.get();
TelemetryItem logTelemetryItem =
customValidationPolicy.actualTelemetryItems.stream()
customValidationPolicy.getActualTelemetryItems().stream()
.filter(item -> item.getName().equals("Message"))
.findFirst()
.get();
Expand All @@ -195,94 +178,6 @@ public void testBuildTraceMetricLogExportersConsecutively() throws Exception {
// TODO (trask) also export and validate logs in this test
}

@Test
public void testStatsbeat() throws Exception {
// create the OpenTelemetry SDK
CountDownLatch countDownLatch = new CountDownLatch(1);
CustomValidationPolicy customValidationPolicy = new CustomValidationPolicy(countDownLatch);
OpenTelemetrySdk openTelemetry =
TestUtils.createOpenTelemetrySdk(
getHttpPipeline(customValidationPolicy), getStatsbeatConfiguration(), STATSBEAT_CONNECTION_STRING);

// generate a metric
generateMetric(openTelemetry);

// close to flush
openTelemetry.close();

Thread.sleep(2000);

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
.isEqualTo(new URL("https://westus-0.in.applicationinsights.azure.com/v2.1/track"));

TelemetryItem attachStatsbeat =
customValidationPolicy.actualTelemetryItems.stream()
.filter(item -> item.getName().equals("Statsbeat"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
return metricsData.getMetrics().stream().allMatch(metricDataPoint -> metricDataPoint.getName().equals("Attach"));
})
.findFirst()
.get();
validateAttachStatsbeat(attachStatsbeat);

TelemetryItem featureStatsbeat =
customValidationPolicy.actualTelemetryItems.stream()
.filter(item -> item.getName().equals("Statsbeat"))
.filter(item -> {
MetricsData metricsData = (MetricsData) item.getData().getBaseData();
return metricsData.getMetrics().stream().allMatch(metricDataPoint -> metricDataPoint.getName().equals("Feature"));
})
.findFirst()
.get();
validateFeatureStatsbeat(featureStatsbeat);
}

@Test
public void testStatsbeatShutdownWhen400InvalidIKeyReturned() throws Exception {
String fakeBody = "{\"itemsReceived\":1,\"itemsAccepted\":0,\"errors\":[{\"index\":0,\"statusCode\":400,\"message\":\"Invalid instrumentation key\"}]}";
MockedHttpClient mockedHttpClient =
new MockedHttpClient(
request -> {
return Mono.just(new MockHttpResponse(request, 400, new HttpHeaders(), fakeBody.getBytes()));
});

// create OpenTelemetrySdk
CountDownLatch countDownLatch = new CountDownLatch(1);
AzureMonitorExportersEndToEndTest.CustomValidationPolicy customValidationPolicy = new AzureMonitorExportersEndToEndTest.CustomValidationPolicy(countDownLatch);
OpenTelemetrySdk openTelemetrySdk =
TestUtils.createOpenTelemetrySdk(
getHttpPipeline(customValidationPolicy, mockedHttpClient), getConfiguration(), STATSBEAT_CONNECTION_STRING);

generateMetric(openTelemetrySdk);

// close to flush
openTelemetrySdk.close();

Thread.sleep(1000);

// wait for export
countDownLatch.await(10, SECONDS);
assertThat(customValidationPolicy.url)
.isEqualTo(new URL("https://westus-0.in.applicationinsights.azure.com/v2.1/track"));
assertThat(customValidationPolicy.actualTelemetryItems.stream().filter(item -> item.getName().equals("Statsbeat")).count()).isEqualTo(0);
}

private static Map<String, String> getConfiguration() {
return Collections.singletonMap("APPLICATIONINSIGHTS_CONNECTION_STRING", STATSBEAT_CONNECTION_STRING);
}

private static Map<String, String> getStatsbeatConfiguration() {
Map<String, String> map = new HashMap<>(3);
map.put("APPLICATIONINSIGHTS_CONNECTION_STRING", CONNECTION_STRING_ENV);
map.put("STATSBEAT_LONG_INTERVAL_SECONDS_PROPERTY_NAME", "1");
map.put("STATSBEAT_SHORT_INTERVAL_SECONDS_PROPERTY_NAME", "1");
return map;
}


@SuppressWarnings("try")
private static void generateSpan(OpenTelemetry openTelemetry) {
Tracer tracer = openTelemetry.getTracer("Sample");
Expand Down Expand Up @@ -327,22 +222,6 @@ private static void validateSpan(TelemetryItem telemetryItem) {
.containsExactly(entry("color", "red"), entry("name", "apple"));
}

private static void validateAttachStatsbeat(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getData().getBaseType()).isEqualTo("MetricData");
MetricsData actualMetricsData = (MetricsData) telemetryItem.getData().getBaseData();
assertThat(actualMetricsData.getMetrics().get(0).getName()).isEqualTo("Attach");
assertThat(actualMetricsData.getProperties()).contains(entry("rp", "unknown"), entry("attach", "Manual"), entry("language", "java"));
assertThat(actualMetricsData.getProperties()).containsKeys("attach", "cikey", "language", "os", "rp", "runtimeVersion", "version");
}

private static void validateFeatureStatsbeat(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getData().getBaseType()).isEqualTo("MetricData");
MetricsData actualMetricsData = (MetricsData) telemetryItem.getData().getBaseData();
assertThat(actualMetricsData.getMetrics().get(0).getName()).isEqualTo("Feature");
assertThat(actualMetricsData.getProperties()).contains(entry("type", "0"), entry("language", "java"));
assertThat(actualMetricsData.getProperties()).containsKeys("feature", "cikey", "language", "os", "rp", "runtimeVersion", "version");
}

private static void validateMetric(TelemetryItem telemetryItem) {
assertThat(telemetryItem.getInstrumentationKey()).isEqualTo(INSTRUMENTATION_KEY);
assertThat(telemetryItem.getTags()).containsEntry("ai.cloud.role", "unknown_service:java");
Expand Down Expand Up @@ -373,83 +252,7 @@ private static void validateLog(TelemetryItem telemetryItem) {
entry("name", "apple"));
}

private static class CustomValidationPolicy implements HttpPipelinePolicy {

private final CountDownLatch countDown;
private volatile URL url;
private final List<TelemetryItem> actualTelemetryItems = new CopyOnWriteArrayList<>();

CustomValidationPolicy(CountDownLatch countDown) {
this.countDown = countDown;
}

@Override
public Mono<HttpResponse> process(
HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
url = context.getHttpRequest().getUrl();
Mono<String> asyncBytes =
FluxUtil.collectBytesInByteBufferStream(context.getHttpRequest().getBody())
.map(CustomValidationPolicy::ungzip);
asyncBytes.subscribe(
value -> {
ObjectMapper objectMapper = createObjectMapper();
try (MappingIterator<TelemetryItem> i =
objectMapper.readerFor(TelemetryItem.class).readValues(value)) {
while (i.hasNext()) {
actualTelemetryItems.add(i.next());
}
countDown.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return next.process();
}

// decode gzipped request raw bytes back to original request body
private static String ungzip(byte[] rawBytes) {
if (rawBytes.length == 0) {
return "";
}
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(rawBytes))) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] data = new byte[1024];
int read;
while ((read = in.read(data, 0, data.length)) != -1) {
baos.write(data, 0, read);
}
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static ObjectMapper createObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
// handle JSR-310 (java 8) dates with Jackson by configuring ObjectMapper to use this
// dependency and not (de)serialize Instant as timestamps that it does by default
objectMapper.findAndRegisterModules().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return objectMapper;
}
}

private static class MockedHttpClient implements HttpClient {

private final AtomicInteger count = new AtomicInteger();
private final Function<HttpRequest, Mono<HttpResponse>> handler;

MockedHttpClient(Function<HttpRequest, Mono<HttpResponse>> handler) {
this.handler = handler;
}

@Override
public Mono<HttpResponse> send(HttpRequest httpRequest) {
count.getAndIncrement();
return handler.apply(httpRequest);
}

int getCount() {
return count.get();
}
private static Map<String, String> getConfiguration() {
return Collections.singletonMap("APPLICATIONINSIGHTS_CONNECTION_STRING", CONNECTION_STRING_ENV);
}
}
Loading