Skip to content

Commit

Permalink
OBSDATA-440 Adding SegmentMetadataEvent and publishing them via Kafka…
Browse files Browse the repository at this point in the history
…SegmentMetadataEmitter (#117)
  • Loading branch information
harinirajendran committed May 22, 2023
1 parent 1d1454b commit 1bc702e
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 65 deletions.
23 changes: 14 additions & 9 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,25 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
| Property | Description | Required | Default |
|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Supported types are `alerts`, `metrics`, `requests`, and `segmentMetadata`. | no | `["metrics", "alerts"]` |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.producer.config` | JSON formatted configuration to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
Expand Down Expand Up @@ -137,6 +138,8 @@ public void emit(Event event)
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.LinkedHashMap;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void emit(Event event)
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;

Expand Down Expand Up @@ -139,6 +140,8 @@ public void emit(Event event)
"The following alert is dropped, description is [%s], severity is [%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
Expand Down
1 change: 0 additions & 1 deletion extensions-contrib/kafka-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,29 @@

package org.apache.druid.emitter.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,14 +57,16 @@ public class KafkaEmitter implements Emitter
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
private final AtomicLong segmentMetadataLost;
private final AtomicLong invalidLost;

private final KafkaEmitterConfig config;
private final Producer<String, String> producer;
private final Producer<String, byte[]> producer;
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> metricQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> alertQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> requestQueue;
private final MemoryBoundLinkedBlockingQueue<byte[]> segmentMetadataQueue;
private final ScheduledExecutorService scheduler;

protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
Expand All @@ -78,10 +82,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
this.segmentMetadataLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}

Expand All @@ -96,7 +102,7 @@ private Callback setProducerCallback(AtomicLong lostCouter)
}

@VisibleForTesting
protected Producer<String, String> setKafkaProducer()
protected Producer<String, byte[]> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Expand All @@ -105,7 +111,7 @@ protected Producer<String, String> setKafkaProducer()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
props.putAll(config.getKafkaProducerConfig());

Expand All @@ -119,18 +125,26 @@ protected Producer<String, String> setKafkaProducer()
@Override
public void start()
{
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
if (config.getRequestTopic() != null) {
Set<EventType> eventTypes = config.getEventTypes();
if (eventTypes.contains(EventType.METRICS)) {
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.ALERTS)) {
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.SEGMENTMETADATA)) {
scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info(
"Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d] segmentMetadataLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
invalidLost.get()
invalidLost.get(),
segmentMetadataLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
Expand All @@ -151,9 +165,14 @@ private void sendRequestToKafka()
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
private void sendSegmentMetadataToKafka()
{
sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<byte[]> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
ObjectContainer<byte[]> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
Expand All @@ -173,34 +192,39 @@ public void emit(final Event event)
EventMap map = event.toMap();
if (config.getClusterName() != null) {
map = map.asBuilder()
.put("clusterName", config.getClusterName())
.build();
.put("clusterName", config.getClusterName())
.build();
}

String resultJson = jsonMapper.writeValueAsString(map);

ObjectContainer<String> objectContainer = new ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
byte[] resultBytes = jsonMapper.writeValueAsBytes(map);
ObjectContainer<byte[]> objectContainer = new ObjectContainer<>(
resultBytes,
resultBytes.length
);
Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
if (!metricQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
if (!alertQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
} else if (event instanceof SegmentMetadataEvent) {
if (!eventTypes.contains(EventType.SEGMENTMETADATA) || !segmentMetadataQueue.offer(objectContainer)) {
segmentMetadataLost.incrementAndGet();
}
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
catch (Exception e) {
invalidLost.incrementAndGet();
log.warn(e, "Exception while serializing event");
}
}
}
Expand Down Expand Up @@ -238,4 +262,9 @@ public long getInvalidLostCount()
{
return invalidLost.get();
}

public long getSegmentMetadataLostCount()
{
return segmentMetadataLost.get();
}
}
Loading

0 comments on commit 1bc702e

Please sign in to comment.