Skip to content

Commit

Permalink
auto columns fixes (#14422)
Browse files Browse the repository at this point in the history
changes:
* auto columns no longer participate in generic 'null column' handling, this was a mistake to try to support and caused ingestion failures due to mismatched ColumnFormat, and will be replaced in the future with nested common format constant column functionality (not in this PR)
* fix bugs with auto columns which contain empty objects, empty arrays, or primitive types mixed with either of these empty constructs
* fix bug with bound filter when upper is null equivalent but is strict
  • Loading branch information
clintropolis authored Jun 14, 2023
1 parent be5a659 commit 8454cc6
Show file tree
Hide file tree
Showing 30 changed files with 722 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testDiscoveredTypes()
new LongDimensionSchema("long"),
new DoubleDimensionSchema("double"),
new StringDimensionSchema("bool"),
new AutoTypeColumnSchema("variant"),
new StringDimensionSchema("variant"),
new AutoTypeColumnSchema("array"),
new AutoTypeColumnSchema("nested")
),
Expand All @@ -110,7 +110,7 @@ public void testDiscoveredTypes()
.add("long", ColumnType.LONG)
.add("double", ColumnType.DOUBLE)
.add("bool", ColumnType.STRING)
.add("variant", ColumnType.NESTED_DATA)
.add("variant", ColumnType.STRING)
.add("array", ColumnType.LONG_ARRAY)
.add("nested", ColumnType.NESTED_DATA)
.build(),
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testDiscoveredTypesStrictBooleans()
new LongDimensionSchema("long"),
new DoubleDimensionSchema("double"),
new LongDimensionSchema("bool"),
new AutoTypeColumnSchema("variant"),
new StringDimensionSchema("variant"),
new AutoTypeColumnSchema("array"),
new AutoTypeColumnSchema("nested")
),
Expand All @@ -173,7 +173,7 @@ public void testDiscoveredTypesStrictBooleans()
.add("long", ColumnType.LONG)
.add("double", ColumnType.DOUBLE)
.add("bool", ColumnType.LONG)
.add("variant", ColumnType.NESTED_DATA)
.add("variant", ColumnType.STRING)
.add("array", ColumnType.LONG_ARRAY)
.add("nested", ColumnType.NESTED_DATA)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ private ColumnAnalysis analyzeComplexColumn(
.withTypeName(typeName);

try (final BaseColumn theColumn = columnHolder != null ? columnHolder.getColumn() : null) {
if (capabilities != null) {
bob.hasMultipleValues(capabilities.hasMultipleValues().isTrue())
.hasNulls(capabilities.hasNulls().isMaybeTrue());
}

if (theColumn != null && !(theColumn instanceof ComplexColumn)) {
return bob.withErrorMessage(
StringUtils.format(
Expand All @@ -358,9 +363,6 @@ private ColumnAnalysis analyzeComplexColumn(
}
final ComplexColumn complexColumn = (ComplexColumn) theColumn;

bob.hasMultipleValues(capabilities.hasMultipleValues().isTrue())
.hasNulls(capabilities.hasNulls().isMaybeTrue());

long size = 0;
if (analyzingSize() && complexColumn != null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ColumnTypeFactory;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
Expand All @@ -56,7 +57,19 @@

public class AutoTypeColumnIndexer implements DimensionIndexer<StructuredData, StructuredData, StructuredData>
{
/**
* have we seen any null values?
*/
protected volatile boolean hasNulls = false;
/**
* Have we seen any objects? Empty objects in particular are sneaky, they don't have any nested paths, so we also
* broadly track if we have processed any objects {@link StructuredDataProcessor.ProcessResults#hasObjects()}
*/
protected volatile boolean hasNestedData = false;
protected volatile boolean isConstant = true;
@Nullable
protected volatile Object constantValue = null;
private volatile boolean firstRow = true;

protected SortedMap<String, FieldIndexer> fieldIndexers = new TreeMap<>();
protected final ValueDictionary globalDictionary = new ValueDictionary();
Expand Down Expand Up @@ -111,6 +124,12 @@ public EncodedKeyComponent<StructuredData> processRowValsToUnsortedEncodedKeyCom
boolean reportParseExceptions
)
{
if (firstRow) {
constantValue = dimValues;
firstRow = false;
} else if (isConstant) {
isConstant = Objects.equals(dimValues, constantValue);
}
final long oldDictSizeInBytes = globalDictionary.sizeInBytes();
final int oldFieldKeySize = estimatedFieldKeySize;
final StructuredData data;
Expand All @@ -122,7 +141,12 @@ public EncodedKeyComponent<StructuredData> processRowValsToUnsortedEncodedKeyCom
} else {
data = new StructuredData(dimValues);
}
StructuredDataProcessor.ProcessResults info = indexerProcessor.processFields(data == null ? null : data.getValue());
final StructuredDataProcessor.ProcessResults info = indexerProcessor.processFields(
data == null ? null : data.getValue()
);
if (info.hasObjects()) {
hasNestedData = true;
}
// 'raw' data is currently preserved 'as-is', and not replaced with object references to the global dictionaries
long effectiveSizeBytes = info.getEstimatedSize();
// then, we add the delta of size change to the global dictionaries to account for any new space added by the
Expand All @@ -136,6 +160,12 @@ public EncodedKeyComponent<StructuredData> processRowValsToUnsortedEncodedKeyCom
public void setSparseIndexed()
{
this.hasNulls = true;
if (firstRow) {
firstRow = false;
} else if (constantValue != null) {
constantValue = null;
isConstant = false;
}
}

@Override
Expand Down Expand Up @@ -164,10 +194,6 @@ public SortedMap<String, FieldTypeInfo.MutableTypeSet> getFieldTypeInfo()
fields.put(entry.getKey(), entry.getValue().getTypes());
}
}
// special handling for when column only has arrays with null elements, treat it as a string array
if (fields.isEmpty() && fieldIndexers.size() == 1) {
fields.put(fieldIndexers.firstKey(), new FieldTypeInfo.MutableTypeSet().add(ColumnType.STRING_ARRAY));
}
return fields;
}

Expand Down Expand Up @@ -284,20 +310,43 @@ public ColumnCapabilities getColumnCapabilities()
.setHasNulls(hasNulls);
}

private ColumnType getLogicalType()
public ColumnType getLogicalType()
{
if (fieldIndexers.isEmpty()) {
if (hasNestedData) {
return ColumnType.NESTED_DATA;
}
if (isConstant && constantValue == null) {
// we didn't see anything, so we can be anything, so why not a string?
return ColumnType.STRING;
}
if (fieldIndexers.size() == 1 && fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) {
FieldIndexer rootField = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
ColumnType singleType = rootField.getTypes().getSingleType();
return singleType == null ? ColumnType.NESTED_DATA : singleType;
ColumnType logicalType = null;
for (ColumnType type : FieldTypeInfo.convertToSet(rootField.getTypes().getByteValue())) {
logicalType = ColumnType.leastRestrictiveType(logicalType, type);
}
if (logicalType != null) {
// special handle empty arrays
if (!rootField.getTypes().hasUntypedArray() || logicalType.isArray()) {
return logicalType;
}
return ColumnTypeFactory.getInstance().ofArray(logicalType);
}
}
return ColumnType.NESTED_DATA;
}

public boolean isConstant()
{
return isConstant;
}

@Nullable
public Object getConstantValue()
{
return constantValue;
}

@Override
public ColumnFormat getFormat()
{
Expand Down Expand Up @@ -517,26 +566,27 @@ private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?> eval)
eval.type()
);
}

final Object[] theArray = eval.asArray();
switch (columnType.getElementType().getType()) {
case LONG:
typeSet.add(ColumnType.LONG_ARRAY);
final Object[] longArray = eval.asArray();
sizeEstimate = valueDictionary.addLongArray(longArray);
return new StructuredDataProcessor.ProcessedValue<>(longArray, sizeEstimate);
sizeEstimate = valueDictionary.addLongArray(theArray);
return new StructuredDataProcessor.ProcessedValue<>(theArray, sizeEstimate);
case DOUBLE:
typeSet.add(ColumnType.DOUBLE_ARRAY);
final Object[] doubleArray = eval.asArray();
sizeEstimate = valueDictionary.addDoubleArray(doubleArray);
return new StructuredDataProcessor.ProcessedValue<>(doubleArray, sizeEstimate);
sizeEstimate = valueDictionary.addDoubleArray(theArray);
return new StructuredDataProcessor.ProcessedValue<>(theArray, sizeEstimate);
case STRING:
final Object[] stringArray = eval.asArray();
// empty arrays and arrays with all nulls are detected as string arrays, but dont count them as part of
// the type set
if (stringArray.length > 0 && !Arrays.stream(stringArray).allMatch(Objects::isNull)) {
// empty arrays and arrays with all nulls are detected as string arrays, but don't count them as part of
// the type set yet, we'll handle that later when serializing
if (theArray.length == 0 || Arrays.stream(theArray).allMatch(Objects::isNull)) {
typeSet.addUntypedArray();
} else {
typeSet.add(ColumnType.STRING_ARRAY);
}
sizeEstimate = valueDictionary.addStringArray(stringArray);
return new StructuredDataProcessor.ProcessedValue<>(stringArray, sizeEstimate);
sizeEstimate = valueDictionary.addStringArray(theArray);
return new StructuredDataProcessor.ProcessedValue<>(theArray, sizeEstimate);
default:
throw new IAE("Unhandled type: %s", columnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ColumnTypeFactory;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.data.Indexed;
Expand Down Expand Up @@ -82,7 +83,6 @@ public class AutoTypeColumnMerger implements DimensionMergerV9

private ColumnType logicalType;
private boolean isVariantType = false;
private boolean hasOnlyNulls = false;

public AutoTypeColumnMerger(
String name,
Expand Down Expand Up @@ -113,6 +113,11 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I

final SortedMap<String, FieldTypeInfo.MutableTypeSet> mergedFields = new TreeMap<>();

boolean forceNested = false;
Object constantValue = null;
boolean hasArrays = false;
boolean isConstant = true;

for (int i = 0; i < adapters.size(); i++) {
final IndexableAdapter adapter = adapters.get(i);
final IndexableAdapter.NestedColumnMergable mergable = closer.register(
Expand All @@ -121,31 +126,47 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
if (mergable == null) {
continue;
}
forceNested = forceNested || mergable.isForceNestedType();
isConstant = isConstant && mergable.isConstant();
constantValue = mergable.getConstantValue();

final SortedValueDictionary dimValues = mergable.getValueDictionary();

boolean allNulls = dimValues == null || dimValues.allNull();
sortedLookup = dimValues;
if (!allNulls) {
sortedLookup = dimValues;
mergable.mergeFieldsInto(mergedFields);
sortedLookups[i] = dimValues.getSortedStrings();
sortedLongLookups[i] = dimValues.getSortedLongs();
sortedDoubleLookups[i] = dimValues.getSortedDoubles();
sortedArrayLookups[i] = dimValues.getSortedArrays();
hasArrays = sortedArrayLookups[i].size() > 0;
numMergeIndex++;
}
}

// no data, we don't need to write this column
if (numMergeIndex == 0 && mergedFields.size() == 0) {
hasOnlyNulls = true;
return;
}

// check to see if we can specialize the serializer after merging all the adapters
final FieldTypeInfo.MutableTypeSet rootTypes = mergedFields.get(NestedPathFinder.JSON_PATH_ROOT);
final boolean rootOnly = mergedFields.size() == 1 && rootTypes != null;
if (rootOnly && rootTypes.getSingleType() != null) {


// for backwards compat; remove this constant handling in druid 28 along with
// indexSpec.optimizeJsonConstantColumns in favor of always writing constant columns
// we also handle the numMergeIndex == 0 here, which also indicates that the column is a null constant
if (!forceNested && ((isConstant && constantValue == null) || numMergeIndex == 0)) {
logicalType = ColumnType.STRING;
serializer = new ScalarStringColumnSerializer(
name,
indexSpec,
segmentWriteOutMedium,
closer
);
} else if (!forceNested && rootOnly && rootTypes.getSingleType() != null) {
logicalType = rootTypes.getSingleType();
// empty arrays can be missed since they don't have a type, so handle them here
if (!logicalType.isArray() && hasArrays) {
logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
}
switch (logicalType.getType()) {
case LONG:
serializer = new ScalarLongColumnSerializer(
Expand Down Expand Up @@ -187,13 +208,17 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
logicalType
);
}
} else if (rootOnly) {
} else if (!forceNested && rootOnly) {
// mixed type column, but only root path, we can use VariantArrayColumnSerializer
// pick the least restrictive type for the logical type
isVariantType = true;
for (ColumnType type : FieldTypeInfo.convertToSet(rootTypes.getByteValue())) {
logicalType = ColumnType.leastRestrictiveType(logicalType, type);
}
// empty arrays can be missed since they don't have a type, so handle them here
if (!logicalType.isArray() && hasArrays) {
logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
}
serializer = new VariantColumnSerializer(
name,
rootTypes.getByteValue(),
Expand Down Expand Up @@ -307,7 +332,8 @@ public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions)
@Override
public boolean hasOnlyNulls()
{
return hasOnlyNulls;
// we handle this internally using a constant column instead of using the generic null part serde
return false;
}

@Override
Expand Down
Loading

0 comments on commit 8454cc6

Please sign in to comment.