Skip to content

Commit

Permalink
API: Handle NaN lower or upper bound in stats evaluators (#2069)
Browse files Browse the repository at this point in the history
  • Loading branch information
yyanyy authored Jan 20, 2021
1 parent c75ac35 commit b018e21
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.NaNUtil;

import static org.apache.iceberg.expressions.Expressions.rewriteNot;

Expand All @@ -44,6 +45,11 @@
* Files are passed to {@link #eval(ContentFile)}, which returns true if the file may contain matching
* rows and false if the file cannot contain matching rows. Files may be skipped if and only if the
* return value of {@code eval} is false.
* <p>
* Due to the comparison implementation of ORC stats, for float/double columns in ORC files, if the first
* value in a file is NaN, metrics of this file will report NaN for both upper and lower bound despite
* that the column could contain non-NaN data. Thus in some scenarios explicitly checks for NaN is necessary
* in order to not skip files that may contain matching data.
*/
public class InclusiveMetricsEvaluator {
private static final int IN_PREDICATE_LIMIT = 200;
Expand Down Expand Up @@ -184,13 +190,18 @@ public <T> Boolean notNaN(BoundReference<T> ref) {
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp >= 0) {
return ROWS_CANNOT_MATCH;
Expand All @@ -204,13 +215,18 @@ public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp > 0) {
return ROWS_CANNOT_MATCH;
Expand All @@ -224,7 +240,7 @@ public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

Expand All @@ -244,7 +260,7 @@ public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

Expand All @@ -264,13 +280,18 @@ public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp > 0) {
return ROWS_CANNOT_MATCH;
Expand Down Expand Up @@ -300,7 +321,7 @@ public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
Integer id = ref.fieldId();

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_CANNOT_MATCH;
}

Expand All @@ -313,6 +334,12 @@ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH;
}

literals = literals.stream().filter(v -> ref.comparator().compare(lower, v) <= 0).collect(Collectors.toList());
if (literals.isEmpty()) { // if all values are less than lower bound, rows cannot match.
return ROWS_CANNOT_MATCH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.NaNUtil;

import static org.apache.iceberg.expressions.Expressions.rewriteNot;

Expand All @@ -44,6 +45,11 @@
* <p>
* Files are passed to {@link #eval(ContentFile)}, which returns true if all rows in the file must
* contain matching rows and false if the file may contain rows that do not match.
* <p>
* Due to the comparison implementation of ORC stats, for float/double columns in ORC files, if the first
* value in a file is NaN, metrics of this file will report NaN for both upper and lower bound despite
* that the column could contain non-NaN data. Thus in some scenarios explicitly checks for NaN is necessary
* in order to not include files that may contain rows that don't match.
*/
public class StrictMetricsEvaluator {
private final Schema schema;
Expand Down Expand Up @@ -179,7 +185,7 @@ public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

Expand All @@ -202,7 +208,7 @@ public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

Expand All @@ -225,13 +231,18 @@ public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(field.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
return ROWS_MIGHT_NOT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp > 0) {
return ROWS_MUST_MATCH;
Expand All @@ -248,13 +259,18 @@ public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(field.type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
return ROWS_MIGHT_NOT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp >= 0) {
return ROWS_MUST_MATCH;
Expand All @@ -271,7 +287,7 @@ public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

Expand Down Expand Up @@ -304,13 +320,18 @@ public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_MUST_MATCH;
}

if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(struct.field(id).type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
return ROWS_MIGHT_NOT_MATCH;
}

int cmp = lit.comparator().compare(lower, lit.value());
if (cmp > 0) {
return ROWS_MUST_MATCH;
Expand All @@ -335,7 +356,7 @@ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (canContainNulls(id)) {
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}

Expand Down Expand Up @@ -371,7 +392,7 @@ public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
Types.NestedField field = struct.field(id);
Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));

if (containsNullsOnly(id)) {
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_MUST_MATCH;
}

Expand All @@ -380,6 +401,11 @@ public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(struct.field(id).type(), lowerBounds.get(id));

if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
return ROWS_MIGHT_NOT_MATCH;
}

literals = literals.stream().filter(v -> ref.comparator().compare(lower, v) <= 0).collect(Collectors.toList());
if (literals.isEmpty()) { // if all values are less than lower bound, rows must match (notIn).
return ROWS_MUST_MATCH;
Expand All @@ -406,6 +432,11 @@ private boolean canContainNulls(Integer id) {
return nullCounts == null || (nullCounts.containsKey(id) && nullCounts.get(id) > 0);
}

private boolean canContainNaNs(Integer id) {
// nan counts might be null for early version writers when nan counters are not populated.
return nanCounts != null && nanCounts.containsKey(id) && nanCounts.get(id) > 0;
}

private boolean containsNullsOnly(Integer id) {
return valueCounts != null && valueCounts.containsKey(id) &&
nullCounts != null && nullCounts.containsKey(id) &&
Expand Down
Loading

0 comments on commit b018e21

Please sign in to comment.