Skip to content

Commit

Permalink
Refactoring the data source before unnest (#13085)
Browse files Browse the repository at this point in the history
* First set of changes for framework

* Second set of changes to move segment map function to data source

* Minot change to server manager

* Removing the createSegmentMapFunction from JoinableFactoryWrapper and moving to JoinDataSource

* Checkstyle fixes

* Patching Eric's fix for injection

* Checkstyle and fixing some CI issues

* Fixing code inspections and some failed tests and one injector for test in avatica

* Another set of changes for CI...almost there

* Equals and hashcode part update

* Fixing injector from Eric + refactoring for broadcastJoinHelper

* Updating second injector. Might revert later if better way found

* Fixing guice issue in JoinableFactory

* Addressing review comments part 1

* Temp changes refactoring

* Revert "Temp changes refactoring"

This reverts commit 9da42a9.

* temp

* Temp discussions

* Refactoring temp

* Refatoring the query rewrite to refer to a datasource

* Refactoring getCacheKey by moving it inside data source

* Nullable annotation check in injector

* Addressing some comments, removing 2 analysis.isJoin() checks and correcting the benchmark files

* Minor changes for refactoring

* Addressing reviews part 1

* Refactoring part 2 with new test cases for broadcast join

* Set for nullables

* removing instance of checks

* Storing nullables in guice to avoid checking on reruns

* Fixing a test case and removing an irrelevant line

* Addressing the atomic reference review comments
  • Loading branch information
somu-imply authored Oct 26, 2022
1 parent 72c1609 commit affc522
Show file tree
Hide file tree
Showing 59 changed files with 1,320 additions and 1,024 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setup()
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper()
);
groupByQuery = GroupByQuery
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand Down Expand Up @@ -96,14 +95,12 @@ public ObjectMapper getSmileMapper(Injector injector)
return smileMapper;
}

@VisibleForTesting
public static void setupJackson(Injector injector, final ObjectMapper mapper)
{
mapper.setInjectableValues(new GuiceInjectableValues(injector));
setupAnnotationIntrospector(mapper, new GuiceAnnotationIntrospector());
}

@VisibleForTesting
public static void setupAnnotationIntrospector(
final ObjectMapper mapper,
final AnnotationIntrospector annotationIntrospector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.google.inject.ConfigurationException;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.java.util.common.IAE;

import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReference;

/**
*
*/
public class GuiceInjectableValues extends InjectableValues
{
private final Injector injector;
private final AtomicReference<HashSet<Key>> nullables;

public GuiceInjectableValues(Injector injector)
{
this.injector = injector;
this.nullables = new AtomicReference<>(new HashSet<>());
}

@Override
Expand All @@ -49,8 +57,22 @@ public Object findInjectableValue(
// whatever provider needs"
// Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
// great care
if (valueId instanceof Key) {
return injector.getInstance((Key) valueId);
if (nullables.get().contains((Key) valueId)) {
return null;
} else if (valueId instanceof Key) {
try {
return injector.getInstance((Key) valueId);
}
catch (ConfigurationException ce) {
// check if nullable annotation is present for this
if (forProperty.getAnnotation(Nullable.class) != null) {
HashSet<Key> encounteredNullables = new HashSet<>(nullables.get());
encounteredNullables.add((Key) valueId);
nullables.set(encounteredNullables);
return null;
}
throw ce;
}
}
throw new IAE(
"Unknown class type [%s] for valueId [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
Expand Down Expand Up @@ -88,6 +89,58 @@ protected BaseLeafFrameProcessor(
this.broadcastJoinHelper = inputChannelsAndBroadcastJoinHelper.rhs;
}

/**
* Helper that enables implementations of {@link BaseLeafFrameProcessorFactory} to set up their primary and side channels.
*/
private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputChannelsAndBroadcastJoinHelper(
final DataSource dataSource,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

final List<ReadableFrameChannel> inputChannels = new ArrayList<>();
final BroadcastJoinHelper broadcastJoinHelper;

if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

if (baseInput.hasChannel()) {
// BroadcastJoinHelper doesn't need to read the base channel, so stub in a null reader.
channelReaders.add(null);
}

for (Int2ObjectMap.Entry<ReadableInput> sideChannelEntry : sideChannels.int2ObjectEntrySet()) {
final int inputNumber = sideChannelEntry.getIntKey();
inputNumberToProcessorChannelMap.put(inputNumber, inputChannels.size());
inputChannels.add(sideChannelEntry.getValue().getChannel());
channelReaders.add(sideChannelEntry.getValue().getChannelFrameReader());
}

broadcastJoinHelper = new BroadcastJoinHelper(
inputNumberToProcessorChannelMap,
inputChannels,
channelReaders,
joinableFactory,
memoryReservedForBroadcastJoin
);
} else {
broadcastJoinHelper = null;
}

return Pair.of(inputChannels, broadcastJoinHelper);
}

@Override
public List<ReadableFrameChannel> inputChannels()
{
Expand Down Expand Up @@ -146,71 +199,19 @@ protected SegmentReference mapSegment(final Segment segment)

private boolean initializeSegmentMapFn(final IntSet readableInputs)
{
final AtomicLong cpuAccumulator = new AtomicLong();
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
segmentMapFn = Function.identity();
return true;
} else {
final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);

DataSource inlineChannelDataSource = broadcastJoinHelper.inlineChannelData(query.getDataSource());
if (retVal) {
segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);
}

return retVal;
}
}

/**
* Helper that enables implementations of {@link BaseLeafFrameProcessorFactory} to set up their primary and side channels.
*/
private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputChannelsAndBroadcastJoinHelper(
final DataSource dataSource,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

final List<ReadableFrameChannel> inputChannels = new ArrayList<>();
final BroadcastJoinHelper broadcastJoinHelper;

if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

if (baseInput.hasChannel()) {
// BroadcastJoinHelper doesn't need to read the base channel, so stub in a null reader.
channelReaders.add(null);
}

for (Int2ObjectMap.Entry<ReadableInput> sideChannelEntry : sideChannels.int2ObjectEntrySet()) {
final int inputNumber = sideChannelEntry.getIntKey();
inputNumberToProcessorChannelMap.put(inputNumber, inputChannels.size());
inputChannels.add(sideChannelEntry.getValue().getChannel());
channelReaders.add(sideChannelEntry.getValue().getChannelFrameReader());
}

broadcastJoinHelper = new BroadcastJoinHelper(
inputNumberToProcessorChannelMap,
inputChannels,
channelReaders,
joinableFactory,
memoryReservedForBroadcastJoin
);
} else {
broadcastJoinHelper = null;
}

return Pair.of(inputChannels, broadcastJoinHelper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.msq.querykit;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand All @@ -32,18 +31,12 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactoryWrapper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BroadcastJoinHelper
Expand Down Expand Up @@ -138,21 +131,9 @@ public IntSet getSideChannelNumbers()
return sideChannelNumbers;
}

public Function<SegmentReference, SegmentReference> makeSegmentMapFn(final Query<?> query)
{
final DataSource dataSourceWithInlinedChannelData = inlineChannelData(query.getDataSource());
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSourceWithInlinedChannelData);

return joinableFactory.createSegmentMapFn(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
new AtomicLong(),
analysis.getBaseQuery().orElse(query)
);
}

@VisibleForTesting
DataSource inlineChannelData(final DataSource originalDataSource)

public DataSource inlineChannelData(final DataSource originalDataSource)
{
if (originalDataSource instanceof InputNumberDataSource) {
final int inputNumber = ((InputNumberDataSource) originalDataSource).getInputNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ private static DataSourcePlan forJoin(
clause.getPrefix(),
clause.getCondition(),
clause.getJoinType(),

// First JoinDataSource (i == 0) involves the base table, so we need to propagate the base table filter.
i == 0 ? analysis.getJoinBaseTableFilter().orElse(null) : null
i == 0 ? analysis.getJoinBaseTableFilter().orElse(null) : null,
dataSource.getJoinableFactoryWrapper()
);
inputSpecs.addAll(clausePlan.getInputSpecs());
clausePlan.getBroadcastInputs().intStream().forEach(n -> broadcastInputs.add(n + shift));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

@JsonTypeName("inputNumber")
public class InputNumberDataSource implements DataSource
Expand Down Expand Up @@ -81,6 +85,27 @@ public boolean isConcrete()
return false;
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAcc
)
{
return Function.identity();
}

@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return newSource;
}

@Override
public byte[] getCacheKey()
{
return null;
}

@JsonProperty
public int getInputNumber()
{
Expand Down
Loading

0 comments on commit affc522

Please sign in to comment.