Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discover nested columns when using nested column indexer for schemaless ingestion #13672

Merged
merged 15 commits into from
Jan 18, 2023
Merged
14 changes: 12 additions & 2 deletions core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class InputRowSchema
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final ColumnsFilter columnsFilter;

private final boolean discoverNestedColumns;

/**
* Set of metric names for further downstream processing by {@link InputSource}.
* Empty set if no metric given.
Expand All @@ -47,20 +50,22 @@ public InputRowSchema(
final ColumnsFilter columnsFilter
)
{
this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of());
this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of(), false);
}

public InputRowSchema(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter,
final Set<String> metricNames
final Set<String> metricNames,
boolean discoverNestedColumns
)
{
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
this.columnsFilter = columnsFilter;
this.metricNames = metricNames == null ? ImmutableSet.of() : metricNames;
this.discoverNestedColumns = discoverNestedColumns;
}

@NotNull
Expand Down Expand Up @@ -92,4 +97,9 @@ public ColumnsFilter getColumnsFilter()
{
return columnsFilter;
}

public boolean shouldDiscoverNestedColumns()
{
return discoverNestedColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public class JsonLineReader extends TextReader
)
{
super(inputRowSchema, source);
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(keepNullColumns, inputRowSchema.shouldDiscoverNestedColumns())
);
this.mapper = mapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(keepNullColumns, inputRowSchema.shouldDiscoverNestedColumns())
);
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public class JsonReader extends IntermediateRowParsingReader<String>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(keepNullColumns, inputRowSchema.shouldDiscoverNestedColumns())
);
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
private final boolean keepNullValues;

private final boolean discoverNestedFields;

public JSONFlattenerMaker(boolean keepNullValues)

public JSONFlattenerMaker(boolean keepNullValues, boolean discoverNestedFields)
{
this.keepNullValues = keepNullValues;
this.discoverNestedFields = discoverNestedFields;
}

@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
if (discoverNestedFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you drop a one line comment here? I am assuming its like this since each top-level field is a field of its own if we are allowing nested columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want similar comments for all the other FlattenerMaker implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went ahead and added comments to all

return obj::fieldNames;
}
return FluentIterable.from(obj::fields)
.filter(
entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class JSONPathParser implements Parser<String, Object>
public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean keepNullColumns)
{
this.mapper = mapper == null ? new ObjectMapper() : mapper;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(keepNullColumns, false)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public interface FlattenerMaker<T>
{
JsonProvider getJsonProvider();
/**
* List all "root" primitive properties and primitive lists (no nested objects, no lists of objects)
* List all "root" fields, optionally filtering to include only fields that contain primitive and lists of primitive values
*/
Iterable<String> discoverRootFields(T obj);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -37,7 +38,8 @@ public class JSONFlattenerMakerTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true);
private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false);
private static final JSONFlattenerMaker FLATTENER_MAKER_NESTED = new JSONFlattenerMaker(true, true);

@Test
public void testStrings() throws JsonProcessingException
Expand Down Expand Up @@ -169,4 +171,32 @@ public void testNested() throws JsonProcessingException
result = FLATTENER_MAKER.finalizeConversionForMap(node);
Assert.assertEquals(expectedList, result);
}

@Test
public void testDiscovery() throws JsonProcessingException
{
Map<String, Object> theMap =
ImmutableMap.<String, Object>builder()
.put("bool", true)
.put("int", 1)
.put("long", 1L)
.put("float", 0.11f)
.put("double", 0.33)
.put("binary", new byte[]{0x01, 0x02, 0x03})
.put("list", ImmutableList.of("foo", "bar", "baz"))
.put("anotherList", ImmutableList.of(1, 2, 3))
.put("nested", ImmutableMap.of("x", 1L, "y", 2L, "z", 3L))
.build();

JsonNode node = OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(theMap));
Assert.assertTrue(node.isObject());
Assert.assertEquals(
ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList"),
ImmutableSet.copyOf(FLATTENER_MAKER.discoverRootFields(node))
);
Assert.assertEquals(
ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList", "nested"),
ImmutableSet.copyOf(FLATTENER_MAKER_NESTED.discoverRootFields(node))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ObjectFlattenersTest
{
private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}";

private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true);
private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false);
private static final ObjectFlattener FLATTENER = ObjectFlatteners.create(
new JSONPathSpec(
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,22 @@ private static boolean isFieldPrimitive(Schema.Field field)
private final boolean fromPigAvroStorage;
private final boolean binaryAsString;

private final boolean discoverNestedFields;

/**
* @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
* @param binaryAsString boolean to encode the byte[] as a string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs need an update.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*/
public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString, final boolean extractUnionsByType)
public AvroFlattenerMaker(
final boolean fromPigAvroStorage,
final boolean binaryAsString,
final boolean extractUnionsByType,
final boolean discoverNestedFields
)
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
this.discoverNestedFields = discoverNestedFields;

this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
this.jsonPathConfiguration =
Expand All @@ -113,6 +121,9 @@ public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binary
@Override
public Set<String> discoverRootFields(final GenericRecord obj)
{
if (discoverNestedFields) {
return obj.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toSet());
}
return obj.getSchema()
.getFields()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.readerSchema = readerSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
new AvroFlattenerMaker(
false,
binaryAsString,
extractUnionsByType,
inputRowSchema.shouldDiscoverNestedColumns()
)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public static ObjectFlattener<GenericRecord> makeFlattener(
flattenSpec = JSONPathSpec.DEFAULT;
}

return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType));
return ObjectFlatteners.create(
flattenSpec,
new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType, false)
);
}

public static List<InputRow> parseGenericRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
this.inputRowSchema = inputRowSchema;
this.source = source;
this.avroBytesDecoder = avroBytesDecoder;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
new AvroFlattenerMaker(
false,
binaryAsString,
extractUnionsByType,
inputRowSchema.shouldDiscoverNestedColumns()
)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.data.input.avro;

import com.google.common.collect.ImmutableSet;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
Expand All @@ -40,9 +41,9 @@
public class AvroFlattenerMakerTest
{
private static final AvroFlattenerMaker FLATTENER_WITHOUT_EXTRACT_UNION_BY_TYPE =
new AvroFlattenerMaker(false, false, false);
new AvroFlattenerMaker(false, false, false, false);
private static final AvroFlattenerMaker FLATTENER_WITH_EXTRACT_UNION_BY_TYPE =
new AvroFlattenerMaker(false, false, true);
new AvroFlattenerMaker(false, false, true, false);

private static final SomeAvroDatum RECORD = AvroStreamInputRowParserTest.buildSomeAvroDatum();

Expand Down Expand Up @@ -77,7 +78,7 @@ public void makeJsonPathExtractor_flattenerWithExtractUnionsByType()
@Test
public void jsonPathExtractorExtractUnionsByType()
{
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true);
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true, false);

// Unmamed types are accessed by type

Expand Down Expand Up @@ -156,6 +157,59 @@ public void makeJsonQueryExtractor_flattenerWithExtractUnionsByType()
);
}

@Test
public void testDiscovery()
{
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, true, false);
final AvroFlattenerMaker flattenerNested = new AvroFlattenerMaker(false, false, true, true);

SomeAvroDatum input = AvroStreamInputRowParserTest.buildSomeAvroDatum();

Assert.assertEquals(
ImmutableSet.of(
"someOtherId",
"someStringArray",
"someIntArray",
"someFloat",
"eventType",
"someFixed",
"someBytes",
"someUnion",
"id",
"someEnum",
"someLong",
"someInt",
"timestamp"
),
ImmutableSet.copyOf(flattener.discoverRootFields(input))
);
Assert.assertEquals(
ImmutableSet.of(
"someStringValueMap",
"someOtherId",
"someStringArray",
"someIntArray",
"someFloat",
"isValid",
"someIntValueMap",
"eventType",
"someFixed",
"someBytes",
"someRecord",
"someMultiMemberUnion",
"someNull",
"someRecordArray",
"someUnion",
"id",
"someEnum",
"someLong",
"someInt",
"timestamp"
),
ImmutableSet.copyOf(flattenerNested.discoverRootFields(input))
);
}

private void getRootField_common(final SomeAvroDatum record, final AvroFlattenerMaker flattener)
{
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ public boolean isSplittable()
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
InputRowSchema newInputRowSchema = new InputRowSchema(dummyTimestampSpec, inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
InputRowSchema newInputRowSchema = new InputRowSchema(
dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
inputRowSchema.getMetricNames(),
inputRowSchema.shouldDiscoverNestedColumns()
);
return new KafkaInputReader(
inputRowSchema,
settableByteEntitySource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public OrcHadoopInputRowParser(
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(this.binaryAsString));
this.orcStructFlattener = ObjectFlatteners.create(
flattenSpec,
new OrcStructFlattenerMaker(this.binaryAsString, false)
);
this.parser = new MapInputRowParser(parseSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(binaryAsString));
this.orcStructFlattener = ObjectFlatteners.create(
flattenSpec,
new OrcStructFlattenerMaker(binaryAsString, inputRowSchema.shouldDiscoverNestedColumns())
);
}

@Override
Expand Down
Loading