Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First with time #12235

Merged
merged 29 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7488f97
new test framework candidate
gortiz Jan 2, 2024
e634ea7
Improved test system
gortiz Jan 3, 2024
a132466
Improve framework to be able to specify segments as strings
gortiz Jan 3, 2024
67f24fd
fix headers
gortiz Jan 4, 2024
001293f
Improve assertions when there are nulls
gortiz Jan 8, 2024
ff6f1ff
Improve error text
gortiz Jan 8, 2024
b0380c5
Improvements in the framework
gortiz Jan 8, 2024
8084db5
Add a base class single input aggregation operations can extend to su…
gortiz Jan 8, 2024
965497f
Create a test for Mode
gortiz Jan 5, 2024
9335d98
Create a base to support null handling in more aggregations functions
gortiz Jan 5, 2024
837ce9e
Add test with current behavior
gortiz Jan 8, 2024
3ed7ed8
Support nullHandling in FirstWithTime
gortiz Jan 8, 2024
00e08ac
Add last_with_time as well
gortiz Jan 8, 2024
80abaf2
Add headers
gortiz Jan 9, 2024
0147343
Add a base class single input aggregation operations can extend to su…
gortiz Jan 8, 2024
b0562d6
Fix issue in NullableSingleInputAggregationFunction.forEachNotNullInt
gortiz Jan 9, 2024
9d9cdf1
Improve error message in NullEnabledQueriesTest
gortiz Jan 9, 2024
a421ab4
Add new schema family
gortiz Jan 9, 2024
0dd672e
Rename test schemas and table config
gortiz Jan 9, 2024
985ad03
Split AllNullQueriesTest into on test per query
gortiz Jan 10, 2024
365839a
Fix error in last with time
gortiz Jan 10, 2024
a82f663
Revert change in AllNullQueriesTest that belongs to mode-null-support…
gortiz Jan 10, 2024
d40db25
Merge branch 'null-aggr' into first-with-time
gortiz Jan 10, 2024
1eab299
Merge remote-tracking branch 'origin/master' into null-aggr
gortiz Jan 25, 2024
a8ee44c
Merge branch 'null-aggr' into first-with-time
gortiz Jan 25, 2024
3b17d9f
Merge remote-tracking branch 'origin/master' into null-aggr
gortiz Jan 29, 2024
dc8e65c
Merge branch 'null-aggr' into first-with-time
gortiz Jan 29, 2024
8d8b8b7
Merge branch 'master' into first-with-time
gortiz Mar 18, 2024
02a6464
Adapted to new framework
gortiz Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
DataType dataType = DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
switch (dataType) {
case BOOLEAN:
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled,
true);
case INT:
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled,
false);
case LONG:
return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case FLOAT:
return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case DOUBLE:
return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case STRING:
return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
default:
throw new IllegalArgumentException("Unsupported data type for FIRST_WITH_TIME: " + dataType);
}
Expand Down Expand Up @@ -300,17 +302,17 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
DataType dataType = DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
switch (dataType) {
case BOOLEAN:
return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, true);
case INT:
return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, false);
case LONG:
return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol);
return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case FLOAT:
return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case DOUBLE:
return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case STRING:
return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol);
return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
default:
throw new IllegalArgumentException("Unsupported data type for LAST_WITH_TIME: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -41,8 +41,9 @@
public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Double> {
private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new DoubleLongPair(Double.NaN, Long.MAX_VALUE);

public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean nullHandlingEnabled) {
super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, nullHandlingEnabled);
}

@Override
Expand All @@ -56,48 +57,42 @@ public ValueLongPair<Double> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
Double firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double data = doubleValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Double readCell(BlockValSet block, int docId) {
return block.getDoubleValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double data = doubleValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
double data = doubleValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double value = doubleValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
double value = doubleValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -41,8 +41,9 @@
public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Float> {
private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MAX_VALUE);

public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean nullHandlingEnabled) {
super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, nullHandlingEnabled);
}

@Override
Expand All @@ -56,48 +57,42 @@ public ValueLongPair<Float> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
Float firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float data = floatValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Float readCell(BlockValSet block, int docId) {
return block.getFloatValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float data = floatValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
float data = floatValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float value = floatValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
float value = floatValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -46,8 +46,8 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre
private final boolean _isBoolean;

public FirstIntValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean isBoolean) {
super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
boolean nullHandlingEnabled, boolean isBoolean) {
super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, nullHandlingEnabled);
_isBoolean = isBoolean;
}

Expand All @@ -62,48 +62,42 @@ public ValueLongPair<Integer> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
Integer firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int data = intValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Integer readCell(BlockValSet block, int docId) {
return block.getIntValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int data = intValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
int data = intValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int value = intValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
int value = intValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Loading
Loading