Skip to content

Commit

Permalink
OBSDATA-440 Adding SegmentMetadataEvent and publishing them via Kafka…
Browse files Browse the repository at this point in the history
…SegmentMetadataEmitter (#117)
  • Loading branch information
harinirajendran committed May 16, 2023
1 parent bbbb031 commit 45d9d1f
Show file tree
Hide file tree
Showing 14 changed files with 572 additions and 65 deletions.
26 changes: 17 additions & 9 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,28 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
| property | description | required? | default |
|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Choices: alerts, metrics, requests, segmentMetadata | no | ["metrics", "alerts"] |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alert. If `event.types` contains `alerts`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic.format` | Format in which segment related metadata will be emitted. <br/>Choices: json, protobuf.<br/> If set to `protobuf`, then segment metadata is emitted in `DruidSegmentEvent.proto` format | no | json |
| `druid.emitter.kafka.producer.config` | JSON formatted configuration which user want to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.segmentMetadata.topic.format=protobuf
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
Whenever `druid.emitter.kafka.segmentMetadata.topic.format` field is updated, it is recommended to also update `druid.emitter.kafka.segmentMetadata.topic` to avoid the same topic from getting polluted with different formats of segment metadata.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
Expand Down Expand Up @@ -137,6 +138,8 @@ public void emit(Event event)
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.LinkedHashMap;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void emit(Event event)
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;

Expand Down Expand Up @@ -139,6 +140,8 @@ public void emit(Event event)
"The following alert is dropped, description is [%s], severity is [%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
Expand Down
46 changes: 45 additions & 1 deletion extensions-contrib/kafka-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -111,5 +115,45 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>detect</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,32 @@

package org.apache.druid.emitter.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.emitter.proto.DruidSegmentEvent;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,14 +60,16 @@ public class KafkaEmitter implements Emitter
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
private final AtomicLong segmentMetadataLost;
private final AtomicLong invalidLost;

private final KafkaEmitterConfig config;
private final Producer<String, String> producer;
private final Producer<String, byte[]> producer;
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> metricQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> alertQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> requestQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> segmentMetadataQueue;
private final ScheduledExecutorService scheduler;

protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
Expand All @@ -78,10 +85,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
this.segmentMetadataLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}

Expand All @@ -96,7 +105,7 @@ private Callback setProducerCallback(AtomicLong lostCouter)
}

@VisibleForTesting
protected Producer<String, String> setKafkaProducer()
protected Producer<String, byte[]> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Expand All @@ -105,7 +114,7 @@ protected Producer<String, String> setKafkaProducer()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
props.putAll(config.getKafkaProducerConfig());

Expand All @@ -119,18 +128,26 @@ protected Producer<String, String> setKafkaProducer()
@Override
public void start()
{
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
if (config.getRequestTopic() != null) {
Set<EventType> eventTypes = config.getEventTypes();
if (eventTypes.contains(EventType.METRICS)) {
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.ALERTS)) {
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.SEGMENTMETADATA)) {
scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info(
"Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d] segmentMetadataLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
invalidLost.get()
invalidLost.get(),
segmentMetadataLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
Expand All @@ -151,9 +168,14 @@ private void sendRequestToKafka()
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
private void sendSegmentMetadataToKafka()
{
sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<byte[]> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
ObjectContainer<byte[]> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
Expand All @@ -173,38 +195,87 @@ public void emit(final Event event)
EventMap map = event.toMap();
if (config.getClusterName() != null) {
map = map.asBuilder()
.put("clusterName", config.getClusterName())
.build();
.put("clusterName", config.getClusterName())
.build();
}

String resultJson = jsonMapper.writeValueAsString(map);

ObjectContainer<String> objectContainer = new ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
byte[] resultBytes = jsonMapper.writeValueAsBytes(map);
ObjectContainer<byte[]> objectContainer = new ObjectContainer<>(
resultBytes,
resultBytes.length
);
Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
if (!metricQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
if (!alertQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
} else if (event instanceof SegmentMetadataEvent) {
if (!eventTypes.contains(EventType.SEGMENTMETADATA)) {
segmentMetadataLost.incrementAndGet();
} else {
switch (config.getSegmentMetadataTopicFormat()) {
case PROTOBUF:
resultBytes = convertMetadataEventToProto((SegmentMetadataEvent) event, segmentMetadataLost);
objectContainer = new ObjectContainer<>(
resultBytes,
resultBytes.length
);
break;
case JSON:
// Do Nothing. We already have the JSON object stored in objectContainer
break;
default:
throw new UnsupportedOperationException("segmentMetadata.topic.format has an invalid value " + config.getSegmentMetadataTopicFormat().toString());
}
if (!segmentMetadataQueue.offer(objectContainer)) {
segmentMetadataLost.incrementAndGet();
}
}
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
catch (Exception e) {
invalidLost.incrementAndGet();
log.warn(e, "Exception while serializing event");
}
}
}

private byte[] convertMetadataEventToProto(SegmentMetadataEvent event, AtomicLong segmentMetadataLost)
{
try {
Timestamp createdTimeTs = Timestamps.fromMillis(event.getCreatedTime().getMillis());
Timestamp startTimeTs = Timestamps.fromMillis(event.getStartTime().getMillis());
Timestamp endTimeTs = Timestamps.fromMillis(event.getEndTime().getMillis());

DruidSegmentEvent.Builder druidSegmentEventBuilder = DruidSegmentEvent.newBuilder()
.setDataSource(event.getDataSource())
.setCreatedTime(createdTimeTs)
.setStartTime(startTimeTs)
.setEndTime(endTimeTs)
.setVersion(event.getVersion())
.setIsCompacted(event.isCompacted());
if (config.getClusterName() != null) {
druidSegmentEventBuilder.setClusterName(config.getClusterName());
}
DruidSegmentEvent druidSegmentEvent = druidSegmentEventBuilder.build();
return druidSegmentEvent.toByteArray();
}
catch (Exception e) {
log.warn(e, "Exception while serializing SegmentMetadataEvent");
throw e;
}
}

@Override
public void flush()
{
Expand Down Expand Up @@ -238,4 +309,9 @@ public long getInvalidLostCount()
{
return invalidLost.get();
}

public long getSegmentMetadataLostCount()
{
return segmentMetadataLost.get();
}
}
Loading

0 comments on commit 45d9d1f

Please sign in to comment.