Skip to content

Commit

Permalink
introduce a "tree" type to the flattenSpec (#12177)
Browse files Browse the repository at this point in the history
* introduce a "tree" type to the flattenSpec

* feedback - rename exprs to nodes, use CollectionsUtils.isNullOrEmpty for guard

* feedback - expand docs to more clearly capture limitations of "tree" flattenSpec

* feedback - fix for typo on docs

* introduce a comment to explain defensive copy, tweak null handling

* fix: part of rebase

* mark ObjectFlatteners.FlattenerMaker as an ExtensionPoint and provide default for new tree type

* fix: objectflattener restore previous behavior to call getRootField for root type

* docs: ingestion/data-formats add note that ORC only supports path expressions

* chore: linter remove unused import

* fix: use correct newer form for empty DimensionsSpec in FlattenJSONBenchmark
  • Loading branch information
jasonk000 authored Nov 1, 2022
1 parent 675fd98 commit 0d03ce4
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class FlattenJSONBenchmark
Parser flatParser;
Parser nestedParser;
Parser jqParser;
Parser treeJqParser;
Parser treeTreeParser;
Parser fieldDiscoveryParser;
Parser forcedPathParser;
int flatCounter = 0;
Expand All @@ -82,6 +84,8 @@ public void prepare() throws Exception
flatParser = gen.getFlatParser();
nestedParser = gen.getNestedParser();
jqParser = gen.getJqParser();
treeJqParser = gen.getTreeJqParser();
treeTreeParser = gen.getTreeTreeParser();
fieldDiscoveryParser = gen.getFieldDiscoveryParser();
forcedPathParser = gen.getForcedPathParser();
}
Expand Down Expand Up @@ -112,6 +116,32 @@ public Map<String, Object> flatten(final Blackhole blackhole)
return parsed;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> treejqflatten(final Blackhole blackhole)
{
Map<String, Object> parsed = treeJqParser.parseToMap(nestedInputs.get(jqCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
jqCounter = (jqCounter + 1) % NUM_EVENTS;
return parsed;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> treetreeflatten(final Blackhole blackhole)
{
Map<String, Object> parsed = treeTreeParser.parseToMap(nestedInputs.get(jqCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
jqCounter = (jqCounter + 1) % NUM_EVENTS;
return parsed;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.java.util.common.parsers.Parser;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

Expand Down Expand Up @@ -209,6 +210,69 @@ public Parser getJqParser()
return spec.makeParser();
}

public Parser getTreeJqParser()
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
fields.add(JSONPathFieldSpec.createRootField("ts"));

fields.add(JSONPathFieldSpec.createRootField("d1"));
fields.add(JSONPathFieldSpec.createJqField("e1.d1", ".e1.d1"));
fields.add(JSONPathFieldSpec.createJqField("e1.d2", ".e1.d2"));
fields.add(JSONPathFieldSpec.createJqField("e2.d3", ".e2.d3"));
fields.add(JSONPathFieldSpec.createJqField("e2.d4", ".e2.d4"));
fields.add(JSONPathFieldSpec.createJqField("e2.d5", ".e2.d5"));
fields.add(JSONPathFieldSpec.createJqField("e2.d6", ".e2.d6"));

fields.add(JSONPathFieldSpec.createRootField("m3"));
fields.add(JSONPathFieldSpec.createJqField("e3.m1", ".e3.m1"));
fields.add(JSONPathFieldSpec.createJqField("e3.m2", ".e3.m2"));
fields.add(JSONPathFieldSpec.createJqField("e3.m3", ".e3.m3"));
fields.add(JSONPathFieldSpec.createJqField("e3.m4", ".e3.m4"));

JSONPathSpec flattenSpec = new JSONPathSpec(false, fields);
JSONParseSpec spec = new JSONParseSpec(
new TimestampSpec("ts", "iso", null),
DimensionsSpec.EMPTY,
flattenSpec,
null,
null
);

return spec.makeParser();
}


public Parser getTreeTreeParser()
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
fields.add(JSONPathFieldSpec.createRootField("ts"));

fields.add(JSONPathFieldSpec.createRootField("d1"));
fields.add(JSONPathFieldSpec.createTreeField("e1.d1", Arrays.asList("e1", "d1")));
fields.add(JSONPathFieldSpec.createTreeField("e1.d2", Arrays.asList("e1", "d2")));
fields.add(JSONPathFieldSpec.createTreeField("e2.d3", Arrays.asList("e2", "d3")));
fields.add(JSONPathFieldSpec.createTreeField("e2.d4", Arrays.asList("e2", "d4")));
fields.add(JSONPathFieldSpec.createTreeField("e2.d5", Arrays.asList("e2", "d5")));
fields.add(JSONPathFieldSpec.createTreeField("e2.d6", Arrays.asList("e2", "d6")));

fields.add(JSONPathFieldSpec.createRootField("m3"));
fields.add(JSONPathFieldSpec.createTreeField("e3.m1", Arrays.asList("e3", "m1")));
fields.add(JSONPathFieldSpec.createTreeField("e3.m2", Arrays.asList("e3", "m2")));
fields.add(JSONPathFieldSpec.createTreeField("e3.m3", Arrays.asList("e3", "m3")));
fields.add(JSONPathFieldSpec.createTreeField("e3.m4", Arrays.asList("e3", "m4")));

JSONPathSpec flattenSpec = new JSONPathSpec(false, fields);
JSONParseSpec spec = new JSONParseSpec(
new TimestampSpec("ts", "iso", null),
DimensionsSpec.EMPTY,
flattenSpec,
null,
null
);

return spec.makeParser();
}

public String generateFlatEvent() throws Exception
{
String nestedEvent = generateNestedEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ public Function<JsonNode, Object> makeJsonQueryExtractor(final String expr)
}
}

@Override
public Function<JsonNode, Object> makeJsonTreeExtractor(final List<String> nodes)
{
// create a defensive copy
final String[] keyNames = nodes.toArray(new String[0]);

return jsonNode -> {
JsonNode targetNode = jsonNode;
for (String keyName : keyNames) {
if (targetNode == null) {
return null;
}
targetNode = targetNode.get(keyName);
}
return finalizeConversionForMap(targetNode);
};
}

@Override
public JsonProvider getJsonProvider()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,59 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.utils.CollectionUtils;

import java.util.List;
import java.util.Objects;

public class JSONPathFieldSpec
{
private final JSONPathFieldType type;
private final String name;
private final String expr;
private final List<String> nodes;

@JsonCreator
public JSONPathFieldSpec(
@JsonProperty("type") JSONPathFieldType type,
@JsonProperty("name") String name,
@JsonProperty("expr") String expr
@JsonProperty("expr") String expr,
@JsonProperty("nodes") List<String> nodes
)
{
this.type = type;
this.name = Preconditions.checkNotNull(name, "Missing 'name' in field spec");

// If expr is null and type is root, use the name as the expr too.
if (expr == null && type == JSONPathFieldType.ROOT) {
this.expr = name;
} else {
this.expr = Preconditions.checkNotNull(expr, "Missing 'expr' for field[%s]", name);
// Validate required fields are present
switch (type) {
case ROOT:
this.expr = (expr == null) ? name : expr;
this.nodes = null;
break;

case TREE:
this.expr = null;
Preconditions.checkArgument(
!CollectionUtils.isNullOrEmpty(nodes),
"Missing 'nodes' for field[%s], was [%s]", name, nodes);
this.nodes = nodes;
break;

default:
this.expr = Preconditions.checkNotNull(expr, "Missing 'expr' for field[%s]", name);
this.nodes = null;
}
}

public JSONPathFieldSpec(
JSONPathFieldType type,
String name,
String expr
)
{
this(type, name, expr, null);
}

@JsonProperty
public JSONPathFieldType getType()
{
Expand All @@ -67,6 +93,12 @@ public String getExpr()
return expr;
}

@JsonProperty
public List<String> getNodes()
{
return nodes;
}

@JsonCreator
public static JSONPathFieldSpec fromString(String name)
{
Expand All @@ -88,6 +120,11 @@ public static JSONPathFieldSpec createRootField(String name)
return new JSONPathFieldSpec(JSONPathFieldType.ROOT, name, null);
}

public static JSONPathFieldSpec createTreeField(String name, List<String> nodes)
{
return new JSONPathFieldSpec(JSONPathFieldType.TREE, name, null, nodes);
}

@Override
public boolean equals(final Object o)
{
Expand All @@ -100,13 +137,14 @@ public boolean equals(final Object o)
final JSONPathFieldSpec that = (JSONPathFieldSpec) o;
return type == that.type &&
Objects.equals(name, that.name) &&
Objects.equals(expr, that.expr);
Objects.equals(expr, that.expr) &&
Objects.equals(nodes, that.nodes);
}

@Override
public int hashCode()
{
return Objects.hash(type, name, expr);
return Objects.hash(type, name, expr, nodes);
}

@Override
Expand All @@ -116,6 +154,7 @@ public String toString()
"type=" + type +
", name='" + name + '\'' +
", expr='" + expr + '\'' +
", nodes='" + nodes + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum JSONPathFieldType
{
ROOT,
PATH,
JQ;
JQ,
TREE;

@JsonValue
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Iterables;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;

Expand Down Expand Up @@ -64,6 +65,9 @@ public static <T> ObjectFlattener<T> create(
case JQ:
extractor = flattenerMaker.makeJsonQueryExtractor(fieldSpec.getExpr());
break;
case TREE:
extractor = flattenerMaker.makeJsonTreeExtractor(fieldSpec.getNodes());
break;
default:
throw new UOE("Unsupported field type[%s]", fieldSpec.getType());
}
Expand Down Expand Up @@ -208,6 +212,7 @@ public Map<String, Object> toMap(T obj)
};
}

@ExtensionPoint
public interface FlattenerMaker<T>
{
JsonProvider getJsonProvider();
Expand All @@ -231,6 +236,14 @@ public interface FlattenerMaker<T>
*/
Function<T, Object> makeJsonQueryExtractor(String expr);

/**
* Create a "field" extractor for nested json expressions
*/
default Function<T, Object> makeJsonTreeExtractor(List<String> nodes)
{
throw new UOE("makeJsonTreeExtractor has not been implemented.");
}

/**
* Convert object to Java {@link Map} using {@link #getJsonProvider()} and {@link #finalizeConversionForMap} to
* extract and convert data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;

public class JsonInputFormatTest
{
Expand All @@ -48,7 +49,9 @@ public void testSerde() throws IOException
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg", null, Arrays.asList("o", "mg")),
new JSONPathFieldSpec(JSONPathFieldType.TREE, "tree_omg2", null, Arrays.asList("o", "mg2"))
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
Expand Down
Loading

0 comments on commit 0d03ce4

Please sign in to comment.