-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task #6129
Conversation
17f6b6b
to
1f4d2be
Compare
@@ -429,9 +429,6 @@ public void run() | |||
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true | |||
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale behind break
ing here if the last sequence is checkpointed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I discussed with @jihoonson because I noticed that previously it was setting the state to publishing and then going through a bunch of logic not related to publishing if stop wasn't user requested, and this is what was intended to happen here. As I understand, currently it will harmlessly but needlessly fall through the logic below before breaking out and publishing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Seems fine, since we haven't read anything from the Kafka consumer yet.
/** | ||
* Maximum number of rows in memory before persisting to local storage | ||
* | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please get rid of the return if it's not going to have anything useful on it; here, and a few other places in the file.
long getMaxBytesInMemory(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra newline here isn't needed.
* @return | ||
*/ | ||
@Nullable | ||
default Long getMaxTotalRows() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why null
instead of Long.MAX_VALUE
? That's more rows that one machine could possibly store anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never be used afaik, everything that uses this value gets it from json, this default is so I didn't have to add it to RealtimeTuningConfig
which implements AppenderatorConfig
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's never going to be used, how about throwing UnsupportedOperationException and marking it non-nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nullable to be consistent with how IndexTask
was using it. I don't have strong opinions about null
vs Long.MAX_VALUE
, I'll rework where this is used to get rid of nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't read the comment on that, it looks like IndexTask
needs this to be nullable, will throw UnsupportedOperationException
in default method at least.
@@ -611,6 +603,7 @@ public Object doCall() throws IOException | |||
} | |||
theSinks.put(identifier, sink); | |||
sink.finishWriting(); | |||
totalRows.addAndGet(-sink.getNumRows()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make more sense to decrement this at the same time as rowsCurrentlyInMemory
, bytesCurrentlyInMemory
, rather than here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, where you are suggesting? Those values are decremented at persist time, this one is happening at publish time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah, you're right. Let me re-read this with non-dumb eyes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I re-read it, less dumbly this time, and it looks good to me. But consider the finishWriting-returning-boolean thing.
@@ -1118,16 +1094,18 @@ public String apply(SegmentIdentifier input) | |||
final boolean removeOnDiskData | |||
) | |||
{ | |||
if (sink.isWritable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale behind moving this block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For one, it seemed to be more legit to check that to match the we only count active sinks
comment which implies writable to me. Additionally, since there are a couple of paths to decrementing the totalRows
counter, I wanted to make sure I never double decremented it (it's other decrement is also tied to finishWriting).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, to prevent races (abandonSegment could be called in a separate thread) how about having sink.finishWriting()
return true or false, corresponding to its writability state before the method was called? Then, only decrement the counters if it returns true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 after CI
I'm reviewing this PR. |
@@ -117,7 +117,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | |||
|`type`|String|The indexing task type, this should always be `kafka`.|yes| | |||
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| | |||
|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)| | |||
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| | |||
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| | |||
|`maxTotalRows`|Integer|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -281,6 +282,7 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported) | |||
); | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
@@ -476,7 +478,7 @@ public void testIncrementalHandOff() throws Exception | |||
} | |||
final String baseSequenceName = "sequence0"; | |||
// as soon as any segment has more than one record, incremental publishing should happen | |||
maxRowsPerSegment = 1; | |||
maxRowsPerSegment = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clintropolis would you tell me why this change is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR also matches the behavior of #6125 which changes a '>' to a '>=' and whose logic was moved here in this PR. So to not modify the test a ton i upped the count (since with new behavior the test scenario would be pushing every row).
I updated the main description of the PR to reflect this, thanks for the reminder, I forgot 👍
@@ -40,6 +40,7 @@ | |||
private final int maxRowsInMemory; | |||
private final long maxBytesInMemory; | |||
private final int maxRowsPerSegment; | |||
private final Long maxTotalRows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add @Nullable
.
|
||
@JsonProperty | ||
@Override | ||
public Long getMaxTotalRows() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
.
private final long maxBytesInMemory; | ||
private final int maxRowsPerSegment; | ||
private final Long maxTotalRows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
@JsonProperty | ||
public int getMaxRowsPerSegment() | ||
{ | ||
return maxRowsPerSegment; | ||
} | ||
|
||
@Override | ||
@JsonProperty | ||
public Long getMaxTotalRows() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
@@ -1485,6 +1485,7 @@ public long getMaxBytesInMemory() | |||
} | |||
|
|||
@JsonProperty | |||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add @Nullable
here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (but line changed is below this comment so github isn't squashing this one in the UI)
sink.finishWriting(); | ||
if (sink.finishWriting()) { | ||
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). | ||
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is this valid? It looks that this would not be called if this is executed first.
@jihoonson do you have any additional comments? I believe I've addressed all existing review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @clintropolis thanks!
bcfd2d9
to
e1af7cb
Compare
… kafka index task and appenderator based realtime indexing task, as available in IndexTask
c1d0c52
to
036f510
Compare
Resolves #5898 by adding
getMaxTotalRows
andgetMaxRowsPerSegment
toAppenderatorConfig
, extending the model used byIndexTask
toIncrementalPublishingKafkaIndexTaskRunner
andAppenderatorDriverRealtimeTask
.Additionally, tweaks
maxRowsPerSegment
behavior of kafka indexing to match the appenderator based realtime indexing change in #6125.