Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OBSDATA-483: Adapt OpenCensus and OpenTelemetry extensions to the introduction of SettableByteEntity #113

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
5b84d27
OBSDATA-483: Adapt opencensus extension to the introduction of Settab…
kkonstantine Oct 11, 2022
29c58c0
OBSDATA-483: Adapt opentelemetry extension to the introduction of Set…
kkonstantine Oct 13, 2022
ea8ad43
OBSDATA-483: Decide which reader to instantiate on read between openc…
kkonstantine Oct 13, 2022
f4f2afa
OBSDATA-483: Add logger config in opencensus tests
kkonstantine Oct 13, 2022
0689681
OBSDATA-483: Fix issue with opening the byte entity
kkonstantine Oct 26, 2022
83fdd00
OBSDATA-483: Instantiate the right iterator in every read request
kkonstantine Oct 27, 2022
42ab806
OBSDATA-483: Add comments
kkonstantine Nov 1, 2022
163b878
OBSDATA-483: Address Xavier's comments
kkonstantine Dec 12, 2022
efa5794
OBSDATA-483: Remove unused member fields
kkonstantine Dec 15, 2022
ecd10e3
OBSDATA-483: Rename enum
kkonstantine Dec 16, 2022
714405c
OBSDATA-483: Fix trace log to actually print the argument
kkonstantine Jan 3, 2023
1e9f30e
OBSDATA-483: Keep passing the underlying byte buffer and move its pos…
kkonstantine Jan 3, 2023
262f759
OBSDATA-483: Fix checkstyle issues
kkonstantine Jan 3, 2023
f5534f0
OBSDATA-483: Add back handling of InvalidProtocolBufferException
kkonstantine Jan 4, 2023
29a63d0
OBSDATA-483: Extend the semaphore workflow execution time to 2 hours
kkonstantine Jan 4, 2023
79c22bc
Revert "OBSDATA-483: Extend the semaphore workflow execution time to …
kkonstantine Jan 4, 2023
ebc5909
OBSDATA-483: Don't close iterator in sample
kkonstantine Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions extensions-contrib/opencensus-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.opencensus.protobuf;

import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

public class HybridProtobufReader implements InputEntityReader
{
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final DimensionsSpec dimensionsSpec;
private final SettableByteEntity<? extends ByteEntity> source;
private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

private volatile MethodHandle getHeaderMethod = null;

enum ProtobufReader
{
OPENCENSUS,
OPENTELEMETRY
}

public HybridProtobufReader(
DimensionsSpec dimensionsSpec,
SettableByteEntity<? extends ByteEntity> source,
String metricDimension,
String valueDimension,
String metricLabelPrefix,
String resourceLabelPrefix
)
{
this.dimensionsSpec = dimensionsSpec;
this.source = source;
this.metricDimension = metricDimension;
this.valueDimension = valueDimension;
this.metricLabelPrefix = metricLabelPrefix;
this.resourceLabelPrefix = resourceLabelPrefix;
}

@Override
public CloseableIterator<InputRow> read() throws IOException
{
return newReader(whichReader()).read();
}

public InputEntityReader newReader(ProtobufReader which)
{
switch (which) {
case OPENTELEMETRY:
return new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
case OPENCENSUS:
default:
return new OpenCensusProtobufReader(
dimensionsSpec,
source,
metricDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}

public ProtobufReader whichReader()
{
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getEntity().getClass().getClassLoader(),
VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source.getEntity());
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return ProtobufReader.OPENTELEMETRY;
}
}
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}
return ProtobufReader.OPENCENSUS;
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,25 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;

public class OpenCensusProtobufInputFormat implements InputFormat
{
private static final String DEFAULT_METRIC_DIMENSION = "name";
private static final String DEFAULT_RESOURCE_PREFIX = "resource.";
private static final String DEFAULT_VALUE_DIMENSION = "value";
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
xvrl marked this conversation as resolved.
Show resolved Hide resolved
private final String resourceLabelPrefix;

private volatile MethodHandle getHeaderMethod = null;

public OpenCensusProtobufInputFormat(
@JsonProperty("metricDimension") String metricDimension,
@JsonProperty("valueDimension") @Nullable String valueDimension,
Expand All @@ -73,41 +65,21 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getClass().getClassLoader(),
OpenCensusProtobufInputFormat.VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source);
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return new OpenTelemetryMetricsProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}
// Sampler passes a KafkaRecordEntity directly, while the normal code path wraps the same entity in a
// SettableByteEntity
xvrl marked this conversation as resolved.
Show resolved Hide resolved
SettableByteEntity<? extends ByteEntity> settableEntity;
if (source instanceof SettableByteEntity) {
settableEntity = (SettableByteEntity<? extends ByteEntity>) source;
} else {
SettableByteEntity<ByteEntity> wrapper = new SettableByteEntity<>();
xvrl marked this conversation as resolved.
Show resolved Hide resolved
wrapper.setEntity((ByteEntity) source);
settableEntity = wrapper;
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}


return new OpenCensusProtobufReader(
return new HybridProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
settableEntity,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;

Expand Down Expand Up @@ -103,9 +104,11 @@ public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
@Override
public List<InputRow> parseBatch(ByteBuffer input)
{
SettableByteEntity<ByteEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(new ByteEntity(input));
return new OpenCensusProtobufReader(
parseSpec.getDimensionsSpec(),
new ByteEntity(input),
settableByteEntity,
metricDimension,
metricLabelPrefix,
resourceLabelPrefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -55,14 +57,14 @@ public class OpenCensusProtobufReader implements InputEntityReader
private static final String VALUE_COLUMN = "value";

private final DimensionsSpec dimensionsSpec;
private final ByteEntity source;
private final SettableByteEntity<? extends ByteEntity> source;
private final String metricDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

public OpenCensusProtobufReader(
DimensionsSpec dimensionsSpec,
ByteEntity source,
SettableByteEntity<? extends ByteEntity> source,
String metricDimension,
String metricLabelPrefix,
String resourceLabelPrefix
Expand Down Expand Up @@ -101,7 +103,12 @@ public InputRow next()
List<InputRow> readAsList()
{
try {
return parseMetric(Metric.parseFrom(source.getBuffer()));
ByteBuffer buffer = source.getEntity().getBuffer();
List<InputRow> rows = parseMetric(Metric.parseFrom(buffer));
// Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf
// parser does not update the position itself
buffer.position(buffer.limit());
return rows;
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
Expand Down
Loading