Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle null values in Range Partition dimension distribution #11973

Merged
merged 4 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_distribution";

// Future work: StringDistribution does not handle inserting NULLs. This is the same behavior as hadoop indexing.
private static final boolean SKIP_NULL = true;
// Do not skip nulls as StringDistribution can handle null values.
// This behavior is different from hadoop indexing.
private static final boolean SKIP_NULL = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be selectively turned on only when more than one dimension is being used? I don't know for certain what the impact of not skipping null will be but then that impact will be limited to new range partitioning only. or it can be based on a flag that you can pass via the context. thoughts?

Copy link
Contributor Author

@kfaraz kfaraz Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be fine without the flag.

The following effects would be observed on single dim:

  1. Partitioning would now also work on a dimension column that is always null, although it will actually create just one partition.
  2. Estimation of partition boundaries will also take into account null values. So the algorithm would do a better job of estimating the size of the first partition (it would be closer to the target rows). The sizes of later partitions will not be affected (although the same data being ingested before and after this change could have different partition boundaries as the first partition boundary might shift and the others would shift with it)

With the addition of (multi dimension) range partitioning, single dim is inevitably being affected as it now goes through the multi dim flow itself. So this would only be another part of that overall effect.


private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
Expand Down Expand Up @@ -276,9 +277,10 @@ private Map<Interval, StringDistribution> determineDistribution(
}
String[] values = new String[partitionDimensions.size()];
for (int i = 0; i < partitionDimensions.size(); ++i) {
values[i] = Iterables.getOnlyElement(
inputRow.getDimension(partitionDimensions.get(i))
);
List<String> dimensionValues = inputRow.getDimension(partitionDimensions.get(i));
if (dimensionValues != null && !dimensionValues.isEmpty()) {
values[i] = Iterables.getOnlyElement(dimensionValues);
}
}
final StringTuple partitionDimensionValues = StringTuple.create(values);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
{
private static final ArrayOfStringsSerDe STRINGS_SERDE = new ArrayOfStringsSerDe();
private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();

@Override
public byte[] serializeToByteArray(StringTuple[] items)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task.batch.parallel.distribution;

import org.apache.datasketches.ArrayOfItemsSerDe;
import org.apache.datasketches.ArrayOfStringsSerDe;
import org.apache.datasketches.Util;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.data.input.StringTuple;

import java.nio.charset.StandardCharsets;

/**
* Serde for {@link StringTuple}.
* <p>
* The implementation is the same as {@link ArrayOfStringsSerDe}, except this
* class handles null String values as well.
*/
public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
{

@Override
public byte[] serializeToByteArray(final String[] items)
{
int length = 0;
final byte[][] itemsBytes = new byte[items.length][];
for (int i = 0; i < items.length; i++) {
// If the String is null, make the byte array also null
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
itemsBytes[i] = items[i] == null ? null : items[i].getBytes(StandardCharsets.UTF_8);
length += (itemsBytes[i] == null ? 0 : itemsBytes[i].length) + Integer.BYTES;
}
final byte[] bytes = new byte[length];
final WritableMemory mem = WritableMemory.writableWrap(bytes);
long offsetBytes = 0;
for (int i = 0; i < items.length; i++) {
if (itemsBytes[i] != null) {
// Write the length of the array and the array itself
mem.putInt(offsetBytes, itemsBytes[i].length);
offsetBytes += Integer.BYTES;
mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
offsetBytes += itemsBytes[i].length;
} else {
// If the byte array is null, write the length as -1
mem.putInt(offsetBytes, -1);
offsetBytes += Integer.BYTES;
}
}
return bytes;
}

@Override
public String[] deserializeFromMemory(final Memory mem, final int numItems)
{
final String[] array = new String[numItems];
long offsetBytes = 0;
for (int i = 0; i < numItems; i++) {
// Read the length of the byte array
Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
final int arrayLength = mem.getInt(offsetBytes);
offsetBytes += Integer.BYTES;

// Negative strLength represents a null byte array and a null String
if (arrayLength >= 0) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
final byte[] bytes = new byte[arrayLength];
Util.checkBounds(offsetBytes, arrayLength, mem.getCapacity());
mem.getByteArray(offsetBytes, bytes, 0, arrayLength);
offsetBytes += arrayLength;
array[i] = new String(bytes, StandardCharsets.UTF_8);
}
}
return array;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task.batch.parallel.distribution;

import org.apache.datasketches.memory.Memory;
import org.junit.Assert;
import org.junit.Test;

public class ArrayOfStringsNullSafeSerdeTest
{

private final ArrayOfStringsNullSafeSerde serde = new ArrayOfStringsNullSafeSerde();

@Test
public void testStringArray()
{
testSerde("abc", "def", "xyz");
testSerde("abc", "123", "456.0");
}

@Test
public void testSingletonArray()
{
testSerde("abc");
testSerde("xyz");
}

@Test
public void testEmptyArray()
{
testSerde();
}

@Test
public void testArrayWithNullString()
{
testSerde((String) null);
testSerde("abc", null, "def");
testSerde(null, null, null);
}

@Test
public void testArrayWithEmptyString()
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
{
testSerde("");
testSerde("abc", "def", "");
testSerde("", "", "");
}

private void testSerde(String... inputArray)
{
byte[] bytes = serde.serializeToByteArray(inputArray);
String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length);
Assert.assertEquals(inputArray, deserialized);
}

}