Skip to content

Commit

Permalink
Added getters for configs, labels for distribution metric. (#15)
Browse files Browse the repository at this point in the history
* Added getters for configs, labels for distribution metric.

* Addressed review comments

* Removed extra brackets in JsonProperty.
  • Loading branch information
apoorvmittal10 authored and harinirajendran committed Apr 27, 2021
1 parent 65c6a90 commit 0ab2bcc
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -85,6 +86,24 @@ public ParseSpec getParseSpec()
return parseSpec;
}

@JsonProperty
public String getMetricDimension()
{
return metricDimension;
}

@JsonProperty
public String getMetricLabelPrefix()
{
return metricLabelPrefix;
}

@JsonProperty
public String getResourceLabelPrefix()
{
return resourceLabelPrefix;
}

@Override
public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
{
Expand Down Expand Up @@ -113,7 +132,7 @@ public List<InputRow> parseBatch(ByteBuffer input)
.collect(Collectors.toList());

// Process resource labels map.
Map<String, Object> resourceLabelsMap = metric.getResource().getLabelsMap().entrySet().stream()
Map<String, String> resourceLabelsMap = metric.getResource().getLabelsMap().entrySet().stream()
.collect(Collectors.toMap(entry -> this.resourceLabelPrefix + entry.getKey(),
Map.Entry::getValue));

Expand Down Expand Up @@ -190,12 +209,14 @@ public List<InputRow> parseBatch(ByteBuffer input)
case DISTRIBUTION_VALUE:
// count
Map<String, Object> distCount = new HashMap<>();
distCount.putAll(labels);
distCount.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
distCount.put(VALUE, point.getDistributionValue().getCount());
addDerivedMetricsRow(distCount, dimensions, rows);

// sum
Map<String, Object> distSum = new HashMap<>();
distSum.putAll(labels);
distSum.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
distSum.put(VALUE, point.getDistributionValue().getSum());
addDerivedMetricsRow(distSum, dimensions, rows);
Expand All @@ -218,4 +239,26 @@ private void addDerivedMetricsRow(Map<String, Object> derivedMetrics, List<Strin
));
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof OpenCensusProtobufInputRowParser)) {
return false;
}
final OpenCensusProtobufInputRowParser that = (OpenCensusProtobufInputRowParser) o;
return Objects.equals(parseSpec, that.parseSpec) &&
Objects.equals(metricDimension, that.metricDimension) &&
Objects.equals(metricLabelPrefix, that.metricLabelPrefix) &&
Objects.equals(resourceLabelPrefix, that.resourceLabelPrefix);
}

@Override
public int hashCode()
{
return Objects.hash(parseSpec, metricDimension, metricLabelPrefix, resourceLabelPrefix);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@

package org.apache.druid.data.input.opencensus.protobuf;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.opencensus.proto.metrics.v1.DistributionValue;
import io.opencensus.proto.metrics.v1.LabelKey;
import io.opencensus.proto.metrics.v1.LabelValue;
import io.opencensus.proto.metrics.v1.Metric;
import io.opencensus.proto.metrics.v1.MetricDescriptor;
import io.opencensus.proto.metrics.v1.MetricDescriptor.Type;
import io.opencensus.proto.metrics.v1.Point;
import io.opencensus.proto.metrics.v1.SummaryValue;
import io.opencensus.proto.metrics.v1.TimeSeries;
import io.opencensus.proto.resource.v1.Resource;
import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
Expand Down Expand Up @@ -97,9 +101,8 @@ public void setUp()


@Test
public void testGaugeParse() throws Exception
public void testDoubleGaugeParse() throws Exception
{

//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

Expand All @@ -108,8 +111,6 @@ public void testGaugeParse() throws Exception
Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

System.out.println(timestamp.getSeconds() * 1000);

Metric metric = doubleGaugeMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);
Expand All @@ -124,6 +125,30 @@ public void testGaugeParse() throws Exception
Assert.assertEquals(2000, row.getMetric("value").doubleValue(), 0.0);
}

@Test
public void testIntGaugeParse() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = intGaugeMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());

assertDimensionEquals(row, "name", "metric_gauge_int64");
assertDimensionEquals(row, "foo_key", "foo_value");

Assert.assertEquals(1000, row.getMetric("value").intValue());
}

@Test
public void testSummaryParse() throws Exception
{
Expand Down Expand Up @@ -154,7 +179,38 @@ public void testSummaryParse() throws Exception
assertDimensionEquals(row, "name", "metric_summary-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(10, row.getMetric("value").doubleValue(), 0.0);
}

@Test
public void testDistributionParse() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = distributionMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_distribution-count");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(100, row.getMetric("value").intValue());

row = rows.get(1);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_distribution-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(500, row.getMetric("value").doubleValue(), 0.0);
}

@Test
Expand Down Expand Up @@ -302,6 +358,20 @@ public void testCustomPrefix() throws Exception
assertDimensionEquals(row, "custom.env_key", "env_val");
}

@Test
public void testSerde() throws Exception
{
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "metric.name", "descriptor.", "custom.");

final ObjectMapper jsonMapper = new ObjectMapper();
jsonMapper.registerModules(new OpenCensusProtobufExtensionsModule().getJacksonModules());

Assert.assertEquals(parser, jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
ByteBufferInputRowParser.class
));
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand All @@ -312,43 +382,29 @@ private void assertDimensionEquals(InputRow row, String dimension, Object expect

private Metric doubleGaugeMetric(Timestamp timestamp)
{
Metric dist = Metric.newBuilder()
.setMetricDescriptor(
MetricDescriptor.newBuilder()
.setName("metric_gauge_double")
.setDescription("metric_gauge_double_description")
.setUnit("ms")
.setType(
MetricDescriptor.Type.GAUGE_DOUBLE)
.addLabelKeys(
LabelKey.newBuilder()
.setKey("foo_key")
.build())
.build())
.setResource(
Resource.newBuilder()
.setType("env")
.putAllLabels(Collections.singletonMap("env_key", "env_val"))
.build())
.addTimeseries(
TimeSeries.newBuilder()
.setStartTimestamp(timestamp)
.addLabelValues(
LabelValue.newBuilder()
.setHasValue(true)
.setValue("foo_value")
.build())
.addPoints(
Point.newBuilder()
.setTimestamp(timestamp)
.setDoubleValue(2000)
.build())
.build())
.build();

return dist;
return getMetric(
"metric_gauge_double",
"metric_gauge_double_description",
Type.GAUGE_DOUBLE,
Point.newBuilder()
.setTimestamp(timestamp)
.setDoubleValue(2000)
.build(),
timestamp);
}

private Metric intGaugeMetric(Timestamp timestamp)
{
return getMetric(
"metric_gauge_int64",
"metric_gauge_int64_description",
MetricDescriptor.Type.GAUGE_INT64,
Point.newBuilder()
.setTimestamp(timestamp)
.setInt64Value(1000)
.build(),
timestamp);
}

private Metric summaryMetric(Timestamp timestamp)
{
Expand Down Expand Up @@ -387,15 +443,44 @@ private Metric summaryMetric(Timestamp timestamp)
.setSnapshot(snapshot)
.build();

return getMetric(
"metric_summary",
"metric_summary_description",
MetricDescriptor.Type.SUMMARY,
Point.newBuilder()
.setTimestamp(timestamp)
.setSummaryValue(summaryValue)
.build(),
timestamp);
}

private Metric distributionMetric(Timestamp timestamp)
{
DistributionValue distributionValue = DistributionValue.newBuilder()
.setCount(100)
.setSum(500)
.build();

return getMetric(
"metric_distribution",
"metric_distribution_description",
MetricDescriptor.Type.GAUGE_DISTRIBUTION,
Point.newBuilder()
.setTimestamp(timestamp)
.setDistributionValue(distributionValue)
.build(),
timestamp);
}

private Metric getMetric(String name, String description, MetricDescriptor.Type type, Point point, Timestamp timestamp)
{
Metric dist = Metric.newBuilder()
.setMetricDescriptor(
MetricDescriptor.newBuilder()
.setName("metric_summary")
.setDescription("metric_summary_description")
.setName(name)
.setDescription(description)
.setUnit("ms")
.setType(
MetricDescriptor.Type.SUMMARY)
.setType(type)
.addLabelKeys(
LabelKey.newBuilder()
.setKey("foo_key")
Expand All @@ -414,16 +499,11 @@ private Metric summaryMetric(Timestamp timestamp)
.setHasValue(true)
.setValue("foo_value")
.build())
.addPoints(
Point.newBuilder()
.setTimestamp(timestamp)
.setSummaryValue(summaryValue)
.build())
.addPoints(point)
.build())
.build();

return dist;
}


}

0 comments on commit 0ab2bcc

Please sign in to comment.