Skip to content

Commit

Permalink
confluent-extensions with custom transform specs (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvrl authored and m-ghazanfar committed Jun 6, 2023
1 parent c2534d6 commit 7426bcd
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 0 deletions.
4 changes: 4 additions & 0 deletions codestyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@
<suppress checks="ImportOrder" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="EmptyLineSeparator" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="AvoidStaticImport" files="[\\/]src[\\/](test)[\\/]"/>

<!-- Confluent Extensions -->
<suppress checks="Header" files="[\\/]io[\\/]confluent[\\/]" />
<suppress checks="PackageName" files="[\\/]io[\\/]confluent[\\/]" />
</suppressions>
2 changes: 2 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@
<argument>org.apache.druid.extensions.contrib:opentelemetry-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-opencensus-extensions</argument>
<argument>-c</argument>
<argument>io.confluent.druid.extensions:confluent-extensions</argument>
</arguments>
</configuration>
</execution>
Expand Down
44 changes: 44 additions & 0 deletions extensions-contrib/confluent-extensions/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Copyright 2020 Confluent Inc.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.confluent.druid.extensions</groupId>
<artifactId>confluent-extensions</artifactId>
<name>confluent-extensions</name>
<description>confluent-extensions</description>

<parent>
<artifactId>druid</artifactId>
<groupId>org.apache.druid</groupId>
<version>0.17.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Confluent Inc.
*/

package io.confluent.druid;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.confluent.druid.transform.ExtractTenantTopicTransform;
import io.confluent.druid.transform.ExtractTenantTransform;
import org.apache.druid.initialization.DruidModule;

import java.util.Collections;
import java.util.List;

public class ConfluentExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.singletonList(
new SimpleModule("ConfluentTransformsModule")
.registerSubtypes(
new NamedType(ExtractTenantTransform.class, "extractTenant"),
new NamedType(ExtractTenantTopicTransform.class, "extractTenantTopic")
)
);
}

@Override
public void configure(Binder binder)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Confluent Inc.
*/

package io.confluent.druid.transform;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.transform.RowFunction;
import org.apache.druid.segment.transform.Transform;

public class ExtractTenantTopicTransform implements Transform
{
private final String fieldName;
private final String name;

public ExtractTenantTopicTransform(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName
)
{
this.fieldName = fieldName;
this.name = name;
}

@Override
public String getName()
{
return name;
}

@Override
public RowFunction getRowFunction()
{
return row -> {
Object existing = row.getRaw(name);
// do not overwrite existing values if present
if (existing != null) {
return existing;
}

Object value = row.getRaw(fieldName);
return value == null ? null : TenantUtils.extractTenantTopic(value.toString());
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Confluent Inc.
*/

package io.confluent.druid.transform;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.transform.RowFunction;
import org.apache.druid.segment.transform.Transform;

public class ExtractTenantTransform implements Transform
{
private final String fieldName;
private final String name;

public ExtractTenantTransform(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName
)
{
this.fieldName = fieldName;
this.name = name;
}

@Override
public String getName()
{
return name;
}

@Override
public RowFunction getRowFunction()
{
return row -> {
Object existing = row.getRaw(name);
// do not overwrite existing values if present
if (existing != null) {
return existing;
}

Object value = row.getRaw(fieldName);
return value == null ? null : TenantUtils.extractTenant(value.toString());
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Confluent Inc.
*/

package io.confluent.druid.transform;

import javax.annotation.Nullable;

public class TenantUtils
{
private static final char DELIMITER = '_';

@Nullable
public static String extractTenant(String prefixedTopic)
{
int i = prefixedTopic.indexOf(DELIMITER);
return i < 0 ? null : prefixedTopic.substring(0, i);
}

@Nullable
public static String extractTenantTopic(String prefixedTopic)
{
int i = prefixedTopic.indexOf(DELIMITER);
return i < 0 ? null : prefixedTopic.substring(i + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright 2020 Confluent Inc.

io.confluent.druid.ConfluentExtensionsModule
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2020 Confluent Inc.
*/

package io.confluent.druid.transform;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

public class ExtractTransformTest
{

private static final MapInputRowParser PARSER = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("t", "auto", DateTimes.of("2020-01-01")),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("topic", "tenant")),
null,
null
)
)
);

private static final Map<String, Object> ROW1 = ImmutableMap.<String, Object>builder()
.put("topic", "lkc-abc123_mytopic")
.build();

private static final Map<String, Object> ROW2 = ImmutableMap.<String, Object>builder()
.put("tenant", "lkc-xyz789")
.put("tenant_topic", "topic0")
.put("topic", "lkc-abc123_mytopic")
.build();

private static final Map<String, Object> ROW3 = ImmutableMap.<String, Object>builder()
.put("topic", "invalid-topic")
.build();

private static final Map<String, Object> ROW4 = ImmutableMap.<String, Object>builder()
.build();


@Test
public void testExtraction()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExtractTenantTransform("tenant", "topic"),
new ExtractTenantTopicTransform("tenant_topic", "topic")
)
);

final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW1).get(0);

Assert.assertNotNull(row);
Assert.assertEquals(ImmutableList.of("topic", "tenant"), row.getDimensions());
Assert.assertEquals(ImmutableList.of("lkc-abc123"), row.getDimension("tenant"));
Assert.assertEquals(ImmutableList.of("mytopic"), row.getDimension("tenant_topic"));
}

@Test
public void testPreserveExistingFields()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExtractTenantTransform("tenant", "topic"),
new ExtractTenantTopicTransform("tenant_topic", "topic")
)
);

final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW2).get(0);

Assert.assertNotNull(row);
Assert.assertEquals(ImmutableList.of("topic", "tenant"), row.getDimensions());
Assert.assertEquals(ImmutableList.of("lkc-xyz789"), row.getDimension("tenant"));
Assert.assertEquals(ImmutableList.of("topic0"), row.getDimension("tenant_topic"));
}

@Test
public void testInvalidTopics()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExtractTenantTransform("tenant", "topic"),
new ExtractTenantTopicTransform("tenant_topic", "topic")
)
);

final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW3).get(0);

Assert.assertNotNull(row);
Assert.assertEquals(ImmutableList.of("topic", "tenant"), row.getDimensions());
Assert.assertNull(row.getRaw("tenant"));
Assert.assertNull(row.getRaw("tenant_topic"));
}

@Test
public void testNullTopic()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExtractTenantTransform("tenant", "topic"),
new ExtractTenantTopicTransform("tenant_topic", "topic")
)
);

final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW4).get(0);

Assert.assertNotNull(row);
Assert.assertEquals(ImmutableList.of("topic", "tenant"), row.getDimensions());
Assert.assertNull(row.getRaw("tenant"));
Assert.assertNull(row.getRaw("tenant_topic"));
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
<module>extensions-contrib/opentelemetry-emitter</module>
<module>extensions-contrib/kubernetes-overlord-extensions</module>
<module>extensions-contrib/opencensus-extensions</module>
<module>extensions-contrib/confluent-extensions</module>
<!-- distribution packaging -->
<module>distribution</module>
<!-- Revised integration tests -->
Expand Down

0 comments on commit 7426bcd

Please sign in to comment.