Skip to content

Commit

Permalink
Temp changes refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
somu-imply committed Oct 3, 2022
1 parent 1b75daf commit 9da42a9
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return analysis.getDataSource().createSegmentMapFunction(query, new AtomicLong());
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

@JsonProperty
public int getInputNumber()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;

import java.util.List;
Expand Down Expand Up @@ -99,4 +100,12 @@ public interface DataSource
* @return the segment function
*/
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);

/**
*
* @return a non-empty byte array - If there is join datasource involved and caching is possible.
* NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
* in the JOIN is not cacheable.
*/
byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -243,6 +244,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return Function.identity();
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

/**
* Returns the row signature (map of column name to type) for this inline datasource. Note that types may
* be null, meaning we know we have a column with a certain name, but we don't know what its type is.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand All @@ -59,6 +61,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -86,6 +89,8 @@ public class JoinDataSource implements DataSource
private final String rightPrefix;
private final JoinConditionAnalysis conditionAnalysis;
private final JoinType joinType;
private static final byte JOIN_OPERATION = 0x1;
private static final Logger log = new Logger(JoinDataSource.class);
// An optional filter on the left side if left is direct table access
@Nullable
private final DimFilter leftFilter;
Expand Down Expand Up @@ -425,4 +430,32 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
);
return segmentMapFn;
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
if (clauses.isEmpty()) {
throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
}

final CacheKeyBuilder keyBuilder;
keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
if (dataSourceAnalysis.getJoinBaseTableFilter().isPresent()) {
keyBuilder.appendCacheable(dataSourceAnalysis.getJoinBaseTableFilter().get());
}
for (PreJoinableClause clause : clauses) {
Optional<byte[]> bytes = joinableFactoryWrapper.getJoinableFactory().computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
if (!bytes.isPresent()) {
// Encountered a data source which didn't support cache yet
log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
return new byte[]{};
}
keyBuilder.appendByteArray(bytes.get());
keyBuilder.appendString(clause.getCondition().getOriginalExpression());
keyBuilder.appendString(clause.getPrefix());
keyBuilder.appendString(clause.getJoinType().name());
}
return keyBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
Expand Down Expand Up @@ -108,6 +109,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return Function.identity();
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
Expand Down Expand Up @@ -100,6 +101,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return Function.identity();
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
Expand Down Expand Up @@ -105,6 +106,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return Function.identity();
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;

import java.util.List;
Expand Down Expand Up @@ -117,6 +118,12 @@ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
return Function.identity();
}

@Override
public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis)
{
return new byte[]{};
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,32 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopDataSource;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;


public class JoinDataSourceTest
Expand Down Expand Up @@ -180,6 +192,33 @@ public void test_serde() throws Exception
Assert.assertEquals(joinDataSource, deserialized);
}

@Test
public void test_computeJoinDataSourceCacheKey_noClauses()
{
DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class);
JoinDataSource joinDataSource = JoinDataSource.create(
new TableDataSource("table1"),
new TableDataSource("table2"),
"j.",
"x == \"j.x\"",
JoinType.LEFT,
TrueDimFilter.instance(),
ExprMacroTable.nil(),
joinableFactoryWrapper
);
EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList());
EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty());
EasyMock.expect(analysis.getDataSource()).andReturn(joinDataSource);
EasyMock.replay(analysis);

expectedException.expect(IAE.class);
expectedException.expectMessage(StringUtils.format(
"No join clauses to build the cache key for data source [%s]",
joinDataSource
));
joinDataSource.getCacheKey(analysis);
}

@Test
public void testException_leftFilterOnNonTableSource()
{
Expand Down
Loading

0 comments on commit 9da42a9

Please sign in to comment.