Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task #6129

Merged
merged 4 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
Expand All @@ -131,7 +132,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)|
|`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|

#### IndexSpec

Expand Down Expand Up @@ -314,10 +315,10 @@ In this way, configuration changes can be applied without requiring any pause in
### On the Subject of Segments

Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition
granular interval until maxRowsPerSegment, maxTotalRows or intermediateHandoffPeriod limit is reached, at this point a new partition
for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment
or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment,
maxTotalRows or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
and new set of segments will be created for further events. This means that the task can run for longer durations of time
without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,6 @@ public void run()
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
status = Status.PUBLISHING;
}

if (stopRequested.get()) {
break;
}

Expand Down Expand Up @@ -530,10 +527,8 @@ public void run()
if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
if (!sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
isPersistRequired |= addResult.isPersistRequired();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final int maxRowsPerSegment;
@Nullable
private final Long maxTotalRows;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@Deprecated
Expand All @@ -61,6 +63,7 @@ public KafkaTuningConfig(
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
Expand All @@ -85,6 +88,7 @@ public KafkaTuningConfig(
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = maxTotalRows;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
Expand Down Expand Up @@ -123,6 +127,7 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
config.maxRowsInMemory,
config.maxBytesInMemory,
config.maxRowsPerSegment,
config.maxTotalRows,
config.intermediatePersistPeriod,
config.basePersistDirectory,
config.maxPendingPersists,
Expand Down Expand Up @@ -153,12 +158,22 @@ public long getMaxBytesInMemory()
return maxBytesInMemory;
}

@Override
@JsonProperty
public int getMaxRowsPerSegment()
{
return maxRowsPerSegment;
}


@JsonProperty
@Override
@Nullable
public Long getMaxTotalRows()
{
return maxTotalRows;
}

@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
Expand Down Expand Up @@ -255,6 +270,7 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
dir,
maxPendingPersists,
Expand Down Expand Up @@ -284,6 +300,7 @@ public boolean equals(Object o)
return maxRowsInMemory == that.maxRowsInMemory &&
maxRowsPerSegment == that.maxRowsPerSegment &&
maxBytesInMemory == that.maxBytesInMemory &&
Objects.equals(maxTotalRows, that.maxTotalRows) &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
handoffConditionTimeout == that.handoffConditionTimeout &&
Expand All @@ -305,6 +322,7 @@ public int hashCode()
maxRowsInMemory,
maxRowsPerSegment,
maxBytesInMemory,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
Expand All @@ -326,6 +344,7 @@ public String toString()
return "KafkaTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxRowsPerSegment=" + maxRowsPerSegment +
", maxTotalRows=" + maxTotalRows +
", maxBytesInMemory=" + maxBytesInMemory +
", intermediatePersistPeriod=" + intermediatePersistPeriod +
", basePersistDirectory=" + basePersistDirectory +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public KafkaSupervisorSpec(
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
Expand All @@ -69,6 +70,7 @@ public KafkaSupervisorTuningConfig(
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
Expand Down Expand Up @@ -134,6 +136,7 @@ public String toString()
return "KafkaSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
Expand Down
Loading