Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce otlp/http support #5322

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

class GrpcRetryInfoCalculator {
// todo tlongo rename
public class GrpcRetryInfoCalculator {

private final Duration minimumDelay;
private final Duration maximumDelay;

private final AtomicReference<Instant> lastTimeCalled;
private final AtomicReference<Duration> nextDelay;

GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
public GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
this.minimumDelay = minimumDelay;
this.maximumDelay = maximumDelay;
// Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff)
Expand All @@ -34,7 +35,7 @@ private static com.google.protobuf.Duration.Builder mapDuration(Duration duratio
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
}

RetryInfo createRetryInfo() {
public RetryInfo createRetryInfo() {
Instant now = Instant.now();
// Is the last time we got called longer ago than the next delay?
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Sum;
Expand Down Expand Up @@ -407,24 +405,9 @@ private ExportTraceServiceRequest createExportTraceRequest() {
.build())
.build();

final InstrumentationLibrarySpans ilSpans = InstrumentationLibrarySpans.newBuilder()
.setInstrumentationLibrary(InstrumentationLibrary.newBuilder()
.setName(ilName)
.setVersion(ilVersion)
.build())
.addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder()
.setTraceId(ByteString.copyFrom(TraceId2.getBytes()))
.setSpanId(ByteString.copyFrom(SpanId2.getBytes()))
.setKind(io.opentelemetry.proto.trace.v1.Span.SpanKind.SPAN_KIND_INTERNAL)
.setName(ilSpanName)
.setStartTimeUnixNano(currentUnixTimeNano)
.setEndTimeUnixNano(currentUnixTimeNano+TIME_DELTA*1000_000_000)
.build())
.build();
ResourceSpans resourceSpans = ResourceSpans.newBuilder()
.setResource(resource)
.addScopeSpans(scopeSpans)
.addInstrumentationLibrarySpans(ilSpans)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, that instrumentation library was migrated to instrumentation scope. Is this removal required to address the changed data model?

Copy link
Author

@TomasLongo TomasLongo Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. InstrumentaionLibrarySpans have been removed.

Basically the hierarchy has been changed. In 0.16.x InstrumentationLibrarySpans carried the span information. Now instrumentation information lives next to the span information inside the ScopeSpan object

.build();

return ExportTraceServiceRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import io.micrometer.core.instrument.Timer;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.InstrumentationLibraryLogs;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -61,9 +62,8 @@
public class OTelLogsGrpcServiceTest {
private static final ExportLogsServiceRequest LOGS_REQUEST = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(ResourceLogs.newBuilder()
.addInstrumentationLibraryLogs(InstrumentationLibraryLogs.newBuilder()
.addLogRecords(LogRecord.newBuilder())
.build())).build();
.addScopeLogs(ScopeLogs.newBuilder().addLogRecords(LogRecord.newBuilder()) .build()))
.build();

private static PluginSetting pluginSetting;
private final int bufferWriteTimeoutInMillis = 100000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,21 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;
import org.apache.commons.codec.binary.Hex;
import org.opensearch.dataprepper.model.metric.Bucket;
import org.opensearch.dataprepper.model.metric.DefaultBucket;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.metric.DefaultQuantile;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.Quantile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -40,14 +31,7 @@ public final class OTelMetricsProtoHelper {

private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsProtoHelper.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String SERVICE_NAME = "service.name";
private static final String METRIC_ATTRIBUTES = "metric.attributes";
static final String RESOURCE_ATTRIBUTES = "resource.attributes";
static final String EXEMPLAR_ATTRIBUTES = "exemplar.attributes";
static final String INSTRUMENTATION_LIBRARY_NAME = "instrumentationLibrary.name";
static final String INSTRUMENTATION_LIBRARY_VERSION = "instrumentationLibrary.version";
static final String INSTRUMENTATION_SCOPE_NAME = "instrumentationScope.name";
static final String INSTRUMENTATION_SCOPE_VERSION = "instrumentationScope.version";

/**
* To make it ES friendly we will replace '.' in keys with '@' in all the Keys in {@link io.opentelemetry.proto.common.v1.KeyValue}
Expand All @@ -60,8 +44,6 @@ public final class OTelMetricsProtoHelper {
* Span and Resource attributes are essential for kibana so they should not be nested. SO we will prefix them with "metric.attributes"
* and "resource.attributes" and "exemplar.attributes".
*/
public static final Function<String, String> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> METRIC_ATTRIBUTES + DOT + i.replace(DOT, AT);
public static final Function<String, String> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> RESOURCE_ATTRIBUTES + DOT + i.replace(DOT, AT);
public static final Function<String, String> PREFIX_AND_EXEMPLAR_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> EXEMPLAR_ATTRIBUTES + DOT + i.replace(DOT, AT);

private OTelMetricsProtoHelper() {
Expand Down Expand Up @@ -111,31 +93,6 @@ public static Object convertAnyValue(final AnyValue value) {
}
}

/**
* Converts the keys of all attributes in the {@link NumberDataPoint}.
* Also, casts the underlying data into its actual type
*
* @param numberDataPoint The point to process
* @return A Map containing all attributes of `numberDataPoint` with keys converted into an OS-friendly format
*/
public static Map<String, Object> convertKeysOfDataPointAttributes(final NumberDataPoint numberDataPoint) {
return numberDataPoint.getAttributesList().stream()
.collect(Collectors.toMap(i -> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Unpacks the List of {@link KeyValue} object into a Map.
* <p>
* Converts the keys into an os friendly format and casts the underlying data into its actual type?
*
* @param attributesList The list of {@link KeyValue} objects to process
* @return A Map containing unpacked {@link KeyValue} data
*/
public static Map<String, Object> unpackKeyValueList(List<KeyValue> attributesList) {
return attributesList.stream()
.collect(Collectors.toMap(i -> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Unpacks the List of {@link KeyValue} object into a Map.
* <p>
Expand Down Expand Up @@ -186,77 +143,11 @@ public static Double getExemplarValueAsDouble(final io.opentelemetry.proto.metri
}
}

public static Map<String, Object> getResourceAttributes(final Resource resource) {
return resource.getAttributesList().stream()
.collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Extracts the name and version of the used instrumentation library used
*
* @return A map, containing information about the instrumentation library
*/
public static Map<String, Object> getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) {
final Map<String, Object> instrumentationAttr = new HashMap<>();
if (!instrumentationLibrary.getName().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName());
}
if (!instrumentationLibrary.getVersion().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion());
}
return instrumentationAttr;
}

/**
* Extracts the name and version of the used instrumentation scope used
*
* @return A map, containing information about the instrumentation scope
*/
public static Map<String, Object> getInstrumentationScopeAttributes(final InstrumentationScope instrumentationScope) {
final Map<String, Object> instrumentationScopeAttr = new HashMap<>();
if (!instrumentationScope.getName().isEmpty()) {
instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_NAME, instrumentationScope.getName());
}
if (!instrumentationScope.getVersion().isEmpty()) {
instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationScope.getVersion());
}
return instrumentationScopeAttr;
}


public static String convertUnixNanosToISO8601(final long unixNano) {
return Instant.ofEpochSecond(0L, unixNano).toString();
}

public static String getStartTimeISO8601(final NumberDataPoint numberDataPoint) {
return convertUnixNanosToISO8601(numberDataPoint.getStartTimeUnixNano());
}

public static String getTimeISO8601(final NumberDataPoint ndp) {
return convertUnixNanosToISO8601(ndp.getTimeUnixNano());
}

public static Optional<String> getServiceName(final Resource resource) {
return resource.getAttributesList().stream()
.filter(keyValue -> keyValue.getKey().equals(SERVICE_NAME) && !keyValue.getValue().getStringValue().isEmpty())
.findFirst()
.map(i -> i.getValue().getStringValue());
}


public static Map<String, Object> mergeAllAttributes(final Collection<Map<String, Object>> attributes) {
return attributes.stream()
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}


public static List<Quantile> getQuantileValues(List<SummaryDataPoint.ValueAtQuantile> quantileValues) {
return quantileValues.stream()
.map(q -> new DefaultQuantile(q.getQuantile(), q.getValue()))
.collect(Collectors.toList());
}

/**
* Create the buckets, see <a href="https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto">
* the OTel metrics proto spec</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -139,7 +139,7 @@ private ExportMetricsServiceRequest fillServiceRequest(ExponentialHistogram hist
.setName("name")
.setDescription("description")
.build();
InstrumentationLibraryMetrics instLib = InstrumentationLibraryMetrics.newBuilder()
ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder()
.addMetrics(metric).build();

Resource resource = Resource.newBuilder()
Expand All @@ -149,7 +149,7 @@ private ExportMetricsServiceRequest fillServiceRequest(ExponentialHistogram hist
).build();
ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.setResource(resource)
.addInstrumentationLibraryMetrics(instLib)
.addScopeMetrics(scopeMetrics)
.build();
return ExportMetricsServiceRequest.newBuilder().addResourceMetrics(resourceMetrics).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import com.google.protobuf.ByteString;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Exemplar;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
Expand Down Expand Up @@ -71,60 +69,6 @@ void init() {
rawProcessor = new OTelMetricsRawProcessor(testsettings, new OtelMetricsRawProcessorConfig());
}

@Test
void testInstrumentationLibrary() throws JsonProcessingException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this test still be required for backward compatibility?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KarstenSchnitter the spec changed happened about 4 years ago. I do not think we need provide backward compatibility. @dlvenable what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if any customers are using older protocol. In fact, the behavior is broken for new agents for a long time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is broken anyway, I am fine with dropping backward compatibility.

NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build();

io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder()
.setGauge(gauge)
.setUnit("seconds")
.setName("name")
.setDescription("description");

InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder()
.addMetrics(metric)
.setInstrumentationLibrary(InstrumentationLibrary.newBuilder()
.setName("ilname")
.setVersion("ilversion")
.build())
.build();

Resource resource = Resource.newBuilder()
.addAttributes(KeyValue.newBuilder()
.setKey("service.name")
.setValue(AnyValue.newBuilder().setStringValue("service").build())
).build();

ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.addInstrumentationLibraryMetrics(isntLib)
.setResource(resource)
.build();

ExportMetricsServiceRequest exportMetricRequest = ExportMetricsServiceRequest.newBuilder()
.addResourceMetrics(resourceMetrics).build();

Record<ExportMetricsServiceRequest> record = new Record<>(exportMetricRequest);

Collection<Record<? extends Metric>> records = rawProcessor.doExecute(Collections.singletonList(record));
List<Record<? extends Metric>> list = new ArrayList<>(records);

Record<? extends Metric> dataPrepperResult = list.get(0);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(dataPrepperResult.getData().toJsonString(), Map.class);
assertThat(map).contains(entry("kind", Metric.KIND.GAUGE.toString()));
assertThat(map).contains(entry("unit", "seconds"));
assertThat(map).contains(entry("serviceName", "service"));
assertThat(map).contains(entry("resource.attributes.service@name", "service"));
assertThat(map).contains(entry("description", "description"));
assertThat(map).contains(entry("value", 4.0D));
assertThat(map).contains(entry("startTime", "1970-01-01T00:00:00Z"));
assertThat(map).contains(entry("time", "1970-01-01T00:00:00Z"));
assertThat(map).contains(entry("instrumentationLibrary.name", "ilname"));
assertThat(map).contains(entry("instrumentationLibrary.version", "ilversion"));

}

@Test
void testScopeMetricsLibrary() throws JsonProcessingException {
NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -119,7 +119,7 @@ private ExportMetricsServiceRequest fillServiceRequest(Histogram histogram) {
.setName("name")
.setDescription("description")
.build();
InstrumentationLibraryMetrics instLib = InstrumentationLibraryMetrics.newBuilder()
ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder()
.addMetrics(metric).build();

Resource resource = Resource.newBuilder()
Expand All @@ -129,7 +129,7 @@ private ExportMetricsServiceRequest fillServiceRequest(Histogram histogram) {
).build();
ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.setResource(resource)
.addInstrumentationLibraryMetrics(instLib)
.addScopeMetrics(scopeMetrics)
.build();
return ExportMetricsServiceRequest.newBuilder().addResourceMetrics(resourceMetrics).build();
}
Expand Down
Loading
Loading