Skip to content

Commit

Permalink
Support nullHandling in FirstWithTime
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz committed Jan 8, 2024
1 parent 837ce9e commit 3ed7ed8
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
DataType dataType = DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
switch (dataType) {
case BOOLEAN:
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled,
true);
case INT:
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled,
false);
case LONG:
return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case FLOAT:
return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case DOUBLE:
return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
case STRING:
return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled);
default:
throw new IllegalArgumentException("Unsupported data type for FIRST_WITH_TIME: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -41,8 +41,9 @@
public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Double> {
private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new DoubleLongPair(Double.NaN, Long.MAX_VALUE);

public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean nullHandlingEnabled) {
super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, nullHandlingEnabled);
}

@Override
Expand All @@ -56,48 +57,42 @@ public ValueLongPair<Double> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
Double firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double data = doubleValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Double readCell(BlockValSet block, int docId) {
return block.getDoubleValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double data = doubleValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
double data = doubleValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
double value = doubleValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
double value = doubleValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -41,8 +41,9 @@
public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Float> {
private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MAX_VALUE);

public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean nullHandlingEnabled) {
super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, nullHandlingEnabled);
}

@Override
Expand All @@ -56,48 +57,42 @@ public ValueLongPair<Float> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
Float firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float data = floatValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Float readCell(BlockValSet block, int docId) {
return block.getFloatValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float data = floatValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
float data = floatValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
float value = floatValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
float value = floatValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.roaringbitmap.IntIterator;


/**
Expand All @@ -46,8 +46,8 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre
private final boolean _isBoolean;

public FirstIntValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol,
boolean isBoolean) {
super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
boolean nullHandlingEnabled, boolean isBoolean) {
super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, nullHandlingEnabled);
_isBoolean = isBoolean;
}

Expand All @@ -62,48 +62,42 @@ public ValueLongPair<Integer> getDefaultValueTimePair() {
}

@Override
public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
BlockValSet blockValSet, BlockValSet timeValSet) {
ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
Integer firstData = defaultValueLongPair.getValue();
long firstTime = defaultValueLongPair.getTime();
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int data = intValues[i];
long time = timeValues[i];
if (time <= firstTime) {
firstTime = time;
firstData = data;
}
}
setAggregationResult(aggregationResultHolder, firstData, firstTime);
public Integer readCell(BlockValSet block, int docId) {
return block.getIntValuesSV()[docId];
}

@Override
public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int data = intValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
int data = intValues[i];
long time = timeValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
}
});
}

@Override
public void aggregateGroupResultWithRawDataMv(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
int value = intValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);

IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
forEachNotNull(length, nullIdxIterator, (from, to) -> {
for (int i = from; i < to; i++) {
int value = intValues[i];
long time = timeValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, time);
}
}
}
});
}

@Override
Expand Down
Loading

0 comments on commit 3ed7ed8

Please sign in to comment.