Skip to content

Commit

Permalink
Revert "Fix memory reservation accounting problem about InMemoryHashA…
Browse files Browse the repository at this point in the history
…ggregationBuilder inside SpillableHashAggregationBuilder."

This reverts commit d79e4ad.
  • Loading branch information
vermapratyush committed Oct 12, 2023
1 parent adb705b commit 627165a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ private void initializeAggregationBuilderIfNeeded()
maxPartialMemory,
joinCompiler,
true,
useSystemMemory ? ReserveType.SYSTEM : ReserveType.USER);
useSystemMemory);
}
else {
verify(!useSystemMemory, "using system memory in spillable aggregations is not supported");
Expand Down Expand Up @@ -667,11 +667,4 @@ private static long calculateDefaultOutputHash(List<Type> groupByChannels, int g
}
return result;
}

public enum ReserveType
{
USER,
SYSTEM,
REVOCABLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.GroupByHash;
import com.facebook.presto.operator.HashAggregationOperator.ReserveType;
import com.facebook.presto.operator.HashCollisionsCounter;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.TransformWork;
Expand All @@ -45,7 +44,6 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;

import static com.facebook.presto.SystemSessionProperties.isDictionaryAggregationEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand All @@ -63,8 +61,7 @@ public class InMemoryHashAggregationBuilder
private final OptionalLong maxPartialMemory;
private final LocalMemoryContext systemMemoryContext;
private final LocalMemoryContext localUserMemoryContext;
private final ReserveType reserveType;
private final Consumer<Long> memoryConsumer;
private final boolean useSystemMemory;

private boolean full;

Expand All @@ -79,7 +76,7 @@ public InMemoryHashAggregationBuilder(
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
ReserveType reserveType)
boolean useSystemMemory)
{
this(accumulatorFactories,
step,
Expand All @@ -92,36 +89,7 @@ public InMemoryHashAggregationBuilder(
Optional.empty(),
joinCompiler,
yieldForMemoryReservation,
reserveType,
Optional.empty());
}

public InMemoryHashAggregationBuilder(
List<AccumulatorFactory> accumulatorFactories,
Step step,
int expectedGroups,
List<Type> groupByTypes,
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
Optional<Consumer<Long>> memoryConsumer)
{
this(accumulatorFactories,
step,
expectedGroups,
groupByTypes,
groupByChannels,
hashChannel,
operatorContext,
maxPartialMemory,
Optional.empty(),
joinCompiler,
yieldForMemoryReservation,
ReserveType.REVOCABLE,
memoryConsumer);
useSystemMemory);
}

public InMemoryHashAggregationBuilder(
Expand All @@ -136,31 +104,16 @@ public InMemoryHashAggregationBuilder(
Optional<Integer> overwriteIntermediateChannelOffset,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
ReserveType reserveType,
Optional<Consumer<Long>> memoryConsumer)
boolean useSystemMemory)
{
// reserveType is REVOCABLE implies current InMemoryHashAggregationBuilder is built from SpillableHashAggregationBuilder
// and it will accept a customized memoryConsumer for memory update
if (reserveType == ReserveType.REVOCABLE) {
checkArgument(memoryConsumer.isPresent(),
"memoryConsumer must be present when reserve type is REVOCABLE");
}

this.reserveType = reserveType;
if (memoryConsumer.isPresent()) {
this.memoryConsumer = memoryConsumer.get();
}
else {
this.memoryConsumer = this::updateMemory;
}

UpdateMemory updateMemory;
if (yieldForMemoryReservation) {
updateMemory = this::updateMemoryWithYieldInfo;
}
else {
// Report memory usage but do not yield for memory.
// This is specially used for spillable hash aggregation operator.
// TODO: revisit this when spillable hash aggregation operator is turned on
updateMemory = () -> {
updateMemoryWithYieldInfo();
return true;
Expand All @@ -179,6 +132,7 @@ public InMemoryHashAggregationBuilder(
this.maxPartialMemory = maxPartialMemory.map(dataSize -> OptionalLong.of(dataSize.toBytes())).orElseGet(OptionalLong::empty);
this.systemMemoryContext = operatorContext.newLocalSystemMemoryContext(InMemoryHashAggregationBuilder.class.getSimpleName());
this.localUserMemoryContext = operatorContext.localUserMemoryContext();
this.useSystemMemory = useSystemMemory;

// wrapper each function with an aggregator
ImmutableList.Builder<Aggregator> builder = ImmutableList.builder();
Expand All @@ -197,7 +151,7 @@ public InMemoryHashAggregationBuilder(
@Override
public void close()
{
memoryConsumer.accept(0L);
updateMemory(0);
}

@Override
Expand Down Expand Up @@ -372,28 +326,24 @@ private boolean updateMemoryWithYieldInfo()
{
long memorySize = getSizeInMemory();
if (partial && maxPartialMemory.isPresent()) {
memoryConsumer.accept(memorySize);
updateMemory(memorySize);
full = (memorySize > maxPartialMemory.getAsLong());
return true;
}
// Operator/driver will be blocked on memory after we call setBytes.
// If memory is not available, once we return, this operator will be blocked until memory is available.
memoryConsumer.accept(memorySize);
updateMemory(memorySize);
// If memory is not available, inform the caller that we cannot proceed for allocation.
return operatorContext.isWaitingForMemory().isDone();
}

private void updateMemory(long memorySize)
{
switch (reserveType) {
case USER:
localUserMemoryContext.setBytes(memorySize);
break;
case SYSTEM:
systemMemoryContext.setBytes(memorySize);
break;
default:
throw new AssertionError("InMemoryHashAggregationBuilder do not support reserve type: " + reserveType);
if (useSystemMemory) {
systemMemoryContext.setBytes(memorySize);
}
else {
localUserMemoryContext.setBytes(memorySize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.HashAggregationOperator.ReserveType;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.operator.WorkProcessor.Transformation;
Expand Down Expand Up @@ -151,7 +150,6 @@ private void rebuildHashAggregationBuilder()
Optional.of(overwriteIntermediateChannelOffset),
joinCompiler,
false,
ReserveType.USER,
Optional.empty());
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void rebuildHashAggregationBuilder()
Optional.of(DataSize.succinctBytes(0)),
joinCompiler,
false,
Optional.of((memorySize) -> localRevocableMemoryContext.setBytes(memorySize)));
false);
emptyHashAggregationBuilderSize = hashAggregationBuilder.getSizeInMemory();
}
}

0 comments on commit 627165a

Please sign in to comment.