Skip to content

Commit

Permalink
Add range filtering support for iceberg ingestion (apache#15782)
Browse files Browse the repository at this point in the history
* Add range filtering support for iceberg ingestion

* Docs formatting

* Spelling
  • Loading branch information
a2l007 authored and abhishekrb19 committed Feb 2, 2024
1 parent d27c38b commit 085f76d
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 5 deletions.
11 changes: 11 additions & 0 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,17 @@ This input source provides the following filters: `and`, `equals`, `interval`, a
|type|Set this value to `not`.|yes|
|filter|The iceberg filter on which logical NOT is applied|yes|

`range` Filter:

|Property|Description|Default|Required|
|--------|-----------|-------|--------|
|type|Set this value to `range`.|None|yes|
|filterColumn|The column name from the iceberg table schema based on which range filtering needs to happen.|None|yes|
|lower|Lower bound value to match.|None|no. At least one of `lower` or `upper` must not be null.|
|upper|Upper bound value to match. |None|no. At least one of `lower` or `upper` must not be null.|
|lowerOpen|Boolean indicating if lower bound is open in the interval of values defined by the range (">" instead of ">="). |false|no|
|upperOpen|Boolean indicating if upper bound is open on the interval of values defined by range ("<" instead of "<="). |false|no|

## Delta Lake input source

:::info
Expand Down
4 changes: 0 additions & 4 deletions extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
@JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class),
@JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
@JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class)
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
@JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
})
public interface IcebergFilter
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.iceberg.filter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

public class IcebergRangeFilter implements IcebergFilter
{
@JsonProperty
private final String filterColumn;
@JsonProperty
private final Boolean lowerOpen;
@JsonProperty
private final Boolean upperOpen;
@JsonProperty
private final Object lower;
@JsonProperty
private final Object upper;

@JsonCreator
public IcebergRangeFilter(
@JsonProperty("filterColumn") String filterColumn,
@JsonProperty("lower") @Nullable Object lower,
@JsonProperty("upper") @Nullable Object upper,
@JsonProperty("lowerOpen") @Nullable Boolean lowerOpen,
@JsonProperty("upperOpen") @Nullable Boolean upperOpen
)
{
Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the range filter");
Preconditions.checkArgument(lower != null || upper != null, "Both lower and upper bounds cannot be empty");
this.filterColumn = filterColumn;
this.lowerOpen = lowerOpen != null ? lowerOpen : false;
this.upperOpen = upperOpen != null ? upperOpen : false;
this.lower = lower;
this.upper = upper;
}


@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}

@Override
public Expression getFilterExpression()
{
List<Expression> expressions = new ArrayList<>();

if (lower != null) {
Expression lowerExp = lowerOpen
? Expressions.greaterThan(filterColumn, lower)
: Expressions.greaterThanOrEqual(filterColumn, lower);
expressions.add(lowerExp);
}
if (upper != null) {
Expression upperExp = upperOpen
? Expressions.lessThan(filterColumn, upper)
: Expressions.lessThanOrEqual(filterColumn, upper);
expressions.add(upperExp);
}
if (expressions.size() == 2) {
return Expressions.and(expressions.get(0), expressions.get(1));
}
return expressions.get(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.iceberg.filter;

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.junit.Assert;
import org.junit.Test;

public class IcebergRangeFilterTest
{
private final String TEST_COLUMN = "column1";

@Test
public void testUpperOpenFilter()
{
Expression expectedExpression = Expressions.and(
Expressions.greaterThanOrEqual(TEST_COLUMN, 45),
Expressions.lessThan(TEST_COLUMN, 50)
);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, false, true);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}

@Test
public void testLowerOpenFilter()
{
Expression expectedExpression = Expressions.and(
Expressions.greaterThan(TEST_COLUMN, 45),
Expressions.lessThanOrEqual(TEST_COLUMN, 50)
);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, true, false);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}

@Test
public void testNoLowerFilter()
{
Expression expectedExpression = Expressions.lessThanOrEqual(TEST_COLUMN, 50);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, null, 50, null, false);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}

@Test
public void testNoUpperFilter()
{
Expression expectedExpression = Expressions.greaterThanOrEqual(TEST_COLUMN, 100);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 100, null, null, null);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}
}
2 changes: 2 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1306,9 +1306,11 @@ kafka.topic
keyColumnName
keyFormat
listDelimiter
lowerOpen
timestamp
timestampColumnName
timestampSpec
upperOpen
urls
valueFormat
1GB
Expand Down

0 comments on commit 085f76d

Please sign in to comment.