From 1bc702e89793199d4f370797ce651272252e6a26 Mon Sep 17 00:00:00 2001 From: Harini Rajendran Date: Wed, 2 Nov 2022 11:51:56 -0500 Subject: [PATCH] OBSDATA-440 Adding SegmentMetadataEvent and publishing them via KafkaSegmentMetadataEmitter (#117) --- .../extensions-contrib/kafka-emitter.md | 23 ++-- .../ambari/metrics/AmbariMetricsEmitter.java | 3 + .../emitter/dropwizard/DropwizardEmitter.java | 3 + .../emitter/graphite/GraphiteEmitter.java | 3 + extensions-contrib/kafka-emitter/pom.xml | 1 - .../druid/emitter/kafka/KafkaEmitter.java | 83 ++++++++----- .../emitter/kafka/KafkaEmitterConfig.java | 106 +++++++++++++++-- .../emitter/kafka/KafkaEmitterConfigTest.java | 41 +++++-- .../druid/emitter/kafka/KafkaEmitterTest.java | 41 +++++-- .../SegmentTransactionalInsertAction.java | 19 +++ .../emitter/service/SegmentMetadataEvent.java | 112 ++++++++++++++++++ .../service/SegmentMetadataEventTest.java | 54 +++++++++ 12 files changed, 424 insertions(+), 65 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java create mode 100644 processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index 85b8f10a7e10a..b06cd903c472e 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -36,20 +36,25 @@ to monitor the status of your Druid cluster with this extension. All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`. -|property|description|required?|default| -|--------|-----------|---------|-------| -|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| -|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| -|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| -|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none| -|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| -|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| +| Property | Description | Required | Default | +|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------| +| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none | +| `druid.emitter.kafka.event.types` | Comma-separated event types.
Supported types are `alerts`, `metrics`, `requests`, and `segmentMetadata`. | no | `["metrics", "alerts"]` | +| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none | +| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.producer.config` | JSON formatted configuration to set additional properties to Kafka producer. | no | none | +| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none | ### Example ``` druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092 -druid.emitter.kafka.metric.topic=druid-metric +druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"] druid.emitter.kafka.alert.topic=druid-alert +druid.emitter.kafka.request.topic=druid-request-logs +druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata druid.emitter.kafka.producer.config={"max.block.ms":10000} ``` + diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java index 905b6cffc0136..11dea07585db3 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java +++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -137,6 +138,8 @@ public void emit(Event event) for (Emitter emitter : emitterList) { emitter.emit(event); } + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { throw new ISE("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java index 5baa1b5da2458..e22c373f89fde 100644 --- a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java +++ b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.util.LinkedHashMap; @@ -127,6 +128,8 @@ public void emit(Event event) for (Emitter emitter : alertEmitters) { emitter.emit(event); } + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { throw new ISE("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java index b3739ab9d15f8..10bfe1e869fca 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java +++ b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; @@ -139,6 +140,8 @@ public void emit(Event event) "The following alert is dropped, description is [%s], severity is [%s]", alertEvent.getDescription(), alertEvent.getSeverity() ); + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { log.error("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 7159dbb2ac98e..b14eba2ea7d17 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -86,7 +86,6 @@ slf4j-api provided - junit junit diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 129a374b5849a..051e5f10d4f68 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -19,17 +19,17 @@ package org.apache.druid.emitter.kafka; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType; import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; import org.apache.kafka.clients.producer.Callback; @@ -37,9 +37,11 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -55,14 +57,16 @@ public class KafkaEmitter implements Emitter private final AtomicLong metricLost; private final AtomicLong alertLost; private final AtomicLong requestLost; + private final AtomicLong segmentMetadataLost; private final AtomicLong invalidLost; private final KafkaEmitterConfig config; - private final Producer producer; + private final Producer producer; private final ObjectMapper jsonMapper; - private final MemoryBoundLinkedBlockingQueue metricQueue; - private final MemoryBoundLinkedBlockingQueue alertQueue; - private final MemoryBoundLinkedBlockingQueue requestQueue; + private final MemoryBoundLinkedBlockingQueue metricQueue; + private final MemoryBoundLinkedBlockingQueue alertQueue; + private final MemoryBoundLinkedBlockingQueue requestQueue; + private final MemoryBoundLinkedBlockingQueue segmentMetadataQueue; private final ScheduledExecutorService scheduler; protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS; @@ -78,10 +82,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.scheduler = Executors.newScheduledThreadPool(4); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); this.requestLost = new AtomicLong(0L); + this.segmentMetadataLost = new AtomicLong(0L); this.invalidLost = new AtomicLong(0L); } @@ -96,7 +102,7 @@ private Callback setProducerCallback(AtomicLong lostCouter) } @VisibleForTesting - protected Producer setKafkaProducer() + protected Producer setKafkaProducer() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { @@ -105,7 +111,7 @@ protected Producer setKafkaProducer() Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); @@ -119,18 +125,26 @@ protected Producer setKafkaProducer() @Override public void start() { - scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS); - scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS); - if (config.getRequestTopic() != null) { + Set eventTypes = config.getEventTypes(); + if (eventTypes.contains(EventType.METRICS)) { + scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS); + } + if (eventTypes.contains(EventType.ALERTS)) { + scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS); + } + if (eventTypes.contains(EventType.REQUESTS)) { scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS); } + if (eventTypes.contains(EventType.SEGMENTMETADATA)) { + scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS); + } scheduler.scheduleWithFixedDelay(() -> { - log.info( - "Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d] segmentMetadataLost=[%d]", metricLost.get(), alertLost.get(), requestLost.get(), - invalidLost.get() + invalidLost.get(), + segmentMetadataLost.get() ); }, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); @@ -151,9 +165,14 @@ private void sendRequestToKafka() sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost)); } - private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) + private void sendSegmentMetadataToKafka() + { + sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost)); + } + + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) { - ObjectContainer objectToSend; + ObjectContainer objectToSend; try { while (true) { objectToSend = recordQueue.take(); @@ -173,34 +192,39 @@ public void emit(final Event event) EventMap map = event.toMap(); if (config.getClusterName() != null) { map = map.asBuilder() - .put("clusterName", config.getClusterName()) - .build(); + .put("clusterName", config.getClusterName()) + .build(); } - String resultJson = jsonMapper.writeValueAsString(map); - - ObjectContainer objectContainer = new ObjectContainer<>( - resultJson, - StringUtils.toUtf8(resultJson).length + byte[] resultBytes = jsonMapper.writeValueAsBytes(map); + ObjectContainer objectContainer = new ObjectContainer<>( + resultBytes, + resultBytes.length ); + Set eventTypes = config.getEventTypes(); if (event instanceof ServiceMetricEvent) { - if (!metricQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) { metricLost.incrementAndGet(); } } else if (event instanceof AlertEvent) { - if (!alertQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } } else if (event instanceof RequestLogEvent) { - if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) { requestLost.incrementAndGet(); } + } else if (event instanceof SegmentMetadataEvent) { + if (!eventTypes.contains(EventType.SEGMENTMETADATA) || !segmentMetadataQueue.offer(objectContainer)) { + segmentMetadataLost.incrementAndGet(); + } } else { invalidLost.incrementAndGet(); } } - catch (JsonProcessingException e) { + catch (Exception e) { invalidLost.incrementAndGet(); + log.warn(e, "Exception while serializing event"); } } } @@ -238,4 +262,9 @@ public long getInvalidLostCount() { return invalidLost.get(); } + + public long getSegmentMetadataLostCount() + { + return segmentMetadataLost.get(); + } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index ed7b9ea0e9d1d..e437afff1173c 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -21,53 +21,115 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class KafkaEmitterConfig { + public enum EventType + { + METRICS, + ALERTS, + REQUESTS, + SEGMENTMETADATA { + @Override + public String toString() + { + return "segmentMetadata"; + } + }; + + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } + @JsonCreator + public static EventType fromString(String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + } + + public static final Set DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS); @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) private final String bootstrapServers; - @JsonProperty("metric.topic") + @Nullable @JsonProperty("event.types") + private final Set eventTypes; + @Nullable @JsonProperty("metric.topic") private final String metricTopic; - @JsonProperty("alert.topic") + @Nullable @JsonProperty("alert.topic") private final String alertTopic; @Nullable @JsonProperty("request.topic") private final String requestTopic; + @Nullable @JsonProperty("segmentMetadata.topic") + private final String segmentMetadataTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") - private Map kafkaProducerConfig; + private final Map kafkaProducerConfig; @JsonCreator public KafkaEmitterConfig( @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("metric.topic") String metricTopic, - @JsonProperty("alert.topic") String alertTopic, + @Nullable @JsonProperty("event.types") Set eventTypes, + @Nullable @JsonProperty("metric.topic") String metricTopic, + @Nullable @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, + @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic, @JsonProperty("clusterName") String clusterName, @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); - this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); - this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); - this.requestTopic = requestTopic; + this.eventTypes = validateEventTypes(eventTypes, requestTopic); + + this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "metric.topic can not be null") : null; + this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "alert.topic can not be null") : null; + this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "request.topic can not be null") : null; + this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENTMETADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "segmentMetadata.topic can not be null") : null; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; } + private Set validateEventTypes(Set eventTypes, String requestTopic) + { + // Unless explicitly overridden, kafka emitter will always emit metrics and alerts + if (eventTypes == null) { + Set defaultEventTypes = new HashSet<>(DEFAULT_EVENT_TYPES); + // To maintain backwards compatibility, if eventTypes is not set, then requests are sent out or not + // based on the `request.topic` config + if (requestTopic != null) { + defaultEventTypes.add(EventType.REQUESTS); + } + return defaultEventTypes; + } + return eventTypes; + } + @JsonProperty public String getBootstrapServers() { return bootstrapServers; } + @JsonProperty + public Set getEventTypes() + { + return eventTypes; + } + @JsonProperty public String getMetricTopic() { @@ -92,6 +154,12 @@ public String getRequestTopic() return requestTopic; } + @Nullable + public String getSegmentMetadataTopic() + { + return segmentMetadataTopic; + } + @JsonProperty public Map getKafkaProducerConfig() { @@ -113,10 +181,16 @@ public boolean equals(Object o) if (!getBootstrapServers().equals(that.getBootstrapServers())) { return false; } - if (!getMetricTopic().equals(that.getMetricTopic())) { + + if (getEventTypes() != null ? !getEventTypes().equals(that.getEventTypes()) : that.getEventTypes() != null) { return false; } - if (!getAlertTopic().equals(that.getAlertTopic())) { + + if (getMetricTopic() != null ? !getMetricTopic().equals(that.getMetricTopic()) : that.getMetricTopic() != null) { + return false; + } + + if (getAlertTopic() != null ? !getAlertTopic().equals(that.getAlertTopic()) : that.getAlertTopic() != null) { return false; } @@ -124,6 +198,10 @@ public boolean equals(Object o) return false; } + if (getSegmentMetadataTopic() != null ? !getSegmentMetadataTopic().equals(that.getSegmentMetadataTopic()) : that.getSegmentMetadataTopic() != null) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } @@ -134,9 +212,11 @@ public boolean equals(Object o) public int hashCode() { int result = getBootstrapServers().hashCode(); - result = 31 * result + getMetricTopic().hashCode(); - result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getEventTypes() != null ? getEventTypes().hashCode() : 0); + result = 31 * result + (getMetricTopic() != null ? getMetricTopic().hashCode() : 0); + result = 31 * result + (getAlertTopic() != null ? getAlertTopic().hashCode() : 0); result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); + result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); return result; @@ -147,9 +227,11 @@ public String toString() { return "KafkaEmitterConfig{" + "bootstrap.servers='" + bootstrapServers + '\'' + + ", event.types='" + eventTypes.toString() + '\'' + ", metric.topic='" + metricTopic + '\'' + ", alert.topic='" + alertTopic + '\'' + ", request.topic='" + requestTopic + '\'' + + ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' + ", clusterName='" + clusterName + '\'' + ", Producer.config=" + kafkaProducerConfig + '}'; diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 55ecdbaeb8a9b..56c6ddb452f07 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -19,15 +19,18 @@ package org.apache.druid.emitter.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import java.io.IOException; +import java.util.HashSet; +import java.util.Set; public class KafkaEmitterConfigTest { @@ -42,8 +45,8 @@ public void setUp() @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", "requestTest", + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", + "alertTest", "requestTest", "metadataTest", "clusterNameTest", ImmutableMap.builder() .put("testKey", "testValue").build() ); @@ -56,8 +59,24 @@ public void testSerDeserKafkaEmitterConfig() throws IOException @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", null, + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", + "alertTest", null, "metadataTest", + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build() + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + + @Test + public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException + { + Set eventTypeSet = new HashSet(); + eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENTMETADATA); + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null, + null, null, "metadataTest", "clusterNameTest", ImmutableMap.builder() .put("testKey", "testValue").build() ); @@ -70,8 +89,8 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException @Test public void testSerDeNotRequiredKafkaProducerConfig() { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", - "alertTest", null, + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest", + "alertTest", null, "metadataTest", "clusterNameTest", null ); try { @@ -83,6 +102,14 @@ public void testSerDeNotRequiredKafkaProducerConfig() } } + @Test + public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException + { + Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENTMETADATA, mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class)); + Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)); + Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class)); + } + @Test public void testJacksonModules() { diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 422d18a7f153c..66532601c5e9c 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; @@ -37,7 +38,10 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import static org.mockito.ArgumentMatchers.any; @@ -47,15 +51,18 @@ @RunWith(Parameterized.class) public class KafkaEmitterTest { - @Parameterized.Parameter + @Parameterized.Parameter(0) + public Set eventsType; + + @Parameterized.Parameter(1) public String requestTopic; - @Parameterized.Parameters(name = "{index}: requestTopic - {0}") + @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - {1}") public static Object[] data() { - return new Object[] { - "requests", - null + return new Object[][] { + {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENTMETADATA)), "requests"}, + {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENTMETADATA)), null} }; } @@ -77,19 +84,31 @@ public void testKafkaEmitter() throws InterruptedException ).build("service", "host") ); - int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size(); + final List segmentMetadataEvents = ImmutableList.of( + new SegmentMetadataEvent( + "dummy_datasource", + DateTimes.of("2001-01-01T00:00:00.000Z"), + DateTimes.of("2001-01-02T00:00:00.000Z"), + DateTimes.of("2001-01-03T00:00:00.000Z"), + "dummy_version", + true + ) + ); + + int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size() + segmentMetadataEvents.size(); int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size(); final CountDownLatch countDownSentEvents = new CountDownLatch( requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); - final KafkaProducer producer = mock(KafkaProducer.class); + + final KafkaProducer producer = mock(KafkaProducer.class); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), + new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null), new ObjectMapper() ) { @Override - protected Producer setKafkaProducer() + protected Producer setKafkaProducer() { // override send interval to 1 second sendInterval = 1; @@ -113,10 +132,14 @@ protected Producer setKafkaProducer() for (Event event : requestLogEvents) { kafkaEmitter.emit(event); } + for (Event event : segmentMetadataEvents) { + kafkaEmitter.emit(event); + } countDownSentEvents.await(); Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); + Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount()); Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount()); Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 233739eb7b778..fded752eb9f67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -33,10 +33,13 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -257,11 +260,27 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) segment.getShardSpec() == null ? null : segment.getShardSpec().getType() ); toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize())); + // Emit the segment related metadata using the configured emitters + this.emitSegmentMetadata(segment, toolbox); } return retVal; } + private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox) + { + SegmentMetadataEvent event = new SegmentMetadataEvent( + segment.getDataSource(), + DateTime.now(DateTimeZone.UTC), + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getVersion(), + segment.getLastCompactionState() != null + ); + + toolbox.getEmitter().emit(event); + } + private void checkWithSegmentLock() { final Map> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten); diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java new file mode 100644 index 0000000000000..01577c47f6b39 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.joda.time.DateTime; + +public class SegmentMetadataEvent implements Event +{ + public static final String FEED = "feed"; + public static final String DATASOURCE = "dataSource"; + public static final String CREATED_TIME = "createdTime"; + public static final String START_TIME = "startTime"; + public static final String END_TIME = "endTime"; + public static final String VERSION = "version"; + public static final String IS_COMPACTED = "isCompacted"; + + private final DateTime createdTime; + private final String dataSource; + private final DateTime startTime; + private final DateTime endTime; + private final String version; + private final boolean isCompacted; + + public SegmentMetadataEvent( + String dataSource, + DateTime createdTime, + DateTime startTime, + DateTime endTime, + String version, + boolean isCompacted + ) + { + this.dataSource = dataSource; + this.createdTime = createdTime; + this.startTime = startTime; + this.endTime = endTime; + this.version = version; + this.isCompacted = isCompacted; + } + + @Override + public String getFeed() + { + return "segment_metadata"; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public DateTime getStartTime() + { + return startTime; + } + + public DateTime getEndTime() + { + return endTime; + } + + public String getDataSource() + { + return dataSource; + } + + public String getVersion() + { + return version; + } + + public boolean isCompacted() + { + return isCompacted; + } + + @Override + @JsonValue + public EventMap toMap() + { + + return EventMap.builder() + .put(FEED, getFeed()) + .put(DATASOURCE, dataSource) + .put(CREATED_TIME, createdTime) + .put(START_TIME, startTime) + .put(END_TIME, endTime) + .put(VERSION, version) + .put(IS_COMPACTED, isCompacted) + .build(); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java new file mode 100644 index 0000000000000..83a4fcba7dc5e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentMetadataEventTest +{ + @Test + public void testBasicEvent() + { + SegmentMetadataEvent event = new SegmentMetadataEvent( + "dummy_datasource", + DateTimes.of("2001-01-01T00:00:00.000Z"), + DateTimes.of("2001-01-02T00:00:00.000Z"), + DateTimes.of("2001-01-03T00:00:00.000Z"), + "dummy_version", + true + ); + + Assert.assertEquals( + ImmutableMap.builder() + .put(SegmentMetadataEvent.FEED, "segment_metadata") + .put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource") + .put(SegmentMetadataEvent.CREATED_TIME, DateTimes.of("2001-01-01T00:00:00.000Z")) + .put(SegmentMetadataEvent.START_TIME, DateTimes.of("2001-01-02T00:00:00.000Z")) + .put(SegmentMetadataEvent.END_TIME, DateTimes.of("2001-01-03T00:00:00.000Z")) + .put(SegmentMetadataEvent.VERSION, "dummy_version") + .put(SegmentMetadataEvent.IS_COMPACTED, true) + .build(), + event.toMap() + ); + } +}