Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OBSDATA-440 Adding SegmentMetadataEvent and publishing them via KafkaSegmentMetadataEmitter #117

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.service;

import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.joda.time.DateTime;

public class SegmentMetadataEvent implements Event
{
public static final String FEED = "feed";
public static final String DATASOURCE = "dataSource";
public static final String CREATED_TIME = "createdTime";
public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime";
public static final String VERSION = "version";
public static final String IS_COMPACTED = "isCompacted";

private final DateTime createdTime;
private final String dataSource;
private final DateTime startTime;
private final DateTime endTime;
private final String version;
private final boolean isCompacted;

public SegmentMetadataEvent(
String dataSource,
DateTime createdTime,
DateTime startTime,
DateTime endTime,
String version,
boolean isCompacted
)
{
this.dataSource = dataSource;
this.createdTime = createdTime;
this.startTime = startTime;
this.endTime = endTime;
this.version = version;
this.isCompacted = isCompacted;
}

@Override
public String getFeed()
{
return "segment_metadata";
}

public DateTime getCreatedTime()
{
return createdTime;
}

public DateTime getStartTime()
{
return startTime;
}

public DateTime getEndTime()
{
return endTime;
}

public String getDataSource()
{
return dataSource;
}

public String getVersion()
{
return version;
}

public boolean isCompacted()
{
return isCompacted;
}

@Override
@JsonValue
public EventMap toMap()
{

return EventMap.builder()
.put(FEED, getFeed())
.put(DATASOURCE, dataSource)
.put(CREATED_TIME, createdTime)
.put(START_TIME, startTime)
.put(END_TIME, endTime)
.put(VERSION, version)
.put(IS_COMPACTED, isCompacted)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.service;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;

public class SegmentMetadataEventTest
{
@Test
public void testBasicEvent()
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
"dummy_datasource",
DateTimes.of("2001-01-01T00:00:00.000Z"),
DateTimes.of("2001-01-02T00:00:00.000Z"),
DateTimes.of("2001-01-03T00:00:00.000Z"),
"dummy_version",
true
);

Assert.assertEquals(
ImmutableMap.<String, Object>builder()
.put(SegmentMetadataEvent.FEED, "segment_metadata")
.put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource")
.put(SegmentMetadataEvent.CREATED_TIME, DateTimes.of("2001-01-01T00:00:00.000Z"))
.put(SegmentMetadataEvent.START_TIME, DateTimes.of("2001-01-02T00:00:00.000Z"))
.put(SegmentMetadataEvent.END_TIME, DateTimes.of("2001-01-03T00:00:00.000Z"))
.put(SegmentMetadataEvent.VERSION, "dummy_version")
.put(SegmentMetadataEvent.IS_COMPACTED, true)
.build(),
event.toMap()
);
}
}
26 changes: 17 additions & 9 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,28 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
| property | description | required? | default |
|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Choices: alerts, metrics, requests, segmentMetadata | no | ["metrics", "alerts"] |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alert. If `event.types` contains `alerts`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic.format` | Format in which segment related metadata will be emitted. <br/>Choices: json, protobuf.<br/> If set to `protobuf`, then segment metadata is emitted in `DruidSegmentEvent.proto` format | no | json |
| `druid.emitter.kafka.producer.config` | JSON formatted configuration which user want to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.segmentMetadata.topic.format=protobuf
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
Whenever `druid.emitter.kafka.segmentMetadata.topic.format` field is updated, it is recommended to also update `druid.emitter.kafka.segmentMetadata.topic` to avoid the same topic from getting polluted with different formats of segment metadata.

47 changes: 46 additions & 1 deletion extensions-contrib/kafka-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.5</version>
harinirajendran marked this conversation as resolved.
Show resolved Hide resolved
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -112,5 +117,45 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>detect</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading