Skip to content

Commit

Permalink
OBSDATA-483: Decide which reader to instantiate on read between openc…
Browse files Browse the repository at this point in the history
…ensus and opentelemetry
  • Loading branch information
kkonstantine committed Oct 13, 2022
1 parent 34a1a40 commit ac49c60
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,42 +74,11 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
SettableByteEntity<? extends ByteEntity> settableByteEntitySource = (SettableByteEntity<? extends ByteEntity>) source;
ByteEntity byteEntitySource = settableByteEntitySource.getEntity();
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
byteEntitySource.getClass().getClassLoader(),
OpenCensusProtobufInputFormat.VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(byteEntitySource);
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return new OpenTelemetryMetricsProtobufReader(
inputRowSchema.getDimensionsSpec(),
byteEntitySource,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}

return new OpenCensusProtobufReader(
inputRowSchema.getDimensionsSpec(),
byteEntitySource,
(SettableByteEntity<? extends ByteEntity>) source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
@Override
public List<InputRow> parseBatch(ByteBuffer input)
{
SettableByteEntity<ByteEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(new ByteEntity(input));
return new OpenCensusProtobufReader(
parseSpec.getDimensionsSpec(),
new ByteEntity(input),
settableByteEntity,
metricDimension,
null,
metricLabelPrefix,
resourceLabelPrefix
).readAsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -54,24 +60,31 @@ public class OpenCensusProtobufReader implements InputEntityReader
{
private static final String SEPARATOR = "-";
private static final String VALUE_COLUMN = "value";
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final DimensionsSpec dimensionsSpec;
private final ByteEntity source;
private final SettableByteEntity<? extends ByteEntity> source;
private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

Supplier<InputEntityReader> readerSupplier = Suppliers.memoize(this::newReader);

public OpenCensusProtobufReader(
DimensionsSpec dimensionsSpec,
ByteEntity source,
SettableByteEntity<? extends ByteEntity> source,
String metricDimension,
String valueDimension,
String metricLabelPrefix,
String resourceLabelPrefix
)
{
this.dimensionsSpec = dimensionsSpec;
this.source = source;
this.metricDimension = metricDimension;
this.valueDimension = valueDimension;
this.metricLabelPrefix = metricLabelPrefix;
this.resourceLabelPrefix = resourceLabelPrefix;
}
Expand All @@ -82,27 +95,75 @@ private interface LabelContext
}

@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read() throws IOException
{
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator());
return CloseableIterators.withEmptyBaggage(new Iterator<InputRow>() {
@Override
public boolean hasNext()
{
return supplier.get().hasNext();
}
@Override
public InputRow next()
{
return supplier.get().next();
return readerSupplier.get().read();
}

class OpenCensusProtobufReaderInternal implements InputEntityReader
{
@Override
public CloseableIterator<InputRow> read()
{
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator());
return CloseableIterators.withEmptyBaggage(new Iterator<InputRow>() {
@Override
public boolean hasNext()
{
return supplier.get().hasNext();
}
@Override
public InputRow next()
{
return supplier.get().next();
}
});
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return OpenCensusProtobufReader.this.sample();
}
}

public InputEntityReader newReader() {
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader

MethodHandle getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getEntity().getClass().getClassLoader(),
VERSION_HEADER_KEY
);

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source.getEntity());
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}
});
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}

return new OpenCensusProtobufReaderInternal();
}

List<InputRow> readAsList()
{
try {
return parseMetric(Metric.parseFrom(source.getBuffer()));
return parseMetric(Metric.parseFrom(source.getEntity().getBuffer()));
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
Expand Down Expand Up @@ -218,7 +279,7 @@ private void addPointRows(Point point, Metric metric, LabelContext labelContext)
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample()
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
Expand Down
Loading

0 comments on commit ac49c60

Please sign in to comment.